61e7aa374e
Sometimes we need to only re-index a subset of our bibliographic data or authorities. Currently this is only possible by enumerating all id (-bn or -ai), which does not work well when indexing eg 100.000 items of a 2.000.000 DB. Re-indexing everything is also overkill. This patch adds an `--where` flag to misc/search_tools/rebuild_elasticsearch.pl which can take arbitrary SQL (that of course has to match the respective tables) and adds it as an additional param to the resultset to index To test, start koha-testing-docker with ElasticSearch enabled, for example via `ktd --es7 up Before applying the patch, rebuild_elasticsearch will index all data: Biblios: $ misc/search_tools/rebuild_elasticsearch.pl -b -v [12387] Checking state of biblios index [12387] Indexing biblios [12387] Committing final records... [12387] Total 435 records indexed (there might be a waring regarding a broken biblio, which can be ignored) Auth: $ misc/search_tools/rebuild_elasticsearch.pl -a -v [12546] Checking state of authorities index [12546] Indexing authorities [12546] 1000 records processed [12546] Committing final records... [12546] Total 1706 records indexed Now apply the patch Biblio, limit by range of biblioid: $ misc/search_tools/rebuild_elasticsearch.pl -b -v --where "biblionumber between 100 and 150" [12765] Checking state of biblios index [12765] Indexing biblios [12765] Committing final records... [12765] Total 50 records indexed Note that only 50 records where indexed (instead of the whole set of 435 records) Auth, limit by authtypecode: $ misc/search_tools/rebuild_elasticsearch.pl -a -v --where "authtypecode = 'GEOGR_NAME'" [12848] Checking state of authorities index [12848] Indexing authorities [12848] Committing final records... [12848] Total 142 records indexed Again, only 142 have been indexed. Sponsored-by: Steiermärkische Landesbibliothek Sponsored-by: HKS3 / koha-support.eu Signed-off-by: David Nind <david@davidnind.com> Signed-off-by: Nick Clemens <nick@bywatersolutions.com> Signed-off-by: Katrin Fischer <katrin.fischer@bsz-bw.de>
387 lines
11 KiB
Perl
Executable file
387 lines
11 KiB
Perl
Executable file
#!/usr/bin/perl
|
|
|
|
# This inserts records from a Koha database into elastic search
|
|
|
|
# Copyright 2014 Catalyst IT
|
|
#
|
|
# This file is part of Koha.
|
|
#
|
|
# Koha is free software; you can redistribute it and/or modify it
|
|
# under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Koha is distributed in the hope that it will be useful, but
|
|
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Koha; if not, see <http://www.gnu.org/licenses>.
|
|
|
|
=head1 NAME
|
|
|
|
rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
|
|
|
|
=head1 SYNOPSIS
|
|
|
|
B<rebuild_elasticsearch.pl>
|
|
[B<-c|--commit>=C<count>]
|
|
[B<-d|--delete>]
|
|
[B<-r|--reset>]
|
|
[B<-a|--authorities>]
|
|
[B<-b|--biblios>]
|
|
[B<--desc>]
|
|
[B<-bn|--bnumber>]
|
|
[B<-ai|--authid>]
|
|
[B<-w|--where SQL>]
|
|
[B<-p|--processes>]
|
|
[B<-v|--verbose>]
|
|
[B<-h|--help>]
|
|
[B<--man>]
|
|
|
|
=head1 DESCRIPTION
|
|
|
|
Inserts records from a Koha database into Elasticsearch.
|
|
|
|
=head1 OPTIONS
|
|
|
|
=over
|
|
|
|
=item B<-c|--commit>=C<count>
|
|
|
|
Specify how many records will be batched up before they're added to Elasticsearch.
|
|
Higher should be faster, but will cause more RAM usage. Default is 5000.
|
|
|
|
=item B<-d|--delete>
|
|
|
|
Delete the index and recreate it before indexing.
|
|
|
|
=item B<-r|--reset>
|
|
|
|
Reload mappings from files (specified in koha-conf.xml) before indexing.
|
|
Implies --delete.
|
|
|
|
=item B<-a|--authorities>
|
|
|
|
Index the authorities only. Combining this with B<-b> is the same as
|
|
specifying neither and so both get indexed.
|
|
|
|
=item B<-b|--biblios>
|
|
|
|
Index the biblios only. Combining this with B<-a> is the same as
|
|
specifying neither and so both get indexed.
|
|
|
|
=item B<--desc>
|
|
|
|
Index the records in descending id order. Intended to index newer record before older records.
|
|
Default is to index in ascending order.
|
|
Does not work with --bnumber or --authid
|
|
|
|
=item B<-bn|--bnumber>
|
|
|
|
Only index the supplied biblionumber, mostly for testing purposes. May be
|
|
repeated.
|
|
|
|
=item B<-ai|--authid>
|
|
|
|
Only index the supplied authority id, mostly for testing purposes. May be
|
|
repeated.
|
|
|
|
=item B<-w|--where>
|
|
|
|
Pass some additional SQL to limit the records to be indexed.
|
|
|
|
=item B<-p|--processes>
|
|
|
|
Number of processes to use for indexing. This can be used to do more indexing
|
|
work in parallel on multicore systems. By default, a single process is used.
|
|
|
|
=item B<-v|--verbose>
|
|
|
|
By default, this program only emits warnings and errors. This makes it talk
|
|
more. Add more to make it even more wordy, in particular when debugging.
|
|
|
|
=item B<-h|--help>
|
|
|
|
Help!
|
|
|
|
=item B<--man>
|
|
|
|
Full documentation.
|
|
|
|
=back
|
|
|
|
=head1 IMPLEMENTATION
|
|
|
|
=cut
|
|
|
|
use autodie;
|
|
use Getopt::Long qw( GetOptions );
|
|
use Koha::Script;
|
|
use C4::Context;
|
|
use Koha::MetadataRecord::Authority;
|
|
use Koha::BiblioUtils;
|
|
use Koha::SearchEngine::Elasticsearch;
|
|
use Koha::SearchEngine::Elasticsearch::Indexer;
|
|
use MARC::Field;
|
|
use Modern::Perl;
|
|
use Pod::Usage qw( pod2usage );
|
|
use Try::Tiny qw( catch try );
|
|
|
|
my $verbose = 0;
|
|
my $commit = 5000;
|
|
my ($delete, $reset, $help, $man, $processes);
|
|
my ($index_biblios, $index_authorities);
|
|
my (@biblionumbers,@authids,$where);
|
|
my $desc;
|
|
|
|
$|=1; # flushes output
|
|
|
|
GetOptions(
|
|
'c|commit=i' => \$commit,
|
|
'd|delete' => \$delete,
|
|
'r|reset' => \$reset,
|
|
'a|authorities' => \$index_authorities,
|
|
'b|biblios' => \$index_biblios,
|
|
'desc' => \$desc,
|
|
'bn|bnumber=i' => \@biblionumbers,
|
|
'ai|authid=i' => \@authids,
|
|
'w|where=s' => \$where,
|
|
'p|processes=i' => \$processes,
|
|
'v|verbose+' => \$verbose,
|
|
'h|help' => \$help,
|
|
'man' => \$man,
|
|
);
|
|
|
|
# Default is to do both
|
|
unless ($index_authorities || $index_biblios) {
|
|
$index_authorities = $index_biblios = 1;
|
|
}
|
|
|
|
if ($processes && ( @biblionumbers || @authids) ) {
|
|
die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
|
|
}
|
|
|
|
pod2usage(1) if $help;
|
|
pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
|
|
|
|
_sanity_check();
|
|
|
|
if ($reset){
|
|
Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
|
|
$delete = 1;
|
|
}
|
|
|
|
_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
|
|
_verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
|
|
|
|
my $slice_index = 0;
|
|
my $slice_count = ( $processes //= 1 );
|
|
my %iterator_options;
|
|
|
|
if ($slice_count > 1) {
|
|
# Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
|
|
$slice_index = 0;
|
|
for (my $proc = 1; $proc < $slice_count; $proc++) {
|
|
my $pid = fork();
|
|
die "Failed to fork a child process\n" unless defined $pid;
|
|
if ($pid == 0) {
|
|
# Child process, give it a slice to process
|
|
$slice_index = $proc;
|
|
last;
|
|
}
|
|
}
|
|
# Fudge the commit count a bit to spread out the Elasticsearch commits
|
|
$commit *= 1 + 0.10 * $slice_index;
|
|
$commit = int( $commit );
|
|
_log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
|
|
$iterator_options{slice} = { index => $slice_index, count => $slice_count };
|
|
}
|
|
|
|
if( $desc ){
|
|
$iterator_options{desc} = 1;
|
|
}
|
|
|
|
if ($where) {
|
|
$iterator_options{where} = $where;
|
|
}
|
|
|
|
my $next;
|
|
if ($index_biblios) {
|
|
_log(1, "Indexing biblios\n");
|
|
if (@biblionumbers) {
|
|
$next = sub {
|
|
my $r = shift @biblionumbers;
|
|
return () unless defined $r;
|
|
return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
|
|
};
|
|
} else {
|
|
my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
|
|
$next = sub {
|
|
$records->next();
|
|
}
|
|
}
|
|
_do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
|
|
}
|
|
if ($index_authorities) {
|
|
_log(1, "Indexing authorities\n");
|
|
if (@authids) {
|
|
$next = sub {
|
|
my $r = shift @authids;
|
|
return () unless defined $r;
|
|
my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
|
|
return ($r, $a);
|
|
};
|
|
} else {
|
|
my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
|
|
$next = sub {
|
|
$records->next();
|
|
}
|
|
}
|
|
_do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
|
|
}
|
|
|
|
if ($slice_index == 0) {
|
|
# Main process, wait for children
|
|
for (my $proc = 1; $proc < $processes; $proc++) {
|
|
wait();
|
|
}
|
|
}
|
|
|
|
=head1 INTERNAL METHODS
|
|
|
|
=head2 _verify_index_state
|
|
|
|
_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
|
|
|
|
Checks the index state and recreates it if requested.
|
|
|
|
=cut
|
|
|
|
sub _verify_index_state {
|
|
my ( $index_name, $recreate ) = @_;
|
|
|
|
_log(1, "Checking state of $index_name index\n");
|
|
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
|
|
|
|
if ($recreate) {
|
|
_log(1, "Dropping and recreating $index_name index\n");
|
|
$indexer->drop_index() if $indexer->index_exists();
|
|
$indexer->create_index();
|
|
}
|
|
elsif (!$indexer->index_exists) {
|
|
# Create index if does not exist
|
|
$indexer->create_index();
|
|
} elsif ($indexer->is_index_status_ok) {
|
|
# Update mapping unless index is some kind of problematic state
|
|
$indexer->update_mappings();
|
|
} elsif ($indexer->is_index_status_recreate_required) {
|
|
warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
|
|
}
|
|
}
|
|
|
|
=head2 _do_reindex
|
|
|
|
_do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
|
|
|
|
Does the actual reindexing. $callback is a function that always returns the next record.
|
|
For each index we iterate through the records, committing at specified count
|
|
|
|
=cut
|
|
|
|
sub _do_reindex {
|
|
my ( $next, $index_name ) = @_;
|
|
|
|
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
|
|
|
|
my $count = 0;
|
|
my $commit_count = $commit;
|
|
my ( @id_buffer, @commit_buffer );
|
|
while ( my $record = $next->() ) {
|
|
my $id = $record->id // $record->authid;
|
|
my $record = $record->record;
|
|
$count++;
|
|
if ( $verbose == 1 ) {
|
|
_log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
|
|
} else {
|
|
_log( 2, "$id\n" );
|
|
}
|
|
|
|
push @id_buffer, $id;
|
|
push @commit_buffer, $record;
|
|
if ( !( --$commit_count ) ) {
|
|
_log( 1, "Committing $commit records...\n" );
|
|
my $response;
|
|
try{
|
|
$response = $indexer->update_index( \@id_buffer, \@commit_buffer );
|
|
_handle_response($response);
|
|
_log( 1, "Commit complete\n" );
|
|
} catch {
|
|
_log(1,"Elasticsearch exception thrown: ".$_->type."\n");
|
|
_log(2,"Details: ".$_->details."\n");
|
|
};
|
|
$commit_count = $commit;
|
|
@id_buffer = ();
|
|
@commit_buffer = ();
|
|
}
|
|
}
|
|
|
|
# There are probably uncommitted records
|
|
_log( 1, "Committing final records...\n" );
|
|
my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
|
|
_handle_response($response);
|
|
_log( 1, "Total $count records indexed\n" );
|
|
}
|
|
|
|
=head2 _sanity_check
|
|
|
|
_sanity_check();
|
|
|
|
Checks some basic stuff to ensure that it's sane before we start.
|
|
|
|
=cut
|
|
|
|
sub _sanity_check {
|
|
# Do we have an elasticsearch block defined?
|
|
my $conf = C4::Context->config('elasticsearch');
|
|
die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
|
|
}
|
|
|
|
=head2 _handle_response
|
|
|
|
Parse the return from update_index and display errors depending on verbosity of the script
|
|
|
|
=cut
|
|
|
|
sub _handle_response {
|
|
my ($response) = @_;
|
|
if( $response->{errors} eq 'true' ){
|
|
_log( 1, "There were errors during indexing\n" );
|
|
if ( $verbose > 1 ){
|
|
foreach my $item (@{$response->{items}}){
|
|
next unless defined $item->{index}->{error};
|
|
print "Record #" . $item->{index}->{_id} . " " .
|
|
$item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
|
|
$item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
=head2 _log
|
|
|
|
_log($level, "Message\n");
|
|
|
|
Output progress information.
|
|
|
|
Will output the message if verbosity level is set to $level or more. Will not
|
|
include a trailing newline automatically.
|
|
|
|
=cut
|
|
|
|
sub _log {
|
|
my ($level, $msg) = @_;
|
|
|
|
print "[$$] $msg" if ($verbose >= $level);
|
|
}
|