Bug 35345: Add --where option to rebuild_elasticsearch.pl
[koha.git] / misc / search_tools / rebuild_elasticsearch.pl
1 #!/usr/bin/perl
2
3 # This inserts records from a Koha database into elastic search
4
5 # Copyright 2014 Catalyst IT
6 #
7 # This file is part of Koha.
8 #
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
21
22 =head1 NAME
23
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
25
26 =head1 SYNOPSIS
27
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
30 [B<-d|--delete>]
31 [B<-r|--reset>]
32 [B<-a|--authorities>]
33 [B<-b|--biblios>]
34 [B<--desc>]
35 [B<-bn|--bnumber>]
36 [B<-ai|--authid>]
37 [B<-w|--where SQL>]
38 [B<-p|--processes>]
39 [B<-v|--verbose>]
40 [B<-h|--help>]
41 [B<--man>]
42
43 =head1 DESCRIPTION
44
45 Inserts records from a Koha database into Elasticsearch.
46
47 =head1 OPTIONS
48
49 =over
50
51 =item B<-c|--commit>=C<count>
52
53 Specify how many records will be batched up before they're added to Elasticsearch.
54 Higher should be faster, but will cause more RAM usage. Default is 5000.
55
56 =item B<-d|--delete>
57
58 Delete the index and recreate it before indexing.
59
60 =item B<-r|--reset>
61
62 Reload mappings from files (specified in koha-conf.xml) before indexing.
63 Implies --delete.
64
65 =item B<-a|--authorities>
66
67 Index the authorities only. Combining this with B<-b> is the same as
68 specifying neither and so both get indexed.
69
70 =item B<-b|--biblios>
71
72 Index the biblios only. Combining this with B<-a> is the same as
73 specifying neither and so both get indexed.
74
75 =item B<--desc>
76
77 Index the records in descending id order. Intended to index newer record before older records.
78 Default is to index in ascending order.
79 Does not work with --bnumber or --authid
80
81 =item B<-bn|--bnumber>
82
83 Only index the supplied biblionumber, mostly for testing purposes. May be
84 repeated.
85
86 =item B<-ai|--authid>
87
88 Only index the supplied authority id, mostly for testing purposes. May be
89 repeated.
90
91 =item B<-w|--where>
92
93 Pass some additional SQL to limit the records to be indexed.
94
95 =item B<-p|--processes>
96
97 Number of processes to use for indexing. This can be used to do more indexing
98 work in parallel on multicore systems. By default, a single process is used.
99
100 =item B<-v|--verbose>
101
102 By default, this program only emits warnings and errors. This makes it talk
103 more. Add more to make it even more wordy, in particular when debugging.
104
105 =item B<-h|--help>
106
107 Help!
108
109 =item B<--man>
110
111 Full documentation.
112
113 =back
114
115 =head1 IMPLEMENTATION
116
117 =cut
118
119 use autodie;
120 use Getopt::Long qw( GetOptions );
121 use Koha::Script;
122 use C4::Context;
123 use Koha::MetadataRecord::Authority;
124 use Koha::BiblioUtils;
125 use Koha::SearchEngine::Elasticsearch;
126 use Koha::SearchEngine::Elasticsearch::Indexer;
127 use MARC::Field;
128 use Modern::Perl;
129 use Pod::Usage qw( pod2usage );
130 use Try::Tiny qw( catch try );
131
132 my $verbose = 0;
133 my $commit = 5000;
134 my ($delete, $reset, $help, $man, $processes);
135 my ($index_biblios, $index_authorities);
136 my (@biblionumbers,@authids,$where);
137 my $desc;
138
139 $|=1; # flushes output
140
141 GetOptions(
142     'c|commit=i'    => \$commit,
143     'd|delete'      => \$delete,
144     'r|reset'       => \$reset,
145     'a|authorities' => \$index_authorities,
146     'b|biblios'     => \$index_biblios,
147     'desc'          => \$desc,
148     'bn|bnumber=i'  => \@biblionumbers,
149     'ai|authid=i'   => \@authids,
150     'w|where=s'     => \$where,
151     'p|processes=i' => \$processes,
152     'v|verbose+'    => \$verbose,
153     'h|help'        => \$help,
154     'man'           => \$man,
155 );
156
157 # Default is to do both
158 unless ($index_authorities || $index_biblios) {
159     $index_authorities = $index_biblios = 1;
160 }
161
162 if ($processes && ( @biblionumbers || @authids) ) {
163     die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
164 }
165
166 pod2usage(1) if $help;
167 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
168
169 _sanity_check();
170
171 if ($reset){
172     Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
173     $delete = 1;
174 }
175
176 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
177 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
178
179 my $slice_index = 0;
180 my $slice_count = ( $processes //= 1 );
181 my %iterator_options;
182
183 if ($slice_count > 1) {
184     # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
185     $slice_index = 0;
186     for (my $proc = 1; $proc < $slice_count; $proc++) {
187         my $pid = fork();
188         die "Failed to fork a child process\n" unless defined $pid;
189         if ($pid == 0) {
190             # Child process, give it a slice to process
191             $slice_index = $proc;
192             last;
193         }
194     }
195     # Fudge the commit count a bit to spread out the Elasticsearch commits
196     $commit *= 1 + 0.10 * $slice_index;
197     $commit = int( $commit );
198     _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
199     $iterator_options{slice} = { index => $slice_index, count => $slice_count };
200 }
201
202 if( $desc ){
203     $iterator_options{desc} = 1;
204 }
205
206 if ($where) {
207     $iterator_options{where} = $where;
208 }
209
210 my $next;
211 if ($index_biblios) {
212     _log(1, "Indexing biblios\n");
213     if (@biblionumbers) {
214         $next = sub {
215             my $r = shift @biblionumbers;
216             return () unless defined $r;
217             return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
218         };
219     } else {
220         my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
221         $next = sub {
222             $records->next();
223         }
224     }
225     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
226 }
227 if ($index_authorities) {
228     _log(1, "Indexing authorities\n");
229     if (@authids) {
230         $next = sub {
231             my $r = shift @authids;
232             return () unless defined $r;
233             my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
234             return ($r, $a);
235         };
236     } else {
237         my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
238         $next = sub {
239             $records->next();
240         }
241     }
242     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
243 }
244
245 if ($slice_index == 0) {
246     # Main process, wait for children
247     for (my $proc = 1; $proc < $processes; $proc++) {
248         wait();
249     }
250 }
251
252 =head1 INTERNAL METHODS
253
254 =head2 _verify_index_state
255
256     _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
257
258 Checks the index state and recreates it if requested.
259
260 =cut
261
262 sub _verify_index_state {
263     my ( $index_name, $recreate ) = @_;
264
265     _log(1, "Checking state of $index_name index\n");
266     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
267
268     if ($recreate) {
269         _log(1, "Dropping and recreating $index_name index\n");
270         $indexer->drop_index() if $indexer->index_exists();
271         $indexer->create_index();
272     }
273     elsif (!$indexer->index_exists) {
274         # Create index if does not exist
275         $indexer->create_index();
276     } elsif ($indexer->is_index_status_ok) {
277         # Update mapping unless index is some kind of problematic state
278         $indexer->update_mappings();
279     } elsif ($indexer->is_index_status_recreate_required) {
280         warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
281     }
282 }
283
284 =head2 _do_reindex
285
286     _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
287
288 Does the actual reindexing. $callback is a function that always returns the next record.
289 For each index we iterate through the records, committing at specified count
290
291 =cut
292
293 sub _do_reindex {
294     my ( $next, $index_name ) = @_;
295
296     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
297
298     my $count        = 0;
299     my $commit_count = $commit;
300     my ( @id_buffer, @commit_buffer );
301     while ( my $record = $next->() ) {
302         my $id     = $record->id // $record->authid;
303         my $record = $record->record;
304         $count++;
305         if ( $verbose == 1 ) {
306             _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
307         } else {
308             _log( 2, "$id\n" );
309         }
310
311         push @id_buffer,     $id;
312         push @commit_buffer, $record;
313         if ( !( --$commit_count ) ) {
314             _log( 1, "Committing $commit records...\n" );
315             my $response;
316             try{
317                 $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
318                 _handle_response($response);
319                 _log( 1, "Commit complete\n" );
320             } catch {
321                 _log(1,"Elasticsearch exception thrown: ".$_->type."\n");
322                 _log(2,"Details: ".$_->details."\n");
323             };
324             $commit_count  = $commit;
325             @id_buffer     = ();
326             @commit_buffer = ();
327         }
328     }
329
330     # There are probably uncommitted records
331     _log( 1, "Committing final records...\n" );
332     my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
333     _handle_response($response);
334     _log( 1, "Total $count records indexed\n" );
335 }
336
337 =head2 _sanity_check
338
339     _sanity_check();
340
341 Checks some basic stuff to ensure that it's sane before we start.
342
343 =cut
344
345 sub _sanity_check {
346     # Do we have an elasticsearch block defined?
347     my $conf = C4::Context->config('elasticsearch');
348     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
349 }
350
351 =head2 _handle_response
352
353 Parse the return from update_index and display errors depending on verbosity of the script
354
355 =cut
356
357 sub _handle_response {
358     my ($response) = @_;
359     if( $response->{errors} eq 'true' ){
360         _log( 1, "There were errors during indexing\n" );
361         if ( $verbose > 1 ){
362             foreach my $item (@{$response->{items}}){
363                 next unless defined $item->{index}->{error};
364                 print "Record #" . $item->{index}->{_id} . " " .
365                       $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
366                       $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
367             }
368         }
369     }
370 }
371
372 =head2 _log
373
374     _log($level, "Message\n");
375
376 Output progress information.
377
378 Will output the message if verbosity level is set to $level or more. Will not
379 include a trailing newline automatically.
380
381 =cut
382
383 sub _log {
384     my ($level, $msg) = @_;
385
386     print "[$$] $msg" if ($verbose >= $level);
387 }