3 # This inserts records from a Koha database into elastic search
5 # Copyright 2014 Catalyst IT
7 # This file is part of Koha.
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
36 Inserts records from a Koha database into Elasticsearch.
42 =item B<-c|--commit>=C<count>
44 Specify how many records will be batched up before they're added to Elasticsearch.
45 Higher should be faster, but will cause more RAM usage. Default is 5000.
49 Delete the index and recreate it before indexing.
51 =item B<-a|--authorities>
53 Index the authorities only. Combining this with B<-b> is the same as
54 specifying neither and so both get indexed.
58 Index the biblios only. Combining this with B<-a> is the same as
59 specifying neither and so both get indexed.
61 =item B<-bn|--bnumber>
63 Only index the supplied biblionumber, mostly for testing purposes. May be
68 Only index the supplied authority id, mostly for testing purposes. May be
71 =item B<-p|--processes>
73 Number of processes to use for indexing. This can be used to do more indexing
74 work in parallel on multicore systems. By default, a single process is used.
78 By default, this program only emits warnings and errors. This makes it talk
79 more. Add more to make it even more wordy, in particular when debugging.
99 use Koha::MetadataRecord::Authority;
100 use Koha::BiblioUtils;
101 use Koha::SearchEngine::Elasticsearch::Indexer;
109 my ($delete, $help, $man, $processes);
110 my ($index_biblios, $index_authorities);
111 my (@biblionumbers,@authids);
113 $|=1; # flushes output
116 'c|commit=i' => \$commit,
117 'd|delete' => \$delete,
118 'a|authorities' => \$index_authorities,
119 'b|biblios' => \$index_biblios,
120 'bn|bnumber=i' => \@biblionumbers,
121 'ai|authid=i' => \@authids,
122 'p|processes=i' => \$processes,
123 'v|verbose+' => \$verbose,
128 # Default is to do both
129 unless ($index_authorities || $index_biblios) {
130 $index_authorities = $index_biblios = 1;
133 if ($processes && ( @biblionumbers || @authids) ) {
134 die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
137 pod2usage(1) if $help;
138 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
142 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
143 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
146 my $slice_count = ( $processes //= 1 );
147 my %iterator_options;
149 if ($slice_count > 1) {
150 # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
152 for (my $proc = 1; $proc < $slice_count; $proc++) {
154 die "Failed to fork a child process\n" unless defined $pid;
156 # Child process, give it a slice to process
157 $slice_index = $proc;
161 # Fudge the commit count a bit to spread out the Elasticsearch commits
162 $commit *= 1 + 0.10 * $slice_index;
163 _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
164 $iterator_options{slice} = { index => $slice_index, count => $slice_count };
168 if ($index_biblios) {
169 _log(1, "Indexing biblios\n");
170 if (@biblionumbers) {
172 my $r = shift @biblionumbers;
173 return () unless defined $r;
174 return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
177 my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
182 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
184 if ($index_authorities) {
185 _log(1, "Indexing authorities\n");
188 my $r = shift @authids;
189 return () unless defined $r;
190 my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
194 my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
199 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
202 if ($slice_index == 0) {
203 # Main process, wait for children
204 for (my $proc = 1; $proc < $processes; $proc++) {
209 =head1 INTERNAL METHODS
211 =head2 _verify_index_state
213 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
215 Checks the index state and recreates it if requested.
219 sub _verify_index_state {
220 my ( $index_name, $recreate ) = @_;
222 _log(1, "Checking state of $index_name index\n");
223 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
226 _log(1, "Dropping and recreating $index_name index\n");
227 $indexer->drop_index() if $indexer->index_exists();
228 $indexer->create_index();
230 elsif (!$indexer->index_exists) {
231 # Create index if does not exist
232 $indexer->create_index();
233 } elsif ($indexer->is_index_status_ok) {
234 # Update mapping unless index is some kind of problematic state
235 $indexer->update_mappings();
236 } elsif ($indexer->is_index_status_recreate_required) {
237 warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
243 _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
245 Does the actual reindexing. $callback is a function that always returns the next record.
246 For each index we iterate through the records, committing at specified count
251 my ( $next, $index_name ) = @_;
253 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
256 my $commit_count = $commit;
257 my ( @id_buffer, @commit_buffer );
258 while ( my $record = $next->() ) {
259 my $id = $record->id // $record->authid;
260 my $record = $record->record;
262 if ( $verbose == 1 ) {
263 _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
268 push @id_buffer, $id;
269 push @commit_buffer, $record;
270 if ( !( --$commit_count ) ) {
271 _log( 1, "Committing $commit records...\n" );
272 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
273 _handle_response($response);
274 $commit_count = $commit;
277 _log( 1, "Commit complete\n" );
281 # There are probably uncommitted records
282 _log( 1, "Committing final records...\n" );
283 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
284 _handle_response($response);
285 _log( 1, "Total $count records indexed\n" );
292 Checks some basic stuff to ensure that it's sane before we start.
297 # Do we have an elasticsearch block defined?
298 my $conf = C4::Context->config('elasticsearch');
299 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
302 =head2 _handle_response
304 Parse the return from update_index and display errors depending on verbosity of the script
308 sub _handle_response {
310 if( $response->{errors} eq 'true' ){
311 _log( 1, "There were errors during indexing\n" );
313 foreach my $item (@{$response->{items}}){
314 next unless defined $item->{index}->{error};
315 print "Record #" . $item->{index}->{_id} . " " .
316 $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
317 $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
325 _log($level, "Message\n");
327 Output progress information.
329 Will output the message if verbosity level is set to $level or more. Will not
330 include a trailing newline automatically.
335 my ($level, $msg) = @_;
337 print "[$$] $msg" if ($verbose >= $level);