1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
37 use Search::Elasticsearch;
41 use List::Util qw( sum0 reduce );
44 use Encode qw(encode);
46 use Scalar::Util qw(looks_like_number);
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
69 The Elasticsearch index name with Koha instance prefix.
82 # Check for a valid index
83 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84 my $config = _read_configuration();
85 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87 my $self = $class->SUPER::new(@_);
91 =head2 get_elasticsearch
93 my $elasticsearch_client = $self->get_elasticsearch();
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
100 sub get_elasticsearch {
102 unless (defined $self->{elasticsearch}) {
103 $self->{elasticsearch} = Search::Elasticsearch->new(
104 $self->get_elasticsearch_params()
107 return $self->{elasticsearch};
110 =head2 get_elasticsearch_params
112 my $params = $self->get_elasticsearch_params();
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
118 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119 'index_name' => 'koha_instance_index',
122 This is configured by the following in the C<config> block in koha-conf.xml:
125 <server>127.0.0.1:9200</server>
126 <server>anotherserver:9200</server>
127 <index_name>koha_instance</index_name>
132 sub get_elasticsearch_params {
137 $conf = _read_configuration();
139 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
143 # Extract relevant parts of configuration
145 nodes => $conf->{nodes}
147 $params->{cxn_pool} //= 'Static';
152 =head2 get_elasticsearch_settings
154 my $settings = $self->get_elasticsearch_settings();
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
159 A hashref containing the settings is returned.
163 sub get_elasticsearch_settings {
166 # Use state to speed up repeated calls
167 state $settings = undef;
168 if (!defined $settings) {
169 my $config_file = C4::Context->config('elasticsearch_index_config');
170 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171 $settings = LoadFile( $config_file );
177 =head2 get_elasticsearch_mappings
179 my $mappings = $self->get_elasticsearch_mappings();
181 This provides the mappings that get passed to Elasticsearch when an index is
186 sub get_elasticsearch_mappings {
189 # Use state to speed up repeated calls
193 if (!defined $all_mappings{$self->index}) {
194 $sort_fields{$self->index} = {};
195 # Clone the general mapping to break ties with the original hash
197 data => clone(_get_elasticsearch_field_config('general', ''))
199 my $marcflavour = lc C4::Context->preference('marcflavour');
200 $self->_foreach_mapping(
202 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
203 return if $marc_type ne $marcflavour;
204 # TODO if this gets any sort of complexity to it, it should
205 # be broken out into its own function.
207 # TODO be aware of date formats, but this requires pre-parsing
208 # as ES will simply reject anything with an invalid date.
209 my $es_type = 'text';
210 if ($type eq 'boolean') {
211 $es_type = 'boolean';
212 } elsif ($type eq 'number' || $type eq 'sum') {
213 $es_type = 'integer';
214 } elsif ($type eq 'isbn' || $type eq 'stdno') {
219 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
228 # Sort is a bit special as it can be true, false, undef.
229 # We care about "true" or "undef",
230 # "undef" means to do the default thing, which is make it sortable.
231 if (!defined $sort || $sort) {
232 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233 $sort_fields{$self->index}{$name} = 1;
237 $all_mappings{$self->index} = $mappings;
239 $self->sort_fields(\%{$sort_fields{$self->index}});
241 return $all_mappings{$self->index};
244 =head2 raw_elasticsearch_mappings
246 Return elasticsearch mapping as it is in database.
247 marc_type: marc21|unimarc|normarc
249 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253 sub raw_elasticsearch_mappings {
254 my ( $marc_type ) = @_;
256 my $schema = Koha::Database->new()->schema();
258 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
261 while ( my $search_field = $search_fields->next ) {
263 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
264 { search_field_id => $search_field->id },
266 join => 'search_marc_map',
267 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
271 while ( my $marc_to_field = $marc_to_fields->next ) {
273 my $marc_map = $marc_to_field->search_marc_map;
275 next if $marc_type && $marc_map->marc_type ne $marc_type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
284 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
286 facet => $marc_to_field->facet || '',
287 marc_type => $marc_map->marc_type,
288 marc_field => $marc_map->marc_field,
289 sort => $marc_to_field->sort,
290 suggestible => $marc_to_field->suggestible || ''
299 =head2 _get_elasticsearch_field_config
301 Get the Elasticsearch field config for the given purpose and data type.
303 $mapping = _get_elasticsearch_field_config('search', 'text');
307 sub _get_elasticsearch_field_config {
309 my ( $purpose, $type ) = @_;
311 # Use state to speed up repeated calls
312 state $settings = undef;
313 if (!defined $settings) {
314 my $config_file = C4::Context->config('elasticsearch_field_config');
315 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
316 $settings = LoadFile( $config_file );
319 if (!defined $settings->{$purpose}) {
320 die "Field purpose $purpose not defined in field config";
323 return $settings->{$purpose};
325 if (defined $settings->{$purpose}{$type}) {
326 return $settings->{$purpose}{$type};
328 if (defined $settings->{$purpose}{'default'}) {
329 return $settings->{$purpose}{'default'};
334 =head2 _load_elasticsearch_mappings
336 Load Elasticsearch mappings in the format of mappings.yaml.
338 $indexes = _load_elasticsearch_mappings();
342 sub _load_elasticsearch_mappings {
343 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345 return LoadFile( $mappings_yaml );
348 sub reset_elasticsearch_mappings {
350 my $indexes = $self->_load_elasticsearch_mappings();
352 Koha::SearchMarcMaps->delete;
353 Koha::SearchFields->delete;
355 while ( my ( $index_name, $fields ) = each %$indexes ) {
356 while ( my ( $field_name, $data ) = each %$fields ) {
358 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
361 $sf_params{staff_client} //= 1;
362 $sf_params{opac} //= 1;
364 $sf_params{name} = $field_name;
366 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
368 my $mappings = $data->{mappings};
369 for my $mapping ( @$mappings ) {
370 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371 index_name => $index_name,
372 marc_type => $mapping->{marc_type},
373 marc_field => $mapping->{marc_field}
375 $search_field->add_to_search_marc_maps($marc_field, {
376 facet => $mapping->{facet} || 0,
377 suggestible => $mapping->{suggestible} || 0,
378 sort => $mapping->{sort},
379 search => $mapping->{search} // 1
385 $self->clear_search_fields_cache();
387 # FIXME return the mappings?
390 # This overrides the accessor provided by Class::Accessor so that if
391 # sort_fields isn't set, then it'll generate it.
395 $self->_sort_fields_accessor(@_);
398 my $val = $self->_sort_fields_accessor();
401 # This will populate the accessor as a side effect
402 $self->get_elasticsearch_mappings();
403 return $self->_sort_fields_accessor();
406 =head2 _process_mappings($mappings, $data, $record_document, $meta)
408 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
410 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
411 Since we group all mappings by MARC field targets C<$mappings> will contain
412 all targets for C<$data> and thus we need to fetch the MARC field only once.
413 C<$mappings> will be applied to C<$record_document> and new field values added.
414 The method has no return value.
420 Arrayref of mappings containing arrayrefs in the format
421 [C<$target>, C<$options>] where C<$target> is the name of the target field and
422 C<$options> is a hashref containing processing directives for this particular
427 The source data from a MARC record field.
429 =item C<$record_document>
431 Hashref representing the Elasticsearch document on which mappings should be
436 A hashref containing metadata useful for enforcing per mapping rules. For
437 example for providing extra context for mapping options, or treating mapping
438 targets differently depending on type (sort, search, facet etc). Combining
439 this metadata with the mapping options and metadata allows us to mutate the
440 data per mapping, or even replace it with other data retrieved from the
443 Current properties are:
445 C<altscript>: A boolean value indicating whether an alternate script presentation is being
448 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
449 'subfield' or 'subfields_group'.
451 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
453 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
454 is 'subfields_group'.
456 C<field>: The original C<MARC::Record> object.
462 sub _process_mappings {
463 my ($_self, $mappings, $data, $record_document, $meta) = @_;
464 foreach my $mapping (@{$mappings}) {
465 my ($target, $options) = @{$mapping};
467 # Don't process sort fields for alternate scripts
468 my $sort = $target =~ /__sort$/;
469 if ($sort && $meta->{altscript}) {
473 # Copy (scalar) data since can have multiple targets
474 # with differing options for (possibly) mutating data
475 # so need a different copy for each
477 $record_document->{$target} //= [];
478 if (defined $options->{substr}) {
479 my ($start, $length) = @{$options->{substr}};
480 $_data = length($data) > $start ? substr $data, $start, $length : '';
482 if (defined $options->{value_callbacks}) {
483 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
485 if (defined $options->{property}) {
487 $options->{property} => $_data
490 if (defined $options->{nonfiling_characters_indicator}) {
491 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
492 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
493 if ($nonfiling_chars) {
494 $_data = substr $_data, $nonfiling_chars;
497 push @{$record_document->{$target}}, $_data;
501 =head2 marc_records_to_documents($marc_records)
503 my $record_documents = $self->marc_records_to_documents($marc_records);
505 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
507 Returns array of hash references, representing Elasticsearch documents,
508 acceptable as body payload in C<Search::Elasticsearch> requests.
512 =item C<$marc_documents>
514 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
520 sub marc_records_to_documents {
521 my ($self, $records) = @_;
522 my $rules = $self->_get_marc_mapping_rules();
523 my $control_fields_rules = $rules->{control_fields};
524 my $data_fields_rules = $rules->{data_fields};
525 my $marcflavour = lc C4::Context->preference('marcflavour');
526 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
528 my @record_documents;
530 foreach my $record (@{$records}) {
531 my $record_document = {};
532 my $mappings = $rules->{leader};
534 $self->_process_mappings($mappings, $record->leader(), $record_document, {
536 data_source => 'leader'
540 foreach my $field ($record->fields()) {
541 if ($field->is_control_field()) {
542 my $mappings = $control_fields_rules->{$field->tag()};
544 $self->_process_mappings($mappings, $field->data(), $record_document, {
546 data_source => 'control_field',
553 my $tag = $field->tag();
554 # Handle alternate scripts in MARC 21
556 if ($marcflavour eq 'marc21' && $tag eq '880') {
557 my $sub6 = $field->subfield('6');
558 if ($sub6 =~ /^(...)-\d+/) {
564 my $data_field_rules = $data_fields_rules->{$tag};
565 if ($data_field_rules) {
566 my $subfields_mappings = $data_field_rules->{subfields};
567 my $wildcard_mappings = $subfields_mappings->{'*'};
568 foreach my $subfield ($field->subfields()) {
569 my ($code, $data) = @{$subfield};
570 my $mappings = $subfields_mappings->{$code} // [];
571 if ($wildcard_mappings) {
572 $mappings = [@{$mappings}, @{$wildcard_mappings}];
575 $self->_process_mappings($mappings, $data, $record_document, {
576 altscript => $altscript,
577 data_source => 'subfield',
583 if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
584 # Used by the authority linker the match-heading field requires a specific syntax
585 # that is specified in C4/Heading
586 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
587 next unless $heading;
588 push @{$record_document->{'match-heading'}}, $heading->search_form;
592 my $subfields_join_mappings = $data_field_rules->{subfields_join};
593 if ($subfields_join_mappings) {
594 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
595 # Map each subfield to values, remove empty values, join with space
600 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
604 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
605 altscript => $altscript,
606 data_source => 'subfields_group',
607 codes => $subfields_group,
612 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
613 # Used by the authority linker the match-heading field requires a specific syntax
614 # that is specified in C4/Heading
615 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
616 next unless $heading;
617 push @{$record_document->{'match-heading'}}, $heading->search_form;
624 foreach my $field (keys %{$rules->{defaults}}) {
625 unless (defined $record_document->{$field}) {
626 $record_document->{$field} = $rules->{defaults}->{$field};
629 foreach my $field (@{$rules->{sum}}) {
630 if (defined $record_document->{$field}) {
631 # TODO: validate numeric? filter?
632 # TODO: Or should only accept fields without nested values?
633 # TODO: Quick and dirty, improve if needed
634 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
637 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
638 foreach my $field (@{$rules->{isbn}}) {
639 if (defined $record_document->{$field}) {
641 foreach my $input_isbn (@{$record_document->{$field}}) {
642 my $isbn = Business::ISBN->new($input_isbn);
643 if (defined $isbn && $isbn->is_valid) {
644 my $isbn13 = $isbn->as_isbn13->as_string;
645 push @isbns, $isbn13;
647 push @isbns, $isbn13;
649 my $isbn10 = $isbn->as_isbn10;
651 $isbn10 = $isbn10->as_string;
652 push @isbns, $isbn10;
654 push @isbns, $isbn10;
657 push @isbns, $input_isbn;
660 $record_document->{$field} = \@isbns;
664 # Remove duplicate values and collapse sort fields
665 foreach my $field (keys %{$record_document}) {
666 if (ref($record_document->{$field}) eq 'ARRAY') {
667 @{$record_document->{$field}} = do {
669 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
671 if ($field =~ /__sort$/) {
672 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
673 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
678 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
679 $record->encoding('UTF-8');
681 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
682 $record_document->{'marc_format'} = 'ARRAY';
686 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
687 local $SIG{__WARN__} = sub {
688 push @warnings, $_[0];
690 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
693 # Suppress warnings if record length exceeded
694 unless (substr($record->leader(), 0, 5) eq '99999') {
695 foreach my $warning (@warnings) {
699 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
700 $record_document->{'marc_format'} = 'MARCXML';
703 $record_document->{'marc_format'} = 'base64ISO2709';
706 push @record_documents, $record_document;
708 return \@record_documents;
711 =head2 _marc_to_array($record)
713 my @fields = _marc_to_array($record)
715 Convert a MARC::Record to an array modeled after MARC-in-JSON
716 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
722 A MARC::Record object
729 my ($self, $record) = @_;
732 leader => $record->leader(),
735 for my $field ($record->fields()) {
736 my $tag = $field->tag();
737 if ($field->is_control_field()) {
738 push @{$data->{fields}}, {$tag => $field->data()};
741 foreach my $subfield ($field->subfields()) {
742 my ($code, $contents) = @{$subfield};
743 push @{$subfields}, {$code => $contents};
745 push @{$data->{fields}}, {
747 ind1 => $field->indicator(1),
748 ind2 => $field->indicator(2),
749 subfields => $subfields
757 =head2 _array_to_marc($data)
759 my $record = _array_to_marc($data)
761 Convert an array modeled after MARC-in-JSON to a MARC::Record
767 An array modeled after MARC-in-JSON
768 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
775 my ($self, $data) = @_;
777 my $record = MARC::Record->new();
779 $record->leader($data->{leader});
780 for my $field (@{$data->{fields}}) {
781 my $tag = (keys %{$field})[0];
782 $field = $field->{$tag};
784 if (ref($field) eq 'HASH') {
786 foreach my $subfield (@{$field->{subfields}}) {
787 my $code = (keys %{$subfield})[0];
788 push @subfields, $code;
789 push @subfields, $subfield->{$code};
791 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
793 $marc_field = MARC::Field->new($tag, $field)
795 $record->append_fields($marc_field);
801 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
803 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
805 Get mappings, an internal data structure later used by
806 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
807 data for a MARC mapping.
809 The returned C<$mappings> is not to to be confused with mappings provided by
810 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
811 provided by C<_foreach_mapping> and expands it to this internal data structure.
812 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
813 is then applied to each MARC target (leader, control field data, subfield or
814 joined subfields) and integrated into the mapping rules data structure used in
815 C<marc_records_to_documents> to transform MARC records into Elasticsearch
822 Boolean indicating whether to create a facet field for this mapping.
824 =item C<$suggestible>
826 Boolean indicating whether to create a suggestion field for this mapping.
830 Boolean indicating whether to create a sort field for this mapping.
834 Boolean indicating whether to create a search field for this mapping.
836 =item C<$target_name>
838 Elasticsearch document target field name.
840 =item C<$target_type>
842 Elasticsearch document target field type.
846 An optional range as a string in the format "<START>-<END>" or "<START>",
847 where "<START>" and "<END>" are integers specifying a range that will be used
848 for extracting a substring from MARC data as Elasticsearch field target value.
850 The first character position is "0", and the range is inclusive,
851 so "0-2" means the first three characters of MARC data.
853 If only "<START>" is provided only one character at position "<START>" will
860 sub _field_mappings {
861 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
862 my %mapping_defaults = ();
865 my $substr_args = undef;
866 if (defined $range) {
867 # TODO: use value_callback instead?
868 my ($start, $end) = map(int, split /-/, $range, 2);
869 $substr_args = [$start];
870 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
872 my $default_options = {};
874 $default_options->{substr} = $substr_args;
877 # TODO: Should probably have per type value callback/hook
878 # but hard code for now
879 if ($target_type eq 'boolean') {
880 $default_options->{value_callbacks} //= [];
881 push @{$default_options->{value_callbacks}}, sub {
883 # Trim whitespace at both ends
884 $value =~ s/^\s+|\s+$//g;
885 return $value ? 'true' : 'false';
890 my $mapping = [$target_name, $default_options];
891 push @mappings, $mapping;
895 push @suffixes, 'facet' if $facet;
896 push @suffixes, 'suggestion' if $suggestible;
897 push @suffixes, 'sort' if !defined $sort || $sort;
899 foreach my $suffix (@suffixes) {
900 my $mapping = ["${target_name}__$suffix"];
901 # TODO: Hack, fix later in less hideous manner
902 if ($suffix eq 'suggestion') {
903 push @{$mapping}, {%{$default_options}, property => 'input'};
906 # Important! Make shallow clone, or we end up with the same hashref
907 # shared by all mappings
908 push @{$mapping}, {%{$default_options}};
910 push @mappings, $mapping;
915 =head2 _get_marc_mapping_rules
917 my $mapping_rules = $self->_get_marc_mapping_rules()
919 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
921 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
922 each call to C<MARC::Record>->field) we create an optimized structure of mapping
923 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
925 We can then iterate through all MARC fields for each record and apply all relevant
926 rules once per fields instead of retreiving fields multiple times for each mapping rule
927 which is terribly slow.
931 # TODO: This structure can be used for processing multiple MARC::Records so is currently
932 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
933 # memory cache which it is currently not. The performance gain of caching
934 # would probably be marginal, but to do this could be a further improvement.
936 sub _get_marc_mapping_rules {
938 my $marcflavour = lc C4::Context->preference('marcflavour');
939 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
940 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
943 'control_fields' => {},
950 $self->_foreach_mapping(sub {
951 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
952 return if $marc_type ne $marcflavour;
954 if ($type eq 'sum') {
955 push @{$rules->{sum}}, $name;
956 push @{$rules->{sum}}, $name."__sort" if $sort;
958 elsif ($type eq 'isbn') {
959 push @{$rules->{isbn}}, $name;
961 elsif ($type eq 'boolean') {
962 # boolean gets special handling, if value doesn't exist for a field,
964 $rules->{defaults}->{$name} = 'false';
967 if ($marc_field =~ $field_spec_regexp) {
972 # Parse and separate subfields form subfield groups
974 my $subfield_group = '';
977 foreach my $token (split //, $2) {
980 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
981 "Unmatched opening parenthesis for $marc_field"
988 elsif ($token eq ")") {
990 if ($subfield_group) {
991 push @subfield_groups, $subfield_group;
992 $subfield_group = '';
997 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
998 "Unmatched closing parenthesis for $marc_field"
1002 elsif ($open_group) {
1003 $subfield_group .= $token;
1006 push @subfields, $token;
1011 push @subfields, '*';
1014 my $range = defined $3 ? $3 : undef;
1015 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1016 if ($field_tag < 10) {
1017 $rules->{control_fields}->{$field_tag} //= [];
1018 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1021 $rules->{data_fields}->{$field_tag} //= {};
1022 foreach my $subfield (@subfields) {
1023 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1024 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1026 foreach my $subfield_group (@subfield_groups) {
1027 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1028 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1032 elsif ($marc_field =~ $leader_regexp) {
1033 my $range = defined $1 ? $1 : undef;
1034 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1035 push @{$rules->{leader}}, @mappings;
1038 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1039 "Invalid MARC field expression: $marc_field"
1044 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1045 if ($marcflavour eq 'marc21') {
1046 # Nonfiling characters processing for sort fields
1048 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1049 # Format is: nonfiling characters indicator => field names list
1051 1 => [130, 630, 730, 740],
1052 2 => [222, 240, 242, 243, 245, 440, 830]
1055 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1058 2 => [130, 430, 530]
1061 foreach my $indicator (keys %title_fields) {
1062 foreach my $field_tag (@{$title_fields{$indicator}}) {
1063 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1064 foreach my $mapping (@{$mappings}) {
1065 if ($mapping->[0] =~ /__sort$/) {
1066 # Mark this as to be processed for nonfiling characters indicator
1067 # later on in _process_mappings
1068 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1078 =head2 _foreach_mapping
1080 $self->_foreach_mapping(
1082 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1085 return unless $marc_type eq 'marc21';
1086 print "Data comes from: " . $marc_field . "\n";
1090 This allows you to apply a function to each entry in the elasticsearch mappings
1091 table, in order to build the mappings for whatever is needed.
1093 In the provided function, the files are:
1099 The field name for elasticsearch (corresponds to the 'mapping' column in the
1104 The type for this value, e.g. 'string'.
1108 True if this value should be facetised. This only really makes sense if the
1109 field is understood by the facet processing code anyway.
1113 True if this is a field that a) needs special sort handling, and b) if it
1114 should be sorted on. False if a) but not b). Undef if not a). This allows,
1115 for example, author to be sorted on but not everything marked with "author"
1116 to be included in that sort.
1120 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1121 'unimarc', 'normarc'.
1123 =item C<$marc_field>
1125 A string that describes the MARC field that contains the data to extract.
1131 sub _foreach_mapping {
1132 my ( $self, $sub ) = @_;
1134 # TODO use a caching framework here
1135 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1137 'search_marc_map.index_name' => $self->index,
1139 { join => { search_marc_to_fields => 'search_marc_map' },
1141 'search_marc_to_fields.facet',
1142 'search_marc_to_fields.suggestible',
1143 'search_marc_to_fields.sort',
1144 'search_marc_to_fields.search',
1145 'search_marc_map.marc_type',
1146 'search_marc_map.marc_field',
1159 while ( my $search_field = $search_fields->next ) {
1161 # Force lower case on indexed field names for case insensitive
1162 # field name searches
1163 lc($search_field->name),
1164 $search_field->type,
1165 $search_field->get_column('facet'),
1166 $search_field->get_column('suggestible'),
1167 $search_field->get_column('sort'),
1168 $search_field->get_column('search'),
1169 $search_field->get_column('marc_type'),
1170 $search_field->get_column('marc_field'),
1175 =head2 process_error
1177 die process_error($@);
1179 This parses an Elasticsearch error message and produces a human-readable
1180 result from it. This result is probably missing all the useful information
1181 that you might want in diagnosing an issue, so the warning is also logged.
1183 Note that currently the resulting message is not internationalised. This
1184 will happen eventually by some method or other.
1189 my ($self, $msg) = @_;
1191 warn $msg; # simple logging
1193 # This is super-primitive
1194 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1196 return "Unable to perform your search. Please try again.\n";
1199 =head2 _read_configuration
1201 my $conf = _read_configuration();
1203 Reads the I<configuration file> and returns a hash structure with the
1204 configuration information. It raises an exception if mandatory entries
1207 The hashref structure has the following form:
1210 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1211 'index_name' => 'koha_instance',
1214 This is configured by the following in the C<config> block in koha-conf.xml:
1217 <server>127.0.0.1:9200</server>
1218 <server>anotherserver:9200</server>
1219 <index_name>koha_instance</index_name>
1224 sub _read_configuration {
1228 my $conf = C4::Context->config('elasticsearch');
1229 unless ( defined $conf ) {
1230 Koha::Exceptions::Config::MissingEntry->throw(
1231 "Missing <elasticsearch> entry in koha-conf.xml"
1235 if ( $conf && $conf->{server} ) {
1236 my $nodes = $conf->{server};
1237 if ( ref($nodes) eq 'ARRAY' ) {
1238 $configuration->{nodes} = $nodes;
1241 $configuration->{nodes} = [$nodes];
1245 Koha::Exceptions::Config::MissingEntry->throw(
1246 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1250 if ( defined $conf->{index_name} ) {
1251 $configuration->{index_name} = $conf->{index_name};
1254 Koha::Exceptions::Config::MissingEntry->throw(
1255 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1259 return $configuration;
1262 =head2 get_facetable_fields
1264 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1266 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1270 sub get_facetable_fields {
1273 # These should correspond to the ES field names, as opposed to the CCL
1274 # things that zebra uses.
1275 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1276 my @faceted_fields = Koha::SearchFields->search(
1277 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1279 my @not_faceted_fields = Koha::SearchFields->search(
1280 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1282 # This could certainly be improved
1283 return ( @faceted_fields, @not_faceted_fields );
1286 =head2 clear_search_fields_cache
1288 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1290 Clear cached values for ES search fields
1294 sub clear_search_fields_cache {
1296 my $cache = Koha::Caches->get_instance();
1297 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1298 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1299 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1300 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1312 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1314 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1316 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>