Bug 35819: Adjust es_indexer_daemon
Signed-off-by: Tomas Cohen Arazi <tomascohen@theke.io> Signed-off-by: Marcel de Rooy <m.de.rooy@rijksmuseum.nl> [EDIT] Add forgotten module Signed-off-by: Katrin Fischer <katrin.fischer@bsz-bw.de> (cherry picked from commit7c1d4716be
) Signed-off-by: Fridolin Somers <fridolin.somers@biblibre.com> (cherry picked from commitfebc2570de
) Signed-off-by: Lucas Gass <lucas@bywatersolutions.com>
This commit is contained in:
parent
d8dc98f7c5
commit
f1891bcc45
1 changed files with 40 additions and 11 deletions
|
@ -56,6 +56,7 @@ use Try::Tiny;
|
|||
use Pod::Usage;
|
||||
use Getopt::Long;
|
||||
use List::MoreUtils qw( natatime );
|
||||
use Time::HiRes;
|
||||
|
||||
use C4::Context;
|
||||
use Koha::Logger;
|
||||
|
@ -65,6 +66,10 @@ use Koha::SearchEngine::Indexer;
|
|||
|
||||
|
||||
my ( $help, $batch_size );
|
||||
|
||||
my $not_found_retries = {};
|
||||
my $max_retries = $ENV{MAX_RETRIES} || 10;
|
||||
|
||||
GetOptions(
|
||||
'h|help' => \$help,
|
||||
'b|batch_size=s' => \$batch_size
|
||||
|
@ -116,23 +121,47 @@ while (1) {
|
|||
my $body = $frame->body;
|
||||
decode_json($body); # TODO Should this be from_json? Check utf8 flag.
|
||||
} catch {
|
||||
$logger->warn(sprintf "Frame not processed - %s", $_);
|
||||
Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_);
|
||||
return;
|
||||
} finally {
|
||||
$conn->ack( { frame => $frame } );
|
||||
};
|
||||
|
||||
next unless $args;
|
||||
|
||||
# FIXME This means we need to have create the DB entry before
|
||||
# It could work in a first step, but then we will want to handle job that will be created from the message received
|
||||
my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
|
||||
|
||||
unless ($job) {
|
||||
$logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
|
||||
unless ( $args ) {
|
||||
Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it");
|
||||
$conn->nack( { frame => $frame, requeue => 'false' } );
|
||||
next;
|
||||
}
|
||||
|
||||
my $job = Koha::BackgroundJobs->find( $args->{job_id} );
|
||||
|
||||
if ( $job && $job->status ne 'new' ) {
|
||||
Koha::Logger->get( { interface => 'worker' } )
|
||||
->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status );
|
||||
|
||||
# nack without requeue, we do not want to process this frame again
|
||||
$conn->nack( { frame => $frame, requeue => 'false' } );
|
||||
next;
|
||||
}
|
||||
|
||||
unless ($job) {
|
||||
if ( ++$not_found_retries->{$args->{job_id}} >= $max_retries ) {
|
||||
Koha::Logger->get( { interface => 'worker' } )
|
||||
->warn( sprintf "Job %s not found, no more retry", $args->{job_id} );
|
||||
|
||||
# nack without requeue, we do not want to process this frame again
|
||||
$conn->nack( { frame => $frame, requeue => 'false' } );
|
||||
next;
|
||||
}
|
||||
|
||||
Koha::Logger->get( { interface => 'worker' } )
|
||||
->debug( sprintf "Job %s not found, will retry later", $args->{job_id} );
|
||||
|
||||
# nack to force requeue
|
||||
$conn->nack( { frame => $frame, requeue => 'true' } );
|
||||
Time::HiRes::sleep(0.5);
|
||||
next;
|
||||
}
|
||||
$conn->ack( { frame => $frame } );
|
||||
|
||||
push @jobs, $job;
|
||||
if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
|
||||
commit(@jobs);
|
||||
|
|
Loading…
Reference in a new issue