1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
35 use Carp qw( carp croak );
36 use Clone qw( clone );
38 use Readonly qw( Readonly );
39 use Search::Elasticsearch;
40 use Try::Tiny qw( catch try );
43 use List::Util qw( sum0 );
45 use MIME::Base64 qw( encode_base64 );
46 use Encode qw( encode );
48 use Scalar::Util qw( looks_like_number );
50 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
51 __PACKAGE__->mk_accessors(qw( sort_fields ));
53 # Constants to refer to the standard index names
54 Readonly our $BIBLIOS_INDEX => 'biblios';
55 Readonly our $AUTHORITIES_INDEX => 'authorities';
59 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
67 The name of the index to use, generally 'biblios' or 'authorities'.
71 The Elasticsearch index name with Koha instance prefix.
84 # Check for a valid index
85 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
86 my $config = _read_configuration();
87 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
89 my $self = $class->SUPER::new(@_);
93 =head2 get_elasticsearch
95 my $elasticsearch_client = $self->get_elasticsearch();
97 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
98 instance level and will be reused if method is called multiple times.
102 sub get_elasticsearch {
104 unless (defined $self->{elasticsearch}) {
105 $self->{elasticsearch} = Search::Elasticsearch->new(
106 $self->get_elasticsearch_params()
109 return $self->{elasticsearch};
112 =head2 get_elasticsearch_params
114 my $params = $self->get_elasticsearch_params();
116 This provides a hashref that contains the parameters for connecting to the
117 ElasicSearch servers, in the form:
120 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
121 'index_name' => 'koha_instance_index',
124 This is configured by the following in the C<config> block in koha-conf.xml:
127 <server>127.0.0.1:9200</server>
128 <server>anotherserver:9200</server>
129 <index_name>koha_instance</index_name>
134 sub get_elasticsearch_params {
139 $conf = _read_configuration();
141 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
149 =head2 get_elasticsearch_settings
151 my $settings = $self->get_elasticsearch_settings();
153 This provides the settings provided to Elasticsearch when an index is created.
154 These can do things like define tokenization methods.
156 A hashref containing the settings is returned.
160 sub get_elasticsearch_settings {
163 # Use state to speed up repeated calls
164 state $settings = undef;
165 if (!defined $settings) {
166 my $config_file = C4::Context->config('elasticsearch_index_config');
167 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
168 $settings = YAML::XS::LoadFile( $config_file );
174 =head2 get_elasticsearch_mappings
176 my $mappings = $self->get_elasticsearch_mappings();
178 This provides the mappings that get passed to Elasticsearch when an index is
183 sub get_elasticsearch_mappings {
186 # Use state to speed up repeated calls
190 if (!defined $all_mappings{$self->index}) {
191 $sort_fields{$self->index} = {};
192 # Clone the general mapping to break ties with the original hash
193 my $mappings = clone(_get_elasticsearch_field_config('general', ''));
194 my $marcflavour = lc C4::Context->preference('marcflavour');
195 $self->_foreach_mapping(
197 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
198 return if $marc_type ne $marcflavour;
199 # TODO if this gets any sort of complexity to it, it should
200 # be broken out into its own function.
202 # TODO be aware of date formats, but this requires pre-parsing
203 # as ES will simply reject anything with an invalid date.
204 my $es_type = 'text';
205 if ($type eq 'boolean') {
206 $es_type = 'boolean';
207 } elsif ($type eq 'number' || $type eq 'sum') {
208 $es_type = 'integer';
209 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 } elsif ($type eq 'year') {
213 } elsif ($type eq 'callnumber') {
214 $es_type = 'cn_sort';
218 $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222 $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
225 $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227 # Sort should be defined in mappings as 1 (Yes) or 0 (No)
228 # Previously, we also supported ~ (Undef) in the file
229 # "undef" means to do the default thing, which is make it sortable.
230 # This is preserved in order to not cause breakages for existing installs
231 if (!defined $sort || $sort) {
232 $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233 $sort_fields{$self->index}{$name} = 1;
237 if( $self->index eq 'authorities' ){
238 $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text');
239 $mappings->{properties}{ 'subject-heading-thesaurus' } = _get_elasticsearch_field_config('search', 'text');
241 $all_mappings{$self->index} = $mappings;
243 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 raw_elasticsearch_mappings
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
256 sub raw_elasticsearch_mappings {
257 my ( $marc_type ) = @_;
259 my $schema = Koha::Database->new()->schema();
261 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
264 while ( my $search_field = $search_fields->next ) {
266 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267 { search_field_id => $search_field->id },
269 join => 'search_marc_map',
270 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
274 while ( my $marc_to_field = $marc_to_fields->next ) {
276 my $marc_map = $marc_to_field->search_marc_map;
278 next if $marc_type && $marc_map->marc_type ne $marc_type;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
288 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
290 facet => $marc_to_field->facet || '',
291 marc_type => $marc_map->marc_type,
292 marc_field => $marc_map->marc_field,
293 sort => $marc_to_field->sort,
294 suggestible => $marc_to_field->suggestible || ''
303 =head2 _get_elasticsearch_field_config
305 Get the Elasticsearch field config for the given purpose and data type.
307 $mapping = _get_elasticsearch_field_config('search', 'text');
311 sub _get_elasticsearch_field_config {
313 my ( $purpose, $type ) = @_;
315 # Use state to speed up repeated calls
316 state $settings = undef;
317 if (!defined $settings) {
318 my $config_file = C4::Context->config('elasticsearch_field_config');
319 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
320 local $YAML::XS::Boolean = 'JSON::PP';
321 $settings = YAML::XS::LoadFile( $config_file );
324 if (!defined $settings->{$purpose}) {
325 die "Field purpose $purpose not defined in field config";
328 return $settings->{$purpose};
330 if (defined $settings->{$purpose}{$type}) {
331 return $settings->{$purpose}{$type};
333 if (defined $settings->{$purpose}{'default'}) {
334 return $settings->{$purpose}{'default'};
339 =head2 _load_elasticsearch_mappings
341 Load Elasticsearch mappings in the format of mappings.yaml.
343 $indexes = _load_elasticsearch_mappings();
347 sub _load_elasticsearch_mappings {
348 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
349 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
350 return YAML::XS::LoadFile( $mappings_yaml );
353 sub reset_elasticsearch_mappings {
355 my $indexes = $self->_load_elasticsearch_mappings();
357 Koha::SearchMarcMaps->delete;
358 Koha::SearchFields->delete;
360 while ( my ( $index_name, $fields ) = each %$indexes ) {
361 while ( my ( $field_name, $data ) = each %$fields ) {
363 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
366 $sf_params{staff_client} //= 1;
367 $sf_params{opac} //= 1;
369 $sf_params{name} = $field_name;
371 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
373 my $mappings = $data->{mappings};
374 for my $mapping ( @$mappings ) {
375 my $marc_field = Koha::SearchMarcMaps->find_or_create({
376 index_name => $index_name,
377 marc_type => $mapping->{marc_type},
378 marc_field => $mapping->{marc_field}
380 $search_field->add_to_search_marc_maps($marc_field, {
381 facet => $mapping->{facet} || 0,
382 suggestible => $mapping->{suggestible} || 0,
383 sort => $mapping->{sort} // 1,
384 search => $mapping->{search} // 1
390 $self->clear_search_fields_cache();
392 # FIXME return the mappings?
395 # This overrides the accessor provided by Class::Accessor so that if
396 # sort_fields isn't set, then it'll generate it.
400 $self->_sort_fields_accessor(@_);
403 my $val = $self->_sort_fields_accessor();
406 # This will populate the accessor as a side effect
407 $self->get_elasticsearch_mappings();
408 return $self->_sort_fields_accessor();
411 =head2 _process_mappings($mappings, $data, $record_document, $meta)
413 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
415 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
416 Since we group all mappings by MARC field targets C<$mappings> will contain
417 all targets for C<$data> and thus we need to fetch the MARC field only once.
418 C<$mappings> will be applied to C<$record_document> and new field values added.
419 The method has no return value.
425 Arrayref of mappings containing arrayrefs in the format
426 [C<$target>, C<$options>] where C<$target> is the name of the target field and
427 C<$options> is a hashref containing processing directives for this particular
432 The source data from a MARC record field.
434 =item C<$record_document>
436 Hashref representing the Elasticsearch document on which mappings should be
441 A hashref containing metadata useful for enforcing per mapping rules. For
442 example for providing extra context for mapping options, or treating mapping
443 targets differently depending on type (sort, search, facet etc). Combining
444 this metadata with the mapping options and metadata allows us to mutate the
445 data per mapping, or even replace it with other data retrieved from the
448 Current properties are:
450 C<altscript>: A boolean value indicating whether an alternate script presentation is being
453 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
454 'subfield' or 'subfields_group'.
456 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
458 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
459 is 'subfields_group'.
461 C<field>: The original C<MARC::Record> object.
467 sub _process_mappings {
468 my ($_self, $mappings, $data, $record_document, $meta) = @_;
469 foreach my $mapping (@{$mappings}) {
470 my ($target, $options) = @{$mapping};
472 # Don't process sort fields for alternate scripts
473 my $sort = $target =~ /__sort$/;
474 if ($sort && $meta->{altscript}) {
478 # Copy (scalar) data since can have multiple targets
479 # with differing options for (possibly) mutating data
480 # so need a different copy for each
481 my $data_copy = $data;
482 if (defined $options->{substr}) {
483 my ($start, $length) = @{$options->{substr}};
484 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
487 # Add data to values array for callbacks processing
488 my $values = [$data_copy];
490 # Value callbacks takes subfield data (or values from previous
491 # callbacks) as argument, and returns a possibly different list of values.
492 # Note that the returned list may also be empty.
493 if (defined $options->{value_callbacks}) {
494 foreach my $callback (@{$options->{value_callbacks}}) {
495 # Pass each value to current callback which returns a list
496 # (scalar is fine too) resulting either in a list or
497 # a list of lists that will be flattened by perl.
498 # The next callback will receive the possibly expanded list of values.
499 $values = [ map { $callback->($_) } @{$values} ];
503 # Skip mapping if all values has been removed
504 next unless @{$values};
506 if (defined $options->{property}) {
507 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
509 if (defined $options->{nonfiling_characters_indicator}) {
510 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
511 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
512 # Nonfiling chars does not make sense for multiple values
513 # Only apply on first element
514 $values->[0] = substr $values->[0], $nonfiling_chars;
517 $values = [ grep(!/^$/, @{$values}) ];
519 $record_document->{$target} //= [];
520 push @{$record_document->{$target}}, @{$values};
524 =head2 marc_records_to_documents($marc_records)
526 my $record_documents = $self->marc_records_to_documents($marc_records);
528 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
530 Returns array of hash references, representing Elasticsearch documents,
531 acceptable as body payload in C<Search::Elasticsearch> requests.
535 =item C<$marc_documents>
537 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
543 sub marc_records_to_documents {
544 my ($self, $records) = @_;
545 my $rules = $self->_get_marc_mapping_rules();
546 my $control_fields_rules = $rules->{control_fields};
547 my $data_fields_rules = $rules->{data_fields};
548 my $marcflavour = lc C4::Context->preference('marcflavour');
549 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
551 my @record_documents;
553 my %auth_match_headings;
554 if( $self->index eq 'authorities' ){
555 my @auth_types = Koha::Authority::Types->search->as_list;
556 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
559 foreach my $record (@{$records}) {
560 my $record_document = {};
562 if ( $self->index eq 'authorities' ){
563 my $authtypecode = GuessAuthTypeCode( $record );
565 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
566 my $field = $record->field( $auth_match_headings{ $authtypecode } );
567 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
568 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
571 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
575 my $mappings = $rules->{leader};
577 $self->_process_mappings($mappings, $record->leader(), $record_document, {
579 data_source => 'leader'
583 foreach my $field ($record->fields()) {
584 if ($field->is_control_field()) {
585 my $mappings = $control_fields_rules->{$field->tag()};
587 $self->_process_mappings($mappings, $field->data(), $record_document, {
589 data_source => 'control_field',
596 my $tag = $field->tag();
597 # Handle alternate scripts in MARC 21
599 if ($marcflavour eq 'marc21' && $tag eq '880') {
600 my $sub6 = $field->subfield('6');
601 if ($sub6 =~ /^(...)-\d+/) {
607 my $data_field_rules = $data_fields_rules->{$tag};
608 if ($data_field_rules) {
609 my $subfields_mappings = $data_field_rules->{subfields};
610 my $wildcard_mappings = $subfields_mappings->{'*'};
611 foreach my $subfield ($field->subfields()) {
612 my ($code, $data) = @{$subfield};
613 my $mappings = $subfields_mappings->{$code} // [];
614 if ($wildcard_mappings) {
615 $mappings = [@{$mappings}, @{$wildcard_mappings}];
618 $self->_process_mappings($mappings, $data, $record_document, {
619 altscript => $altscript,
620 data_source => 'subfield',
628 my $subfields_join_mappings = $data_field_rules->{subfields_join};
629 if ($subfields_join_mappings) {
630 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
631 my $data_field = $field->clone; #copy field to preserve for alt scripts
632 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
633 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
635 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
636 altscript => $altscript,
637 data_source => 'subfields_group',
638 codes => $subfields_group,
649 if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
650 foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
651 my $data_field_rules = $data_fields_rules->{$field->tag()};
652 if ($data_field_rules) {
653 my $subfields_mappings = $data_field_rules->{subfields};
654 my $wildcard_mappings = $subfields_mappings->{'*'};
655 foreach my $subfield ($field->subfields()) {
656 my ($code, $data) = @{$subfield};
658 push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
659 push @mappings, @$wildcard_mappings if $wildcard_mappings;
660 # Do not include "see from" into these kind of fields
661 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
663 $self->_process_mappings(\@mappings, $data, $record_document, {
664 data_source => 'subfield',
672 my $subfields_join_mappings = $data_field_rules->{subfields_join};
673 if ($subfields_join_mappings) {
674 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
675 my $data_field = $field->clone;
676 # remove empty subfields, otherwise they are printed as a space
677 $data_field->delete_subfield(match => qr/^$/);
678 my $data = $data_field->as_string( $subfields_group );
680 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
681 # Do not include "see from" into these kind of fields
682 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
683 $self->_process_mappings(\@mappings, $data, $record_document, {
684 data_source => 'subfields_group',
685 codes => $subfields_group,
696 foreach my $field (keys %{$rules->{defaults}}) {
697 unless (defined $record_document->{$field}) {
698 $record_document->{$field} = $rules->{defaults}->{$field};
701 foreach my $field (@{$rules->{sum}}) {
702 if (defined $record_document->{$field}) {
703 # TODO: validate numeric? filter?
704 # TODO: Or should only accept fields without nested values?
705 # TODO: Quick and dirty, improve if needed
706 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
709 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
710 foreach my $field (@{$rules->{isbn}}) {
711 if (defined $record_document->{$field}) {
713 foreach my $input_isbn (@{$record_document->{$field}}) {
714 my $isbn = Business::ISBN->new($input_isbn);
715 if (defined $isbn && $isbn->is_valid) {
716 my $isbn13 = $isbn->as_isbn13->as_string;
717 push @isbns, $isbn13;
719 push @isbns, $isbn13;
721 my $isbn10 = $isbn->as_isbn10;
723 $isbn10 = $isbn10->as_string;
724 push @isbns, $isbn10;
726 push @isbns, $isbn10;
729 push @isbns, $input_isbn;
732 $record_document->{$field} = \@isbns;
736 # Remove duplicate values and collapse sort fields
737 foreach my $field (keys %{$record_document}) {
738 if (ref($record_document->{$field}) eq 'ARRAY') {
739 @{$record_document->{$field}} = do {
741 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
743 if ($field =~ /__sort$/) {
744 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
745 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
750 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
751 $record->encoding('UTF-8');
753 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
754 $record_document->{'marc_format'} = 'ARRAY';
758 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
759 local $SIG{__WARN__} = sub {
760 push @warnings, $_[0];
762 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
765 # Suppress warnings if record length exceeded
766 unless (substr($record->leader(), 0, 5) eq '99999') {
767 foreach my $warning (@warnings) {
771 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
772 $record_document->{'marc_format'} = 'MARCXML';
775 $record_document->{'marc_format'} = 'base64ISO2709';
779 # Check if there is at least one available item
780 if ($self->index eq $BIBLIOS_INDEX) {
781 my ($tag, $code) = C4::Biblio::GetMarcFromKohaField('biblio.biblionumber');
782 my $field = $record->field($tag);
784 my $biblionumber = $field->is_control_field ? $field->data : $field->subfield($code);
785 my $avail_items = Koha::Items->search({
786 biblionumber => $biblionumber,
791 $record_document->{available} = $avail_items ? \1 : \0;
795 push @record_documents, $record_document;
797 return \@record_documents;
800 =head2 _marc_to_array($record)
802 my @fields = _marc_to_array($record)
804 Convert a MARC::Record to an array modeled after MARC-in-JSON
805 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
811 A MARC::Record object
818 my ($self, $record) = @_;
821 leader => $record->leader(),
824 for my $field ($record->fields()) {
825 my $tag = $field->tag();
826 if ($field->is_control_field()) {
827 push @{$data->{fields}}, {$tag => $field->data()};
830 foreach my $subfield ($field->subfields()) {
831 my ($code, $contents) = @{$subfield};
832 push @{$subfields}, {$code => $contents};
834 push @{$data->{fields}}, {
836 ind1 => $field->indicator(1),
837 ind2 => $field->indicator(2),
838 subfields => $subfields
846 =head2 _array_to_marc($data)
848 my $record = _array_to_marc($data)
850 Convert an array modeled after MARC-in-JSON to a MARC::Record
856 An array modeled after MARC-in-JSON
857 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
864 my ($self, $data) = @_;
866 my $record = MARC::Record->new();
868 $record->leader($data->{leader});
869 for my $field (@{$data->{fields}}) {
870 my $tag = (keys %{$field})[0];
871 $field = $field->{$tag};
873 if (ref($field) eq 'HASH') {
875 foreach my $subfield (@{$field->{subfields}}) {
876 my $code = (keys %{$subfield})[0];
877 push @subfields, $code;
878 push @subfields, $subfield->{$code};
880 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
882 $marc_field = MARC::Field->new($tag, $field)
884 $record->append_fields($marc_field);
890 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
892 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
894 Get mappings, an internal data structure later used by
895 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
896 data for a MARC mapping.
898 The returned C<$mappings> is not to to be confused with mappings provided by
899 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
900 provided by C<_foreach_mapping> and expands it to this internal data structure.
901 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
902 is then applied to each MARC target (leader, control field data, subfield or
903 joined subfields) and integrated into the mapping rules data structure used in
904 C<marc_records_to_documents> to transform MARC records into Elasticsearch
911 Boolean indicating whether to create a facet field for this mapping.
913 =item C<$suggestible>
915 Boolean indicating whether to create a suggestion field for this mapping.
919 Boolean indicating whether to create a sort field for this mapping.
923 Boolean indicating whether to create a search field for this mapping.
925 =item C<$target_name>
927 Elasticsearch document target field name.
929 =item C<$target_type>
931 Elasticsearch document target field type.
935 An optional range as a string in the format "<START>-<END>" or "<START>",
936 where "<START>" and "<END>" are integers specifying a range that will be used
937 for extracting a substring from MARC data as Elasticsearch field target value.
939 The first character position is "0", and the range is inclusive,
940 so "0-2" means the first three characters of MARC data.
942 If only "<START>" is provided only one character at position "<START>" will
949 sub _field_mappings {
950 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
951 my %mapping_defaults = ();
954 my $substr_args = undef;
955 if (defined $range) {
956 # TODO: use value_callback instead?
957 my ($start, $end) = map(int, split /-/, $range, 2);
958 $substr_args = [$start];
959 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
961 my $default_options = {};
963 $default_options->{substr} = $substr_args;
966 # TODO: Should probably have per type value callback/hook
967 # but hard code for now
968 if ($target_type eq 'boolean') {
969 $default_options->{value_callbacks} //= [];
970 push @{$default_options->{value_callbacks}}, sub {
972 # Trim whitespace at both ends
973 $value =~ s/^\s+|\s+$//g;
974 return $value ? 'true' : 'false';
977 elsif ($target_type eq 'year') {
978 $default_options->{value_callbacks} //= [];
979 # Only accept years containing digits and "u"
980 push @{$default_options->{value_callbacks}}, sub {
982 # Replace "u" with "0" for sorting
983 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
988 my $mapping = [$target_name, $default_options];
989 push @mappings, $mapping;
993 push @suffixes, 'facet' if $facet;
994 push @suffixes, 'suggestion' if $suggestible;
995 push @suffixes, 'sort' if !defined $sort || $sort;
997 foreach my $suffix (@suffixes) {
998 my $mapping = ["${target_name}__$suffix"];
999 # TODO: Hack, fix later in less hideous manner
1000 if ($suffix eq 'suggestion') {
1001 push @{$mapping}, {%{$default_options}, property => 'input'};
1004 # Important! Make shallow clone, or we end up with the same hashref
1005 # shared by all mappings
1006 push @{$mapping}, {%{$default_options}};
1008 push @mappings, $mapping;
1013 =head2 _get_marc_mapping_rules
1015 my $mapping_rules = $self->_get_marc_mapping_rules()
1017 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1019 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1020 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1021 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1023 We can then iterate through all MARC fields for each record and apply all relevant
1024 rules once per fields instead of retreiving fields multiple times for each mapping rule
1025 which is terribly slow.
1029 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1030 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1031 # memory cache which it is currently not. The performance gain of caching
1032 # would probably be marginal, but to do this could be a further improvement.
1034 sub _get_marc_mapping_rules {
1036 my $marcflavour = lc C4::Context->preference('marcflavour');
1037 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1038 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1041 'control_fields' => {},
1042 'data_fields' => {},
1048 $self->_foreach_mapping(sub {
1049 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
1050 return if $marc_type ne $marcflavour;
1052 if ($type eq 'sum') {
1053 push @{$rules->{sum}}, $name;
1054 push @{$rules->{sum}}, $name."__sort" if $sort;
1056 elsif ($type eq 'isbn') {
1057 push @{$rules->{isbn}}, $name;
1059 elsif ($type eq 'boolean') {
1060 # boolean gets special handling, if value doesn't exist for a field,
1061 # it is set to false
1062 $rules->{defaults}->{$name} = 'false';
1065 if ($marc_field =~ $field_spec_regexp) {
1069 my @subfield_groups;
1070 # Parse and separate subfields form subfield groups
1072 my $subfield_group = '';
1075 foreach my $token (split //, $2) {
1076 if ($token eq "(") {
1078 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1079 "Unmatched opening parenthesis for $marc_field"
1086 elsif ($token eq ")") {
1088 if ($subfield_group) {
1089 push @subfield_groups, $subfield_group;
1090 $subfield_group = '';
1095 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1096 "Unmatched closing parenthesis for $marc_field"
1100 elsif ($open_group) {
1101 $subfield_group .= $token;
1104 push @subfields, $token;
1109 push @subfields, '*';
1112 my $range = defined $3 ? $3 : undef;
1113 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1114 if ($field_tag < 10) {
1115 $rules->{control_fields}->{$field_tag} //= [];
1116 push @{$rules->{control_fields}->{$field_tag}}, @{clone(\@mappings)};
1119 $rules->{data_fields}->{$field_tag} //= {};
1120 foreach my $subfield (@subfields) {
1121 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1122 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @{clone(\@mappings)};
1124 foreach my $subfield_group (@subfield_groups) {
1125 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1126 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @{clone(\@mappings)};
1130 elsif ($marc_field =~ $leader_regexp) {
1131 my $range = defined $1 ? $1 : undef;
1132 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1133 push @{$rules->{leader}}, @{clone(\@mappings)};
1136 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1137 "Invalid MARC field expression: $marc_field"
1142 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1143 if ($marcflavour eq 'marc21') {
1144 # Nonfiling characters processing for sort fields
1146 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1147 # Format is: nonfiling characters indicator => field names list
1149 1 => [130, 630, 730, 740],
1150 2 => [222, 240, 242, 243, 245, 440, 830]
1153 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1156 2 => [130, 430, 530]
1159 foreach my $indicator (keys %title_fields) {
1160 foreach my $field_tag (@{$title_fields{$indicator}}) {
1161 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1162 foreach my $mapping (@{$mappings}) {
1163 if ($mapping->[0] =~ /__sort$/) {
1164 # Mark this as to be processed for nonfiling characters indicator
1165 # later on in _process_mappings
1166 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1173 if( $self->index eq 'authorities' ){
1174 push @{$rules->{control_fields}->{'008'}}, ['subject-heading-thesaurus', { 'substr' => [ 11, 1 ] } ];
1175 push @{$rules->{data_fields}->{'040'}->{subfields}->{f}}, ['subject-heading-thesaurus', { } ];
1181 =head2 _foreach_mapping
1183 $self->_foreach_mapping(
1185 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1188 return unless $marc_type eq 'marc21';
1189 print "Data comes from: " . $marc_field . "\n";
1193 This allows you to apply a function to each entry in the elasticsearch mappings
1194 table, in order to build the mappings for whatever is needed.
1196 In the provided function, the files are:
1202 The field name for elasticsearch (corresponds to the 'mapping' column in the
1207 The type for this value, e.g. 'string'.
1211 True if this value should be facetised. This only really makes sense if the
1212 field is understood by the facet processing code anyway.
1216 True if this is a field that a) needs special sort handling, and b) if it
1217 should be sorted on. False if a) but not b). Undef if not a). This allows,
1218 for example, author to be sorted on but not everything marked with "author"
1219 to be included in that sort.
1223 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1226 =item C<$marc_field>
1228 A string that describes the MARC field that contains the data to extract.
1234 sub _foreach_mapping {
1235 my ( $self, $sub ) = @_;
1237 # TODO use a caching framework here
1238 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1240 'search_marc_map.index_name' => $self->index,
1242 { join => { search_marc_to_fields => 'search_marc_map' },
1244 'search_marc_to_fields.facet',
1245 'search_marc_to_fields.suggestible',
1246 'search_marc_to_fields.sort',
1247 'search_marc_to_fields.search',
1248 'search_marc_map.marc_type',
1249 'search_marc_map.marc_field',
1262 while ( my $search_field = $search_fields->next ) {
1264 # Force lower case on indexed field names for case insensitive
1265 # field name searches
1266 lc($search_field->name),
1267 $search_field->type,
1268 $search_field->get_column('facet'),
1269 $search_field->get_column('suggestible'),
1270 $search_field->get_column('sort'),
1271 $search_field->get_column('search'),
1272 $search_field->get_column('marc_type'),
1273 $search_field->get_column('marc_field'),
1278 =head2 process_error
1280 die process_error($@);
1282 This parses an Elasticsearch error message and produces a human-readable
1283 result from it. This result is probably missing all the useful information
1284 that you might want in diagnosing an issue, so the warning is also logged.
1286 Note that currently the resulting message is not internationalised. This
1287 will happen eventually by some method or other.
1292 my ($self, $msg) = @_;
1294 warn $msg; # simple logging
1296 # This is super-primitive
1297 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1299 return "Unable to perform your search. Please try again.\n";
1302 =head2 _read_configuration
1304 my $conf = _read_configuration();
1306 Reads the I<configuration file> and returns a hash structure with the
1307 configuration information. It raises an exception if mandatory entries
1310 The hashref structure has the following form:
1313 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1314 'index_name' => 'koha_instance',
1317 This is configured by the following in the C<config> block in koha-conf.xml:
1320 <server>127.0.0.1:9200</server>
1321 <server>anotherserver:9200</server>
1322 <index_name>koha_instance</index_name>
1327 sub _read_configuration {
1331 my $conf = C4::Context->config('elasticsearch');
1332 unless ( defined $conf ) {
1333 Koha::Exceptions::Config::MissingEntry->throw(
1334 "Missing <elasticsearch> entry in koha-conf.xml"
1338 unless ( exists $conf->{server} ) {
1339 Koha::Exceptions::Config::MissingEntry->throw(
1340 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1344 unless ( exists $conf->{index_name} ) {
1345 Koha::Exceptions::Config::MissingEntry->throw(
1346 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1350 while ( my ( $var, $val ) = each %$conf ) {
1351 if ( $var eq 'server' ) {
1352 if ( ref($val) eq 'ARRAY' ) {
1353 $configuration->{nodes} = $val;
1356 $configuration->{nodes} = [$val];
1359 $configuration->{$var} = $val;
1363 $configuration->{cxn_pool} //= 'Static';
1365 return $configuration;
1368 =head2 get_facetable_fields
1370 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1372 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1376 sub get_facetable_fields {
1379 # These should correspond to the ES field names, as opposed to the CCL
1380 # things that zebra uses.
1381 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1382 my @faceted_fields = Koha::SearchFields->search(
1383 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1385 my @not_faceted_fields = Koha::SearchFields->search(
1386 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1388 # This could certainly be improved
1389 return ( @faceted_fields, @not_faceted_fields );
1392 =head2 clear_search_fields_cache
1394 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1396 Clear cached values for ES search fields
1400 sub clear_search_fields_cache {
1402 my $cache = Koha::Caches->get_instance();
1403 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1404 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1405 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1406 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1418 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1420 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1422 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>