1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
37 use Search::Elasticsearch;
41 use List::Util qw( sum0 reduce );
44 use Encode qw(encode);
46 use Scalar::Util qw(looks_like_number);
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
75 my $self = $class->SUPER::new(@_);
76 # Check for a valid index
77 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
81 =head2 get_elasticsearch
83 my $elasticsearch_client = $self->get_elasticsearch();
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
90 sub get_elasticsearch {
92 unless (defined $self->{elasticsearch}) {
93 my $conf = $self->get_elasticsearch_params();
94 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
96 return $self->{elasticsearch};
99 =head2 get_elasticsearch_params
101 my $params = $self->get_elasticsearch_params();
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
107 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108 'index_name' => 'koha_instance_index',
111 This is configured by the following in the C<config> block in koha-conf.xml:
114 <server>127.0.0.1:9200</server>
115 <server>anotherserver:9200</server>
116 <index_name>koha_instance</index_name>
121 sub get_elasticsearch_params {
124 # Copy the hash so that we're not modifying the original
125 my $conf = C4::Context->config('elasticsearch');
126 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127 my $es = { %{ $conf } };
129 # Helpfully, the multiple server lines end up in an array for us anyway
130 # if there are multiple ones, but not if there's only one.
131 my $server = $es->{server};
132 delete $es->{server};
133 if ( ref($server) eq 'ARRAY' ) {
135 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136 $es->{nodes} = $server;
139 $es->{nodes} = [$server];
142 die "No elasticsearch servers were specified in koha-conf.xml.\n";
144 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
145 if ( !$es->{index_name} );
146 # Append the name of this particular index to our namespace
147 $es->{index_name} .= '_' . $self->index;
149 $es->{key_prefix} = 'es_';
150 $es->{cxn_pool} //= 'Static';
151 $es->{request_timeout} //= 60;
156 =head2 get_elasticsearch_settings
158 my $settings = $self->get_elasticsearch_settings();
160 This provides the settings provided to Elasticsearch when an index is created.
161 These can do things like define tokenization methods.
163 A hashref containing the settings is returned.
167 sub get_elasticsearch_settings {
170 # Use state to speed up repeated calls
171 state $settings = undef;
172 if (!defined $settings) {
173 my $config_file = C4::Context->config('elasticsearch_index_config');
174 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
175 $settings = LoadFile( $config_file );
181 =head2 get_elasticsearch_mappings
183 my $mappings = $self->get_elasticsearch_mappings();
185 This provides the mappings that get passed to Elasticsearch when an index is
190 sub get_elasticsearch_mappings {
193 # Use state to speed up repeated calls
197 if (!defined $all_mappings{$self->index}) {
198 $sort_fields{$self->index} = {};
199 # Clone the general mapping to break ties with the original hash
201 data => clone(_get_elasticsearch_field_config('general', ''))
203 my $marcflavour = lc C4::Context->preference('marcflavour');
204 $self->_foreach_mapping(
206 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
207 return if $marc_type ne $marcflavour;
208 # TODO if this gets any sort of complexity to it, it should
209 # be broken out into its own function.
211 # TODO be aware of date formats, but this requires pre-parsing
212 # as ES will simply reject anything with an invalid date.
213 my $es_type = 'text';
214 if ($type eq 'boolean') {
215 $es_type = 'boolean';
216 } elsif ($type eq 'number' || $type eq 'sum') {
217 $es_type = 'integer';
218 } elsif ($type eq 'isbn' || $type eq 'stdno') {
223 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
227 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
230 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
232 # Sort is a bit special as it can be true, false, undef.
233 # We care about "true" or "undef",
234 # "undef" means to do the default thing, which is make it sortable.
235 if (!defined $sort || $sort) {
236 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
237 $sort_fields{$self->index}{$name} = 1;
241 $all_mappings{$self->index} = $mappings;
243 $self->sort_fields(\%{$sort_fields{$self->index}});
245 return $all_mappings{$self->index};
248 =head2 raw_elasticsearch_mappings
250 Return elasticsearch mapping as it is in database.
251 marc_type: marc21|unimarc|normarc
253 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
257 sub raw_elasticsearch_mappings {
258 my ( $marc_type ) = @_;
260 my $schema = Koha::Database->new()->schema();
262 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
265 while ( my $search_field = $search_fields->next ) {
267 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
268 { search_field_id => $search_field->id },
270 join => 'search_marc_map',
271 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
275 while ( my $marc_to_field = $marc_to_fields->next ) {
277 my $marc_map = $marc_to_field->search_marc_map;
279 next if $marc_type && $marc_map->marc_type ne $marc_type;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
288 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
290 facet => $marc_to_field->facet || '',
291 marc_type => $marc_map->marc_type,
292 marc_field => $marc_map->marc_field,
293 sort => $marc_to_field->sort,
294 suggestible => $marc_to_field->suggestible || ''
303 =head2 _get_elasticsearch_field_config
305 Get the Elasticsearch field config for the given purpose and data type.
307 $mapping = _get_elasticsearch_field_config('search', 'text');
311 sub _get_elasticsearch_field_config {
313 my ( $purpose, $type ) = @_;
315 # Use state to speed up repeated calls
316 state $settings = undef;
317 if (!defined $settings) {
318 my $config_file = C4::Context->config('elasticsearch_field_config');
319 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
320 $settings = LoadFile( $config_file );
323 if (!defined $settings->{$purpose}) {
324 die "Field purpose $purpose not defined in field config";
327 return $settings->{$purpose};
329 if (defined $settings->{$purpose}{$type}) {
330 return $settings->{$purpose}{$type};
332 if (defined $settings->{$purpose}{'default'}) {
333 return $settings->{$purpose}{'default'};
338 =head2 _load_elasticsearch_mappings
340 Load Elasticsearch mappings in the format of mappings.yaml.
342 $indexes = _load_elasticsearch_mappings();
346 sub _load_elasticsearch_mappings {
347 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
348 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
349 return LoadFile( $mappings_yaml );
352 sub reset_elasticsearch_mappings {
354 my $indexes = $self->_load_elasticsearch_mappings();
356 Koha::SearchMarcMaps->delete;
357 Koha::SearchFields->delete;
359 while ( my ( $index_name, $fields ) = each %$indexes ) {
360 while ( my ( $field_name, $data ) = each %$fields ) {
362 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
365 $sf_params{staff_client} //= 1;
366 $sf_params{opac} //= 1;
368 $sf_params{name} = $field_name;
370 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
372 my $mappings = $data->{mappings};
373 for my $mapping ( @$mappings ) {
374 my $marc_field = Koha::SearchMarcMaps->find_or_create({
375 index_name => $index_name,
376 marc_type => $mapping->{marc_type},
377 marc_field => $mapping->{marc_field}
379 $search_field->add_to_search_marc_maps($marc_field, {
380 facet => $mapping->{facet} || 0,
381 suggestible => $mapping->{suggestible} || 0,
382 sort => $mapping->{sort},
383 search => $mapping->{search} // 1
389 my $cache = Koha::Caches->get_instance();
390 $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
391 $cache->clear_from_cache('elasticsearch_search_fields_opac');
393 # FIXME return the mappings?
396 # This overrides the accessor provided by Class::Accessor so that if
397 # sort_fields isn't set, then it'll generate it.
401 $self->_sort_fields_accessor(@_);
404 my $val = $self->_sort_fields_accessor();
407 # This will populate the accessor as a side effect
408 $self->get_elasticsearch_mappings();
409 return $self->_sort_fields_accessor();
412 =head2 _process_mappings($mappings, $data, $record_document, $meta)
414 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
416 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
417 Since we group all mappings by MARC field targets C<$mappings> will contain
418 all targets for C<$data> and thus we need to fetch the MARC field only once.
419 C<$mappings> will be applied to C<$record_document> and new field values added.
420 The method has no return value.
426 Arrayref of mappings containing arrayrefs in the format
427 [C<$target>, C<$options>] where C<$target> is the name of the target field and
428 C<$options> is a hashref containing processing directives for this particular
433 The source data from a MARC record field.
435 =item C<$record_document>
437 Hashref representing the Elasticsearch document on which mappings should be
442 A hashref containing metadata useful for enforcing per mapping rules. For
443 example for providing extra context for mapping options, or treating mapping
444 targets differently depending on type (sort, search, facet etc). Combining
445 this metadata with the mapping options and metadata allows us to mutate the
446 data per mapping, or even replace it with other data retrieved from the
449 Current properties are:
451 C<altscript>: A boolean value indicating whether an alternate script presentation is being
454 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
455 'subfield' or 'subfields_group'.
457 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
459 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
460 is 'subfields_group'.
462 C<field>: The original C<MARC::Record> object.
468 sub _process_mappings {
469 my ($_self, $mappings, $data, $record_document, $meta) = @_;
470 foreach my $mapping (@{$mappings}) {
471 my ($target, $options) = @{$mapping};
473 # Don't process sort fields for alternate scripts
474 my $sort = $target =~ /__sort$/;
475 if ($sort && $meta->{altscript}) {
479 # Copy (scalar) data since can have multiple targets
480 # with differing options for (possibly) mutating data
481 # so need a different copy for each
483 $record_document->{$target} //= [];
484 if (defined $options->{substr}) {
485 my ($start, $length) = @{$options->{substr}};
486 $_data = length($data) > $start ? substr $data, $start, $length : '';
488 if (defined $options->{value_callbacks}) {
489 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
491 if (defined $options->{property}) {
493 $options->{property} => $_data
496 if (defined $options->{nonfiling_characters_indicator}) {
497 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
498 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
499 if ($nonfiling_chars) {
500 $_data = substr $_data, $nonfiling_chars;
503 push @{$record_document->{$target}}, $_data;
507 =head2 marc_records_to_documents($marc_records)
509 my $record_documents = $self->marc_records_to_documents($marc_records);
511 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
513 Returns array of hash references, representing Elasticsearch documents,
514 acceptable as body payload in C<Search::Elasticsearch> requests.
518 =item C<$marc_documents>
520 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
526 sub marc_records_to_documents {
527 my ($self, $records) = @_;
528 my $rules = $self->_get_marc_mapping_rules();
529 my $control_fields_rules = $rules->{control_fields};
530 my $data_fields_rules = $rules->{data_fields};
531 my $marcflavour = lc C4::Context->preference('marcflavour');
532 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
534 my @record_documents;
536 foreach my $record (@{$records}) {
537 my $record_document = {};
538 my $mappings = $rules->{leader};
540 $self->_process_mappings($mappings, $record->leader(), $record_document, {
542 data_source => 'leader'
546 foreach my $field ($record->fields()) {
547 if ($field->is_control_field()) {
548 my $mappings = $control_fields_rules->{$field->tag()};
550 $self->_process_mappings($mappings, $field->data(), $record_document, {
552 data_source => 'control_field',
559 my $tag = $field->tag();
560 # Handle alternate scripts in MARC 21
562 if ($marcflavour eq 'marc21' && $tag eq '880') {
563 my $sub6 = $field->subfield('6');
564 if ($sub6 =~ /^(...)-\d+/) {
570 my $data_field_rules = $data_fields_rules->{$tag};
571 if ($data_field_rules) {
572 my $subfields_mappings = $data_field_rules->{subfields};
573 my $wildcard_mappings = $subfields_mappings->{'*'};
574 foreach my $subfield ($field->subfields()) {
575 my ($code, $data) = @{$subfield};
576 my $mappings = $subfields_mappings->{$code} // [];
577 if ($wildcard_mappings) {
578 $mappings = [@{$mappings}, @{$wildcard_mappings}];
581 $self->_process_mappings($mappings, $data, $record_document, {
582 altscript => $altscript,
583 data_source => 'subfield',
589 if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
590 # Used by the authority linker the match-heading field requires a specific syntax
591 # that is specified in C4/Heading
592 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
593 next unless $heading;
594 push @{$record_document->{'match-heading'}}, $heading->search_form;
598 my $subfields_join_mappings = $data_field_rules->{subfields_join};
599 if ($subfields_join_mappings) {
600 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
601 # Map each subfield to values, remove empty values, join with space
606 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
610 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
611 altscript => $altscript,
612 data_source => 'subfields_group',
613 codes => $subfields_group,
618 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
619 # Used by the authority linker the match-heading field requires a specific syntax
620 # that is specified in C4/Heading
621 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
622 next unless $heading;
623 push @{$record_document->{'match-heading'}}, $heading->search_form;
630 foreach my $field (keys %{$rules->{defaults}}) {
631 unless (defined $record_document->{$field}) {
632 $record_document->{$field} = $rules->{defaults}->{$field};
635 foreach my $field (@{$rules->{sum}}) {
636 if (defined $record_document->{$field}) {
637 # TODO: validate numeric? filter?
638 # TODO: Or should only accept fields without nested values?
639 # TODO: Quick and dirty, improve if needed
640 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
643 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
644 foreach my $field (@{$rules->{isbn}}) {
645 if (defined $record_document->{$field}) {
647 foreach my $input_isbn (@{$record_document->{$field}}) {
648 my $isbn = Business::ISBN->new($input_isbn);
649 if (defined $isbn && $isbn->is_valid) {
650 my $isbn13 = $isbn->as_isbn13->as_string;
651 push @isbns, $isbn13;
653 push @isbns, $isbn13;
655 my $isbn10 = $isbn->as_isbn10;
657 $isbn10 = $isbn10->as_string;
658 push @isbns, $isbn10;
660 push @isbns, $isbn10;
663 push @isbns, $input_isbn;
666 $record_document->{$field} = \@isbns;
670 # Remove duplicate values and collapse sort fields
671 foreach my $field (keys %{$record_document}) {
672 if (ref($record_document->{$field}) eq 'ARRAY') {
673 @{$record_document->{$field}} = do {
675 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
677 if ($field =~ /__sort$/) {
678 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
679 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
684 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
685 $record->encoding('UTF-8');
687 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
688 $record_document->{'marc_format'} = 'ARRAY';
692 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
693 local $SIG{__WARN__} = sub {
694 push @warnings, $_[0];
696 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
699 # Suppress warnings if record length exceeded
700 unless (substr($record->leader(), 0, 5) eq '99999') {
701 foreach my $warning (@warnings) {
705 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
706 $record_document->{'marc_format'} = 'MARCXML';
709 $record_document->{'marc_format'} = 'base64ISO2709';
712 push @record_documents, $record_document;
714 return \@record_documents;
717 =head2 _marc_to_array($record)
719 my @fields = _marc_to_array($record)
721 Convert a MARC::Record to an array modeled after MARC-in-JSON
722 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
728 A MARC::Record object
735 my ($self, $record) = @_;
738 leader => $record->leader(),
741 for my $field ($record->fields()) {
742 my $tag = $field->tag();
743 if ($field->is_control_field()) {
744 push @{$data->{fields}}, {$tag => $field->data()};
747 foreach my $subfield ($field->subfields()) {
748 my ($code, $contents) = @{$subfield};
749 push @{$subfields}, {$code => $contents};
751 push @{$data->{fields}}, {
753 ind1 => $field->indicator(1),
754 ind2 => $field->indicator(2),
755 subfields => $subfields
763 =head2 _array_to_marc($data)
765 my $record = _array_to_marc($data)
767 Convert an array modeled after MARC-in-JSON to a MARC::Record
773 An array modeled after MARC-in-JSON
774 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
781 my ($self, $data) = @_;
783 my $record = MARC::Record->new();
785 $record->leader($data->{leader});
786 for my $field (@{$data->{fields}}) {
787 my $tag = (keys %{$field})[0];
788 $field = $field->{$tag};
790 if (ref($field) eq 'HASH') {
792 foreach my $subfield (@{$field->{subfields}}) {
793 my $code = (keys %{$subfield})[0];
794 push @subfields, $code;
795 push @subfields, $subfield->{$code};
797 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
799 $marc_field = MARC::Field->new($tag, $field)
801 $record->append_fields($marc_field);
807 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
809 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
811 Get mappings, an internal data structure later used by
812 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
813 data for a MARC mapping.
815 The returned C<$mappings> is not to to be confused with mappings provided by
816 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
817 provided by C<_foreach_mapping> and expands it to this internal data structure.
818 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
819 is then applied to each MARC target (leader, control field data, subfield or
820 joined subfields) and integrated into the mapping rules data structure used in
821 C<marc_records_to_documents> to transform MARC records into Elasticsearch
828 Boolean indicating whether to create a facet field for this mapping.
830 =item C<$suggestible>
832 Boolean indicating whether to create a suggestion field for this mapping.
836 Boolean indicating whether to create a sort field for this mapping.
840 Boolean indicating whether to create a search field for this mapping.
842 =item C<$target_name>
844 Elasticsearch document target field name.
846 =item C<$target_type>
848 Elasticsearch document target field type.
852 An optional range as a string in the format "<START>-<END>" or "<START>",
853 where "<START>" and "<END>" are integers specifying a range that will be used
854 for extracting a substring from MARC data as Elasticsearch field target value.
856 The first character position is "0", and the range is inclusive,
857 so "0-2" means the first three characters of MARC data.
859 If only "<START>" is provided only one character at position "<START>" will
866 sub _field_mappings {
867 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
868 my %mapping_defaults = ();
871 my $substr_args = undef;
872 if (defined $range) {
873 # TODO: use value_callback instead?
874 my ($start, $end) = map(int, split /-/, $range, 2);
875 $substr_args = [$start];
876 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
878 my $default_options = {};
880 $default_options->{substr} = $substr_args;
883 # TODO: Should probably have per type value callback/hook
884 # but hard code for now
885 if ($target_type eq 'boolean') {
886 $default_options->{value_callbacks} //= [];
887 push @{$default_options->{value_callbacks}}, sub {
889 # Trim whitespace at both ends
890 $value =~ s/^\s+|\s+$//g;
891 return $value ? 'true' : 'false';
896 my $mapping = [$target_name, $default_options];
897 push @mappings, $mapping;
901 push @suffixes, 'facet' if $facet;
902 push @suffixes, 'suggestion' if $suggestible;
903 push @suffixes, 'sort' if !defined $sort || $sort;
905 foreach my $suffix (@suffixes) {
906 my $mapping = ["${target_name}__$suffix"];
907 # TODO: Hack, fix later in less hideous manner
908 if ($suffix eq 'suggestion') {
909 push @{$mapping}, {%{$default_options}, property => 'input'};
912 # Important! Make shallow clone, or we end up with the same hashref
913 # shared by all mappings
914 push @{$mapping}, {%{$default_options}};
916 push @mappings, $mapping;
921 =head2 _get_marc_mapping_rules
923 my $mapping_rules = $self->_get_marc_mapping_rules()
925 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
927 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
928 each call to C<MARC::Record>->field) we create an optimized structure of mapping
929 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
931 We can then iterate through all MARC fields for each record and apply all relevant
932 rules once per fields instead of retreiving fields multiple times for each mapping rule
933 which is terribly slow.
937 # TODO: This structure can be used for processing multiple MARC::Records so is currently
938 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
939 # memory cache which it is currently not. The performance gain of caching
940 # would probably be marginal, but to do this could be a further improvement.
942 sub _get_marc_mapping_rules {
944 my $marcflavour = lc C4::Context->preference('marcflavour');
945 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
946 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
949 'control_fields' => {},
956 $self->_foreach_mapping(sub {
957 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
958 return if $marc_type ne $marcflavour;
960 if ($type eq 'sum') {
961 push @{$rules->{sum}}, $name;
962 push @{$rules->{sum}}, $name."__sort" if $sort;
964 elsif ($type eq 'isbn') {
965 push @{$rules->{isbn}}, $name;
967 elsif ($type eq 'boolean') {
968 # boolean gets special handling, if value doesn't exist for a field,
970 $rules->{defaults}->{$name} = 'false';
973 if ($marc_field =~ $field_spec_regexp) {
978 # Parse and separate subfields form subfield groups
980 my $subfield_group = '';
983 foreach my $token (split //, $2) {
986 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
987 "Unmatched opening parenthesis for $marc_field"
994 elsif ($token eq ")") {
996 if ($subfield_group) {
997 push @subfield_groups, $subfield_group;
998 $subfield_group = '';
1003 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1004 "Unmatched closing parenthesis for $marc_field"
1008 elsif ($open_group) {
1009 $subfield_group .= $token;
1012 push @subfields, $token;
1017 push @subfields, '*';
1020 my $range = defined $3 ? $3 : undef;
1021 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1022 if ($field_tag < 10) {
1023 $rules->{control_fields}->{$field_tag} //= [];
1024 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1027 $rules->{data_fields}->{$field_tag} //= {};
1028 foreach my $subfield (@subfields) {
1029 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1030 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1032 foreach my $subfield_group (@subfield_groups) {
1033 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1034 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1038 elsif ($marc_field =~ $leader_regexp) {
1039 my $range = defined $1 ? $1 : undef;
1040 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1041 push @{$rules->{leader}}, @mappings;
1044 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1045 "Invalid MARC field expression: $marc_field"
1050 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1051 if ($marcflavour eq 'marc21') {
1052 # Nonfiling characters processing for sort fields
1054 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1055 # Format is: nonfiling characters indicator => field names list
1057 1 => [130, 630, 730, 740],
1058 2 => [222, 240, 242, 243, 245, 440, 830]
1061 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1064 2 => [130, 430, 530]
1067 foreach my $indicator (keys %title_fields) {
1068 foreach my $field_tag (@{$title_fields{$indicator}}) {
1069 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1070 foreach my $mapping (@{$mappings}) {
1071 if ($mapping->[0] =~ /__sort$/) {
1072 # Mark this as to be processed for nonfiling characters indicator
1073 # later on in _process_mappings
1074 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1084 =head2 _foreach_mapping
1086 $self->_foreach_mapping(
1088 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1091 return unless $marc_type eq 'marc21';
1092 print "Data comes from: " . $marc_field . "\n";
1096 This allows you to apply a function to each entry in the elasticsearch mappings
1097 table, in order to build the mappings for whatever is needed.
1099 In the provided function, the files are:
1105 The field name for elasticsearch (corresponds to the 'mapping' column in the
1110 The type for this value, e.g. 'string'.
1114 True if this value should be facetised. This only really makes sense if the
1115 field is understood by the facet processing code anyway.
1119 True if this is a field that a) needs special sort handling, and b) if it
1120 should be sorted on. False if a) but not b). Undef if not a). This allows,
1121 for example, author to be sorted on but not everything marked with "author"
1122 to be included in that sort.
1126 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1127 'unimarc', 'normarc'.
1129 =item C<$marc_field>
1131 A string that describes the MARC field that contains the data to extract.
1132 These are of a form suited to Catmandu's MARC fixers.
1138 sub _foreach_mapping {
1139 my ( $self, $sub ) = @_;
1141 # TODO use a caching framework here
1142 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1144 'search_marc_map.index_name' => $self->index,
1146 { join => { search_marc_to_fields => 'search_marc_map' },
1148 'search_marc_to_fields.facet',
1149 'search_marc_to_fields.suggestible',
1150 'search_marc_to_fields.sort',
1151 'search_marc_to_fields.search',
1152 'search_marc_map.marc_type',
1153 'search_marc_map.marc_field',
1166 while ( my $search_field = $search_fields->next ) {
1168 # Force lower case on indexed field names for case insensitive
1169 # field name searches
1170 lc($search_field->name),
1171 $search_field->type,
1172 $search_field->get_column('facet'),
1173 $search_field->get_column('suggestible'),
1174 $search_field->get_column('sort'),
1175 $search_field->get_column('search'),
1176 $search_field->get_column('marc_type'),
1177 $search_field->get_column('marc_field'),
1182 =head2 process_error
1184 die process_error($@);
1186 This parses an Elasticsearch error message and produces a human-readable
1187 result from it. This result is probably missing all the useful information
1188 that you might want in diagnosing an issue, so the warning is also logged.
1190 Note that currently the resulting message is not internationalised. This
1191 will happen eventually by some method or other.
1196 my ($self, $msg) = @_;
1198 warn $msg; # simple logging
1200 # This is super-primitive
1201 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1203 return "Unable to perform your search. Please try again.\n";
1206 =head2 _read_configuration
1208 my $conf = _read_configuration();
1210 Reads the I<configuration file> and returns a hash structure with the
1211 configuration information. It raises an exception if mandatory entries
1214 The hashref structure has the following form:
1217 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1218 'index_name' => 'koha_instance',
1221 This is configured by the following in the C<config> block in koha-conf.xml:
1224 <server>127.0.0.1:9200</server>
1225 <server>anotherserver:9200</server>
1226 <index_name>koha_instance</index_name>
1231 sub _read_configuration {
1235 my $conf = C4::Context->config('elasticsearch');
1236 Koha::Exceptions::Config::MissingEntry->throw(
1237 "Missing 'elasticsearch' block in config file")
1238 unless defined $conf;
1240 if ( $conf && $conf->{server} ) {
1241 my $nodes = $conf->{server};
1242 if ( ref($nodes) eq 'ARRAY' ) {
1243 $configuration->{nodes} = $nodes;
1246 $configuration->{nodes} = [$nodes];
1250 Koha::Exceptions::Config::MissingEntry->throw(
1251 "Missing 'server' entry in config file for elasticsearch");
1254 if ( defined $conf->{index_name} ) {
1255 $configuration->{index_name} = $conf->{index_name};
1258 Koha::Exceptions::Config::MissingEntry->throw(
1259 "Missing 'index_name' entry in config file for elasticsearch");
1262 return $configuration;
1265 =head2 get_facetable_fields
1267 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1269 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1273 sub get_facetable_fields {
1276 # These should correspond to the ES field names, as opposed to the CCL
1277 # things that zebra uses.
1278 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1279 my @faceted_fields = Koha::SearchFields->search(
1280 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1282 my @not_faceted_fields = Koha::SearchFields->search(
1283 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1285 # This could certainly be improved
1286 return ( @faceted_fields, @not_faceted_fields );
1297 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1299 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1301 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>