3 # This file is part of Koha.
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs
24 ./es_indexer_daemon.pl --batch_size=X
28 -b --batch_size how many jobs to commit (default: 10)
29 --help brief help message
37 Print a brief help message and exits.
41 How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming.
47 This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second.
48 If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue
49 and process them in batches every second.
54 use JSON qw( decode_json );
58 use List::MoreUtils qw( natatime );
63 use Koha::BackgroundJobs;
64 use Koha::SearchEngine;
65 use Koha::SearchEngine::Indexer;
68 my ( $help, $batch_size );
70 my $not_found_retries = {};
71 my $max_retries = $ENV{MAX_RETRIES} || 10;
75 'b|batch_size=s' => \$batch_size
78 pod2usage(0) if $help;
82 warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch';
84 my $logger = Koha::Logger->get({ interface => 'worker' });
88 $conn = Koha::BackgroundJob->connect;
90 warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
94 # FIXME cf note in Koha::BackgroundJob about $namespace
95 my $namespace = C4::Context->config('memcached_namespace');
98 destination => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ),
100 'prefetch-count' => 1,
104 my $biblio_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } );
105 my $auth_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::AUTHORITIES_INDEX } );
106 my $config = $biblio_indexer->get_elasticsearch_params;
107 my $at_a_time = $config->{chunk_size} // 5000;
114 my $frame = $conn->receive_frame;
115 if ( !defined $frame ) {
116 # maybe log connection problems
117 next; # will reconnect automatically
121 my $body = $frame->body;
122 decode_json($body); # TODO Should this be from_json? Check utf8 flag.
124 Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_);
129 Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it");
130 $conn->nack( { frame => $frame, requeue => 'false' } );
134 my $job = Koha::BackgroundJobs->find( $args->{job_id} );
136 if ( $job && $job->status ne 'new' ) {
137 Koha::Logger->get( { interface => 'worker' } )
138 ->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status );
140 # nack without requeue, we do not want to process this frame again
141 $conn->nack( { frame => $frame, requeue => 'false' } );
146 if ( ++$not_found_retries->{$args->{job_id}} >= $max_retries ) {
147 Koha::Logger->get( { interface => 'worker' } )
148 ->warn( sprintf "Job %s not found, no more retry", $args->{job_id} );
150 # nack without requeue, we do not want to process this frame again
151 $conn->nack( { frame => $frame, requeue => 'false' } );
155 Koha::Logger->get( { interface => 'worker' } )
156 ->debug( sprintf "Job %s not found, will retry later", $args->{job_id} );
158 # nack to force requeue
159 $conn->nack( { frame => $frame, requeue => 'true' } );
160 Time::HiRes::sleep(0.5);
163 $conn->ack( { frame => $frame } );
166 if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
172 @jobs = Koha::BackgroundJobs->search(
173 { status => 'new', queue => 'elastic_index' } )->as_list;
188 my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] });
193 started_on => \'NOW()',
196 for my $job (@jobs) {
198 $job->json->decode( $job->data );
200 $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id );
201 $job->status('failed')->store;
205 if ( $args->{record_server} eq 'biblioserver' ) {
206 push @bib_records, @{ $args->{record_ids} };
208 push @auth_records, @{ $args->{record_ids} };
213 my $auth_chunks = natatime $at_a_time, @auth_records;
214 while ( ( my @auth_chunk = $auth_chunks->() ) ) {
216 $auth_indexer->update_index( \@auth_chunk );
218 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
223 my $biblio_chunks = natatime $at_a_time, @bib_records;
224 while ( ( my @bib_chunk = $biblio_chunks->() ) ) {
226 $biblio_indexer->update_index( \@bib_chunk );
228 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
236 status => 'finished',
237 ended_on => \'NOW()',