Main Koha release repository https://koha-community.org
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

357 lines
9.8 KiB

#!/usr/bin/perl
# This inserts records from a Koha database into elastic search
# Copyright 2014 Catalyst IT
#
# This file is part of Koha.
#
# Koha is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Koha is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Koha; if not, see <http://www.gnu.org/licenses>.
=head1 NAME
rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
=head1 SYNOPSIS
B<rebuild_elasticsearch.pl>
[B<-c|--commit>=C<count>]
[B<-d|--delete>]
[B<-r|--reset>]
[B<-a|--authorities>]
[B<-b|--biblios>]
[B<-bn|--bnumber>]
[B<-ai|--authid>]
[B<-p|--processes>]
[B<-v|--verbose>]
[B<-h|--help>]
[B<--man>]
=head1 DESCRIPTION
Inserts records from a Koha database into Elasticsearch.
=head1 OPTIONS
=over
=item B<-c|--commit>=C<count>
Specify how many records will be batched up before they're added to Elasticsearch.
Higher should be faster, but will cause more RAM usage. Default is 5000.
=item B<-d|--delete>
Delete the index and recreate it before indexing.
=item B<-r|--reset>
Reload mappings from files (specified in koha-conf.xml) before indexing.
Implies --delete.
=item B<-a|--authorities>
Index the authorities only. Combining this with B<-b> is the same as
specifying neither and so both get indexed.
=item B<-b|--biblios>
Index the biblios only. Combining this with B<-a> is the same as
specifying neither and so both get indexed.
=item B<-bn|--bnumber>
Only index the supplied biblionumber, mostly for testing purposes. May be
repeated.
=item B<-ai|--authid>
Only index the supplied authority id, mostly for testing purposes. May be
repeated.
=item B<-p|--processes>
Number of processes to use for indexing. This can be used to do more indexing
work in parallel on multicore systems. By default, a single process is used.
=item B<-v|--verbose>
By default, this program only emits warnings and errors. This makes it talk
more. Add more to make it even more wordy, in particular when debugging.
=item B<-h|--help>
Help!
=item B<--man>
Full documentation.
=back
=head1 IMPLEMENTATION
=cut
use autodie;
use Getopt::Long;
use Koha::Script;
use C4::Context;
use Koha::MetadataRecord::Authority;
use Koha::BiblioUtils;
use Koha::SearchEngine::Elasticsearch;
use Koha::SearchEngine::Elasticsearch::Indexer;
use MARC::Field;
use MARC::Record;
use Modern::Perl;
use Pod::Usage;
my $verbose = 0;
my $commit = 5000;
my ($delete, $reset, $help, $man, $processes);
my ($index_biblios, $index_authorities);
my (@biblionumbers,@authids);
$|=1; # flushes output
GetOptions(
'c|commit=i' => \$commit,
'd|delete' => \$delete,
'r|reset' => \$reset,
'a|authorities' => \$index_authorities,
'b|biblios' => \$index_biblios,
'bn|bnumber=i' => \@biblionumbers,
'ai|authid=i' => \@authids,
'p|processes=i' => \$processes,
'v|verbose+' => \$verbose,
'h|help' => \$help,
'man' => \$man,
);
# Default is to do both
unless ($index_authorities || $index_biblios) {
$index_authorities = $index_biblios = 1;
}
if ($processes && ( @biblionumbers || @authids) ) {
die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
}
pod2usage(1) if $help;
pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
_sanity_check();
if ($reset){
Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
$delete = 1;
}
_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
_verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
my $slice_index = 0;
my $slice_count = ( $processes //= 1 );
my %iterator_options;
if ($slice_count > 1) {
# Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
$slice_index = 0;
for (my $proc = 1; $proc < $slice_count; $proc++) {
my $pid = fork();
die "Failed to fork a child process\n" unless defined $pid;
if ($pid == 0) {
# Child process, give it a slice to process
$slice_index = $proc;
last;
}
}
# Fudge the commit count a bit to spread out the Elasticsearch commits
$commit *= 1 + 0.10 * $slice_index;
_log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
$iterator_options{slice} = { index => $slice_index, count => $slice_count };
}
my $next;
if ($index_biblios) {
_log(1, "Indexing biblios\n");
if (@biblionumbers) {
$next = sub {
my $r = shift @biblionumbers;
return () unless defined $r;
return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
};
} else {
my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
$next = sub {
$records->next();
}
}
_do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
}
if ($index_authorities) {
_log(1, "Indexing authorities\n");
if (@authids) {
$next = sub {
my $r = shift @authids;
return () unless defined $r;
my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
return ($r, $a);
};
} else {
my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
$next = sub {
$records->next();
}
}
_do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
}
if ($slice_index == 0) {
# Main process, wait for children
for (my $proc = 1; $proc < $processes; $proc++) {
wait();
}
}
=head1 INTERNAL METHODS
=head2 _verify_index_state
_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
Checks the index state and recreates it if requested.
=cut
sub _verify_index_state {
my ( $index_name, $recreate ) = @_;
_log(1, "Checking state of $index_name index\n");
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
if ($recreate) {
_log(1, "Dropping and recreating $index_name index\n");
$indexer->drop_index() if $indexer->index_exists();
$indexer->create_index();
}
elsif (!$indexer->index_exists) {
# Create index if does not exist
$indexer->create_index();
} elsif ($indexer->is_index_status_ok) {
# Update mapping unless index is some kind of problematic state
$indexer->update_mappings();
} elsif ($indexer->is_index_status_recreate_required) {
warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
}
}
=head2 _do_reindex
_do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
Does the actual reindexing. $callback is a function that always returns the next record.
For each index we iterate through the records, committing at specified count
=cut
sub _do_reindex {
my ( $next, $index_name ) = @_;
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
my $count = 0;
my $commit_count = $commit;
my ( @id_buffer, @commit_buffer );
while ( my $record = $next->() ) {
my $id = $record->id // $record->authid;
my $record = $record->record;
$count++;
if ( $verbose == 1 ) {
_log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
} else {
_log( 2, "$id\n" );
}
push @id_buffer, $id;
push @commit_buffer, $record;
if ( !( --$commit_count ) ) {
_log( 1, "Committing $commit records...\n" );
my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
_handle_response($response);
$commit_count = $commit;
@id_buffer = ();
@commit_buffer = ();
_log( 1, "Commit complete\n" );
}
}
# There are probably uncommitted records
_log( 1, "Committing final records...\n" );
my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
_handle_response($response);
_log( 1, "Total $count records indexed\n" );
}
=head2 _sanity_check
_sanity_check();
Checks some basic stuff to ensure that it's sane before we start.
=cut
sub _sanity_check {
# Do we have an elasticsearch block defined?
my $conf = C4::Context->config('elasticsearch');
die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
}
=head2 _handle_response
Parse the return from update_index and display errors depending on verbosity of the script
=cut
sub _handle_response {
my ($response) = @_;
if( $response->{errors} eq 'true' ){
_log( 1, "There were errors during indexing\n" );
if ( $verbose > 1 ){
foreach my $item (@{$response->{items}}){
next unless defined $item->{index}->{error};
print "Record #" . $item->{index}->{_id} . " " .
$item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
$item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
}
}
}
}
=head2 _log
_log($level, "Message\n");
Output progress information.
Will output the message if verbosity level is set to $level or more. Will not
include a trailing newline automatically.
=cut
sub _log {
my ($level, $msg) = @_;
print "[$$] $msg" if ($verbose >= $level);
}