Bug 24352: Correct location and collection labels in OPAC search results
[koha.git] / misc / search_tools / rebuild_elasticsearch.pl
1 #!/usr/bin/perl
2
3 # This inserts records from a Koha database into elastic search
4
5 # Copyright 2014 Catalyst IT
6 #
7 # This file is part of Koha.
8 #
9 # Koha is free software; you can redistribute it and/or modify it under the
10 # terms of the GNU General Public License as published by the Free Software
11 # Foundation; either version 3 of the License, or (at your option) any later
12 # version.
13 #
14 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
15 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
16 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License along
19 # with Koha; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
22 =head1 NAME
23
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
25
26 =head1 SYNOPSIS
27
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
30 [B<-v|--verbose>]
31 [B<-h|--help>]
32 [B<--man>]
33
34 =head1 DESCRIPTION
35
36 Inserts records from a Koha database into Elasticsearch.
37
38 =head1 OPTIONS
39
40 =over
41
42 =item B<-c|--commit>=C<count>
43
44 Specify how many records will be batched up before they're added to Elasticsearch.
45 Higher should be faster, but will cause more RAM usage. Default is 5000.
46
47 =item B<-d|--delete>
48
49 Delete the index and recreate it before indexing.
50
51 =item B<-a|--authorities>
52
53 Index the authorities only. Combining this with B<-b> is the same as
54 specifying neither and so both get indexed.
55
56 =item B<-b|--biblios>
57
58 Index the biblios only. Combining this with B<-a> is the same as
59 specifying neither and so both get indexed.
60
61 =item B<-bn|--bnumber>
62
63 Only index the supplied biblionumber, mostly for testing purposes. May be
64 repeated. This also applies to authorities via authid, so if you're using it,
65 you probably only want to do one or the other at a time.
66
67 =item B<-p|--processes>
68
69 Number of processes to use for indexing. This can be used to do more indexing
70 work in parallel on multicore systems. By default, a single process is used.
71
72 =item B<-v|--verbose>
73
74 By default, this program only emits warnings and errors. This makes it talk
75 more. Add more to make it even more wordy, in particular when debugging.
76
77 =item B<-h|--help>
78
79 Help!
80
81 =item B<--man>
82
83 Full documentation.
84
85 =back
86
87 =head1 IMPLEMENTATION
88
89 =cut
90
91 use autodie;
92 use Getopt::Long;
93 use Koha::Script;
94 use C4::Context;
95 use Koha::MetadataRecord::Authority;
96 use Koha::BiblioUtils;
97 use Koha::SearchEngine::Elasticsearch::Indexer;
98 use MARC::Field;
99 use MARC::Record;
100 use Modern::Perl;
101 use Pod::Usage;
102
103 my $verbose = 0;
104 my $commit = 5000;
105 my ($delete, $help, $man, $processes);
106 my ($index_biblios, $index_authorities);
107 my (@record_numbers);
108
109 $|=1; # flushes output
110
111 GetOptions(
112     'c|commit=i'    => \$commit,
113     'd|delete'      => \$delete,
114     'a|authorities' => \$index_authorities,
115     'b|biblios'     => \$index_biblios,
116     'bn|bnumber=i'  => \@record_numbers,
117     'p|processes=i' => \$processes,
118     'v|verbose+'    => \$verbose,
119     'h|help'        => \$help,
120     'man'           => \$man,
121 );
122
123 # Default is to do both
124 unless ($index_authorities || $index_biblios) {
125     $index_authorities = $index_biblios = 1;
126 }
127
128 if ($processes && @record_numbers) {
129     die "Argument p|processes cannot be combined with bn|bnumber";
130 }
131
132 pod2usage(1) if $help;
133 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
134
135 _sanity_check();
136
137 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
138 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
139
140 my $slice_index = 0;
141 my $slice_count = ( $processes //= 1 );
142 my %iterator_options;
143
144 if ($slice_count > 1) {
145     # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
146     $slice_index = 0;
147     for (my $proc = 1; $proc < $slice_count; $proc++) {
148         my $pid = fork();
149         die "Failed to fork a child process\n" unless defined $pid;
150         if ($pid == 0) {
151             # Child process, give it a slice to process
152             $slice_index = $proc;
153             last;
154         }
155     }
156     # Fudge the commit count a bit to spread out the Elasticsearch commits
157     $commit *= 1 + 0.10 * $slice_index;
158     _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
159     $iterator_options{slice} = { index => $slice_index, count => $slice_count };
160 }
161
162 my $next;
163 if ($index_biblios) {
164     _log(1, "Indexing biblios\n");
165     if (@record_numbers) {
166         $next = sub {
167             my $r = shift @record_numbers;
168             return () unless defined $r;
169             return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
170         };
171     } else {
172         my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
173         $next = sub {
174             $records->next();
175         }
176     }
177     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
178 }
179 if ($index_authorities) {
180     _log(1, "Indexing authorities\n");
181     if (@record_numbers) {
182         $next = sub {
183             my $r = shift @record_numbers;
184             return () unless defined $r;
185             my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
186             return ($r, $a->record);
187         };
188     } else {
189         my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
190         $next = sub {
191             $records->next();
192         }
193     }
194     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
195 }
196
197 if ($slice_index == 0) {
198     # Main process, wait for children
199     for (my $proc = 1; $proc < $processes; $proc++) {
200         wait();
201     }
202 }
203
204 =head1 INTERNAL METHODS
205
206 =head2 _verify_index_state
207
208     _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
209
210 Checks the index state and recreates it if requested.
211
212 =cut
213
214 sub _verify_index_state {
215     my ( $index_name, $recreate ) = @_;
216
217     _log(1, "Checking state of $index_name index\n");
218     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
219
220     if ($recreate) {
221         _log(1, "Dropping and recreating $index_name index\n");
222         $indexer->drop_index() if $indexer->index_exists();
223         $indexer->create_index();
224     }
225     elsif (!$indexer->index_exists) {
226         # Create index if does not exist
227         $indexer->create_index();
228     } elsif ($indexer->is_index_status_ok) {
229         # Update mapping unless index is some kind of problematic state
230         $indexer->update_mappings();
231     } elsif ($indexer->is_index_status_recreate_required) {
232         warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
233     }
234 }
235
236 =head2 _do_reindex
237
238     _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
239
240 Does the actual reindexing. $callback is a function that always returns the next record.
241 For each index we iterate through the records, committing at specified count
242
243 =cut
244
245 sub _do_reindex {
246     my ( $next, $index_name ) = @_;
247
248     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
249
250     my $count        = 0;
251     my $commit_count = $commit;
252     my ( @id_buffer, @commit_buffer );
253     while ( my $record = $next->() ) {
254         my $id     = $record->id;
255         my $record = $record->record;
256         $count++;
257         if ( $verbose == 1 ) {
258             _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
259         } else {
260             _log( 2, "$id\n" );
261         }
262
263         push @id_buffer,     $id;
264         push @commit_buffer, $record;
265         if ( !( --$commit_count ) ) {
266             _log( 1, "Committing $commit records...\n" );
267             my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
268             _handle_response($response);
269             $commit_count  = $commit;
270             @id_buffer     = ();
271             @commit_buffer = ();
272             _log( 1, "Commit complete\n" );
273         }
274     }
275
276     # There are probably uncommitted records
277     _log( 1, "Committing final records...\n" );
278     my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
279     _handle_response($response);
280     _log( 1, "Total $count records indexed\n" );
281 }
282
283 =head2 _sanity_check
284
285     _sanity_check();
286
287 Checks some basic stuff to ensure that it's sane before we start.
288
289 =cut
290
291 sub _sanity_check {
292     # Do we have an elasticsearch block defined?
293     my $conf = C4::Context->config('elasticsearch');
294     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
295 }
296
297 =head2 _handle_response
298
299 Parse the return from update_index and display errors depending on verbosity of the script
300
301 =cut
302
303 sub _handle_response {
304     my ($response) = @_;
305     if( $response->{errors} eq 'true' ){
306         _log( 1, "There were errors during indexing\n" );
307         if ( $verbose > 1 ){
308             foreach my $item (@{$response->{items}}){
309                 next unless defined $item->{index}->{error};
310                 print "Record #" . $item->{index}->{_id} . " " .
311                       $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
312                       $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
313             }
314         }
315     }
316 }
317
318 =head2 _log
319
320     _log($level, "Message\n");
321
322 Output progress information.
323
324 Will output the message if verbosity level is set to $level or more. Will not
325 include a trailing newline automatically.
326
327 =cut
328
329 sub _log {
330     my ($level, $msg) = @_;
331
332     print "[$$] $msg" if ($verbose >= $level);
333 }