Bug 35819: Adjust es_indexer_daemon

Signed-off-by: Tomas Cohen Arazi <tomascohen@theke.io>

Signed-off-by: Marcel de Rooy <m.de.rooy@rijksmuseum.nl>
[EDIT] Add forgotten module
Signed-off-by: Katrin Fischer <katrin.fischer@bsz-bw.de>
(cherry picked from commit 7c1d4716be)
Signed-off-by: Fridolin Somers <fridolin.somers@biblibre.com>
This commit is contained in:
Jonathan Druart 2024-01-30 09:30:38 +01:00 committed by Fridolin Somers
parent f8af6d5028
commit febc2570de

View file

@ -56,6 +56,7 @@ use Try::Tiny;
use Pod::Usage;
use Getopt::Long;
use List::MoreUtils qw( natatime );
use Time::HiRes;
use C4::Context;
use Koha::Logger;
@ -65,6 +66,10 @@ use Koha::SearchEngine::Indexer;
my ( $help, $batch_size );
my $not_found_retries = {};
my $max_retries = $ENV{MAX_RETRIES} || 10;
GetOptions(
'h|help' => \$help,
'b|batch_size=s' => \$batch_size
@ -116,23 +121,47 @@ while (1) {
my $body = $frame->body;
decode_json($body); # TODO Should this be from_json? Check utf8 flag.
} catch {
$logger->warn(sprintf "Frame not processed - %s", $_);
Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_);
return;
} finally {
$conn->ack( { frame => $frame } );
};
next unless $args;
# FIXME This means we need to have create the DB entry before
# It could work in a first step, but then we will want to handle job that will be created from the message received
my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
unless ($job) {
$logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
unless ( $args ) {
Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it");
$conn->nack( { frame => $frame, requeue => 'false' } );
next;
}
my $job = Koha::BackgroundJobs->find( $args->{job_id} );
if ( $job && $job->status ne 'new' ) {
Koha::Logger->get( { interface => 'worker' } )
->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status );
# nack without requeue, we do not want to process this frame again
$conn->nack( { frame => $frame, requeue => 'false' } );
next;
}
unless ($job) {
if ( ++$not_found_retries->{$args->{job_id}} >= $max_retries ) {
Koha::Logger->get( { interface => 'worker' } )
->warn( sprintf "Job %s not found, no more retry", $args->{job_id} );
# nack without requeue, we do not want to process this frame again
$conn->nack( { frame => $frame, requeue => 'false' } );
next;
}
Koha::Logger->get( { interface => 'worker' } )
->debug( sprintf "Job %s not found, will retry later", $args->{job_id} );
# nack to force requeue
$conn->nack( { frame => $frame, requeue => 'true' } );
Time::HiRes::sleep(0.5);
next;
}
$conn->ack( { frame => $frame } );
push @jobs, $job;
if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
commit(@jobs);