3 # This inserts records from a Koha database into elastic search
5 # Copyright 2014 Catalyst IT
7 # This file is part of Koha.
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
45 Inserts records from a Koha database into Elasticsearch.
51 =item B<-c|--commit>=C<count>
53 Specify how many records will be batched up before they're added to Elasticsearch.
54 Higher should be faster, but will cause more RAM usage. Default is 5000.
58 Delete the index and recreate it before indexing.
62 Reload mappings from files (specified in koha-conf.xml) before indexing.
65 =item B<-a|--authorities>
67 Index the authorities only. Combining this with B<-b> is the same as
68 specifying neither and so both get indexed.
72 Index the biblios only. Combining this with B<-a> is the same as
73 specifying neither and so both get indexed.
77 Index the records in descending id order. Intended to index newer record before older records.
78 Default is to index in ascending order.
79 Does not work with --bnumber or --authid
81 =item B<-bn|--bnumber>
83 Only index the supplied biblionumber, mostly for testing purposes. May be
88 Only index the supplied authority id, mostly for testing purposes. May be
93 Pass some additional SQL to limit the records to be indexed.
95 =item B<-p|--processes>
97 Number of processes to use for indexing. This can be used to do more indexing
98 work in parallel on multicore systems. By default, a single process is used.
100 =item B<-v|--verbose>
102 By default, this program only emits warnings and errors. This makes it talk
103 more. Add more to make it even more wordy, in particular when debugging.
115 =head1 IMPLEMENTATION
120 use Getopt::Long qw( GetOptions );
123 use Koha::MetadataRecord::Authority;
124 use Koha::BiblioUtils;
125 use Koha::SearchEngine::Elasticsearch;
126 use Koha::SearchEngine::Elasticsearch::Indexer;
129 use Pod::Usage qw( pod2usage );
130 use Try::Tiny qw( catch try );
134 my ($delete, $reset, $help, $man, $processes);
135 my ($index_biblios, $index_authorities);
136 my (@biblionumbers,@authids,$where);
139 $|=1; # flushes output
142 'c|commit=i' => \$commit,
143 'd|delete' => \$delete,
144 'r|reset' => \$reset,
145 'a|authorities' => \$index_authorities,
146 'b|biblios' => \$index_biblios,
148 'bn|bnumber=i' => \@biblionumbers,
149 'ai|authid=i' => \@authids,
150 'w|where=s' => \$where,
151 'p|processes=i' => \$processes,
152 'v|verbose+' => \$verbose,
157 # Default is to do both
158 unless ($index_authorities || $index_biblios) {
159 $index_authorities = $index_biblios = 1;
162 if ($processes && ( @biblionumbers || @authids) ) {
163 die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
166 pod2usage(1) if $help;
167 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
172 Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
176 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
177 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
180 my $slice_count = ( $processes //= 1 );
181 my %iterator_options;
183 if ($slice_count > 1) {
184 # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
186 for (my $proc = 1; $proc < $slice_count; $proc++) {
188 die "Failed to fork a child process\n" unless defined $pid;
190 # Child process, give it a slice to process
191 $slice_index = $proc;
195 # Fudge the commit count a bit to spread out the Elasticsearch commits
196 $commit *= 1 + 0.10 * $slice_index;
197 $commit = int( $commit );
198 _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
199 $iterator_options{slice} = { index => $slice_index, count => $slice_count };
203 $iterator_options{desc} = 1;
207 $iterator_options{where} = $where;
211 if ($index_biblios) {
212 _log(1, "Indexing biblios\n");
213 if (@biblionumbers) {
215 my $r = shift @biblionumbers;
216 return () unless defined $r;
217 return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
220 my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
225 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
227 if ($index_authorities) {
228 _log(1, "Indexing authorities\n");
231 my $r = shift @authids;
232 return () unless defined $r;
233 my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
237 my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
242 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
245 if ($slice_index == 0) {
246 # Main process, wait for children
247 for (my $proc = 1; $proc < $processes; $proc++) {
252 =head1 INTERNAL METHODS
254 =head2 _verify_index_state
256 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
258 Checks the index state and recreates it if requested.
262 sub _verify_index_state {
263 my ( $index_name, $recreate ) = @_;
265 _log(1, "Checking state of $index_name index\n");
266 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
269 _log(1, "Dropping and recreating $index_name index\n");
270 $indexer->drop_index() if $indexer->index_exists();
271 $indexer->create_index();
273 elsif (!$indexer->index_exists) {
274 # Create index if does not exist
275 $indexer->create_index();
276 } elsif ($indexer->is_index_status_ok) {
277 # Update mapping unless index is some kind of problematic state
278 $indexer->update_mappings();
279 } elsif ($indexer->is_index_status_recreate_required) {
280 warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
286 _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
288 Does the actual reindexing. $callback is a function that always returns the next record.
289 For each index we iterate through the records, committing at specified count
294 my ( $next, $index_name ) = @_;
296 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
299 my $commit_count = $commit;
300 my ( @id_buffer, @commit_buffer );
301 while ( my $record = $next->() ) {
302 my $id = $record->id // $record->authid;
303 my $record = $record->record;
305 if ( $verbose == 1 ) {
306 _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
311 push @id_buffer, $id;
312 push @commit_buffer, $record;
313 if ( !( --$commit_count ) ) {
314 _log( 1, "Committing $commit records...\n" );
317 $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
318 _handle_response($response);
319 _log( 1, "Commit complete\n" );
321 _log(1,"Elasticsearch exception thrown: ".$_->type."\n");
322 _log(2,"Details: ".$_->details."\n");
324 $commit_count = $commit;
330 # There are probably uncommitted records
331 _log( 1, "Committing final records...\n" );
332 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
333 _handle_response($response);
334 _log( 1, "Total $count records indexed\n" );
341 Checks some basic stuff to ensure that it's sane before we start.
346 # Do we have an elasticsearch block defined?
347 my $conf = C4::Context->config('elasticsearch');
348 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
351 =head2 _handle_response
353 Parse the return from update_index and display errors depending on verbosity of the script
357 sub _handle_response {
359 if( $response->{errors} eq 'true' ){
360 _log( 1, "There were errors during indexing\n" );
362 foreach my $item (@{$response->{items}}){
363 next unless defined $item->{index}->{error};
364 print "Record #" . $item->{index}->{_id} . " " .
365 $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
366 $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
374 _log($level, "Message\n");
376 Output progress information.
378 Will output the message if verbosity level is set to $level or more. Will not
379 include a trailing newline automatically.
384 my ($level, $msg) = @_;
386 print "[$$] $msg" if ($verbose >= $level);