From abe70811098da409af6cb9e97a4a3d00b4f0d3ef Mon Sep 17 00:00:00 2001 From: Ere Maijala Date: Fri, 23 Nov 2018 10:43:34 +0200 Subject: [PATCH] Bug 21872: Add multiprocess support to Elasticsearch indexing utility Test plan: 1. Time execution without -p parameter 2. Time execution with -p 2 or -p3 or -p 4 depending on CPU core count Signed-off-by: Josef Moravec Signed-off-by: Martin Renvoize Signed-off-by: Nick Clemens --- Koha/BiblioUtils.pm | 30 +++- Koha/MetadataRecord/Authority.pm | 38 +++++- misc/search_tools/rebuild_elastic_search.pl | 143 +++++++++++++++----- 3 files changed, 175 insertions(+), 36 deletions(-) diff --git a/Koha/BiblioUtils.pm b/Koha/BiblioUtils.pm index 73fc12aa4b..c83d94569d 100644 --- a/Koha/BiblioUtils.pm +++ b/Koha/BiblioUtils.pm @@ -101,20 +101,46 @@ sub get_from_biblionumber { =head2 get_all_biblios_iterator - my $it = Koha::BiblioUtils->get_all_biblios_iterator(); + my $it = Koha::BiblioUtils->get_all_biblios_iterator(%options); This will provide an iterator object that will, one by one, provide the Koha::BiblioUtils of each biblio. This will include the item data. The iterator is a Koha::MetadataIterator object. +Possible options are: + +=over 4 + +=item C + +slice may be defined as a hash of two values: index and count. index +is the slice number to process and count is total number of slices. +With this information the iterator returns just the given slice of +records instead of all. + +=back + =cut sub get_all_biblios_iterator { + my ($self, %options) = @_; + + my $search_terms = {}; + my ($slice_modulo, $slice_count); + if ($options{slice}) { + $slice_count = $options{slice}->{count}; + $slice_modulo = $options{slice}->{index}; + $slice_modulo = 0 if ($slice_modulo == $slice_count); + + $search_terms = \[ ' mod(biblionumber, ?) = ?', $slice_count, $slice_modulo]; + } + my $database = Koha::Database->new(); my $schema = $database->schema(); my $rs = - $schema->resultset('Biblio')->search( {}, + $schema->resultset('Biblio')->search( + $search_terms, { columns => [qw/ biblionumber /] } ); my $next_func = sub { # Warn and skip bad records, otherwise we break the loop diff --git a/Koha/MetadataRecord/Authority.pm b/Koha/MetadataRecord/Authority.pm index 062d19c961..db14c5d634 100644 --- a/Koha/MetadataRecord/Authority.pm +++ b/Koha/MetadataRecord/Authority.pm @@ -151,20 +151,54 @@ sub authorized_heading { =head2 get_all_authorities_iterator - my $it = Koha::MetadataRecord::Authority->get_all_authorities_iterator(); + my $it = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%options); This will provide an iterator object that will, one by one, provide the Koha::MetadataRecord::Authority of each authority. The iterator is a Koha::MetadataIterator object. +Possible options are: + +=over 4 + +=item C + +slice may be defined as a hash of two values: index and count. index +is the slice number to process and count is total number of slices. +With this information the iterator returns just the given slice of +records instead of all. + +=back + =cut sub get_all_authorities_iterator { + my ($self, %options) = @_; + + my $search_terms = { + marcxml => { '!=', undef } + }; + my ($slice_modulo, $slice_count); + if ($options{slice}) { + $slice_count = $options{slice}->{count}; + $slice_modulo = $options{slice}->{index}; + $slice_modulo = 0 if ($slice_modulo == $slice_count); + + $search_terms->{authid} = \[ ' mod ? = ?', $slice_count, $slice_modulo]; + $search_terms = { + '-and' => [ + $search_terms, + \[ ' mod(authid, ?) = ?', $slice_count, $slice_modulo] + ] + }; + } + my $database = Koha::Database->new(); my $schema = $database->schema(); my $rs = - $schema->resultset('AuthHeader')->search( { marcxml => { '!=', undef } }, + $schema->resultset('AuthHeader')->search( + $search_terms, { columns => [qw/ authid authtypecode marcxml /] } ); my $next_func = sub { my $row = $rs->next(); diff --git a/misc/search_tools/rebuild_elastic_search.pl b/misc/search_tools/rebuild_elastic_search.pl index a233cf5d0f..588a955b3f 100755 --- a/misc/search_tools/rebuild_elastic_search.pl +++ b/misc/search_tools/rebuild_elastic_search.pl @@ -64,6 +64,11 @@ Only index the supplied biblionumber, mostly for testing purposes. May be repeated. This also applies to authorities via authid, so if you're using it, you probably only want to do one or the other at a time. +=item B<-p|--processes> + +Number of processes to use for indexing. This can be used to do more indexing +work in parallel on multicore systems. By default, a single process is used. + =item B<-v|--verbose> By default, this program only emits warnings and errors. This makes it talk @@ -79,6 +84,8 @@ Full documentation. =back +=head1 IMPLEMENTATION + =cut use autodie; @@ -95,21 +102,22 @@ use Pod::Usage; my $verbose = 0; my $commit = 5000; -my ($delete, $help, $man); +my ($delete, $help, $man, $processes); my ($index_biblios, $index_authorities); -my (@biblionumbers); +my (@record_numbers); $|=1; # flushes output GetOptions( - 'c|commit=i' => \$commit, - 'd|delete' => \$delete, + 'c|commit=i' => \$commit, + 'd|delete' => \$delete, 'a|authorities' => \$index_authorities, - 'b|biblios' => \$index_biblios, - 'bn|bnumber=i' => \@biblionumbers, - 'v|verbose+' => \$verbose, - 'h|help' => \$help, - 'man' => \$man, + 'b|biblios' => \$index_biblios, + 'bn|bnumber=i' => \@record_numbers, + 'p|processes=i' => \$processes, + 'v|verbose+' => \$verbose, + 'h|help' => \$help, + 'man' => \$man, ); # Default is to do both @@ -120,49 +128,94 @@ unless ($index_authorities || $index_biblios) { pod2usage(1) if $help; pod2usage( -exitstatus => 0, -verbose => 2 ) if $man; -sanity_check(); +_sanity_check(); + +_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios); +_verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities); + +my $slice_index = 0; +my $slice_count = $processes // 1; + +if ($slice_count > 1) { + # Fire up child processes for processing slices from 2 on. This main process will handle slice 1. + $slice_index = 1; + for (my $proc = 2; $proc <= $processes; $proc++) { + my $pid = fork(); + die "Failed to fork a child process\n" unless defined $pid; + if ($pid == 0) { + # Child process, give it a slice to process + $slice_index = $proc; + last; + } + } + # Fudge the commit count a bit to spread out the Elasticsearch commits + $commit *= 1 + 0.10 * ($slice_index - 1); +} + +my %iterator_options; +if ($slice_index) { + _log(1, "Processing slice $slice_index of $slice_count\n"); + $iterator_options{slice} = { index => $slice_index, count => $slice_count }; +} my $next; if ($index_biblios) { _log(1, "Indexing biblios\n"); - if (@biblionumbers) { + if (@record_numbers) { $next = sub { - my $r = shift @biblionumbers; + my $r = shift @record_numbers; return () unless defined $r; return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 )); }; } else { - my $records = Koha::BiblioUtils->get_all_biblios_iterator(); + my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options); $next = sub { $records->next(); } } - do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX); + _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX); } if ($index_authorities) { _log(1, "Indexing authorities\n"); - if (@biblionumbers) { + if (@record_numbers) { $next = sub { - my $r = shift @biblionumbers; + my $r = shift @record_numbers; return () unless defined $r; my $a = Koha::MetadataRecord::Authority->get_from_authid($r); return ($r, $a->record); }; } else { - my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(); + my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options); $next = sub { $records->next(); } } - do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX); + _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX); } -sub do_reindex { - my ( $next, $index_name ) = @_; +if ($slice_index == 1) { + # Main process, wait for children + for (my $proc = 2; $proc <= $processes; $proc++) { + wait(); + } +} + +=head2 _verify_index_state + + _ verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1); + +Checks the index state and recreates it if requested. +=cut + +sub _verify_index_state { + my ( $index_name, $recreate ) = @_; + + _log(1, "Checking state of $index_name index\n"); my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } ); - if ($delete) { + if ($recreate) { + _log(1, "Dropping and recreating $index_name index\n"); $indexer->drop_index() if $indexer->index_exists(); $indexer->create_index(); } @@ -175,6 +228,20 @@ sub do_reindex { } elsif ($indexer->is_index_status_recreate_required) { warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/; } +} + +=head2 _do_reindex + + _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX); + +Does the actual reindexing. $callback is a function that always returns the next record. + +=cut + +sub _do_reindex { + my ( $next, $index_name ) = @_; + + my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } ); my $count = 0; my $commit_count = $commit; @@ -192,12 +259,12 @@ sub do_reindex { push @id_buffer, $id; push @commit_buffer, $record; if ( !( --$commit_count ) ) { - _log( 1, "Committing $commit records..." ); + _log( 1, "Committing $commit records...\n" ); $indexer->update_index( \@id_buffer, \@commit_buffer ); $commit_count = $commit; @id_buffer = (); @commit_buffer = (); - _log( 1, " done\n" ); + _log( 1, "Commit complete\n" ); } } @@ -207,21 +274,33 @@ sub do_reindex { _log( 1, "Total $count records indexed\n" ); } -# Checks some basic stuff to ensure that it's sane before we start. -sub sanity_check { +=head2 _sanity_check + + _sanity_check(); + +Checks some basic stuff to ensure that it's sane before we start. + +=cut + +sub _sanity_check { # Do we have an elasticsearch block defined? my $conf = C4::Context->config('elasticsearch'); die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf ); } -# Output progress information. -# -# _log($level, $msg); -# -# Will output $msg if the verbosity setting is set to $level or more. Will -# not include a trailing newline. +=head2 _log + + _log($level, "Message\n"); + +Output progress information. + +Will output the message if verbosity level is set to $level or more. Will not +include a trailing newline automatically. + +=cut + sub _log { my ($level, $msg) = @_; - print $msg if ($verbose >= $level); + print "[$$] $msg" if ($verbose >= $level); } -- 2.39.5