Koha/misc/workers/es_indexer_daemon.pl
Nick Clemens 7496a603cd
Bug 35086: Also split chunks when indexing from background job
The es background indexer is designed to combine background jobs when started based on the 'batch_size' option.

While this is helpful for combining individual updates, it can be problematic when there are several large batch modifications, or when worker has stopped and is restarted.

This patch uses the same logic as in the indexer to split the chunks that are sent directly for indexing.

To test:
1 - Follow test plan on previous patch
2 - Confirm items are correctly indexed and jobs marked

Signed-off-by: David Nind <david@davidnind.com>

Signed-off-by: Jonathan Druart <jonathan.druart@bugs.koha-community.org>
Signed-off-by: Katrin Fischer <katrin.fischer@bsz-bw.de>
2024-01-16 12:06:00 +01:00

210 lines
5.8 KiB
Perl
Executable file

#!/usr/bin/perl
# This file is part of Koha.
#
# Koha is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Koha is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Koha; if not, see <http://www.gnu.org/licenses>.
=head1 NAME
es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs
=head1 SYNOPSIS
./es_indexer_daemon.pl --batch_size=X
Options:
-b --batch_size how many jobs to commit (default: 10)
--help brief help message
=head1 OPTIONS
=over 8
=item B<--help>
Print a brief help message and exits.
=item B<--batch_size>
How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming.
=back
=head1 DESCRIPTION
This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second.
If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue
and process them in batches every second.
=cut
use Modern::Perl;
use JSON qw( decode_json );
use Try::Tiny;
use Pod::Usage;
use Getopt::Long;
use List::MoreUtils qw( natatime );
use C4::Context;
use Koha::Logger;
use Koha::BackgroundJobs;
use Koha::SearchEngine;
use Koha::SearchEngine::Indexer;
my ( $help, $batch_size );
GetOptions(
'h|help' => \$help,
'b|batch_size=s' => \$batch_size
) || pod2usage(1);
pod2usage(0) if $help;
$batch_size //= 10;
warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch';
my $logger = Koha::Logger->get({ interface => 'worker' });
my $conn;
try {
$conn = Koha::BackgroundJob->connect;
} catch {
warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
};
if ( $conn ) {
# FIXME cf note in Koha::BackgroundJob about $namespace
my $namespace = C4::Context->config('memcached_namespace');
$conn->subscribe(
{
destination => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ),
ack => 'client',
'prefetch-count' => 1,
}
);
}
my $biblio_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } );
my $auth_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::AUTHORITIES_INDEX } );
my $config = $biblio_indexer->get_elasticsearch_params;
my $at_a_time = $config->{chunk_size} // 5000;
my @jobs = ();
while (1) {
if ( $conn ) {
my $frame = $conn->receive_frame;
if ( !defined $frame ) {
# maybe log connection problems
next; # will reconnect automatically
}
my $args = try {
my $body = $frame->body;
decode_json($body); # TODO Should this be from_json? Check utf8 flag.
} catch {
$logger->warn(sprintf "Frame not processed - %s", $_);
return;
} finally {
$conn->ack( { frame => $frame } );
};
next unless $args;
# FIXME This means we need to have create the DB entry before
# It could work in a first step, but then we will want to handle job that will be created from the message received
my $job = Koha::BackgroundJobs->search( { id => $args->{job_id}, status => 'new' } )->next;
unless ($job) {
$logger->warn( sprintf "Job %s not found, or has wrong status", $args->{job_id} );
next;
}
push @jobs, $job;
if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) {
commit(@jobs);
@jobs = ();
}
} else {
@jobs = Koha::BackgroundJobs->search(
{ status => 'new', queue => 'elastic_index' } )->as_list;
commit(@jobs);
@jobs = ();
sleep 10;
}
}
$conn->disconnect;
sub commit {
my (@jobs) = @_;
my @bib_records;
my @auth_records;
my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] });
# Start
$jobs->update({
progress => 0,
status => 'started',
started_on => \'NOW()',
});
for my $job (@jobs) {
my $args = try {
$job->json->decode( $job->data );
} catch {
$logger->warn( sprintf "Cannot decode data for job id=%s", $job->id );
$job->status('failed')->store;
return;
};
next unless $args;
if ( $args->{record_server} eq 'biblioserver' ) {
push @bib_records, @{ $args->{record_ids} };
} else {
push @auth_records, @{ $args->{record_ids} };
}
}
if (@auth_records) {
my $auth_chunks = natatime $at_a_time, @auth_records;
while ( ( my @auth_chunk = $auth_chunks->() ) ) {
try {
$auth_indexer->update_index( \@auth_chunk );
} catch {
$logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
};
}
}
if (@bib_records) {
my $biblio_chunks = natatime $at_a_time, @bib_records;
while ( ( my @bib_chunk = $biblio_chunks->() ) ) {
try {
$biblio_indexer->update_index( \@bib_chunk );
} catch {
$logger->warn( sprintf "Update of elastic index failed with: %s", $_ );
};
}
}
# Finish
$jobs->update({
progress => 1,
status => 'finished',
ended_on => \'NOW()',
});
}