From 9acb599d16764a813f5481164cdaeec3b3d01f27 Mon Sep 17 00:00:00 2001 From: Julian Maurice Date: Wed, 30 Mar 2022 14:21:50 +0200 Subject: [PATCH] Bug 27783: Replace --job-type by --queue This patch adds a new column background_jobs.queue, which default to 'default' By default, new jobs are enqueued into this default queue, and the background job worker will subscribe to the default queue unless told otherwise by the --job-queue option The new job UpdateElasticIndex is automatically enqueued in another queue named 'index' So you can have 'default' worker with misc/background_jobs_worker.pl and a dedicated indexing worker with misc/background_jobs_worker.pl --queue index This is to address bug 27344 comment #15 Signed-off-by: Martin Renvoize Signed-off-by: Kyle M Hall Signed-off-by: Martin Renvoize Signed-off-by: Fridolin Somers --- Koha/BackgroundJob.pm | 4 ++- misc/background_jobs_worker.pl | 51 ++++++++++++---------------------- 2 files changed, 21 insertions(+), 34 deletions(-) diff --git a/Koha/BackgroundJob.pm b/Koha/BackgroundJob.pm index b22da6d0a3..9d05f172bf 100644 --- a/Koha/BackgroundJob.pm +++ b/Koha/BackgroundJob.pm @@ -97,6 +97,7 @@ sub enqueue { my $job_type = $self->job_type; my $job_size = $params->{job_size}; my $job_args = $params->{job_args}; + my $job_queue = $params->{job_queue} // 'default'; my $borrowernumber = (C4::Context->userenv) ? C4::Context->userenv->{number} : undef; my $json_args = encode_json $job_args; @@ -105,6 +106,7 @@ sub enqueue { { status => 'new', type => $job_type, + queue => $job_queue, size => $job_size, data => $json_args, enqueued_on => dt_from_string, @@ -129,7 +131,7 @@ sub enqueue { # Also, here we just want the Koha instance's name, but it's not in the config... # Picking a random id (memcached_namespace) from the config my $namespace = C4::Context->config('memcached_namespace'); - $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } ) + $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_queue), body => $json_args } ) or Koha::Exceptions::Exception->throw('Job has not been enqueued'); } catch { $self->status('failed')->store; diff --git a/misc/background_jobs_worker.pl b/misc/background_jobs_worker.pl index b503ebde9f..c9495d0e87 100755 --- a/misc/background_jobs_worker.pl +++ b/misc/background_jobs_worker.pl @@ -21,26 +21,27 @@ background_jobs_worker.pl - Worker script that will process background jobs =head1 SYNOPSIS -./background_jobs_worker.pl [--job-type] +./background_jobs_worker.pl [--job-queue QUEUE] =head1 DESCRIPTION -This script will connect to the Stomp server (RabbitMQ) and subscribe to the different destination queues available. -You can specify some queues only (using --job-type) if you want to run several workers that will handle their own jobs. +This script will connect to the Stomp server (RabbitMQ) and subscribe to the queues passed in parameter (or the 'default' queue), +or if a Stomp server is not active it will poll the database every 10s for new jobs in the passed queue. + +You can specify some queues only (using --job-queue, which is repeatable) if you want to run several workers that will handle their own jobs. =head1 OPTIONS =over -=item B<--job-type> +=item B<--job-queue> -Give the job types this worker will process. +Repeatable. Give the job queues this worker will process. The different values available are: - batch_biblio_record_modification - batch_authority_record_modification - update_elastic_index + default + index =back @@ -54,14 +55,18 @@ use Getopt::Long; use Koha::BackgroundJobs; -my ( $help, @job_types ); +my ( $help, @queues ); GetOptions( 'h|help' => \$help, - 'job-type:s' => \@job_types, + 'job-queue=s' => \@queues, ) || pod2usage(1); pod2usage(0) if $help; +unless (@queues) { + push @queues, 'default'; +} + my $conn; try { $conn = Koha::BackgroundJob->connect; @@ -69,31 +74,11 @@ try { warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_; }; -my @available_job_types = qw( - batch_biblio_record_modification - batch_authority_record_modification - batch_item_record_modification - batch_biblio_record_deletion - batch_authority_record_deletion - batch_item_record_deletion - batch_hold_cancel - update_elastic_index -); - -if ( @job_types ) { - for my $job_type ( @job_types ) { - pod2usage( -verbose => 1, -msg => sprintf "You specify an invalid --job-type value: %s\n", $job_type ) - unless grep { $_ eq $job_type } @available_job_types; - } -} else { - @job_types = @available_job_types; -} - if ( $conn ) { # FIXME cf note in Koha::BackgroundJob about $namespace my $namespace = C4::Context->config('memcached_namespace'); - for my $job_type ( @job_types ) { - $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $job_type), ack => 'client' }); + for my $queue (@queues) { + $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $queue), ack => 'client' }); } } while (1) { @@ -115,7 +100,7 @@ while (1) { $conn->ack( { frame => $frame } ); # FIXME depending on success? } else { - my $jobs = Koha::BackgroundJobs->search({ status => 'new' }); + my $jobs = Koha::BackgroundJobs->search({ status => 'new', queue => \@queues }); while ( my $job = $jobs->next ) { my $args = decode_json($job->data); process_job( $job, { job_id => $job->id, %$args } ); -- 2.39.5