#!/usr/bin/perl # This file is part of Koha. # # Koha is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # Koha is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Koha; if not, see . =head1 NAME es_indexer_daemon.pl - Worker script that will process background Elasticsearch jobs =head1 SYNOPSIS ./es_indexer_daemon.pl --batch_size=X Options: -b --batch_size how many jobs to commit (default: 10) --help brief help message =head1 OPTIONS =over 8 =item B<--help> Print a brief help message and exits. =item B<--batch_size> How many jobs to commit per batch. Defaults to 10, will commit after .1 seconds if no more jobs incoming. =back =head1 DESCRIPTION This script will connect to the Stomp server (RabbitMQ) and subscribe to the Elasticsearch queue, processing batches every second. If a Stomp server is not active it will poll the database every 10s for new jobs in the Elasticsearch queue and process them in batches every second. =cut use Modern::Perl; use JSON qw( decode_json ); use Try::Tiny; use Pod::Usage; use Getopt::Long; use List::MoreUtils qw( natatime ); use Time::HiRes; use C4::Context; use Koha::Logger; use Koha::BackgroundJobs; use Koha::SearchEngine; use Koha::SearchEngine::Indexer; my ( $help, $batch_size ); my $not_found_retries = {}; my $max_retries = $ENV{MAX_RETRIES} || 10; GetOptions( 'h|help' => \$help, 'b|batch_size=s' => \$batch_size ) || pod2usage(1); pod2usage(0) if $help; $batch_size //= 10; warn "Not using Elasticsearch" unless C4::Context->preference('SearchEngine') eq 'Elasticsearch'; my $logger = Koha::Logger->get({ interface => 'worker' }); my $conn; try { $conn = Koha::BackgroundJob->connect; } catch { warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_; }; if ( $conn ) { # FIXME cf note in Koha::BackgroundJob about $namespace my $namespace = C4::Context->config('memcached_namespace'); $conn->subscribe( { destination => sprintf( "/queue/%s-%s", $namespace, 'elastic_index' ), ack => 'client', 'prefetch-count' => 1, } ); } my $biblio_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } ); my $auth_indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::AUTHORITIES_INDEX } ); my $config = $biblio_indexer->get_elasticsearch_params; my $at_a_time = $config->{chunk_size} // 5000; my @jobs = (); while (1) { if ( $conn ) { my $frame = $conn->receive_frame; if ( !defined $frame ) { # maybe log connection problems next; # will reconnect automatically } my $args = try { my $body = $frame->body; decode_json($body); # TODO Should this be from_json? Check utf8 flag. } catch { Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame not processed - %s", $_); return; }; unless ( $args ) { Koha::Logger->get({ interface => 'worker' })->warn(sprintf "Frame does not have correct args, ignoring it"); $conn->nack( { frame => $frame, requeue => 'false' } ); next; } my $job = Koha::BackgroundJobs->find( $args->{job_id} ); if ( $job && $job->status ne 'new' ) { Koha::Logger->get( { interface => 'worker' } ) ->warn( sprintf "Job %s has wrong status %s", $args->{job_id}, $job->status ); # nack without requeue, we do not want to process this frame again $conn->nack( { frame => $frame, requeue => 'false' } ); next; } unless ($job) { $not_found_retries->{ $args->{job_id} } //= 0; if ( ++$not_found_retries->{ $args->{job_id} } >= $max_retries ) { Koha::Logger->get( { interface => 'worker' } ) ->warn( sprintf "Job %s not found, no more retry", $args->{job_id} ); # nack without requeue, we do not want to process this frame again $conn->nack( { frame => $frame, requeue => 'false' } ); next; } Koha::Logger->get( { interface => 'worker' } ) ->debug( sprintf "Job %s not found, will retry later", $args->{job_id} ); # nack to force requeue $conn->nack( { frame => $frame, requeue => 'true' } ); Time::HiRes::sleep(0.5); next; } $conn->ack( { frame => $frame } ); push @jobs, $job; if ( @jobs >= $batch_size || !$conn->can_read( { timeout => '0.1' } ) ) { commit(@jobs); @jobs = (); } } else { @jobs = Koha::BackgroundJobs->search( { status => 'new', queue => 'elastic_index' } )->as_list; commit(@jobs); @jobs = (); sleep 10; } } $conn->disconnect; sub commit { my (@jobs) = @_; my @bib_records; my @auth_records; my $jobs = Koha::BackgroundJobs->search( { id => [ map { $_->id } @jobs ] }); # Start $jobs->update({ progress => 0, status => 'started', started_on => \'NOW()', }); for my $job (@jobs) { my $args = try { $job->json->decode( $job->data ); } catch { $logger->warn( sprintf "Cannot decode data for job id=%s", $job->id ); $job->status('failed')->store; return; }; next unless $args; if ( $args->{record_server} eq 'biblioserver' ) { push @bib_records, @{ $args->{record_ids} }; } else { push @auth_records, @{ $args->{record_ids} }; } } if (@auth_records) { my $auth_chunks = natatime $at_a_time, @auth_records; while ( ( my @auth_chunk = $auth_chunks->() ) ) { try { $auth_indexer->update_index( \@auth_chunk ); } catch { $logger->warn( sprintf "Update of elastic index failed with: %s", $_ ); }; } } if (@bib_records) { my $biblio_chunks = natatime $at_a_time, @bib_records; while ( ( my @bib_chunk = $biblio_chunks->() ) ) { try { $biblio_indexer->update_index( \@bib_chunk ); } catch { $logger->warn( sprintf "Update of elastic index failed with: %s", $_ ); }; } } # Finish $jobs->update({ progress => 1, status => 'finished', ended_on => \'NOW()', }); }