From f1891bcc450d03ab379ab2bb164f3f2c83d1a8ee Mon Sep 17 00:00:00 2001 From: Jonathan Druart Date: Tue, 30 Jan 2024 09:30:38 +0100 Subject: [PATCH] Bug 35819: Adjust es_indexer_daemon Signed-off-by: Tomas Cohen Arazi Signed-off-by: Marcel de Rooy [EDIT] Add forgotten module Signed-off-by: Katrin Fischer (cherry picked from commit 7c1d4716beba5aa0d2c77ad460b4e6982cfee236) Signed-off-by: Fridolin Somers (cherry picked from commit febc2570de8feea490bd53c1f4434eafb3b5d960) Signed-off-by: Lucas Gass --- misc/workers/es_indexer_daemon.pl | 45 +++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/misc/workers/es_indexer_daemon.pl b/misc/workers/es_indexer_daemon.pl index 44bcdbf9a5..5f34345391 100755 --- a/misc/workers/es_indexer_daemon.pl +++ b/misc/workers/es_indexer_daemon.pl @@ -56,6 +56,7 @@ use Try::Tiny; use Pod::Usage; use Getopt::Long; use List::MoreUtils qw( natatime ); +use Time::HiRes; use C4::Context; use Koha::Logger; @@ -65,6 +66,10 @@ use Koha::SearchEngine::Indexer; my ( $help, $batch_size ); + +my $not_found_retries = {}; +my $max_retries = $ENV{MAX_RETRIES} || 10; + GetOptions( 'h|help' => \$help, 'b|batch_size=s' => \$batch_size @@ -116,22 +121,46 @@ while (1) { my $body = $frame->body; decode_json($body); # TODO Should this be from_json? Check utf8 flag. } catch { - $logger->warn(sprintf "Frame not processed - %s", $_); + Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_); return; - } finally { - $conn->ack( { frame => $frame } ); }; - next unless $args; + unless ( $args ) { + Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it"); + $conn->nack( { frame => $frame, requeue => 'false' } ); + next; + } + + my $job = Koha::BackgroundJobs->find( $args->{job_id} ); - # FIXME This means we need to have create the DB entry before - # It could work in a first step, but then we will want to handle job that will be created from the message received - my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next; + if ( $job && $job->status ne 'new' ) { + Koha::Logger->get( { interface => 'worker' } ) + ->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status ); + + # nack without requeue, we do not want to process this frame again + $conn->nack( { frame => $frame, requeue => 'false' } ); + next; + } unless ($job) { - $logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} ); + if ( ++$not_found_retries->{$args->{job_id}} >= $max_retries ) { + Koha::Logger->get( { interface => 'worker' } ) + ->warn( sprintf "Job %s not found, no more retry", $args->{job_id} ); + + # nack without requeue, we do not want to process this frame again + $conn->nack( { frame => $frame, requeue => 'false' } ); + next; + } + + Koha::Logger->get( { interface => 'worker' } ) + ->debug( sprintf "Job %s not found, will retry later", $args->{job_id} ); + + # nack to force requeue + $conn->nack( { frame => $frame, requeue => 'true' } ); + Time::HiRes::sleep(0.5); next; } + $conn->ack( { frame => $frame } ); push @jobs, $job; if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) { -- 2.39.5