Bug 35819: Notify NACK and requeue when Job ID not found

This patch makes the worker reject the incoming frame for putting the
message back in the queue, in the event the job id doesn't exist yet.
Which is the case when some actions are being triggered inside a
transaction which hasn't been commited to the DB yet.

To test you will need 3 KTD shells
(a) mysql:
   $ ktd --shell
  k$ sudo koha-mysql kohadev

(b) logs:
   $ ktd --shell
   # for restarting the worker and looking at the logs
  k$ sudo koha-worker --restart kohadev  ; tail -f /var/log/koha/kohadev/worker-*.log
(c) running the test:
   $ ktd --shell

1. Have (a), (b) and (c) terminals ready
2. On (c), run:
   $ perl -MKoha::Database -MKoha::BackgroundJob::BatchUpdateBiblioHoldsQueue -e 'Koha::Database->schema->txn_do( sub { Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue->new->enqueue({ biblio_ids => [ 1 ] }); sleep 1;  } );'
=> FAIL:
   * (b) shows (once) an error about a job not existing
3. On (a) run:
   > SELECT * FROM background_jobs;
=> FAIL: Notice the job ID mentioned on 2 stands as 'new'.
4. Apply this patch
5. Ctrl+c on (b), and re-run to launch the worker with the patch applied
6. Repeat 2
=> SUCCESS (partial): The error about the job not existing is displayed
many times
7. Repeat 3
=> SUCCESS: The job ID mentioned on 6 stands as 'finished'.
8. Sign off :-D

Discussion:

* The `requeue` header I added is correct, but it is the default
  behavior anyway. I prefered to make it explicit, though.

* To avoid that bunch of retries, we should requeue with some delay. I
  didn't manage to make it work (yet) but there's a 'delay' plugin for
  rabbit [1]. We already install the 'stomp' plugin in
  koha-common.postinst. But this plugin requires downloading it. Which
  would require further investigation.

* As Nick and Marcel pointed, we need to revisit the whole architecture,
  the need of a MQ (DB polling wouldn't have this problem), etc. But
  that's for another place.

[1] https://hevodata.com/learn/rabbitmq-delayed-message/#:~:text=To%20delay%20a%20message%2C%20the,to%20queues%20or%20other%20exchanges.

Signed-off-by: Marcel de Rooy <m.de.rooy@rijksmuseum.nl>
Signed-off-by: Tomas Cohen Arazi <tomascohen@theke.io>

Signed-off-by: Marcel de Rooy <m.de.rooy@rijksmuseum.nl>
Signed-off-by: Katrin Fischer <katrin.fischer@bsz-bw.de>
This commit is contained in:
Tomás Cohen Arazi 2024-01-16 14:46:20 -03:00 committed by Katrin Fischer
parent 90cd0e4ccf
commit 1a51c2e973
Signed by: kfischer
GPG key ID: 0EF6E2C03357A834

View file

@ -122,19 +122,22 @@ while (1) {
} catch {
Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_);
return;
} finally {
$conn->ack( { frame => $frame } );
};
next unless $args;
my $job;
# FIXME This means we need to have create the DB entry before
# It could work in a first step, but then we will want to handle job that will be created from the message received
my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
if ($args) {
$job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
unless ($job) {
Koha::Logger->get( { interface => 'worker' } )
->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
unless( $job ) {
Koha::Logger->get( { interface => 'worker' } )
->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
# nack to force requeue
$conn->nack( { frame => $frame, requeue => 1 } );
next;
}
$conn->ack( { frame => $frame } );
} else {
next;
}