Bug 22417: Process the jobs even if the message broker is not reachable
[koha.git] / Koha / BackgroundJob.pm
1 package Koha::BackgroundJob;
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19 use JSON qw( encode_json decode_json );
20 use Carp qw( croak );
21 use Net::Stomp;
22 use Try::Tiny;
23
24 use C4::Context;
25 use Koha::DateUtils qw( dt_from_string );
26 use Koha::Exceptions;
27 use Koha::BackgroundJob::BatchUpdateBiblio;
28 use Koha::BackgroundJob::BatchUpdateAuthority;
29
30 use base qw( Koha::Object );
31
32 =head1 NAME
33
34 Koha::BackgroundJob - Koha BackgroundJob Object class
35
36 This is a base class for BackgroundJob, some methods must be subclassed.
37
38 Example of usage:
39
40 Producer:
41 my $job_id = Koha::BackgroundJob->enqueue(
42     {
43         job_type => $job_type,
44         job_size => $job_size,
45         job_args => $job_args
46     }
47 );
48
49 Consumer:
50 Koha::BackgrounJobs->find($job_id)->process;
51 See also C<misc/background_jobs_worker.pl> for a full example
52
53 =head1 API
54
55 =head2 Class methods
56
57 =head3 connect
58
59 Connect to the message broker using default guest/guest credential
60
61 =cut
62
63 sub connect {
64     my ( $self );
65     my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
66     $stomp->connect( { login => 'guest', passcode => 'guest' } );
67     return $stomp;
68 }
69
70 =head3 enqueue
71
72 Enqueue a new job. It will insert a new row in the DB table and notify the broker that a new job has been enqueued.
73
74 C<job_size> is the size of the job
75 C<job_args> is the arguments of the job. It's a structure that will be JSON encoded.
76
77 Return the job_id of the newly created job.
78
79 =cut
80
81 sub enqueue {
82     my ( $self, $params ) = @_;
83
84     my $job_type = $self->job_type;
85     my $job_size = $params->{job_size};
86     my $job_args = $params->{job_args};
87
88     my $borrowernumber = C4::Context->userenv->{number}; # FIXME Handle non GUI calls
89     my $json_args = encode_json $job_args;
90     my $job_id;
91     $self->_result->result_source->schema->txn_do(
92         sub {
93             $self->set(
94                 {
95                     status         => 'new',
96                     type           => $job_type,
97                     size           => $job_size,
98                     data           => $json_args,
99                     enqueued_on    => dt_from_string,
100                     borrowernumber => $borrowernumber,
101                 }
102             )->store;
103
104             $job_id = $self->id;
105             $job_args->{job_id} = $job_id;
106             $json_args = encode_json $job_args;
107
108             try {
109                 my $conn = $self->connect;
110                 # This namespace is wrong, it must be a vhost instead.
111                 # But to do so it needs to be created on the server => much more work when a new Koha instance is created.
112                 # Also, here we just want the Koha instance's name, but it's not in the config...
113                 # Picking a random id (memcached_namespace) from the config
114                 my $namespace = C4::Context->config('memcached_namespace');
115                 $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
116                   or Koha::Exceptions::Exception->throw('Job has not been enqueued');
117             } catch {
118                 if ( ref($_) eq 'Koha::Exceptions::Exception' ) {
119                     $_->rethrow;
120                 } else {
121                     warn sprintf "The job has not been sent to the message broker: (%s)", $_;
122                 }
123             };
124         }
125     );
126
127     return $job_id;
128 }
129
130 =head3 process
131
132 Process the job!
133
134 =cut
135
136 sub process {
137     my ( $self, $args ) = @_;
138
139     my $job_type = $self->type;
140     return $job_type eq 'batch_biblio_record_modification'
141       ? Koha::BackgroundJob::BatchUpdateBiblio->process($args)
142       : $job_type eq 'batch_authority_record_modification'
143       ? Koha::BackgroundJob::BatchUpdateAuthority->process($args)
144       : Koha::Exceptions::Exception->throw('->process called without valid job_type');
145 }
146
147 =head3 job_type
148
149 Return the job type of the job. Must be a string.
150
151 =cut
152
153 sub job_type { croak "This method must be subclassed" }
154
155 =head3 messages
156
157 Messages let during the processing of the job.
158
159 =cut
160
161 sub messages {
162     my ( $self ) = @_;
163
164     my @messages;
165     my $data_dump = decode_json $self->data;
166     if ( exists $data_dump->{messages} ) {
167         @messages = @{ $data_dump->{messages} };
168     }
169
170     return @messages;
171 }
172
173 =head3 report
174
175 Report of the job.
176
177 =cut
178
179 sub report {
180     my ( $self ) = @_;
181
182     my $data_dump = decode_json $self->data;
183     return $data_dump->{report};
184 }
185
186 =head3 cancel
187
188 Cancel a job.
189
190 =cut
191
192 sub cancel {
193     my ( $self ) = @_;
194     $self->status('cancelled')->store;
195 }
196
197 sub _type {
198     return 'BackgroundJob';
199 }
200
201 1;