Browse Source

Bug 19893: Alternative optimized indexing for Elasticsearch

Implement optimized indexing for Elasticsearch

How to test:
1) Time a full elasticsearch re-index without this patch by running the with the -d flag:
   `koha-shell <instance_name> -c "time -d"`.
2) Apply this patch.
3) Time a full re-index again, it should be about twice at fast (for a
   couple of thousand biblios, with fewer biblios results may be more

Sponsored-by: Gothenburg University Library
Signed-off-by: Ere Maijala <>
Signed-off-by: Martin Renvoize <>

Signed-off-by: Nick Clemens <>
David Gustafsson 6 years ago
committed by Nick Clemens
  1. 228
  2. 137
  3. 63
  4. 4
  5. 8
  6. 7
  7. 240
  8. 164
  9. 113


@ -34,6 +34,12 @@ use Search::Elasticsearch;
use Try::Tiny;
use YAML::Syck;
use List::Util qw( sum0 reduce );
use Search::Elasticsearch;
use MARC::File::XML;
use MIME::Base64;
use Encode qw(encode);
__PACKAGE__->mk_ro_accessors(qw( index ));
__PACKAGE__->mk_accessors(qw( sort_fields ));
@ -67,6 +73,19 @@ sub new {
return $self;
sub get_elasticsearch {
my $self = shift @_;
unless (defined $self->{elasticsearch}) {
my $conf = $self->get_elasticsearch_params();
$self->{elasticsearch} = Search::Elasticsearch->new(
client => "5_0::Direct",
nodes => $conf->{nodes},
cxn_pool => 'Sniff'
return $self->{elasticsearch};
=head2 get_elasticsearch_params
my $params = $self->get_elasticsearch_params();
@ -281,48 +300,205 @@ sub sort_fields {
return $self->_sort_fields_accessor();
# Provides the rules for data conversion.
sub get_fixer_rules {
sub marc_records_to_documents {
my ($self, $records) = @_;
my $rules = $self->get_marc_mapping_rules();
my $control_fields_rules = $rules->{control_fields};
my $data_fields_rules = $rules->{data_fields};
my $marcflavour = lc C4::Context->preference('marcflavour');
my $serialization_format = C4::Context->preference('ElasticsearchMARCSerializationFormat');
my @record_documents;
sub _process_mappings {
my ($mappings, $data, $record_document) = @_;
foreach my $mapping (@{$mappings}) {
my ($target, $options) = @{$mapping};
# Copy (scalar) data since can have multiple targets
# with differing options for (possibly) mutating data
# so need a different copy for each
my $_data = $data;
$record_document->{$target} //= [];
if (defined $options->{substr}) {
my ($start, $length) = @{$options->{substr}};
$_data = length($data) > $start ? substr $data, $start, $length : '';
if (defined $options->{value_callbacks}) {
$_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
if (defined $options->{property}) {
$_data = {
$options->{property} => $_data
push @{$record_document->{$target}}, $_data;
foreach my $record (@{$records}) {
my $record_document = {};
my $mappings = $rules->{leader};
if ($mappings) {
_process_mappings($mappings, $record->leader(), $record_document);
foreach my $field ($record->fields()) {
if($field->is_control_field()) {
my $mappings = $control_fields_rules->{$field->tag()};
if ($mappings) {
_process_mappings($mappings, $field->data(), $record_document);
else {
my $subfields_mappings = $data_fields_rules->{$field->tag()};
if ($subfields_mappings) {
my $wildcard_mappings = $subfields_mappings->{'*'};
foreach my $subfield ($field->subfields()) {
my ($code, $data) = @{$subfield};
my $mappings = $subfields_mappings->{$code} // [];
if ($wildcard_mappings) {
$mappings = [@{$mappings}, @{$wildcard_mappings}];
if (@{$mappings}) {
_process_mappings($mappings, $data, $record_document);
foreach my $field (keys %{$rules->{defaults}}) {
unless (defined $record_document->{$field}) {
$record_document->{$field} = $rules->{defaults}->{$field};
foreach my $field (@{$rules->{sum}}) {
if (defined $record_document->{$field}) {
# TODO: validate numeric? filter?
# TODO: Or should only accept fields without nested values?
# TODO: Quick and dirty, improve if needed
$record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
# TODO: Perhaps should check if $records_document non empty, but really should never be the case
if ($serialization_format eq 'base64ISO2709') {
$record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
else {
$record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
my $id = $record->subfield('999', 'c');
push @record_documents, [$id, $record_document];
return \@record_documents;
# Provides the rules for marc to Elasticsearch JSON document conversion.
sub get_marc_mapping_rules {
my ($self) = @_;
my $marcflavour = lc C4::Context->preference('marcflavour');
my @rules;
sub {
sub _field_mappings {
my ($facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
my %mapping_defaults = ();
my @mappings;
my $substr_args = undef;
if ($range) {
# TODO: use value_callback instead?
my ($start, $end) = map(int, split /-/, $range, 2);
$substr_args = [$start];
push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
my $default_options = {};
if ($substr_args) {
$default_options->{substr} = $substr_args;
# TODO: Should probably have per type value callback/hook
# but hard code for now
if ($target_type eq 'boolean') {
$default_options->{value_callbacks} //= [];
push @{$default_options->{value_callbacks}}, sub {
my ($value) = @_;
# Trim whitespace at both ends
$value =~ s/^\s+|\s+$//g;
return $value ? 'true' : 'false';
my $mapping = [$target_name, $default_options];
push @mappings, $mapping;
my @suffixes = ();
push @suffixes, 'facet' if $facet;
push @suffixes, 'suggestion' if $suggestible;
push @suffixes, 'sort' if !defined $sort || $sort;
foreach my $suffix (@suffixes) {
my $mapping = ["${target_name}__$suffix"];
# Hack, fix later in less hideous manner
if ($suffix eq 'suggestion') {
push @{$mapping}, {%{$default_options}, property => 'input'};
else {
push @{$mapping}, $default_options;
push @mappings, $mapping;
return @mappings;
my $field_spec_regexp = qr/^([0-9]{3})([0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
my $rules = {
'leader' => [],
'control_fields' => {},
'data_fields' => {},
'sum' => [],
'defaults' => {}
$self->_foreach_mapping(sub {
my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
return if $marc_type ne $marcflavour;
my $options ='';
push @rules, "marc_map('$marc_field','${name}.\$append', $options)";
if ($facet) {
push @rules, "marc_map('$marc_field','${name}__facet.\$append', $options)";
if ($type eq 'sum') {
push @{$rules->{sum}}, $name;
if ($suggestible) {
push @rules,
#"marc_map('$marc_field','${name}__suggestion.input.\$append', '')"; #must not have nested data structures in .input
elsif($type eq 'boolean') {
# boolean gets special handling, if value doesn't exist for a field,
# it is set to false
$rules->{defaults}->{$name} = 'false';
if ( $type eq 'boolean' ) {
# boolean gets special handling, basically if it doesn't exist,
# it's added and set to false. Otherwise we can't query it.
push @rules,
"unless exists('$name') add_field('$name', 0) end";
if ($marc_field =~ $field_spec_regexp) {
my $field_tag = $1;
my $subfields = defined $2 ? $2 : '*';
my $range = defined $3 ? $3 : undef;
if ($field_tag < 10) {
$rules->{control_fields}->{$field_tag} //= [];
my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
push @{$rules->{control_fields}->{$field_tag}}, @mappings;
if ($type eq 'sum' ) {
push @rules, "sum('$name')";
else {
$rules->{data_fields}->{$field_tag} //= {};
foreach my $subfield (split //, $subfields) {
$rules->{data_fields}->{$field_tag}->{$subfield} //= [];
my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
push @{$rules->{data_fields}->{$field_tag}->{$subfield}}, @mappings;
if ($self->sort_fields()->{$name}) {
if ($sort || !defined $sort) {
push @rules, "marc_map('$marc_field','${name}__sort.\$append', $options)";
elsif ($marc_field =~ $leader_regexp) {
my $range = defined $1 ? $1 : undef;
my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
push @{$rules->{leader}}, @mappings;
push @rules, "move_field(_id,es_id)"; #Also you must set the Catmandu::Store::ElasticSearch->new(key_prefix: 'es_');
return \@rules;
else {
die("Invalid marc field: $marc_field");
return $rules;
=head2 _foreach_mapping


@ -66,24 +66,64 @@ sub update_index {
$self->_sanitise_records($biblionums, $records);
my $from = $self->_convert_marc_to_json($records);
if ( !$self->store ) {
my $params = $self->get_elasticsearch_params();
index_settings => $self->get_elasticsearch_settings(),
index_mappings => $self->get_elasticsearch_mappings(),
return 1;
#print Data::Dumper::Dumper( $from->to_array );
sub bulk_index {
my ($self, $records) = @_;
my $conf = $self->get_elasticsearch_params();
my $elasticsearch = $self->get_elasticsearch();
my $documents = $self->marc_records_to_documents($records);
my @body;
foreach my $document_info (@{$documents}) {
my ($id, $document) = @{$document_info};
push @body, {
index => {
_id => $id
push @body, $document;
if (@body) {
my $response = $elasticsearch->bulk(
index => $conf->{index_name},
type => 'data', # is just hard coded in
body => \@body
# TODO: handle response
return 1;
sub ensure_mappings_updated {
my ($self) = @_;
unless ($self->{_mappings_updated}) {
sub update_mappings {
my ($self) = @_;
my $conf = $self->get_elasticsearch_params();
my $elasticsearch = $self->get_elasticsearch();
my $mappings = $self->get_elasticsearch_mappings();
foreach my $type (keys %{$mappings}) {
my $response = $elasticsearch->indices->put_mapping(
index => $conf->{index_name},
type => $type,
body => {
$type => $mappings->{$type}
# TODO: process response, produce errors etc
$self->{_mappings_updated} = 1;
=head2 $indexer->update_index_background($biblionums, $records)
This has exactly the same API as C<update_index_background> however it'll
@ -139,28 +179,6 @@ sub delete_index_background {
=head2 $indexer->create_index();
Create an index on the Elasticsearch server.
sub create_index {
my ($self) = @_;
if (!$self->store) {
my $params = $self->get_elasticsearch_params();
index_settings => $self->get_elasticsearch_settings(),
index_mappings => $self->get_elasticsearch_mappings(),
=head2 $indexer->drop_index();
Drops the index from the elasticsearch server. Calling C<update_index>
@ -170,22 +188,35 @@ after this will recreate it again.
sub drop_index {
my ($self) = @_;
if ($self->index_exists) {
my $conf = $self->get_elasticsearch_params();
my $elasticsearch = $self->get_elasticsearch();
my $response = $elasticsearch->indices->delete(index => $conf->{index_name});
# TODO: Handle response? Convert errors to exceptions/die
if (!$self->store) {
# If this index doesn't exist, this will create it. Then it'll be
# deleted. That's not the end of the world however.
my $params = $self->get_elasticsearch_params();
index_settings => $self->get_elasticsearch_settings(),
index_mappings => $self->get_elasticsearch_mappings(),
sub create_index {
my ($self) = @_;
my $conf = $self->get_elasticsearch_params();
my $settings = $self->get_elasticsearch_settings();
my $elasticsearch = $self->get_elasticsearch();
my $response = $elasticsearch->indices->create(
index => $conf->{index_name},
body => {
settings => $settings
# TODO: Handle response? Convert errors to exceptions/die
my $store = $self->store;
sub index_exists {
my ($self) = @_;
my $conf = $self->get_elasticsearch_params();
my $elasticsearch = $self->get_elasticsearch();
return $elasticsearch->indices->exists(
index => $conf->{index_name},
sub _sanitise_records {
@ -209,16 +240,6 @@ sub _sanitise_records {
sub _convert_marc_to_json {
my $self = shift;
my $records = shift;
my $importer =
Catmandu::Importer::MARC->new( records => $records, id => '999c' );
my $fixer = Catmandu::Fix->new( fixes => $self->get_fixer_rules() );
$importer = $fixer->fix($importer);
return $importer;


@ -49,9 +49,10 @@ use Koha::SearchEngine::QueryBuilder;
use Koha::SearchEngine::Search;
use MARC::Record;
use Catmandu::Store::ElasticSearch;
use MARC::File::XML;
use Data::Dumper; #TODO remove
use Carp qw(cluck);
use MIME::Base64;
Koha::SearchEngine::Elasticsearch::Search->mk_accessors(qw( store ));
@ -157,14 +158,12 @@ sub search_compat {
my $results = $self->search($query, undef, $results_per_page, %options);
# Convert each result into a MARC::Record
my (@records, $index);
$index = $offset; # opac-search expects results to be put in the
my @records;
# opac-search expects results to be put in the
# right place in the array, according to $offset
my $index = $offset;
$results->each(sub {
# The results come in an array for some reason
my $marc_json = $_[0]->{record};
my $marc = $self->json2marc($marc_json);
$records[$index++] = $marc;
$records[$index++] = $self->decode_record_from_result(@_);
# consumers of this expect a name-spaced result, we provide the default
# configuration.
@ -196,14 +195,14 @@ sub search_auth_compat {
sub {
my %result;
my $record = $_[0];
my $marc_json = $record->{record};
# I wonder if these should be real values defined in the mapping
# rather than hard-coded conversions.
my $record = $_[0];
# Handle legacy nested arrays indexed with splitting enabled.
my $authid = $record->{ 'Local-number' }[0];
$authid = @$authid[0] if (ref $authid eq 'ARRAY');
$result{authid} = $authid;
# TODO put all this info into the record at index time so we
@ -219,7 +218,7 @@ sub search_auth_compat {
# it's not reproduced here yet.
my $authtype = $rs->single;
my $auth_tag_to_report = $authtype ? $authtype->auth_tag_to_report : "";
my $marc = $self->json2marc($marc_json);
my $marc = $self->decode_record_from_result(@_);
my $mainentry = $marc->field($auth_tag_to_report);
my $reported_tag;
if ($mainentry) {
@ -338,9 +337,7 @@ sub simple_search_compat {
my $results = $self->search($query, undef, $max_results, %options);
my @records;
$results->each(sub {
# The results come in an array for some reason
my $marc_json = $_[0]->{record};
my $marc = $self->json2marc($marc_json);
my $marc = $self->decode_record_from_result(@_);
push @records, $marc;
return (undef, \@records, $results->total);
@ -361,43 +358,23 @@ sub extract_biblionumber {
return Koha::SearchEngine::Search::extract_biblionumber( $searchresultrecord );
=head2 json2marc
=head2 decode_record_from_result
my $marc_record = $self->decode_record_from_result(@result);
my $marc = $self->json2marc($marc_json);
Converts the form of marc (based on its JSON, but as a Perl structure) that
Catmandu stores into a MARC::Record object.
Extracts marc data from Elasticsearch result and decodes to MARC::Record object
sub json2marc {
my ( $self, $marcjson ) = @_;
my $marc = MARC::Record->new();
# fields are like:
# [ '245', '1', '2', 'a' => 'Title', 'b' => 'Subtitle' ]
# or
# [ '001', undef, undef, '_', 'a value' ]
# conveniently, this is the form that MARC::Field->new() likes
foreach my $field (@$marcjson) {
next if @$field < 5;
if ( $field->[0] eq 'LDR' ) {
$marc->leader( $field->[4] );
sub decode_record_from_result {
# Result is passed in as array, will get flattened
# and first element will be $result
my ( $self, $result ) = @_;
if (C4::Context->preference('ElasticsearchMARCSerializationFormat') eq 'MARCXML') {
return MARC::Record->new_from_xml($result->{marc_data}, 'UTF-8', uc C4::Context->preference('marcflavour'));
else {
my $tag = $field->[0];
my $marc_field;
if ( MARC::Field->is_controlfield_tag( $field->[0] ) ) {
$marc_field = MARC::Field->new($field->[0], $field->[4]);
} else {
$marc_field = MARC::Field->new(@$field);
return MARC::Record->new_from_usmarc(decode_base64($result->{marc_data}));
return $marc;
=head2 max_result_window


@ -5,9 +5,11 @@ general:
type: string
analyzer: analyser_standard
store: true
type: text
analyzer: keyword
index: false
# Search fields


@ -427,3 +427,11 @@ Administration:
Zebra: Zebra
Elasticsearch: Elasticsearch
- "Use"
- pref: ElasticsearchMARCSerializationFormat
default: MARCXML
base64ISO2709: base64ISO2709
- "as serialization format for MARC records stored in Elasticsearch index. base64ISO2709 is faster and will use less space but have a maximum record length which could cause issues with very large records."


@ -160,8 +160,13 @@ sub do_reindex {
my ( $next, $index_name ) = @_;
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
if ($delete) {
$indexer->drop_index() if $indexer->index_exists();
elsif (!$indexer->index_exists()) {
# Create index if does not exist


@ -17,11 +17,15 @@
use Modern::Perl;
use Test::More tests => 3;
use Test::More tests => 4;
use Test::Exception;
use t::lib::Mocks;
use Test::MockModule;
use MARC::Record;
use Koha::SearchEngine::Elasticsearch;
subtest '_read_configuration() tests' => sub {
@ -108,3 +112,237 @@ subtest 'get_elasticsearch_mappings() tests' => sub {
$mappings = $es->get_elasticsearch_mappings();
is( $mappings->{data}{_all}{type}, 'string', 'Field mappings parsed correctly' );
subtest 'Koha::SearchEngine::Elasticsearch::marc_records_to_documents () tests' => sub {
plan tests => 29;
t::lib::Mocks::mock_preference('marcflavour', 'MARC21');
my @mappings = (
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => undef,
marc_type => 'marc21',
marc_field => '100a',
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => 1,
marc_type => 'marc21',
marc_field => '110a',
name => 'title',
type => 'string',
facet => 0,
suggestible => 1,
sort => 1,
marc_type => 'marc21',
marc_field => '245a',
name => 'unimarc_title',
type => 'string',
facet => 0,
suggestible => 1,
sort => 1,
marc_type => 'unimarc',
marc_field => '245a',
name => 'title',
type => 'string',
facet => 0,
suggestible => undef,
sort => 0,
marc_type => 'marc21',
marc_field => '220',
name => 'sum_item_price',
type => 'sum',
facet => 0,
suggestible => 0,
sort => 0,
marc_type => 'marc21',
marc_field => '952g',
name => 'items_withdrawn_status',
type => 'boolean',
facet => 0,
suggestible => 0,
sort => 0,
marc_type => 'marc21',
marc_field => '9520',
name => 'type_of_record',
type => 'string',
facet => 0,
suggestible => 0,
sort => 0,
marc_type => 'marc21',
marc_field => 'leader_/6',
name => 'type_of_record_and_bib_level',
type => 'string',
facet => 0,
suggestible => 0,
sort => 0,
marc_type => 'marc21',
marc_field => 'leader_/6-7',
my $se = Test::MockModule->new('Koha::SearchEngine::Elasticsearch');
$se->mock('_foreach_mapping', sub {
my ($self, $sub) = @_;
foreach my $map (@mappings) {
my $see = Koha::SearchEngine::Elasticsearch->new({ index => 'biblios' });
my $marc_record_1 = MARC::Record->new();
$marc_record_1->leader(' cam 22 a 4500');
MARC::Field->new('100', '', '', a => 'Author 1'),
MARC::Field->new('110', '', '', a => 'Corp Author'),
MARC::Field->new('210', '', '', a => 'Title 1'),
MARC::Field->new('245', '', '', a => 'Title: first record'),
MARC::Field->new('999', '', '', c => '1234567'),
# ' ' for testing trimming of white space in boolean value callback:
MARC::Field->new('952', '', '', 0 => ' ', g => '123.30'),
MARC::Field->new('952', '', '', 0 => 0, g => '127.20'),
my $marc_record_2 = MARC::Record->new();
$marc_record_2->leader(' cam 22 a 4500');
MARC::Field->new('100', '', '', a => 'Author 2'),
# MARC::Field->new('210', '', '', a => 'Title 2'),
# MARC::Field->new('245', '', '', a => 'Title: second record'),
MARC::Field->new('999', '', '', c => '1234568'),
MARC::Field->new('952', '', '', 0 => 1, g => 'string where should be numeric'),
my $records = [$marc_record_1, $marc_record_2];
$see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
my $docs = $see->marc_records_to_documents($records);
# First record:
is(scalar @{$docs}, 2, 'Two records converted to documents');
is($docs->[0][0], '1234567', 'First document biblionumber should be set as first element in document touple');
is(scalar @{$docs->[0][1]->{author}}, 2, 'First document author field should contain two values');
is_deeply($docs->[0][1]->{author}, ['Author 1', 'Corp Author'], 'First document author field should be set correctly');
is(scalar @{$docs->[0][1]->{author__sort}}, 2, 'First document author__sort field should have two values');
is_deeply($docs->[0][1]->{author__sort}, ['Author 1', 'Corp Author'], 'First document author__sort field should be set correctly');
is(scalar @{$docs->[0][1]->{title__sort}}, 1, 'First document title__sort field should have one value');
is_deeply($docs->[0][1]->{title__sort}, ['Title: first record'], 'First document title__sort field should be set correctly');
is(scalar @{$docs->[0][1]->{author__suggestion}}, 2, 'First document author__suggestion field should contain two values');
'input' => 'Author 1'
'input' => 'Corp Author'
'First document author__suggestion field should be set correctly'
is(scalar @{$docs->[0][1]->{title__suggestion}}, 1, 'First document title__suggestion field should contain one value');
[{ 'input' => 'Title: first record' }],
'First document title__suggestion field should be set correctly'
ok(!(defined $docs->[0][1]->{title__facet}), 'First document should have no title__facet field');
is(scalar @{$docs->[0][1]->{author__facet}}, 2, 'First document author__facet field should have two values');
['Author 1', 'Corp Author'],
'First document author__facet field should be set correctly'
is(scalar @{$docs->[0][1]->{items_withdrawn_status}}, 2, 'First document items_withdrawn_status field should have two values');
['false', 'false'],
'First document items_withdrawn_status field should be set correctly'
'First document sum_item_price field should be set correctly'
ok(defined $docs->[0][1]->{marc_xml}, 'First document marc_xml field should be set');
is(scalar @{$docs->[0][1]->{type_of_record}}, 1, 'First document type_of_record field should have one value');
'First document type_of_record field should be set correctly'
is(scalar @{$docs->[0][1]->{type_of_record_and_bib_level}}, 1, 'First document type_of_record_and_bib_level field should have one value');
'First document type_of_record_and_bib_level field should be set correctly'
# Second record:
is(scalar @{$docs->[1][1]->{author}}, 1, 'Second document author field should contain one value');
is_deeply($docs->[1][1]->{author}, ['Author 2'], 'Second document author field should be set correctly');
is(scalar @{$docs->[1][1]->{items_withdrawn_status}}, 1, 'Second document items_withdrawn_status field should have one value');
'Second document items_withdrawn_status field should be set correctly'
'Second document sum_item_price field should be set correctly'
# Mappings marc_type:
ok(!(defined $docs->[0][1]->{unimarc_title}), "No mapping when marc_type doesn't match marc flavour");


@ -1,164 +0,0 @@
# Copyright 2015 Catalyst IT
# This file is part of Koha.
# Koha is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# Koha is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Koha; if not, see <>
use Modern::Perl;
use Test::More tests => 2;
use Test::MockModule;
use t::lib::Mocks;
use MARC::Record;
my $schema = Koha::Database->schema;
subtest 'get_fixer_rules() tests' => sub {
plan tests => 49;
t::lib::Mocks::mock_preference( 'marcflavour', 'MARC21' );
my @mappings;
my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
$se->mock( '_foreach_mapping', sub {
my ($self, $sub ) = @_;
foreach my $map ( @mappings ) {
my $see = Koha::SearchEngine::Elasticsearch->new({ index => 'biblios' });
@mappings = (
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => undef,
marc_type => 'marc21',
marc_field => '100a',
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => 1,
marc_type => 'marc21',
marc_field => '110a',
$see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
my $result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
is( $result->[3], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
is( $result->[4], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[8], q{move_field(_id,es_id)});
$mappings[0]->{type} = 'boolean';
$mappings[1]->{type} = 'boolean';
$result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
is( $result->[3], q{unless exists('} . $mappings[0]->{name} . q{') add_field('} . $mappings[0]->{name} . q{', 0) end});
is( $result->[4], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
is( $result->[8], q{unless exists('} . $mappings[1]->{name} . q{') add_field('} . $mappings[1]->{name} . q{', 0) end});
is( $result->[9], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[10], q{move_field(_id,es_id)});
$mappings[0]->{type} = 'sum';
$mappings[1]->{type} = 'sum';
$result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
is( $result->[3], q{sum('} . $mappings[0]->{name} . q{')});
is( $result->[4], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
is( $result->[8], q{sum('} . $mappings[1]->{name} . q{')});
is( $result->[9], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[10], q{move_field(_id,es_id)});
$mappings[0]->{type} = 'string';
$mappings[0]->{facet} = 0;
$mappings[1]->{type} = 'string';
$mappings[1]->{facet} = 0;
$result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
is( $result->[3], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[4], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[6], q{move_field(_id,es_id)});
$mappings[0]->{suggestible} = 0;
$mappings[1]->{suggestible} = 0;
$result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
is( $result->[2], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[3], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[4], q{move_field(_id,es_id)});
$mappings[0]->{sort} = 0;
$mappings[1]->{sort} = undef;
$see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
$result = $see->get_fixer_rules();
is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
is( $result->[1], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
is( $result->[2], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
is( $result->[3], q{move_field(_id,es_id)});
t::lib::Mocks::mock_preference( 'marcflavour', 'UNIMARC' );
$result = $see->get_fixer_rules();
is( $result->[0], q{move_field(_id,es_id)});
is( $result->[1], undef, q{No mapping when marc_type doesn't match marchflavour} );


@ -17,7 +17,7 @@
use Modern::Perl;
use Test::More tests => 7;
use Test::More tests => 3;
use Test::MockModule;
use t::lib::Mocks;
@ -41,12 +41,7 @@ $marc_record->append_fields(
MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
MARC::Field->new( '245', '', '', 'a' => 'Title' )
my $records = [$marc_record];
ok( my $converted = $indexer->_convert_marc_to_json($records),
'Convert some records' );
is( $converted->count, 1, 'One converted record' );
@ -57,109 +52,3 @@ SKIP: {
ok( $indexer->update_index(undef, $records), 'Update Index' );
subtest 'create_index() tests' => sub {
plan tests => 3;
my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
$se->mock( 'get_elasticsearch_params', sub {
my ($self, $sub ) = @_;
my $method = $se->original( 'get_elasticsearch_params' );
my $params = $method->( $self );
$params->{index_name} .= '__test';
return $params;
my $indexer;
$indexer = Koha::SearchEngine::Elasticsearch::Indexer->new({ 'index' => 'biblios' }),
'Creating a new indexer object'
'Creating an index'
'Dropping the index'
subtest '_convert_marc_to_json() tests' => sub {
plan tests => 4;
t::lib::Mocks::mock_preference( 'marcflavour', 'MARC21' );
my @mappings = (
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => '~',
marc_type => 'marc21',
marc_field => '100a',
name => 'author',
type => 'string',
facet => 1,
suggestible => 1,
sort => '~',
marc_type => 'marc21',
marc_field => '110a',
my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
$se->mock( '_foreach_mapping', sub {
my ($self, $sub ) = @_;
foreach my $map ( @mappings ) {
my $marc_record = MARC::Record->new();
MARC::Field->new( '001', '1234567' ),
MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
MARC::Field->new( '100', '', '', 'a' => 'Author' ),
MARC::Field->new( '110', '', '', 'a' => 'Corp Author' ),
MARC::Field->new( '245', '', '', 'a' => 'Title' ),
my $marc_record_2 = MARC::Record->new();
MARC::Field->new( '001', '1234567' ),
MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
MARC::Field->new( '100', '', '', 'a' => 'Author' ),
MARC::Field->new( '245', '', '', 'a' => 'Title' ),
my @records = ( $marc_record, $marc_record_2 );
my $importer = Koha::SearchEngine::Elasticsearch::Indexer->new({ index => 'biblios' })->_convert_marc_to_json( \@records );
my $conv = $importer->next();
is( $conv->{author}[0], "Author", "First mapped author should be 100a");
is( $conv->{author}[1], "Corp Author", "Second mapped author should be 110a");
$conv = $importer->next();
is( $conv->{author}[0], "Author", "First mapped author should be 100a");
is( scalar @{$conv->{author}} , 1, "We should map field only if exists, shouldn't add extra nulls");
