Bug 32305: Counterpart for es_indexer_daemon.pl
[koha.git] / misc / workers / es_indexer_daemon.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 =head1 NAME
19
20 es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs
21
22 =head1 SYNOPSIS
23
24 ./es_indexer_daemon.pl --batch_size=X
25
26 Options:
27
28    -b --batch_size          how many jobs to commit (default: 10)
29    --help                   brief help message
30
31 =head1 OPTIONS
32
33 =over 8
34
35 =item B<--help>
36
37 Print a brief help message and exits.
38
39 =item B<--batch_size>
40
41 How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming.
42
43 =back
44
45 =head1 DESCRIPTION
46
47 This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second.
48 If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue
49 and process them in batches every second.
50
51 =cut
52
53 use Modern::Perl;
54 use JSON qw( decode_json );
55 use Try::Tiny;
56 use Pod::Usage;
57 use Getopt::Long;
58
59 use C4::Context;
60 use Koha::Logger;
61 use Koha::BackgroundJobs;
62 use Koha::SearchEngine;
63 use Koha::SearchEngine::Indexer;
64
65
66 my ( $help, $batch_size );
67 GetOptions(
68     'h|help' => \$help,
69     'b|batch_size=s' => \$batch_size
70 ) || pod2usage(1);
71
72 pod2usage(0) if $help;
73
74 $batch_size //= 10;
75
76 warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch';
77
78 my $logger = Koha::Logger->get({ interface =>  'worker' });
79
80 my $conn;
81 try {
82     $conn = Koha::BackgroundJob->connect;
83 } catch {
84     warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
85 };
86
87 if ( $conn ) {
88     # FIXME cf note in Koha::BackgroundJob about $namespace
89     my $namespace = C4::Context->config('memcached_namespace');
90     $conn->subscribe(
91         {
92             destination      => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ),
93             ack              => 'client',
94             'prefetch-count' => 1,
95         }
96     );
97 }
98 my $biblio_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
99 my $auth_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::AUTHORITIES_INDEX });
100 my @jobs = ();
101
102 while (1) {
103
104     if ( $conn ) {
105         my $frame = $conn->receive_frame;
106         if ( !defined $frame ) {
107             # maybe log connection problems
108             next;    # will reconnect automatically
109         }
110
111         my $args = try {
112             my $body = $frame->body;
113             decode_json($body); # TODO Should this be from_json? Check utf8 flag.
114         } catch {
115             $logger->warn(sprintf "Frame not processed - %s", $_);
116             return;
117         } finally {
118             $conn->ack( { frame => $frame } );
119         };
120
121         next unless $args;
122
123         # FIXME This means we need to have create the DB entry before
124         # It could work in a first step, but then we will want to handle job that will be created from the message received
125         my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
126
127         unless ($job) {
128             $logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
129             next;
130         }
131
132         push @jobs, $job;
133         if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
134             commit(@jobs);
135             @jobs = ();
136         }
137
138     } else {
139         @jobs = Koha::BackgroundJobs->search(
140             { status => 'new', queue => 'elastic_index' } )->as_list;
141         commit(@jobs);
142         @jobs = ();
143         sleep 10;
144     }
145
146 }
147 $conn->disconnect;
148
149 sub commit {
150     my (@jobs) = @_;
151
152     my @bib_records;
153     my @auth_records;
154
155     my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] });
156     # Start
157     $jobs->update({
158         progress => 0,
159         status => 'started',
160         started_on => \'NOW()',
161     });
162
163     for my $job (@jobs) {
164         my $args = try {
165             $job->json->decode( $job->data );
166         } catch {
167             $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id );
168             $job->status('failed')->store;
169             return;
170         };
171         next unless $args;
172         if ( $args->{record_server} eq 'biblioserver' ) {
173             push @bib_records, @{ $args->{record_ids} };
174         } else {
175             push @auth_records, @{ $args->{record_ids} };
176         }
177     }
178
179     if (@auth_records) {
180         try {
181             $auth_indexer->update_index( \@auth_records );
182         } catch {
183             $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
184         };
185     }
186     if (@bib_records) {
187         try {
188             $biblio_indexer->update_index( \@bib_records );
189         } catch {
190             $logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
191         };
192     }
193
194     # Finish
195     $jobs->update({
196         progress => 1,
197         status => 'finished',
198         ended_on => \'NOW()',
199     });
200 }