3 # This file is part of Koha.
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 background_jobs_worker.pl - Worker script that will process background jobs
24 ./background_jobs_worker.pl [--queue QUEUE]
28 This script will connect to the Stomp server (RabbitMQ) and subscribe to the queues passed in parameter (or the 'default' queue),
29 or if a Stomp server is not active it will poll the database every 10s for new jobs in the passed queue.
31 You can specify some queues only (using --queue, which is repeatable) if you want to run several workers that will handle their own jobs.
39 Repeatable. Give the job queues this worker will process.
41 The different values available are:
51 use JSON qw( decode_json );
57 use Koha::BackgroundJobs;
59 my ( $help, @queues );
62 'queue=s' => \@queues,
65 pod2usage(0) if $help;
68 push @queues, 'default';
73 $conn = Koha::BackgroundJob->connect;
75 warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
79 # FIXME cf note in Koha::BackgroundJob about $namespace
80 my $namespace = C4::Context->config('memcached_namespace');
81 for my $queue (@queues) {
83 destination => sprintf("/queue/%s-%s", $namespace, $queue),
85 'prefetch-count' => 1,
91 my $frame = $conn->receive_frame;
92 if ( !defined $frame ) {
93 # maybe log connection problems
94 next; # will reconnect automatically
98 my $body = $frame->body;
99 decode_json($body); # TODO Should this be from_json? Check utf8 flag.
101 Koha::Logger->get->warn(sprintf "Frame not processed - %s", $_);
104 $conn->ack( { frame => $frame } );
109 # FIXME This means we need to have create the DB entry before
110 # It could work in a first step, but then we will want to handle job that will be created from the message received
111 my $job = Koha::BackgroundJobs->find($args->{job_id});
114 Koha::Logger->get->warn(sprintf "No job found for id=%s", $args->{job_id});
118 process_job( $job, $args );
121 my $jobs = Koha::BackgroundJobs->search({ status => 'new', queue => \@queues });
122 while ( my $job = $jobs->next ) {
124 $job->json->decode($job->data);
126 Koha::Logger->get->warn(sprintf "Cannot decode data for job id=%s", $job->id);
127 $job->status('failed')->store;
133 process_job( $job, { job_id => $job->id, %$args } );
142 my ( $job, $args ) = @_;
150 die "fork failed!" unless defined $pid;
153 $job->process( $args );
155 $job->status('failed')->store;