1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
37 use Search::Elasticsearch;
41 use List::Util qw( sum0 reduce );
44 use Encode qw(encode);
46 use Scalar::Util qw(looks_like_number);
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
69 The Elasticsearch index name with Koha instance prefix.
82 # Check for a valid index
83 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84 my $config = _read_configuration();
85 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87 my $self = $class->SUPER::new(@_);
91 =head2 get_elasticsearch
93 my $elasticsearch_client = $self->get_elasticsearch();
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
100 sub get_elasticsearch {
102 unless (defined $self->{elasticsearch}) {
103 $self->{elasticsearch} = Search::Elasticsearch->new(
104 $self->get_elasticsearch_params()
107 return $self->{elasticsearch};
110 =head2 get_elasticsearch_params
112 my $params = $self->get_elasticsearch_params();
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
118 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119 'index_name' => 'koha_instance_index',
122 This is configured by the following in the C<config> block in koha-conf.xml:
125 <server>127.0.0.1:9200</server>
126 <server>anotherserver:9200</server>
127 <index_name>koha_instance</index_name>
132 sub get_elasticsearch_params {
137 $conf = _read_configuration();
139 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
143 # Extract relevant parts of configuration
145 nodes => $conf->{nodes}
147 $params->{cxn_pool} //= 'Static';
152 =head2 get_elasticsearch_settings
154 my $settings = $self->get_elasticsearch_settings();
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
159 A hashref containing the settings is returned.
163 sub get_elasticsearch_settings {
166 # Use state to speed up repeated calls
167 state $settings = undef;
168 if (!defined $settings) {
169 my $config_file = C4::Context->config('elasticsearch_index_config');
170 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171 $settings = LoadFile( $config_file );
177 =head2 get_elasticsearch_mappings
179 my $mappings = $self->get_elasticsearch_mappings();
181 This provides the mappings that get passed to Elasticsearch when an index is
186 sub get_elasticsearch_mappings {
189 # Use state to speed up repeated calls
193 if (!defined $all_mappings{$self->index}) {
194 $sort_fields{$self->index} = {};
195 # Clone the general mapping to break ties with the original hash
197 data => clone(_get_elasticsearch_field_config('general', ''))
199 my $marcflavour = lc C4::Context->preference('marcflavour');
200 $self->_foreach_mapping(
202 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
203 return if $marc_type ne $marcflavour;
204 # TODO if this gets any sort of complexity to it, it should
205 # be broken out into its own function.
207 # TODO be aware of date formats, but this requires pre-parsing
208 # as ES will simply reject anything with an invalid date.
209 my $es_type = 'text';
210 if ($type eq 'boolean') {
211 $es_type = 'boolean';
212 } elsif ($type eq 'number' || $type eq 'sum') {
213 $es_type = 'integer';
214 } elsif ($type eq 'isbn' || $type eq 'stdno') {
219 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
228 # Sort is a bit special as it can be true, false, undef.
229 # We care about "true" or "undef",
230 # "undef" means to do the default thing, which is make it sortable.
231 if (!defined $sort || $sort) {
232 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233 $sort_fields{$self->index}{$name} = 1;
237 $all_mappings{$self->index} = $mappings;
239 $self->sort_fields(\%{$sort_fields{$self->index}});
241 return $all_mappings{$self->index};
244 =head2 raw_elasticsearch_mappings
246 Return elasticsearch mapping as it is in database.
247 marc_type: marc21|unimarc|normarc
249 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253 sub raw_elasticsearch_mappings {
254 my ( $marc_type ) = @_;
256 my $schema = Koha::Database->new()->schema();
258 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
261 while ( my $search_field = $search_fields->next ) {
263 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
264 { search_field_id => $search_field->id },
266 join => 'search_marc_map',
267 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
271 while ( my $marc_to_field = $marc_to_fields->next ) {
273 my $marc_map = $marc_to_field->search_marc_map;
275 next if $marc_type && $marc_map->marc_type ne $marc_type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
284 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
286 facet => $marc_to_field->facet || '',
287 marc_type => $marc_map->marc_type,
288 marc_field => $marc_map->marc_field,
289 sort => $marc_to_field->sort,
290 suggestible => $marc_to_field->suggestible || ''
299 =head2 _get_elasticsearch_field_config
301 Get the Elasticsearch field config for the given purpose and data type.
303 $mapping = _get_elasticsearch_field_config('search', 'text');
307 sub _get_elasticsearch_field_config {
309 my ( $purpose, $type ) = @_;
311 # Use state to speed up repeated calls
312 state $settings = undef;
313 if (!defined $settings) {
314 my $config_file = C4::Context->config('elasticsearch_field_config');
315 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
316 $settings = LoadFile( $config_file );
319 if (!defined $settings->{$purpose}) {
320 die "Field purpose $purpose not defined in field config";
323 return $settings->{$purpose};
325 if (defined $settings->{$purpose}{$type}) {
326 return $settings->{$purpose}{$type};
328 if (defined $settings->{$purpose}{'default'}) {
329 return $settings->{$purpose}{'default'};
334 =head2 _load_elasticsearch_mappings
336 Load Elasticsearch mappings in the format of mappings.yaml.
338 $indexes = _load_elasticsearch_mappings();
342 sub _load_elasticsearch_mappings {
343 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345 return LoadFile( $mappings_yaml );
348 sub reset_elasticsearch_mappings {
350 my $indexes = $self->_load_elasticsearch_mappings();
352 Koha::SearchMarcMaps->delete;
353 Koha::SearchFields->delete;
355 while ( my ( $index_name, $fields ) = each %$indexes ) {
356 while ( my ( $field_name, $data ) = each %$fields ) {
358 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
361 $sf_params{staff_client} //= 1;
362 $sf_params{opac} //= 1;
364 $sf_params{name} = $field_name;
366 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
368 my $mappings = $data->{mappings};
369 for my $mapping ( @$mappings ) {
370 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371 index_name => $index_name,
372 marc_type => $mapping->{marc_type},
373 marc_field => $mapping->{marc_field}
375 $search_field->add_to_search_marc_maps($marc_field, {
376 facet => $mapping->{facet} || 0,
377 suggestible => $mapping->{suggestible} || 0,
378 sort => $mapping->{sort},
379 search => $mapping->{search} // 1
385 my $cache = Koha::Caches->get_instance();
386 $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
387 $cache->clear_from_cache('elasticsearch_search_fields_opac');
389 # FIXME return the mappings?
392 # This overrides the accessor provided by Class::Accessor so that if
393 # sort_fields isn't set, then it'll generate it.
397 $self->_sort_fields_accessor(@_);
400 my $val = $self->_sort_fields_accessor();
403 # This will populate the accessor as a side effect
404 $self->get_elasticsearch_mappings();
405 return $self->_sort_fields_accessor();
408 =head2 _process_mappings($mappings, $data, $record_document, $meta)
410 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
412 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
413 Since we group all mappings by MARC field targets C<$mappings> will contain
414 all targets for C<$data> and thus we need to fetch the MARC field only once.
415 C<$mappings> will be applied to C<$record_document> and new field values added.
416 The method has no return value.
422 Arrayref of mappings containing arrayrefs in the format
423 [C<$target>, C<$options>] where C<$target> is the name of the target field and
424 C<$options> is a hashref containing processing directives for this particular
429 The source data from a MARC record field.
431 =item C<$record_document>
433 Hashref representing the Elasticsearch document on which mappings should be
438 A hashref containing metadata useful for enforcing per mapping rules. For
439 example for providing extra context for mapping options, or treating mapping
440 targets differently depending on type (sort, search, facet etc). Combining
441 this metadata with the mapping options and metadata allows us to mutate the
442 data per mapping, or even replace it with other data retrieved from the
445 Current properties are:
447 C<altscript>: A boolean value indicating whether an alternate script presentation is being
450 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
451 'subfield' or 'subfields_group'.
453 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
455 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
456 is 'subfields_group'.
458 C<field>: The original C<MARC::Record> object.
464 sub _process_mappings {
465 my ($_self, $mappings, $data, $record_document, $meta) = @_;
466 foreach my $mapping (@{$mappings}) {
467 my ($target, $options) = @{$mapping};
469 # Don't process sort fields for alternate scripts
470 my $sort = $target =~ /__sort$/;
471 if ($sort && $meta->{altscript}) {
475 # Copy (scalar) data since can have multiple targets
476 # with differing options for (possibly) mutating data
477 # so need a different copy for each
479 $record_document->{$target} //= [];
480 if (defined $options->{substr}) {
481 my ($start, $length) = @{$options->{substr}};
482 $_data = length($data) > $start ? substr $data, $start, $length : '';
484 if (defined $options->{value_callbacks}) {
485 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
487 if (defined $options->{property}) {
489 $options->{property} => $_data
492 if (defined $options->{nonfiling_characters_indicator}) {
493 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
494 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
495 if ($nonfiling_chars) {
496 $_data = substr $_data, $nonfiling_chars;
499 push @{$record_document->{$target}}, $_data;
503 =head2 marc_records_to_documents($marc_records)
505 my $record_documents = $self->marc_records_to_documents($marc_records);
507 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
509 Returns array of hash references, representing Elasticsearch documents,
510 acceptable as body payload in C<Search::Elasticsearch> requests.
514 =item C<$marc_documents>
516 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
522 sub marc_records_to_documents {
523 my ($self, $records) = @_;
524 my $rules = $self->_get_marc_mapping_rules();
525 my $control_fields_rules = $rules->{control_fields};
526 my $data_fields_rules = $rules->{data_fields};
527 my $marcflavour = lc C4::Context->preference('marcflavour');
528 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
530 my @record_documents;
532 foreach my $record (@{$records}) {
533 my $record_document = {};
534 my $mappings = $rules->{leader};
536 $self->_process_mappings($mappings, $record->leader(), $record_document, {
538 data_source => 'leader'
542 foreach my $field ($record->fields()) {
543 if ($field->is_control_field()) {
544 my $mappings = $control_fields_rules->{$field->tag()};
546 $self->_process_mappings($mappings, $field->data(), $record_document, {
548 data_source => 'control_field',
555 my $tag = $field->tag();
556 # Handle alternate scripts in MARC 21
558 if ($marcflavour eq 'marc21' && $tag eq '880') {
559 my $sub6 = $field->subfield('6');
560 if ($sub6 =~ /^(...)-\d+/) {
566 my $data_field_rules = $data_fields_rules->{$tag};
567 if ($data_field_rules) {
568 my $subfields_mappings = $data_field_rules->{subfields};
569 my $wildcard_mappings = $subfields_mappings->{'*'};
570 foreach my $subfield ($field->subfields()) {
571 my ($code, $data) = @{$subfield};
572 my $mappings = $subfields_mappings->{$code} // [];
573 if ($wildcard_mappings) {
574 $mappings = [@{$mappings}, @{$wildcard_mappings}];
577 $self->_process_mappings($mappings, $data, $record_document, {
578 altscript => $altscript,
579 data_source => 'subfield',
585 if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
586 # Used by the authority linker the match-heading field requires a specific syntax
587 # that is specified in C4/Heading
588 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
589 next unless $heading;
590 push @{$record_document->{'match-heading'}}, $heading->search_form;
594 my $subfields_join_mappings = $data_field_rules->{subfields_join};
595 if ($subfields_join_mappings) {
596 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
597 # Map each subfield to values, remove empty values, join with space
602 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
606 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
607 altscript => $altscript,
608 data_source => 'subfields_group',
609 codes => $subfields_group,
614 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
615 # Used by the authority linker the match-heading field requires a specific syntax
616 # that is specified in C4/Heading
617 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
618 next unless $heading;
619 push @{$record_document->{'match-heading'}}, $heading->search_form;
626 foreach my $field (keys %{$rules->{defaults}}) {
627 unless (defined $record_document->{$field}) {
628 $record_document->{$field} = $rules->{defaults}->{$field};
631 foreach my $field (@{$rules->{sum}}) {
632 if (defined $record_document->{$field}) {
633 # TODO: validate numeric? filter?
634 # TODO: Or should only accept fields without nested values?
635 # TODO: Quick and dirty, improve if needed
636 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
639 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
640 foreach my $field (@{$rules->{isbn}}) {
641 if (defined $record_document->{$field}) {
643 foreach my $input_isbn (@{$record_document->{$field}}) {
644 my $isbn = Business::ISBN->new($input_isbn);
645 if (defined $isbn && $isbn->is_valid) {
646 my $isbn13 = $isbn->as_isbn13->as_string;
647 push @isbns, $isbn13;
649 push @isbns, $isbn13;
651 my $isbn10 = $isbn->as_isbn10;
653 $isbn10 = $isbn10->as_string;
654 push @isbns, $isbn10;
656 push @isbns, $isbn10;
659 push @isbns, $input_isbn;
662 $record_document->{$field} = \@isbns;
666 # Remove duplicate values and collapse sort fields
667 foreach my $field (keys %{$record_document}) {
668 if (ref($record_document->{$field}) eq 'ARRAY') {
669 @{$record_document->{$field}} = do {
671 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
673 if ($field =~ /__sort$/) {
674 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
675 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
680 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
681 $record->encoding('UTF-8');
683 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
684 $record_document->{'marc_format'} = 'ARRAY';
688 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
689 local $SIG{__WARN__} = sub {
690 push @warnings, $_[0];
692 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
695 # Suppress warnings if record length exceeded
696 unless (substr($record->leader(), 0, 5) eq '99999') {
697 foreach my $warning (@warnings) {
701 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
702 $record_document->{'marc_format'} = 'MARCXML';
705 $record_document->{'marc_format'} = 'base64ISO2709';
708 push @record_documents, $record_document;
710 return \@record_documents;
713 =head2 _marc_to_array($record)
715 my @fields = _marc_to_array($record)
717 Convert a MARC::Record to an array modeled after MARC-in-JSON
718 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
724 A MARC::Record object
731 my ($self, $record) = @_;
734 leader => $record->leader(),
737 for my $field ($record->fields()) {
738 my $tag = $field->tag();
739 if ($field->is_control_field()) {
740 push @{$data->{fields}}, {$tag => $field->data()};
743 foreach my $subfield ($field->subfields()) {
744 my ($code, $contents) = @{$subfield};
745 push @{$subfields}, {$code => $contents};
747 push @{$data->{fields}}, {
749 ind1 => $field->indicator(1),
750 ind2 => $field->indicator(2),
751 subfields => $subfields
759 =head2 _array_to_marc($data)
761 my $record = _array_to_marc($data)
763 Convert an array modeled after MARC-in-JSON to a MARC::Record
769 An array modeled after MARC-in-JSON
770 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
777 my ($self, $data) = @_;
779 my $record = MARC::Record->new();
781 $record->leader($data->{leader});
782 for my $field (@{$data->{fields}}) {
783 my $tag = (keys %{$field})[0];
784 $field = $field->{$tag};
786 if (ref($field) eq 'HASH') {
788 foreach my $subfield (@{$field->{subfields}}) {
789 my $code = (keys %{$subfield})[0];
790 push @subfields, $code;
791 push @subfields, $subfield->{$code};
793 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
795 $marc_field = MARC::Field->new($tag, $field)
797 $record->append_fields($marc_field);
803 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
805 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
807 Get mappings, an internal data structure later used by
808 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
809 data for a MARC mapping.
811 The returned C<$mappings> is not to to be confused with mappings provided by
812 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
813 provided by C<_foreach_mapping> and expands it to this internal data structure.
814 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
815 is then applied to each MARC target (leader, control field data, subfield or
816 joined subfields) and integrated into the mapping rules data structure used in
817 C<marc_records_to_documents> to transform MARC records into Elasticsearch
824 Boolean indicating whether to create a facet field for this mapping.
826 =item C<$suggestible>
828 Boolean indicating whether to create a suggestion field for this mapping.
832 Boolean indicating whether to create a sort field for this mapping.
836 Boolean indicating whether to create a search field for this mapping.
838 =item C<$target_name>
840 Elasticsearch document target field name.
842 =item C<$target_type>
844 Elasticsearch document target field type.
848 An optional range as a string in the format "<START>-<END>" or "<START>",
849 where "<START>" and "<END>" are integers specifying a range that will be used
850 for extracting a substring from MARC data as Elasticsearch field target value.
852 The first character position is "0", and the range is inclusive,
853 so "0-2" means the first three characters of MARC data.
855 If only "<START>" is provided only one character at position "<START>" will
862 sub _field_mappings {
863 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
864 my %mapping_defaults = ();
867 my $substr_args = undef;
868 if (defined $range) {
869 # TODO: use value_callback instead?
870 my ($start, $end) = map(int, split /-/, $range, 2);
871 $substr_args = [$start];
872 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
874 my $default_options = {};
876 $default_options->{substr} = $substr_args;
879 # TODO: Should probably have per type value callback/hook
880 # but hard code for now
881 if ($target_type eq 'boolean') {
882 $default_options->{value_callbacks} //= [];
883 push @{$default_options->{value_callbacks}}, sub {
885 # Trim whitespace at both ends
886 $value =~ s/^\s+|\s+$//g;
887 return $value ? 'true' : 'false';
892 my $mapping = [$target_name, $default_options];
893 push @mappings, $mapping;
897 push @suffixes, 'facet' if $facet;
898 push @suffixes, 'suggestion' if $suggestible;
899 push @suffixes, 'sort' if !defined $sort || $sort;
901 foreach my $suffix (@suffixes) {
902 my $mapping = ["${target_name}__$suffix"];
903 # TODO: Hack, fix later in less hideous manner
904 if ($suffix eq 'suggestion') {
905 push @{$mapping}, {%{$default_options}, property => 'input'};
908 # Important! Make shallow clone, or we end up with the same hashref
909 # shared by all mappings
910 push @{$mapping}, {%{$default_options}};
912 push @mappings, $mapping;
917 =head2 _get_marc_mapping_rules
919 my $mapping_rules = $self->_get_marc_mapping_rules()
921 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
923 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
924 each call to C<MARC::Record>->field) we create an optimized structure of mapping
925 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
927 We can then iterate through all MARC fields for each record and apply all relevant
928 rules once per fields instead of retreiving fields multiple times for each mapping rule
929 which is terribly slow.
933 # TODO: This structure can be used for processing multiple MARC::Records so is currently
934 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
935 # memory cache which it is currently not. The performance gain of caching
936 # would probably be marginal, but to do this could be a further improvement.
938 sub _get_marc_mapping_rules {
940 my $marcflavour = lc C4::Context->preference('marcflavour');
941 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
942 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
945 'control_fields' => {},
952 $self->_foreach_mapping(sub {
953 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
954 return if $marc_type ne $marcflavour;
956 if ($type eq 'sum') {
957 push @{$rules->{sum}}, $name;
958 push @{$rules->{sum}}, $name."__sort" if $sort;
960 elsif ($type eq 'isbn') {
961 push @{$rules->{isbn}}, $name;
963 elsif ($type eq 'boolean') {
964 # boolean gets special handling, if value doesn't exist for a field,
966 $rules->{defaults}->{$name} = 'false';
969 if ($marc_field =~ $field_spec_regexp) {
974 # Parse and separate subfields form subfield groups
976 my $subfield_group = '';
979 foreach my $token (split //, $2) {
982 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
983 "Unmatched opening parenthesis for $marc_field"
990 elsif ($token eq ")") {
992 if ($subfield_group) {
993 push @subfield_groups, $subfield_group;
994 $subfield_group = '';
999 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1000 "Unmatched closing parenthesis for $marc_field"
1004 elsif ($open_group) {
1005 $subfield_group .= $token;
1008 push @subfields, $token;
1013 push @subfields, '*';
1016 my $range = defined $3 ? $3 : undef;
1017 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1018 if ($field_tag < 10) {
1019 $rules->{control_fields}->{$field_tag} //= [];
1020 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1023 $rules->{data_fields}->{$field_tag} //= {};
1024 foreach my $subfield (@subfields) {
1025 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1026 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1028 foreach my $subfield_group (@subfield_groups) {
1029 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1030 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1034 elsif ($marc_field =~ $leader_regexp) {
1035 my $range = defined $1 ? $1 : undef;
1036 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1037 push @{$rules->{leader}}, @mappings;
1040 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1041 "Invalid MARC field expression: $marc_field"
1046 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1047 if ($marcflavour eq 'marc21') {
1048 # Nonfiling characters processing for sort fields
1050 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1051 # Format is: nonfiling characters indicator => field names list
1053 1 => [130, 630, 730, 740],
1054 2 => [222, 240, 242, 243, 245, 440, 830]
1057 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1060 2 => [130, 430, 530]
1063 foreach my $indicator (keys %title_fields) {
1064 foreach my $field_tag (@{$title_fields{$indicator}}) {
1065 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1066 foreach my $mapping (@{$mappings}) {
1067 if ($mapping->[0] =~ /__sort$/) {
1068 # Mark this as to be processed for nonfiling characters indicator
1069 # later on in _process_mappings
1070 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1080 =head2 _foreach_mapping
1082 $self->_foreach_mapping(
1084 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1087 return unless $marc_type eq 'marc21';
1088 print "Data comes from: " . $marc_field . "\n";
1092 This allows you to apply a function to each entry in the elasticsearch mappings
1093 table, in order to build the mappings for whatever is needed.
1095 In the provided function, the files are:
1101 The field name for elasticsearch (corresponds to the 'mapping' column in the
1106 The type for this value, e.g. 'string'.
1110 True if this value should be facetised. This only really makes sense if the
1111 field is understood by the facet processing code anyway.
1115 True if this is a field that a) needs special sort handling, and b) if it
1116 should be sorted on. False if a) but not b). Undef if not a). This allows,
1117 for example, author to be sorted on but not everything marked with "author"
1118 to be included in that sort.
1122 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1123 'unimarc', 'normarc'.
1125 =item C<$marc_field>
1127 A string that describes the MARC field that contains the data to extract.
1133 sub _foreach_mapping {
1134 my ( $self, $sub ) = @_;
1136 # TODO use a caching framework here
1137 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1139 'search_marc_map.index_name' => $self->index,
1141 { join => { search_marc_to_fields => 'search_marc_map' },
1143 'search_marc_to_fields.facet',
1144 'search_marc_to_fields.suggestible',
1145 'search_marc_to_fields.sort',
1146 'search_marc_to_fields.search',
1147 'search_marc_map.marc_type',
1148 'search_marc_map.marc_field',
1161 while ( my $search_field = $search_fields->next ) {
1163 # Force lower case on indexed field names for case insensitive
1164 # field name searches
1165 lc($search_field->name),
1166 $search_field->type,
1167 $search_field->get_column('facet'),
1168 $search_field->get_column('suggestible'),
1169 $search_field->get_column('sort'),
1170 $search_field->get_column('search'),
1171 $search_field->get_column('marc_type'),
1172 $search_field->get_column('marc_field'),
1177 =head2 process_error
1179 die process_error($@);
1181 This parses an Elasticsearch error message and produces a human-readable
1182 result from it. This result is probably missing all the useful information
1183 that you might want in diagnosing an issue, so the warning is also logged.
1185 Note that currently the resulting message is not internationalised. This
1186 will happen eventually by some method or other.
1191 my ($self, $msg) = @_;
1193 warn $msg; # simple logging
1195 # This is super-primitive
1196 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1198 return "Unable to perform your search. Please try again.\n";
1201 =head2 _read_configuration
1203 my $conf = _read_configuration();
1205 Reads the I<configuration file> and returns a hash structure with the
1206 configuration information. It raises an exception if mandatory entries
1209 The hashref structure has the following form:
1212 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1213 'index_name' => 'koha_instance',
1216 This is configured by the following in the C<config> block in koha-conf.xml:
1219 <server>127.0.0.1:9200</server>
1220 <server>anotherserver:9200</server>
1221 <index_name>koha_instance</index_name>
1226 sub _read_configuration {
1230 my $conf = C4::Context->config('elasticsearch');
1231 unless ( defined $conf ) {
1232 Koha::Exceptions::Config::MissingEntry->throw(
1233 "Missing <elasticsearch> entry in koha-conf.xml"
1237 if ( $conf && $conf->{server} ) {
1238 my $nodes = $conf->{server};
1239 if ( ref($nodes) eq 'ARRAY' ) {
1240 $configuration->{nodes} = $nodes;
1243 $configuration->{nodes} = [$nodes];
1247 Koha::Exceptions::Config::MissingEntry->throw(
1248 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1252 if ( defined $conf->{index_name} ) {
1253 $configuration->{index_name} = $conf->{index_name};
1256 Koha::Exceptions::Config::MissingEntry->throw(
1257 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1261 return $configuration;
1264 =head2 get_facetable_fields
1266 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1268 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1272 sub get_facetable_fields {
1275 # These should correspond to the ES field names, as opposed to the CCL
1276 # things that zebra uses.
1277 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1278 my @faceted_fields = Koha::SearchFields->search(
1279 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1281 my @not_faceted_fields = Koha::SearchFields->search(
1282 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1284 # This could certainly be improved
1285 return ( @faceted_fields, @not_faceted_fields );
1296 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1298 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1300 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>