1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
35 use Carp qw( carp croak );
36 use Clone qw( clone );
38 use Readonly qw( Readonly );
39 use Search::Elasticsearch;
40 use Try::Tiny qw( catch try );
43 use List::Util qw( sum0 );
45 use MIME::Base64 qw( encode_base64 );
46 use Encode qw( encode );
48 use Scalar::Util qw( looks_like_number );
50 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
51 __PACKAGE__->mk_accessors(qw( sort_fields ));
53 # Constants to refer to the standard index names
54 Readonly our $BIBLIOS_INDEX => 'biblios';
55 Readonly our $AUTHORITIES_INDEX => 'authorities';
59 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
67 The name of the index to use, generally 'biblios' or 'authorities'.
71 The Elasticsearch index name with Koha instance prefix.
84 # Check for a valid index
85 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
86 my $config = _read_configuration();
87 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
89 my $self = $class->SUPER::new(@_);
93 =head2 get_elasticsearch
95 my $elasticsearch_client = $self->get_elasticsearch();
97 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
98 instance level and will be reused if method is called multiple times.
102 sub get_elasticsearch {
104 unless (defined $self->{elasticsearch}) {
105 $self->{elasticsearch} = Search::Elasticsearch->new(
106 $self->get_elasticsearch_params()
109 return $self->{elasticsearch};
112 =head2 get_elasticsearch_params
114 my $params = $self->get_elasticsearch_params();
116 This provides a hashref that contains the parameters for connecting to the
117 ElasicSearch servers, in the form:
120 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
121 'index_name' => 'koha_instance_index',
124 This is configured by the following in the C<config> block in koha-conf.xml:
127 <server>127.0.0.1:9200</server>
128 <server>anotherserver:9200</server>
129 <index_name>koha_instance</index_name>
134 sub get_elasticsearch_params {
139 $conf = _read_configuration();
141 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
149 =head2 get_elasticsearch_settings
151 my $settings = $self->get_elasticsearch_settings();
153 This provides the settings provided to Elasticsearch when an index is created.
154 These can do things like define tokenization methods.
156 A hashref containing the settings is returned.
160 sub get_elasticsearch_settings {
163 # Use state to speed up repeated calls
164 state $settings = undef;
165 if (!defined $settings) {
166 my $config_file = C4::Context->config('elasticsearch_index_config');
167 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
168 $settings = YAML::XS::LoadFile( $config_file );
174 =head2 get_elasticsearch_mappings
176 my $mappings = $self->get_elasticsearch_mappings();
178 This provides the mappings that get passed to Elasticsearch when an index is
183 sub get_elasticsearch_mappings {
186 # Use state to speed up repeated calls
190 if (!defined $all_mappings{$self->index}) {
191 $sort_fields{$self->index} = {};
192 # Clone the general mapping to break ties with the original hash
193 my $mappings = clone(_get_elasticsearch_field_config('general', ''));
194 my $marcflavour = lc C4::Context->preference('marcflavour');
195 $self->_foreach_mapping(
197 my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type ) = @_;
198 return if $marc_type ne $marcflavour;
199 # TODO if this gets any sort of complexity to it, it should
200 # be broken out into its own function.
202 # TODO be aware of date formats, but this requires pre-parsing
203 # as ES will simply reject anything with an invalid date.
204 my $es_type = 'text';
205 if ($type eq 'boolean') {
206 $es_type = 'boolean';
207 } elsif ($type eq 'number' || $type eq 'sum') {
208 $es_type = 'integer';
209 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 } elsif ($type eq 'year') {
213 } elsif ($type eq 'callnumber') {
214 $es_type = 'cn_sort';
218 $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222 $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
225 $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227 # Sort should be defined in mappings as 1 (Yes) or 0 (No)
228 # Previously, we also supported ~ (Undef) in the file
229 # "undef" means to do the default thing, which is make it sortable.
230 # This is preserved in order to not cause breakages for existing installs
231 if (!defined $sort || $sort) {
232 $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233 $sort_fields{$self->index}{$name} = 1;
237 if( $self->index eq 'authorities' ){
238 $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text');
239 $mappings->{properties}{ 'subject-heading-thesaurus' } = _get_elasticsearch_field_config('search', 'text');
241 $all_mappings{$self->index} = $mappings;
243 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 raw_elasticsearch_mappings
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
256 sub raw_elasticsearch_mappings {
257 my ( $marc_type ) = @_;
259 my $schema = Koha::Database->new()->schema();
261 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
264 while ( my $search_field = $search_fields->next ) {
266 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267 { search_field_id => $search_field->id },
269 join => 'search_marc_map',
270 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
274 while ( my $marc_to_field = $marc_to_fields->next ) {
276 my $marc_map = $marc_to_field->search_marc_map;
278 next if $marc_type && $marc_map->marc_type ne $marc_type;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
288 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
290 facet => $marc_to_field->facet || '',
291 marc_type => $marc_map->marc_type,
292 marc_field => $marc_map->marc_field,
293 sort => $marc_to_field->sort,
294 suggestible => $marc_to_field->suggestible || '',
295 filter => $marc_to_field->filter || ''
304 =head2 _get_elasticsearch_field_config
306 Get the Elasticsearch field config for the given purpose and data type.
308 $mapping = _get_elasticsearch_field_config('search', 'text');
312 sub _get_elasticsearch_field_config {
314 my ( $purpose, $type ) = @_;
316 # Use state to speed up repeated calls
317 state $settings = undef;
318 if (!defined $settings) {
319 my $config_file = C4::Context->config('elasticsearch_field_config');
320 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
321 local $YAML::XS::Boolean = 'JSON::PP';
322 $settings = YAML::XS::LoadFile( $config_file );
325 if (!defined $settings->{$purpose}) {
326 die "Field purpose $purpose not defined in field config";
329 return $settings->{$purpose};
331 if (defined $settings->{$purpose}{$type}) {
332 return $settings->{$purpose}{$type};
334 if (defined $settings->{$purpose}{'default'}) {
335 return $settings->{$purpose}{'default'};
340 =head2 _load_elasticsearch_mappings
342 Load Elasticsearch mappings in the format of mappings.yaml.
344 $indexes = _load_elasticsearch_mappings();
348 sub _load_elasticsearch_mappings {
349 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
350 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
351 return YAML::XS::LoadFile( $mappings_yaml );
354 sub reset_elasticsearch_mappings {
356 my $indexes = $self->_load_elasticsearch_mappings();
358 Koha::SearchMarcMaps->delete;
359 Koha::SearchFields->delete;
361 while ( my ( $index_name, $fields ) = each %$indexes ) {
362 while ( my ( $field_name, $data ) = each %$fields ) {
364 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
367 $sf_params{staff_client} //= 1;
368 $sf_params{opac} //= 1;
370 $sf_params{name} = $field_name;
372 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
374 my $mappings = $data->{mappings};
375 for my $mapping ( @$mappings ) {
376 my $marc_field = Koha::SearchMarcMaps->find_or_create({
377 index_name => $index_name,
378 marc_type => $mapping->{marc_type},
379 marc_field => $mapping->{marc_field}
381 $search_field->add_to_search_marc_maps($marc_field, {
382 facet => $mapping->{facet} || 0,
383 suggestible => $mapping->{suggestible} || 0,
384 sort => $mapping->{sort} // 1,
385 search => $mapping->{search} // 1,
386 filter => $mapping->{filter} // ''
392 $self->clear_search_fields_cache();
394 # FIXME return the mappings?
397 # This overrides the accessor provided by Class::Accessor so that if
398 # sort_fields isn't set, then it'll generate it.
402 $self->_sort_fields_accessor(@_);
405 my $val = $self->_sort_fields_accessor();
408 # This will populate the accessor as a side effect
409 $self->get_elasticsearch_mappings();
410 return $self->_sort_fields_accessor();
413 =head2 _process_mappings($mappings, $data, $record_document, $meta)
415 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
417 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
418 Since we group all mappings by MARC field targets C<$mappings> will contain
419 all targets for C<$data> and thus we need to fetch the MARC field only once.
420 C<$mappings> will be applied to C<$record_document> and new field values added.
421 The method has no return value.
427 Arrayref of mappings containing arrayrefs in the format
428 [C<$target>, C<$options>] where C<$target> is the name of the target field and
429 C<$options> is a hashref containing processing directives for this particular
434 The source data from a MARC record field.
436 =item C<$record_document>
438 Hashref representing the Elasticsearch document on which mappings should be
443 A hashref containing metadata useful for enforcing per mapping rules. For
444 example for providing extra context for mapping options, or treating mapping
445 targets differently depending on type (sort, search, facet etc). Combining
446 this metadata with the mapping options and metadata allows us to mutate the
447 data per mapping, or even replace it with other data retrieved from the
450 Current properties are:
452 C<altscript>: A boolean value indicating whether an alternate script presentation is being
455 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
456 'subfield' or 'subfields_group'.
458 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
460 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
461 is 'subfields_group'.
463 C<field>: The original C<MARC::Record> object.
469 sub _process_mappings {
470 my ($_self, $mappings, $data, $record_document, $meta) = @_;
471 foreach my $mapping (@{$mappings}) {
472 my ($target, $options) = @{$mapping};
474 # Don't process sort fields for alternate scripts
475 my $sort = $target =~ /__sort$/;
476 if ($sort && $meta->{altscript}) {
480 # Copy (scalar) data since can have multiple targets
481 # with differing options for (possibly) mutating data
482 # so need a different copy for each
483 my $data_copy = $data;
484 if (defined $options->{substr}) {
485 my ($start, $length) = @{$options->{substr}};
486 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
489 # Add data to values array for callbacks processing
490 my $values = [$data_copy];
492 # Value callbacks takes subfield data (or values from previous
493 # callbacks) as argument, and returns a possibly different list of values.
494 # Note that the returned list may also be empty.
495 if (defined $options->{value_callbacks}) {
496 foreach my $callback (@{$options->{value_callbacks}}) {
497 # Pass each value to current callback which returns a list
498 # (scalar is fine too) resulting either in a list or
499 # a list of lists that will be flattened by perl.
500 # The next callback will receive the possibly expanded list of values.
501 $values = [ map { $callback->($_) } @{$values} ];
505 # Skip mapping if all values has been removed
506 next unless @{$values};
508 if (defined $options->{property}) {
509 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
511 if (defined $options->{nonfiling_characters_indicator}) {
512 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
513 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
514 # Nonfiling chars does not make sense for multiple values
515 # Only apply on first element
516 $values->[0] = substr $values->[0], $nonfiling_chars;
519 $values = [ grep(!/^$/, @{$values}) ];
521 $record_document->{$target} //= [];
522 push @{$record_document->{$target}}, @{$values};
526 =head2 marc_records_to_documents($marc_records)
528 my $record_documents = $self->marc_records_to_documents($marc_records);
530 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
532 Returns array of hash references, representing Elasticsearch documents,
533 acceptable as body payload in C<Search::Elasticsearch> requests.
537 =item C<$marc_documents>
539 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
545 sub marc_records_to_documents {
546 my ($self, $records) = @_;
547 my $rules = $self->_get_marc_mapping_rules();
548 my $control_fields_rules = $rules->{control_fields};
549 my $data_fields_rules = $rules->{data_fields};
550 my $marcflavour = lc C4::Context->preference('marcflavour');
551 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
553 my @record_documents;
555 my %auth_match_headings;
556 if( $self->index eq 'authorities' ){
557 my @auth_types = Koha::Authority::Types->search->as_list;
558 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
561 foreach my $record (@{$records}) {
562 my $record_document = {};
564 if ( $self->index eq 'authorities' ){
565 my $authtypecode = GuessAuthTypeCode( $record );
567 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
568 my $field = $record->field( $auth_match_headings{ $authtypecode } );
569 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
570 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
573 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
577 my $mappings = $rules->{leader};
579 $self->_process_mappings($mappings, $record->leader(), $record_document, {
581 data_source => 'leader'
585 foreach my $field ($record->fields()) {
586 if ($field->is_control_field()) {
587 my $mappings = $control_fields_rules->{$field->tag()};
589 $self->_process_mappings($mappings, $field->data(), $record_document, {
591 data_source => 'control_field',
598 my $tag = $field->tag();
599 # Handle alternate scripts in MARC 21
601 if ($marcflavour eq 'marc21' && $tag eq '880') {
602 my $sub6 = $field->subfield('6');
603 if ($sub6 =~ /^(...)-\d+/) {
609 my $data_field_rules = $data_fields_rules->{$tag};
610 if ($data_field_rules) {
611 my $subfields_mappings = $data_field_rules->{subfields};
612 my $wildcard_mappings = $subfields_mappings->{'*'};
613 foreach my $subfield ($field->subfields()) {
614 my ($code, $data) = @{$subfield};
615 my $mappings = $subfields_mappings->{$code} // [];
616 if ($wildcard_mappings) {
617 $mappings = [@{$mappings}, @{$wildcard_mappings}];
620 $self->_process_mappings($mappings, $data, $record_document, {
621 altscript => $altscript,
622 data_source => 'subfield',
630 my $subfields_join_mappings = $data_field_rules->{subfields_join};
631 if ($subfields_join_mappings) {
632 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
633 my $data_field = $field->clone; #copy field to preserve for alt scripts
634 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
635 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
637 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
638 altscript => $altscript,
639 data_source => 'subfields_group',
640 codes => $subfields_group,
651 if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
652 foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
653 my $data_field_rules = $data_fields_rules->{$field->tag()};
654 if ($data_field_rules) {
655 my $subfields_mappings = $data_field_rules->{subfields};
656 my $wildcard_mappings = $subfields_mappings->{'*'};
657 foreach my $subfield ($field->subfields()) {
658 my ($code, $data) = @{$subfield};
660 push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
661 push @mappings, @$wildcard_mappings if $wildcard_mappings;
662 # Do not include "see from" into these kind of fields
663 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
665 $self->_process_mappings(\@mappings, $data, $record_document, {
666 data_source => 'subfield',
674 my $subfields_join_mappings = $data_field_rules->{subfields_join};
675 if ($subfields_join_mappings) {
676 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
677 my $data_field = $field->clone;
678 # remove empty subfields, otherwise they are printed as a space
679 $data_field->delete_subfield(match => qr/^$/);
680 my $data = $data_field->as_string( $subfields_group );
682 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
683 # Do not include "see from" into these kind of fields
684 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
685 $self->_process_mappings(\@mappings, $data, $record_document, {
686 data_source => 'subfields_group',
687 codes => $subfields_group,
698 foreach my $field (keys %{$rules->{defaults}}) {
699 unless (defined $record_document->{$field}) {
700 $record_document->{$field} = $rules->{defaults}->{$field};
703 foreach my $field (@{$rules->{sum}}) {
704 if (defined $record_document->{$field}) {
705 # TODO: validate numeric? filter?
706 # TODO: Or should only accept fields without nested values?
707 # TODO: Quick and dirty, improve if needed
708 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
711 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
712 foreach my $field (@{$rules->{isbn}}) {
713 if (defined $record_document->{$field}) {
715 foreach my $input_isbn (@{$record_document->{$field}}) {
716 my $isbn = Business::ISBN->new($input_isbn);
717 if (defined $isbn && $isbn->is_valid) {
718 my $isbn13 = $isbn->as_isbn13->as_string;
719 push @isbns, $isbn13;
721 push @isbns, $isbn13;
723 my $isbn10 = $isbn->as_isbn10;
725 $isbn10 = $isbn10->as_string;
726 push @isbns, $isbn10;
728 push @isbns, $isbn10;
731 push @isbns, $input_isbn;
734 $record_document->{$field} = \@isbns;
738 # Remove duplicate values and collapse sort fields
739 foreach my $field (keys %{$record_document}) {
740 if (ref($record_document->{$field}) eq 'ARRAY') {
741 @{$record_document->{$field}} = do {
743 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
745 if ($field =~ /__sort$/) {
746 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
747 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
752 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
753 $record->encoding('UTF-8');
755 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
756 $record_document->{'marc_format'} = 'ARRAY';
760 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
761 local $SIG{__WARN__} = sub {
762 push @warnings, $_[0];
764 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
767 # Suppress warnings if record length exceeded
768 unless (substr($record->leader(), 0, 5) eq '99999') {
769 foreach my $warning (@warnings) {
773 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
774 $record_document->{'marc_format'} = 'MARCXML';
777 $record_document->{'marc_format'} = 'base64ISO2709';
781 # Check if there is at least one available item
782 if ($self->index eq $BIBLIOS_INDEX) {
783 my ($tag, $code) = C4::Biblio::GetMarcFromKohaField('biblio.biblionumber');
784 my $field = $record->field($tag);
786 my $biblionumber = $field->is_control_field ? $field->data : $field->subfield($code);
787 my $avail_items = Koha::Items->search({
788 biblionumber => $biblionumber,
793 $record_document->{available} = $avail_items ? \1 : \0;
797 push @record_documents, $record_document;
799 return \@record_documents;
802 =head2 _marc_to_array($record)
804 my @fields = _marc_to_array($record)
806 Convert a MARC::Record to an array modeled after MARC-in-JSON
807 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
813 A MARC::Record object
820 my ($self, $record) = @_;
823 leader => $record->leader(),
826 for my $field ($record->fields()) {
827 my $tag = $field->tag();
828 if ($field->is_control_field()) {
829 push @{$data->{fields}}, {$tag => $field->data()};
832 foreach my $subfield ($field->subfields()) {
833 my ($code, $contents) = @{$subfield};
834 push @{$subfields}, {$code => $contents};
836 push @{$data->{fields}}, {
838 ind1 => $field->indicator(1),
839 ind2 => $field->indicator(2),
840 subfields => $subfields
848 =head2 _array_to_marc($data)
850 my $record = _array_to_marc($data)
852 Convert an array modeled after MARC-in-JSON to a MARC::Record
858 An array modeled after MARC-in-JSON
859 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
866 my ($self, $data) = @_;
868 my $record = MARC::Record->new();
870 $record->leader($data->{leader});
871 for my $field (@{$data->{fields}}) {
872 my $tag = (keys %{$field})[0];
873 $field = $field->{$tag};
875 if (ref($field) eq 'HASH') {
877 foreach my $subfield (@{$field->{subfields}}) {
878 my $code = (keys %{$subfield})[0];
879 push @subfields, $code;
880 push @subfields, $subfield->{$code};
882 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
884 $marc_field = MARC::Field->new($tag, $field)
886 $record->append_fields($marc_field);
892 =head2 _field_mappings($facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range)
894 my @mappings = _field_mappings( $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range )
896 Get mappings, an internal data structure later used by
897 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
898 data for a MARC mapping.
900 The returned C<$mappings> is not to to be confused with mappings provided by
901 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
902 provided by C<_foreach_mapping> and expands it to this internal data structure.
903 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
904 is then applied to each MARC target (leader, control field data, subfield or
905 joined subfields) and integrated into the mapping rules data structure used in
906 C<marc_records_to_documents> to transform MARC records into Elasticsearch
913 Boolean indicating whether to create a facet field for this mapping.
915 =item C<$suggestible>
917 Boolean indicating whether to create a suggestion field for this mapping.
921 Boolean indicating whether to create a sort field for this mapping.
925 Boolean indicating whether to create a search field for this mapping.
927 =item C<$target_name>
929 Elasticsearch document target field name.
931 =item C<$target_type>
933 Elasticsearch document target field type.
937 An optional range as a string in the format "<START>-<END>" or "<START>",
938 where "<START>" and "<END>" are integers specifying a range that will be used
939 for extracting a substring from MARC data as Elasticsearch field target value.
941 The first character position is "0", and the range is inclusive,
942 so "0-2" means the first three characters of MARC data.
944 If only "<START>" is provided only one character at position "<START>" will
951 sub _field_mappings {
952 my ( $_self, $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range ) = @_;
953 my %mapping_defaults = ();
956 my $substr_args = undef;
957 if (defined $range) {
958 # TODO: use value_callback instead?
959 my ($start, $end) = map(int, split /-/, $range, 2);
960 $substr_args = [$start];
961 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
963 my $default_options = {};
965 $default_options->{substr} = $substr_args;
968 # TODO: Should probably have per type value callback/hook
969 # but hard code for now
970 if ($target_type eq 'boolean') {
971 $default_options->{value_callbacks} //= [];
972 push @{$default_options->{value_callbacks}}, sub {
974 # Trim whitespace at both ends
975 $value =~ s/^\s+|\s+$//g;
976 return $value ? 'true' : 'false';
979 elsif ($target_type eq 'year') {
980 $default_options->{value_callbacks} //= [];
981 # Only accept years containing digits and "u"
982 push @{$default_options->{value_callbacks}}, sub {
984 # Replace "u" with "0" for sorting
985 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
989 if ( defined $filter && $filter eq 'punctuation' ) {
990 $default_options->{value_callbacks} //= [];
991 push @{ $default_options->{value_callbacks} }, sub {
994 # Trim punctuation marks from field
996 s/[\x00-\x1F,\x21-\x2F,\x3A-\x40,\x5B-\x60,\x7B-\x89,\x8B,\x8D,\x8F,\x90-\x99,\x9B,\x9D,\xA0-\xBF,\xD7,\xF7]//g;
1002 my $mapping = [$target_name, $default_options];
1003 push @mappings, $mapping;
1007 push @suffixes, 'facet' if $facet;
1008 push @suffixes, 'suggestion' if $suggestible;
1009 push @suffixes, 'sort' if !defined $sort || $sort;
1011 foreach my $suffix (@suffixes) {
1012 my $mapping = ["${target_name}__$suffix"];
1013 # TODO: Hack, fix later in less hideous manner
1014 if ($suffix eq 'suggestion') {
1015 push @{$mapping}, {%{$default_options}, property => 'input'};
1018 # Important! Make shallow clone, or we end up with the same hashref
1019 # shared by all mappings
1020 push @{$mapping}, {%{$default_options}};
1022 push @mappings, $mapping;
1027 =head2 _get_marc_mapping_rules
1029 my $mapping_rules = $self->_get_marc_mapping_rules()
1031 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1033 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1034 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1035 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1037 We can then iterate through all MARC fields for each record and apply all relevant
1038 rules once per fields instead of retreiving fields multiple times for each mapping rule
1039 which is terribly slow.
1043 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1044 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1045 # memory cache which it is currently not. The performance gain of caching
1046 # would probably be marginal, but to do this could be a further improvement.
1048 sub _get_marc_mapping_rules {
1050 my $marcflavour = lc C4::Context->preference('marcflavour');
1051 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1052 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1055 'control_fields' => {},
1056 'data_fields' => {},
1062 $self->_foreach_mapping(sub {
1063 my ($name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type, $marc_field) = @_;
1064 return if $marc_type ne $marcflavour;
1066 if ($type eq 'sum') {
1067 push @{$rules->{sum}}, $name;
1068 push @{$rules->{sum}}, $name."__sort" if $sort;
1070 elsif ($type eq 'isbn') {
1071 push @{$rules->{isbn}}, $name;
1073 elsif ($type eq 'boolean') {
1074 # boolean gets special handling, if value doesn't exist for a field,
1075 # it is set to false
1076 $rules->{defaults}->{$name} = 'false';
1079 if ($marc_field =~ $field_spec_regexp) {
1083 my @subfield_groups;
1084 # Parse and separate subfields form subfield groups
1086 my $subfield_group = '';
1089 foreach my $token (split //, $2) {
1090 if ($token eq "(") {
1092 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1093 "Unmatched opening parenthesis for $marc_field"
1100 elsif ($token eq ")") {
1102 if ($subfield_group) {
1103 push @subfield_groups, $subfield_group;
1104 $subfield_group = '';
1109 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1110 "Unmatched closing parenthesis for $marc_field"
1114 elsif ($open_group) {
1115 $subfield_group .= $token;
1118 push @subfields, $token;
1123 push @subfields, '*';
1126 my $range = defined $3 ? $3 : undef;
1127 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1128 if ($field_tag < 10) {
1129 $rules->{control_fields}->{$field_tag} //= [];
1130 push @{$rules->{control_fields}->{$field_tag}}, @{clone(\@mappings)};
1133 $rules->{data_fields}->{$field_tag} //= {};
1134 foreach my $subfield (@subfields) {
1135 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1136 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @{clone(\@mappings)};
1138 foreach my $subfield_group (@subfield_groups) {
1139 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1140 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @{clone(\@mappings)};
1144 elsif ($marc_field =~ $leader_regexp) {
1145 my $range = defined $1 ? $1 : undef;
1146 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1147 push @{$rules->{leader}}, @{clone(\@mappings)};
1150 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1151 "Invalid MARC field expression: $marc_field"
1156 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1157 if ($marcflavour eq 'marc21') {
1158 # Nonfiling characters processing for sort fields
1160 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1161 # Format is: nonfiling characters indicator => field names list
1163 1 => [130, 630, 730, 740],
1164 2 => [222, 240, 242, 243, 245, 440, 830]
1167 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1170 2 => [130, 430, 530]
1173 foreach my $indicator (keys %title_fields) {
1174 foreach my $field_tag (@{$title_fields{$indicator}}) {
1175 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1176 foreach my $mapping (@{$mappings}) {
1177 if ($mapping->[0] =~ /__sort$/) {
1178 # Mark this as to be processed for nonfiling characters indicator
1179 # later on in _process_mappings
1180 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1187 if( $self->index eq 'authorities' ){
1188 push @{$rules->{control_fields}->{'008'}}, ['subject-heading-thesaurus', { 'substr' => [ 11, 1 ] } ];
1189 push @{$rules->{data_fields}->{'040'}->{subfields}->{f}}, ['subject-heading-thesaurus', { } ];
1195 =head2 _foreach_mapping
1197 $self->_foreach_mapping(
1199 my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type,
1202 return unless $marc_type eq 'marc21';
1203 print "Data comes from: " . $marc_field . "\n";
1207 This allows you to apply a function to each entry in the elasticsearch mappings
1208 table, in order to build the mappings for whatever is needed.
1210 In the provided function, the files are:
1216 The field name for elasticsearch (corresponds to the 'mapping' column in the
1221 The type for this value, e.g. 'string'.
1225 True if this value should be facetised. This only really makes sense if the
1226 field is understood by the facet processing code anyway.
1230 True if this is a field that a) needs special sort handling, and b) if it
1231 should be sorted on. False if a) but not b). Undef if not a). This allows,
1232 for example, author to be sorted on but not everything marked with "author"
1233 to be included in that sort.
1237 True if this value should be searchable.
1241 Contains a string that represents a filter defined in the indexing code. Currently supports
1242 the option 'punctuation'
1246 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1249 =item C<$marc_field>
1251 A string that describes the MARC field that contains the data to extract.
1257 sub _foreach_mapping {
1258 my ( $self, $sub ) = @_;
1260 # TODO use a caching framework here
1261 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1263 'search_marc_map.index_name' => $self->index,
1265 { join => { search_marc_to_fields => 'search_marc_map' },
1267 'search_marc_to_fields.facet',
1268 'search_marc_to_fields.suggestible',
1269 'search_marc_to_fields.sort',
1270 'search_marc_to_fields.search',
1271 'search_marc_to_fields.filter',
1272 'search_marc_map.marc_type',
1273 'search_marc_map.marc_field',
1287 while ( my $search_field = $search_fields->next ) {
1289 # Force lower case on indexed field names for case insensitive
1290 # field name searches
1291 lc($search_field->name),
1292 $search_field->type,
1293 $search_field->get_column('facet'),
1294 $search_field->get_column('suggestible'),
1295 $search_field->get_column('sort'),
1296 $search_field->get_column('search'),
1297 $search_field->get_column('filter'),
1298 $search_field->get_column('marc_type'),
1299 $search_field->get_column('marc_field'),
1304 =head2 process_error
1306 die process_error($@);
1308 This parses an Elasticsearch error message and produces a human-readable
1309 result from it. This result is probably missing all the useful information
1310 that you might want in diagnosing an issue, so the warning is also logged.
1312 Note that currently the resulting message is not internationalised. This
1313 will happen eventually by some method or other.
1318 my ($self, $msg) = @_;
1320 warn $msg; # simple logging
1322 # This is super-primitive
1323 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1325 return "Unable to perform your search. Please try again.\n";
1328 =head2 _read_configuration
1330 my $conf = _read_configuration();
1332 Reads the I<configuration file> and returns a hash structure with the
1333 configuration information. It raises an exception if mandatory entries
1336 The hashref structure has the following form:
1339 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1340 'index_name' => 'koha_instance',
1343 This is configured by the following in the C<config> block in koha-conf.xml:
1346 <server>127.0.0.1:9200</server>
1347 <server>anotherserver:9200</server>
1348 <index_name>koha_instance</index_name>
1353 sub _read_configuration {
1357 my $conf = C4::Context->config('elasticsearch');
1358 unless ( defined $conf ) {
1359 Koha::Exceptions::Config::MissingEntry->throw(
1360 "Missing <elasticsearch> entry in koha-conf.xml"
1364 unless ( exists $conf->{server} ) {
1365 Koha::Exceptions::Config::MissingEntry->throw(
1366 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1370 unless ( exists $conf->{index_name} ) {
1371 Koha::Exceptions::Config::MissingEntry->throw(
1372 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1376 while ( my ( $var, $val ) = each %$conf ) {
1377 if ( $var eq 'server' ) {
1378 if ( ref($val) eq 'ARRAY' ) {
1379 $configuration->{nodes} = $val;
1382 $configuration->{nodes} = [$val];
1385 $configuration->{$var} = $val;
1389 $configuration->{cxn_pool} //= 'Static';
1391 return $configuration;
1394 =head2 get_facetable_fields
1396 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1398 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1402 sub get_facetable_fields {
1405 # These should correspond to the ES field names, as opposed to the CCL
1406 # things that zebra uses.
1407 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1408 my @faceted_fields = Koha::SearchFields->search(
1409 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1411 my @not_faceted_fields = Koha::SearchFields->search(
1412 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1414 # This could certainly be improved
1415 return ( @faceted_fields, @not_faceted_fields );
1418 =head2 clear_search_fields_cache
1420 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1422 Clear cached values for ES search fields
1426 sub clear_search_fields_cache {
1428 my $cache = Koha::Caches->get_instance();
1429 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1430 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1431 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1432 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1444 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1446 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1448 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>