Bug 35819: Adjust es_indexer_daemon
[koha.git] / misc / workers / es_indexer_daemon.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 =head1 NAME
19
20 es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs
21
22 =head1 SYNOPSIS
23
24 ./es_indexer_daemon.pl --batch_size=X
25
26 Options:
27
28    -b --batch_size          how many jobs to commit (default: 10)
29    --help                   brief help message
30
31 =head1 OPTIONS
32
33 =over 8
34
35 =item B<--help>
36
37 Print a brief help message and exits.
38
39 =item B<--batch_size>
40
41 How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming.
42
43 =back
44
45 =head1 DESCRIPTION
46
47 This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second.
48 If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue
49 and process them in batches every second.
50
51 =cut
52
53 use Modern::Perl;
54 use JSON qw( decode_json );
55 use Try::Tiny;
56 use Pod::Usage;
57 use Getopt::Long;
58 use List::MoreUtils qw( natatime );
59 use Time::HiRes;
60
61 use C4::Context;
62 use Koha::Logger;
63 use Koha::BackgroundJobs;
64 use Koha::SearchEngine;
65 use Koha::SearchEngine::Indexer;
66
67
68 my ( $help, $batch_size );
69
70 my $not_found_retries = {};
71 my $max_retries = $ENV{MAX_RETRIES} || 10;
72
73 GetOptions(
74     'h|help' => \$help,
75     'b|batch_size=s' => \$batch_size
76 ) || pod2usage(1);
77
78 pod2usage(0) if $help;
79
80 $batch_size //= 10;
81
82 warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch';
83
84 my $logger = Koha::Logger->get({ interface =>  'worker' });
85
86 my $conn;
87 try {
88     $conn = Koha::BackgroundJob->connect;
89 } catch {
90     warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
91 };
92
93 if ( $conn ) {
94     # FIXME cf note in Koha::BackgroundJob about $namespace
95     my $namespace = C4::Context->config('memcached_namespace');
96     $conn->subscribe(
97         {
98             destination      => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ),
99             ack              => 'client',
100             'prefetch-count' => 1,
101         }
102     );
103 }
104 my $biblio_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } );
105 my $auth_indexer   = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::AUTHORITIES_INDEX } );
106 my $config         = $biblio_indexer->get_elasticsearch_params;
107 my $at_a_time      = $config->{chunk_size} // 5000;
108
109 my @jobs = ();
110
111 while (1) {
112
113     if ( $conn ) {
114         my $frame = $conn->receive_frame;
115         if ( !defined $frame ) {
116             # maybe log connection problems
117             next;    # will reconnect automatically
118         }
119
120         my $args = try {
121             my $body = $frame->body;
122             decode_json($body); # TODO Should this be from_json? Check utf8 flag.
123         } catch {
124             Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_);
125             return;
126         };
127
128         unless ( $args ) {
129             Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it");
130             $conn->nack( { frame => $frame, requeue => 'false' } );
131             next;
132         }
133
134         my $job = Koha::BackgroundJobs->find( $args->{job_id} );
135
136         if ( $job && $job->status ne 'new' ) {
137             Koha::Logger->get( { interface => 'worker' } )
138                 ->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status );
139
140             # nack without requeue, we do not want to process this frame again
141             $conn->nack( { frame => $frame, requeue => 'false' } );
142             next;
143         }
144
145         unless ($job) {
146             if ( ++$not_found_retries->{$args->{job_id}} >= $max_retries ) {
147                 Koha::Logger->get( { interface => 'worker' } )
148                     ->warn( sprintf "Job %s not found, no more retry", $args->{job_id} );
149
150                 # nack without requeue, we do not want to process this frame again
151                 $conn->nack( { frame => $frame, requeue => 'false' } );
152                 next;
153             }
154
155             Koha::Logger->get( { interface => 'worker' } )
156                 ->debug( sprintf "Job %s not found, will retry later", $args->{job_id} );
157
158             # nack to force requeue
159             $conn->nack( { frame => $frame, requeue => 'true' } );
160             Time::HiRes::sleep(0.5);
161             next;
162         }
163         $conn->ack( { frame => $frame } );
164
165         push @jobs, $job;
166         if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
167             commit(@jobs);
168             @jobs = ();
169         }
170
171     } else {
172         @jobs = Koha::BackgroundJobs->search(
173             { status => 'new', queue => 'elastic_index' } )->as_list;
174         commit(@jobs);
175         @jobs = ();
176         sleep 10;
177     }
178
179 }
180 $conn->disconnect;
181
182 sub commit {
183     my (@jobs) = @_;
184
185     my @bib_records;
186     my @auth_records;
187
188     my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] });
189     # Start
190     $jobs->update({
191         progress => 0,
192         status => 'started',
193         started_on => \'NOW()',
194     });
195
196     for my $job (@jobs) {
197         my $args = try {
198             $job->json->decode( $job->data );
199         } catch {
200             $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id );
201             $job->status('failed')->store;
202             return;
203         };
204         next unless $args;
205         if ( $args->{record_server} eq 'biblioserver' ) {
206             push @bib_records, @{ $args->{record_ids} };
207         } else {
208             push @auth_records, @{ $args->{record_ids} };
209         }
210     }
211
212     if (@auth_records) {
213         my $auth_chunks = natatime $at_a_time, @auth_records;
214         while ( ( my @auth_chunk = $auth_chunks->() ) ) {
215             try {
216                 $auth_indexer->update_index( \@auth_chunk );
217             } catch {
218                 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
219             };
220         }
221     }
222     if (@bib_records) {
223         my $biblio_chunks = natatime $at_a_time, @bib_records;
224         while ( ( my @bib_chunk = $biblio_chunks->() ) ) {
225             try {
226                 $biblio_indexer->update_index( \@bib_chunk );
227             } catch {
228                 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
229             };
230         }
231     }
232
233     # Finish
234     $jobs->update({
235         progress => 1,
236         status => 'finished',
237         ended_on => \'NOW()',
238     });
239 }