From 83fb050b2f18e0f279df505c3ef5eb7942ecc992 Mon Sep 17 00:00:00 2001 From: Tomas Cohen Arazi Date: Tue, 16 Jan 2024 14:46:20 -0300 Subject: [PATCH] Bug 35819: Notify NACK and requeue when Job ID not found This patch makes the worker reject the incoming frame for putting the message back in the queue, in the event the job id doesn't exist yet. Which is the case when some actions are being triggered inside a transaction which hasn't been commited to the DB yet. To test you will need 3 KTD shells (a) mysql: $ ktd --shell k$ sudo koha-mysql kohadev (b) logs: $ ktd --shell # for restarting the worker and looking at the logs k$ sudo koha-worker --restart kohadev ; tail -f /var/log/koha/kohadev/worker-*.log (c) running the test: $ ktd --shell 1. Have (a), (b) and (c) terminals ready 2. On (c), run: $ perl -MKoha::Database -MKoha::BackgroundJob::BatchUpdateBiblioHoldsQueue -e 'Koha::Database->schema->txn_do( sub { Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue->new->enqueue({ biblio_ids => [ 1 ] }); sleep 1; } );' => FAIL: * (b) shows (once) an error about a job not existing 3. On (a) run: > SELECT * FROM background_jobs; => FAIL: Notice the job ID mentioned on 2 stands as 'new'. 4. Apply this patch 5. Ctrl+c on (b), and re-run to launch the worker with the patch applied 6. Repeat 2 => SUCCESS (partial): The error about the job not existing is displayed many times 7. Repeat 3 => SUCCESS: The job ID mentioned on 6 stands as 'finished'. 8. Sign off :-D Discussion: * The `requeue` header I added is correct, but it is the default behavior anyway. I prefered to make it explicit, though. * To avoid that bunch of retries, we should requeue with some delay. I didn't manage to make it work (yet) but there's a 'delay' plugin for rabbit [1]. We already install the 'stomp' plugin in koha-common.postinst. But this plugin requires downloading it. Which would require further investigation. * As Nick and Marcel pointed, we need to revisit the whole architecture, the need of a MQ (DB polling wouldn't have this problem), etc. But that's for another place. [1] https://hevodata.com/learn/rabbitmq-delayed-message/#:~:text=To%20delay%20a%20message%2C%20the,to%20queues%20or%20other%20exchanges. Signed-off-by: Marcel de Rooy Signed-off-by: Tomas Cohen Arazi Signed-off-by: Marcel de Rooy Signed-off-by: Katrin Fischer (cherry picked from commit 1a51c2e973a1b6ce1116bbc243853112be96f0ea) Signed-off-by: Fridolin Somers --- misc/workers/background_jobs_worker.pl | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/misc/workers/background_jobs_worker.pl b/misc/workers/background_jobs_worker.pl index d2096fa8cf..d96205f1ff 100755 --- a/misc/workers/background_jobs_worker.pl +++ b/misc/workers/background_jobs_worker.pl @@ -122,19 +122,22 @@ while (1) { } catch { Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_); return; - } finally { - $conn->ack( { frame => $frame } ); }; - next unless $args; + my $job; - # FIXME This means we need to have create the DB entry before - # It could work in a first step, but then we will want to handle job that will be created from the message received - my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next; + if ($args) { + $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next; + unless ($job) { + Koha::Logger->get( { interface => 'worker' } ) + ->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} ); - unless( $job ) { - Koha::Logger->get( { interface => 'worker' } ) - ->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} ); + # nack to force requeue + $conn->nack( { frame => $frame, requeue => 1 } ); + next; + } + $conn->ack( { frame => $frame } ); + } else { next; } -- 2.39.5