3 # This file is part of Koha.
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs
24 ./es_indexer_daemon.pl --batch_size=X
28 -b --batch_size how many jobs to commit (default: 10)
29 --help brief help message
37 Print a brief help message and exits.
41 How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming.
47 This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second.
48 If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue
49 and process them in batches every second.
54 use JSON qw( decode_json );
61 use Koha::BackgroundJobs;
62 use Koha::SearchEngine;
63 use Koha::SearchEngine::Indexer;
66 my ( $help, $batch_size );
69 'b|batch_size=s' => \$batch_size
72 pod2usage(0) if $help;
76 warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch';
78 my $logger = Koha::Logger->get({ interface => 'worker' });
82 $conn = Koha::BackgroundJob->connect;
84 warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
88 # FIXME cf note in Koha::BackgroundJob about $namespace
89 my $namespace = C4::Context->config('memcached_namespace');
92 destination => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ),
94 'prefetch-count' => 1,
98 my $biblio_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
99 my $auth_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::AUTHORITIES_INDEX });
105 my $frame = $conn->receive_frame;
106 if ( !defined $frame ) {
107 # maybe log connection problems
108 next; # will reconnect automatically
112 my $body = $frame->body;
113 decode_json($body); # TODO Should this be from_json? Check utf8 flag.
115 $logger->warn(sprintf "Frame not processed - %s", $_);
118 $conn->ack( { frame => $frame } );
123 # FIXME This means we need to have create the DB entry before
124 # It could work in a first step, but then we will want to handle job that will be created from the message received
125 my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
128 $logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
133 if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
139 @jobs = Koha::BackgroundJobs->search(
140 { status => 'new', queue => 'elastic_index' } )->as_list;
155 my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] });
160 started_on => \'NOW()',
163 for my $job (@jobs) {
165 $job->json->decode( $job->data );
167 $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id );
168 $job->status('failed')->store;
172 if ( $args->{record_server} eq 'biblioserver' ) {
173 push @bib_records, @{ $args->{record_ids} };
175 push @auth_records, @{ $args->{record_ids} };
181 $auth_indexer->update_index( \@auth_records );
183 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
188 $biblio_indexer->update_index( \@bib_records );
190 $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
197 status => 'finished',
198 ended_on => \'NOW()',