Bug 22417: Process the jobs even if the message broker is not reachable

Signed-off-by: Kyle M Hall <kyle@bywatersolutions.com>

Signed-off-by: Jonathan Druart <jonathan.druart@bugs.koha-community.org>
This commit is contained in:
Jonathan Druart 2020-09-02 11:59:10 +02:00
parent 98503334cc
commit 5b5083bc7b
2 changed files with 53 additions and 27 deletions

View file

@ -19,6 +19,7 @@ use Modern::Perl;
use JSON qw( encode_json decode_json );
use Carp qw( croak );
use Net::Stomp;
use Try::Tiny;
use C4::Context;
use Koha::DateUtils qw( dt_from_string );
@ -104,14 +105,22 @@ sub enqueue {
$job_args->{job_id} = $job_id;
$json_args = encode_json $job_args;
my $conn = $self->connect;
# This namespace is wrong, it must be a vhost instead.
# But to do so it needs to be created on the server => much more work when a new Koha instance is created.
# Also, here we just want the Koha instance's name, but it's not in the config...
# Picking a random id (memcached_namespace) from the config
my $namespace = C4::Context->config('memcached_namespace');
$conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
or Koha::Exceptions::Exception->throw('Job has not been enqueued');
try {
my $conn = $self->connect;
# This namespace is wrong, it must be a vhost instead.
# But to do so it needs to be created on the server => much more work when a new Koha instance is created.
# Also, here we just want the Koha instance's name, but it's not in the config...
# Picking a random id (memcached_namespace) from the config
my $namespace = C4::Context->config('memcached_namespace');
$conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
or Koha::Exceptions::Exception->throw('Job has not been enqueued');
} catch {
if ( ref($_) eq 'Koha::Exceptions::Exception' ) {
$_->rethrow;
} else {
warn sprintf "The job has not been sent to the message broker: (%s)", $_;
}
};
}
);

View file

@ -17,33 +17,50 @@
use Modern::Perl;
use JSON qw( encode_json decode_json );
use Try::Tiny;
use Koha::BackgroundJobs;
my $conn = Koha::BackgroundJob->connect;
my $conn;
try {
$conn = Koha::BackgroundJob->connect;
} catch {
warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
};
my @job_types = qw( batch_biblio_record_modification batch_authority_record_modification );
# FIXME cf note in Koha::BackgroundJob about $namespace
my $namespace = C4::Context->config('memcached_namespace');
for my $job_type ( @job_types ) {
$conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $job_type), ack => 'client' });
if ( $conn ) {
# FIXME cf note in Koha::BackgroundJob about $namespace
my $namespace = C4::Context->config('memcached_namespace');
for my $job_type ( @job_types ) {
$conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $job_type), ack => 'client' });
}
}
while (1) {
my $frame = $conn->receive_frame;
if ( !defined $frame ) {
# maybe log connection problems
next; # will reconnect automatically
if ( $conn ) {
my $frame = $conn->receive_frame;
if ( !defined $frame ) {
# maybe log connection problems
next; # will reconnect automatically
}
my $body = $frame->body;
my $args = decode_json($body);
# FIXME This means we need to have create the DB entry before
# It could work in a first step, but then we will want to handle job that will be created from the message received
my $job = Koha::BackgroundJobs->find($args->{job_id});
my $success = $job->process( $args );
$conn->ack( { frame => $frame } ); # FIXME depending on $success?
} else {
my $jobs = Koha::BackgroundJobs->search({ status => 'new' });
while ( my $job = $jobs->next ) {
my $args = decode_json($job->data);
$job->process( { job_id => $job->id, %$args } );
}
sleep 10;
}
my $body = $frame->body;
my $args = decode_json($body);
# FIXME This means we need to have create the DB entry before
# It could work in a first step, but then we will want to handle job that will be created from the message received
my $job = Koha::BackgroundJobs->find($args->{job_id});
my $success = $job->process( $args );
$conn->ack( { frame => $frame } ); # FIXME depending on $success?
}
$conn->disconnect;