Bug 22828: Elasticsearch - display errors encountered during indexing on the command...
[koha.git] / misc / search_tools / rebuild_elasticsearch.pl
1 #!/usr/bin/perl
2
3 # This inserts records from a Koha database into elastic search
4
5 # Copyright 2014 Catalyst IT
6 #
7 # This file is part of Koha.
8 #
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
21
22 =head1 NAME
23
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
25
26 =head1 SYNOPSIS
27
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
30 [B<-v|--verbose>]
31 [B<-h|--help>]
32 [B<--man>]
33
34 =head1 DESCRIPTION
35
36 Inserts records from a Koha database into Elasticsearch.
37
38 =head1 OPTIONS
39
40 =over
41
42 =item B<-c|--commit>=C<count>
43
44 Specify how many records will be batched up before they're added to Elasticsearch.
45 Higher should be faster, but will cause more RAM usage. Default is 5000.
46
47 =item B<-d|--delete>
48
49 Delete the index and recreate it before indexing.
50
51 =item B<-a|--authorities>
52
53 Index the authorities only. Combining this with B<-b> is the same as
54 specifying neither and so both get indexed.
55
56 =item B<-b|--biblios>
57
58 Index the biblios only. Combining this with B<-a> is the same as
59 specifying neither and so both get indexed.
60
61 =item B<-bn|--bnumber>
62
63 Only index the supplied biblionumber, mostly for testing purposes. May be
64 repeated.
65
66 =item B<-ai|--authid>
67
68 Only index the supplied authority id, mostly for testing purposes. May be
69 repeated.
70
71 =item B<-p|--processes>
72
73 Number of processes to use for indexing. This can be used to do more indexing
74 work in parallel on multicore systems. By default, a single process is used.
75
76 =item B<-v|--verbose>
77
78 By default, this program only emits warnings and errors. This makes it talk
79 more. Add more to make it even more wordy, in particular when debugging.
80
81 =item B<-h|--help>
82
83 Help!
84
85 =item B<--man>
86
87 Full documentation.
88
89 =back
90
91 =head1 IMPLEMENTATION
92
93 =cut
94
95 use autodie;
96 use Getopt::Long;
97 use Koha::Script;
98 use C4::Context;
99 use Koha::MetadataRecord::Authority;
100 use Koha::BiblioUtils;
101 use Koha::SearchEngine::Elasticsearch::Indexer;
102 use MARC::Field;
103 use MARC::Record;
104 use Modern::Perl;
105 use Pod::Usage;
106
107 my $verbose = 0;
108 my $commit = 5000;
109 my ($delete, $help, $man, $processes);
110 my ($index_biblios, $index_authorities);
111 my (@biblionumbers,@authids);
112
113 $|=1; # flushes output
114
115 GetOptions(
116     'c|commit=i'    => \$commit,
117     'd|delete'      => \$delete,
118     'a|authorities' => \$index_authorities,
119     'b|biblios'     => \$index_biblios,
120     'bn|bnumber=i' => \@biblionumbers,
121     'ai|authid=i'  => \@authids,
122     'p|processes=i' => \$processes,
123     'v|verbose+'    => \$verbose,
124     'h|help'        => \$help,
125     'man'           => \$man,
126 );
127
128 # Default is to do both
129 unless ($index_authorities || $index_biblios) {
130     $index_authorities = $index_biblios = 1;
131 }
132
133 if ($processes && ( @biblionumbers || @authids) ) {
134     die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
135 }
136
137 pod2usage(1) if $help;
138 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
139
140 _sanity_check();
141
142 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
143 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
144
145 my $slice_index = 0;
146 my $slice_count = ( $processes //= 1 );
147 my %iterator_options;
148
149 if ($slice_count > 1) {
150     # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
151     $slice_index = 0;
152     for (my $proc = 1; $proc < $slice_count; $proc++) {
153         my $pid = fork();
154         die "Failed to fork a child process\n" unless defined $pid;
155         if ($pid == 0) {
156             # Child process, give it a slice to process
157             $slice_index = $proc;
158             last;
159         }
160     }
161     # Fudge the commit count a bit to spread out the Elasticsearch commits
162     $commit *= 1 + 0.10 * $slice_index;
163     _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
164     $iterator_options{slice} = { index => $slice_index, count => $slice_count };
165 }
166
167 my $next;
168 if ($index_biblios) {
169     _log(1, "Indexing biblios\n");
170     if (@biblionumbers) {
171         $next = sub {
172             my $r = shift @biblionumbers;
173             return () unless defined $r;
174             return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
175         };
176     } else {
177         my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
178         $next = sub {
179             $records->next();
180         }
181     }
182     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
183 }
184 if ($index_authorities) {
185     _log(1, "Indexing authorities\n");
186     if (@authids) {
187         $next = sub {
188             my $r = shift @authids;
189             return () unless defined $r;
190             my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
191             return ($r, $a);
192         };
193     } else {
194         my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
195         $next = sub {
196             $records->next();
197         }
198     }
199     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
200 }
201
202 if ($slice_index == 0) {
203     # Main process, wait for children
204     for (my $proc = 1; $proc < $processes; $proc++) {
205         wait();
206     }
207 }
208
209 =head1 INTERNAL METHODS
210
211 =head2 _verify_index_state
212
213     _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
214
215 Checks the index state and recreates it if requested.
216
217 =cut
218
219 sub _verify_index_state {
220     my ( $index_name, $recreate ) = @_;
221
222     _log(1, "Checking state of $index_name index\n");
223     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
224
225     if ($recreate) {
226         _log(1, "Dropping and recreating $index_name index\n");
227         $indexer->drop_index() if $indexer->index_exists();
228         $indexer->create_index();
229     }
230     elsif (!$indexer->index_exists) {
231         # Create index if does not exist
232         $indexer->create_index();
233     } elsif ($indexer->is_index_status_ok) {
234         # Update mapping unless index is some kind of problematic state
235         $indexer->update_mappings();
236     } elsif ($indexer->is_index_status_recreate_required) {
237         warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
238     }
239 }
240
241 =head2 _do_reindex
242
243     _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
244
245 Does the actual reindexing. $callback is a function that always returns the next record.
246 For each index we iterate through the records, committing at specified count
247
248 =cut
249
250 sub _do_reindex {
251     my ( $next, $index_name ) = @_;
252
253     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
254
255     my $count        = 0;
256     my $commit_count = $commit;
257     my ( @id_buffer, @commit_buffer );
258     while ( my $record = $next->() ) {
259         my $id     = $record->id // $record->authid;
260         my $record = $record->record;
261         $count++;
262         if ( $verbose == 1 ) {
263             _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
264         } else {
265             _log( 2, "$id\n" );
266         }
267
268         push @id_buffer,     $id;
269         push @commit_buffer, $record;
270         if ( !( --$commit_count ) ) {
271             _log( 1, "Committing $commit records...\n" );
272             my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
273             _handle_response($response);
274             $commit_count  = $commit;
275             @id_buffer     = ();
276             @commit_buffer = ();
277             _log( 1, "Commit complete\n" );
278         }
279     }
280
281     # There are probably uncommitted records
282     _log( 1, "Committing final records...\n" );
283     my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
284     _handle_response($response);
285     _log( 1, "Total $count records indexed\n" );
286 }
287
288 =head2 _sanity_check
289
290     _sanity_check();
291
292 Checks some basic stuff to ensure that it's sane before we start.
293
294 =cut
295
296 sub _sanity_check {
297     # Do we have an elasticsearch block defined?
298     my $conf = C4::Context->config('elasticsearch');
299     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
300 }
301
302 =head2 _handle_response
303
304 Parse the return from update_index and display errors depending on verbosity of the script
305
306 =cut
307
308 sub _handle_response {
309     my ($response) = @_;
310     if( $response->{errors} eq 'true' ){
311         _log( 1, "There were errors during indexing\n" );
312         if ( $verbose > 1 ){
313             foreach my $item (@{$response->{items}}){
314                 next unless defined $item->{index}->{error};
315                 print "Record #" . $item->{index}->{_id} . " " .
316                       $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
317                       $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
318             }
319         }
320     }
321 }
322
323 =head2 _log
324
325     _log($level, "Message\n");
326
327 Output progress information.
328
329 Will output the message if verbosity level is set to $level or more. Will not
330 include a trailing newline automatically.
331
332 =cut
333
334 sub _log {
335     my ($level, $msg) = @_;
336
337     print "[$$] $msg" if ($verbose >= $level);
338 }