3 # This inserts records from a Koha database into elastic search
5 # Copyright 2014 Catalyst IT
7 # This file is part of Koha.
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
44 Inserts records from a Koha database into Elasticsearch.
50 =item B<-c|--commit>=C<count>
52 Specify how many records will be batched up before they're added to Elasticsearch.
53 Higher should be faster, but will cause more RAM usage. Default is 5000.
57 Delete the index and recreate it before indexing.
61 Reload mappings from files (specified in koha-conf.xml) before indexing.
64 =item B<-a|--authorities>
66 Index the authorities only. Combining this with B<-b> is the same as
67 specifying neither and so both get indexed.
71 Index the biblios only. Combining this with B<-a> is the same as
72 specifying neither and so both get indexed.
76 Index the records in descending id order. Intended to index newer record before older records.
77 Default is to index in ascending order.
78 Does not work with --bnumber or --authid
80 =item B<-bn|--bnumber>
82 Only index the supplied biblionumber, mostly for testing purposes. May be
87 Only index the supplied authority id, mostly for testing purposes. May be
90 =item B<-p|--processes>
92 Number of processes to use for indexing. This can be used to do more indexing
93 work in parallel on multicore systems. By default, a single process is used.
97 By default, this program only emits warnings and errors. This makes it talk
98 more. Add more to make it even more wordy, in particular when debugging.
110 =head1 IMPLEMENTATION
115 use Getopt::Long qw( GetOptions );
118 use Koha::MetadataRecord::Authority;
119 use Koha::BiblioUtils;
120 use Koha::SearchEngine::Elasticsearch;
121 use Koha::SearchEngine::Elasticsearch::Indexer;
124 use Pod::Usage qw( pod2usage );
125 use Try::Tiny qw( catch try );
129 my ($delete, $reset, $help, $man, $processes);
130 my ($index_biblios, $index_authorities);
131 my (@biblionumbers,@authids);
134 $|=1; # flushes output
137 'c|commit=i' => \$commit,
138 'd|delete' => \$delete,
139 'r|reset' => \$reset,
140 'a|authorities' => \$index_authorities,
141 'b|biblios' => \$index_biblios,
143 'bn|bnumber=i' => \@biblionumbers,
144 'ai|authid=i' => \@authids,
145 'p|processes=i' => \$processes,
146 'v|verbose+' => \$verbose,
151 # Default is to do both
152 unless ($index_authorities || $index_biblios) {
153 $index_authorities = $index_biblios = 1;
156 if ($processes && ( @biblionumbers || @authids) ) {
157 die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
160 pod2usage(1) if $help;
161 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
166 Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
170 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
171 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
174 my $slice_count = ( $processes //= 1 );
175 my %iterator_options;
177 if ($slice_count > 1) {
178 # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
180 for (my $proc = 1; $proc < $slice_count; $proc++) {
182 die "Failed to fork a child process\n" unless defined $pid;
184 # Child process, give it a slice to process
185 $slice_index = $proc;
189 # Fudge the commit count a bit to spread out the Elasticsearch commits
190 $commit *= 1 + 0.10 * $slice_index;
191 $commit = int( $commit );
192 _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
193 $iterator_options{slice} = { index => $slice_index, count => $slice_count };
197 $iterator_options{desc} = 1;
201 if ($index_biblios) {
202 _log(1, "Indexing biblios\n");
203 if (@biblionumbers) {
205 my $r = shift @biblionumbers;
206 return () unless defined $r;
207 return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
210 my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
215 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
217 if ($index_authorities) {
218 _log(1, "Indexing authorities\n");
221 my $r = shift @authids;
222 return () unless defined $r;
223 my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
227 my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
232 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
235 if ($slice_index == 0) {
236 # Main process, wait for children
237 for (my $proc = 1; $proc < $processes; $proc++) {
242 =head1 INTERNAL METHODS
244 =head2 _verify_index_state
246 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
248 Checks the index state and recreates it if requested.
252 sub _verify_index_state {
253 my ( $index_name, $recreate ) = @_;
255 _log(1, "Checking state of $index_name index\n");
256 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
259 _log(1, "Dropping and recreating $index_name index\n");
260 $indexer->drop_index() if $indexer->index_exists();
261 $indexer->create_index();
263 elsif (!$indexer->index_exists) {
264 # Create index if does not exist
265 $indexer->create_index();
266 } elsif ($indexer->is_index_status_ok) {
267 # Update mapping unless index is some kind of problematic state
268 $indexer->update_mappings();
269 } elsif ($indexer->is_index_status_recreate_required) {
270 warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
276 _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
278 Does the actual reindexing. $callback is a function that always returns the next record.
279 For each index we iterate through the records, committing at specified count
284 my ( $next, $index_name ) = @_;
286 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
289 my $commit_count = $commit;
290 my ( @id_buffer, @commit_buffer );
291 while ( my $record = $next->() ) {
292 my $id = $record->id // $record->authid;
293 my $record = $record->record;
295 if ( $verbose == 1 ) {
296 _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
301 push @id_buffer, $id;
302 push @commit_buffer, $record;
303 if ( !( --$commit_count ) ) {
304 _log( 1, "Committing $commit records...\n" );
307 $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
308 _handle_response($response);
309 _log( 1, "Commit complete\n" );
311 _log(1,"Elasticsearch exception thrown: ".$_->type."\n");
312 _log(2,"Details: ".$_->details."\n");
314 $commit_count = $commit;
320 # There are probably uncommitted records
321 _log( 1, "Committing final records...\n" );
322 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
323 _handle_response($response);
324 _log( 1, "Total $count records indexed\n" );
331 Checks some basic stuff to ensure that it's sane before we start.
336 # Do we have an elasticsearch block defined?
337 my $conf = C4::Context->config('elasticsearch');
338 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
341 =head2 _handle_response
343 Parse the return from update_index and display errors depending on verbosity of the script
347 sub _handle_response {
349 if( $response->{errors} eq 'true' ){
350 _log( 1, "There were errors during indexing\n" );
352 foreach my $item (@{$response->{items}}){
353 next unless defined $item->{index}->{error};
354 print "Record #" . $item->{index}->{_id} . " " .
355 $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
356 $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
364 _log($level, "Message\n");
366 Output progress information.
368 Will output the message if verbosity level is set to $level or more. Will not
369 include a trailing newline automatically.
374 my ($level, $msg) = @_;
376 print "[$$] $msg" if ($verbose >= $level);