From 9a1de9b3ed4018f46d8ab7583e9064786a415328 Mon Sep 17 00:00:00 2001 From: Nick Clemens Date: Mon, 9 Jan 2023 13:43:43 +0000 Subject: [PATCH] Bug 32594: Add a dedicated Elasticsearch biblio indexing background queue Currently we generate large numebrs if single record reindex for circulation and other actions. It can take a long time to process these as we need to load the ES settings for each. This patch updates the Elasticsearch background jobs to throw records into a new queue that can be processed by it's own worker and adds a dedicated worker that batches the jobs every 1 second. To test: 1 - Apply patches, set SearchEngine system preference to 'Elasticsearch' 2 - perl misc/search_tools/es_indexer_daemon.pl 3 - Leave the running in terminal and perform actions in staff interface: - Checking out a bib - Returning a bib - Editing a single bib - Editing a single item - Batch editing bibs - Batch editing items 4 - Confirm for each action that records are updated in search/search results 5 - Stop the script 6 - set SearchEngine system preference to 'Zebra' 7 - perl misc/search_tools/es_indexer_daemon.pl 8 - Script dies as Elasticsearch not enabled Signed-off-by: David Nind Signed-off-by: Emily Lamancusa Bug 32594: (follow-up) Adjust logging per bug 32612 JD amended patch: tidy! There were tabs here... Signed-off-by: Jonathan Druart Signed-off-by: Tomas Cohen Arazi (cherry picked from commit aba2453ad65f8cb4cc9ea609dbf5bb3ed6f241db) Signed-off-by: Martin Renvoize --- Koha/BackgroundJob/UpdateElasticIndex.pm | 4 +- misc/workers/background_jobs_worker.pl | 13 +- misc/workers/es_indexer_daemon.pl | 186 +++++++++++++++++++++++ 3 files changed, 197 insertions(+), 6 deletions(-) create mode 100755 misc/workers/es_indexer_daemon.pl diff --git a/Koha/BackgroundJob/UpdateElasticIndex.pm b/Koha/BackgroundJob/UpdateElasticIndex.pm index 432d34fa97..55c50a1295 100644 --- a/Koha/BackgroundJob/UpdateElasticIndex.pm +++ b/Koha/BackgroundJob/UpdateElasticIndex.pm @@ -105,10 +105,12 @@ sub enqueue { my $record_server = $args->{record_server}; my @record_ids = @{ $args->{record_ids} }; + # elastic_index queue will be handled by the es_indexer_daemon script $self->SUPER::enqueue({ - job_size => 1, + job_size => 1, # Each index is a single job, regardless of the amount of records included job_args => {record_server => $record_server, record_ids => \@record_ids}, + job_queue => 'elastic_index' }); } diff --git a/misc/workers/background_jobs_worker.pl b/misc/workers/background_jobs_worker.pl index 4eb191de47..c54610458f 100755 --- a/misc/workers/background_jobs_worker.pl +++ b/misc/workers/background_jobs_worker.pl @@ -59,6 +59,7 @@ use Pod::Usage; use Getopt::Long; use Parallel::ForkManager; +use C4::Context; use Koha::Logger; use Koha::BackgroundJobs; use C4::Context; @@ -95,11 +96,13 @@ if ( $conn ) { # FIXME cf note in Koha::BackgroundJob about $namespace my $namespace = C4::Context->config('memcached_namespace'); for my $queue (@queues) { - $conn->subscribe({ - destination => sprintf("/queue/%s-%s", $namespace, $queue), - ack => 'client', - 'prefetch-count' => 1, - }); + $conn->subscribe( + { + destination => sprintf( "/queue/%s-%s", $namespace, $queue ), + ack => 'client', + 'prefetch-count' => 1, + } + ); } } while (1) { diff --git a/misc/workers/es_indexer_daemon.pl b/misc/workers/es_indexer_daemon.pl new file mode 100755 index 0000000000..502c2fe6ae --- /dev/null +++ b/misc/workers/es_indexer_daemon.pl @@ -0,0 +1,186 @@ +#!/usr/bin/perl + +# This file is part of Koha. +# +# Koha is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# Koha is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Koha; if not, see . + +=head1 NAME + +background_jobs_worker_es.pl - Worker script that will process background Elasticsearch jobs + +=head1 SYNOPSIS + +./background_jobs_worker_es.pl --batch_size=X + +Options: + + --help brief help message + -b --batch_size how many jobs to commit + +=head1 OPTIONS + +=over 8 + +=item B<--help> + +Print a brief help message and exits. + +=item B<--batch_size> + +How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming. + +=back + +=head1 DESCRIPTION + +This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second. +If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue +and process them in batches every second. + +=cut + +use Modern::Perl; +use JSON qw( decode_json ); +use Try::Tiny; +use Pod::Usage; +use Getopt::Long; + +use C4::Context; +use Koha::Logger; +use Koha::BackgroundJobs; +use Koha::SearchEngine; +use Koha::SearchEngine::Indexer; + + +my ( $help, $batch_size ); +GetOptions( + 'h|help' => \$help, + 'b|batch_size=s' => \$batch_size +) || pod2usage(1); + +pod2usage(0) if $help; + +$batch_size //= 10; + +die "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch'; + +my $logger = Koha::Logger->get({ interface => 'worker' }); + +my $conn; +try { + $conn = Koha::BackgroundJob->connect; +} catch { + warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_; +}; + +if ( $conn ) { + # FIXME cf note in Koha::BackgroundJob about $namespace + my $namespace = C4::Context->config('memcached_namespace'); + $conn->subscribe( + { + destination => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ), + ack => 'client', + 'prefetch-count' => 1, + } + ); +} +my $biblio_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX }); +my $auth_indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::AUTHORITIES_INDEX }); +my @jobs = (); + +while (1) { + + if ( $conn ) { + my $frame = $conn->receive_frame; + if ( !defined $frame ) { + # maybe log connection problems + next; # will reconnect automatically + } + + my $args = try { + my $body = $frame->body; + decode_json($body); # TODO Should this be from_json? Check utf8 flag. + } catch { + $logger->warn(sprintf "Frame not processed - %s", $_); + return; + } finally { + $conn->ack( { frame => $frame } ); + }; + + next unless $args; + + # FIXME This means we need to have create the DB entry before + # It could work in a first step, but then we will want to handle job that will be created from the message received + my $job = Koha::BackgroundJobs->find($args->{job_id}); + + unless ( $job ) { + $logger->warn(sprintf "No job found for id=%s", $args->{job_id}); + next; + } + + push @jobs, $job; + if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) { + commit(@jobs); + @jobs = (); + } + + } else { + @jobs = Koha::BackgroundJobs->search( + { status => 'new', queue => 'elastic_index' } )->as_list; + commit(@jobs); + @jobs = (); + sleep 10; + } + +} +$conn->disconnect; + +sub commit { + my (@jobs) = @_; + + my @bib_records; + my @auth_records; + for my $job (@jobs) { + my $args = try { + $job->json->decode( $job->data ); + } catch { + $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id ); + $job->status('failed')->store; + return; + }; + next unless $args; + if ( $args->{record_server} eq 'biblioserver' ) { + push @bib_records, @{ $args->{record_ids} }; + } else { + push @auth_records, @{ $args->{record_ids} }; + } + } + + if (@auth_records) { + try { + $auth_indexer->update_index( \@auth_records ); + } catch { + $logger->warn( sprintf "Update of elastic index failed with: %s", $_ ); + }; + } + if (@bib_records) { + try { + $biblio_indexer->update_index( \@bib_records ); + } catch { + $logger->warn( sprintf "Update of elastic index failed with: %s", $_ ); + }; + } + + Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] } )->update( { status => 'finished', progress => 1 }, { no_triggers => 1 } ); +} -- 2.39.5