3 # This inserts records from a Koha database into elastic search
5 # Copyright 2014 Catalyst IT
7 # This file is part of Koha.
9 # Koha is free software; you can redistribute it and/or modify it under the
10 # terms of the GNU General Public License as published by the Free Software
11 # Foundation; either version 3 of the License, or (at your option) any later
14 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
15 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
16 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License along
19 # with Koha; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
36 Inserts records from a Koha database into Elasticsearch.
42 =item B<-c|--commit>=C<count>
44 Specify how many records will be batched up before they're added to Elasticsearch.
45 Higher should be faster, but will cause more RAM usage. Default is 5000.
49 Delete the index and recreate it before indexing.
51 =item B<-a|--authorities>
53 Index the authorities only. Combining this with B<-b> is the same as
54 specifying neither and so both get indexed.
58 Index the biblios only. Combining this with B<-a> is the same as
59 specifying neither and so both get indexed.
61 =item B<-bn|--bnumber>
63 Only index the supplied biblionumber, mostly for testing purposes. May be
64 repeated. This also applies to authorities via authid, so if you're using it,
65 you probably only want to do one or the other at a time.
67 =item B<-p|--processes>
69 Number of processes to use for indexing. This can be used to do more indexing
70 work in parallel on multicore systems. By default, a single process is used.
74 By default, this program only emits warnings and errors. This makes it talk
75 more. Add more to make it even more wordy, in particular when debugging.
95 use Koha::MetadataRecord::Authority;
96 use Koha::BiblioUtils;
97 use Koha::SearchEngine::Elasticsearch::Indexer;
105 my ($delete, $help, $man, $processes);
106 my ($index_biblios, $index_authorities);
107 my (@record_numbers);
109 $|=1; # flushes output
112 'c|commit=i' => \$commit,
113 'd|delete' => \$delete,
114 'a|authorities' => \$index_authorities,
115 'b|biblios' => \$index_biblios,
116 'bn|bnumber=i' => \@record_numbers,
117 'p|processes=i' => \$processes,
118 'v|verbose+' => \$verbose,
123 # Default is to do both
124 unless ($index_authorities || $index_biblios) {
125 $index_authorities = $index_biblios = 1;
128 if ($processes && @record_numbers) {
129 die "Argument p|processes cannot be combined with bn|bnumber";
132 pod2usage(1) if $help;
133 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
137 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
138 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
141 my $slice_count = ( $processes //= 1 );
142 my %iterator_options;
144 if ($slice_count > 1) {
145 # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
147 for (my $proc = 1; $proc < $slice_count; $proc++) {
149 die "Failed to fork a child process\n" unless defined $pid;
151 # Child process, give it a slice to process
152 $slice_index = $proc;
156 # Fudge the commit count a bit to spread out the Elasticsearch commits
157 $commit *= 1 + 0.10 * $slice_index;
158 _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
159 $iterator_options{slice} = { index => $slice_index, count => $slice_count };
163 if ($index_biblios) {
164 _log(1, "Indexing biblios\n");
165 if (@record_numbers) {
167 my $r = shift @record_numbers;
168 return () unless defined $r;
169 return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
172 my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
177 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
179 if ($index_authorities) {
180 _log(1, "Indexing authorities\n");
181 if (@record_numbers) {
183 my $r = shift @record_numbers;
184 return () unless defined $r;
185 my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
186 return ($r, $a->record);
189 my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
194 _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
197 if ($slice_index == 0) {
198 # Main process, wait for children
199 for (my $proc = 1; $proc < $processes; $proc++) {
204 =head1 INTERNAL METHODS
206 =head2 _verify_index_state
208 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
210 Checks the index state and recreates it if requested.
214 sub _verify_index_state {
215 my ( $index_name, $recreate ) = @_;
217 _log(1, "Checking state of $index_name index\n");
218 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
221 _log(1, "Dropping and recreating $index_name index\n");
222 $indexer->drop_index() if $indexer->index_exists();
223 $indexer->create_index();
225 elsif (!$indexer->index_exists) {
226 # Create index if does not exist
227 $indexer->create_index();
228 } elsif ($indexer->is_index_status_ok) {
229 # Update mapping unless index is some kind of problematic state
230 $indexer->update_mappings();
231 } elsif ($indexer->is_index_status_recreate_required) {
232 warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
238 _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
240 Does the actual reindexing. $callback is a function that always returns the next record.
241 For each index we iterate through the records, committing at specified count
246 my ( $next, $index_name ) = @_;
248 my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
251 my $commit_count = $commit;
252 my ( @id_buffer, @commit_buffer );
253 while ( my $record = $next->() ) {
254 my $id = $record->id;
255 my $record = $record->record;
257 if ( $verbose == 1 ) {
258 _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
263 push @id_buffer, $id;
264 push @commit_buffer, $record;
265 if ( !( --$commit_count ) ) {
266 _log( 1, "Committing $commit records...\n" );
267 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
268 _handle_response($response);
269 $commit_count = $commit;
272 _log( 1, "Commit complete\n" );
276 # There are probably uncommitted records
277 _log( 1, "Committing final records...\n" );
278 my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
279 _handle_response($response);
280 _log( 1, "Total $count records indexed\n" );
287 Checks some basic stuff to ensure that it's sane before we start.
292 # Do we have an elasticsearch block defined?
293 my $conf = C4::Context->config('elasticsearch');
294 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
297 =head2 _handle_response
299 Parse the return from update_index and display errors depending on verbosity of the script
303 sub _handle_response {
305 if( $response->{errors} eq 'true' ){
306 _log( 1, "There were errors during indexing\n" );
308 foreach my $item (@{$response->{items}}){
309 next unless defined $item->{index}->{error};
310 print "Record #" . $item->{index}->{_id} . " " .
311 $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
312 $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
320 _log($level, "Message\n");
322 Output progress information.
324 Will output the message if verbosity level is set to $level or more. Will not
325 include a trailing newline automatically.
330 my ($level, $msg) = @_;
332 print "[$$] $msg" if ($verbose >= $level);