1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
37 use Search::Elasticsearch;
41 use List::Util qw( sum0 reduce );
44 use Encode qw(encode);
46 use Scalar::Util qw(looks_like_number);
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
75 my $self = $class->SUPER::new(@_);
76 # Check for a valid index
77 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
81 =head2 get_elasticsearch
83 my $elasticsearch_client = $self->get_elasticsearch();
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
90 sub get_elasticsearch {
92 unless (defined $self->{elasticsearch}) {
93 $self->{elasticsearch} = Search::Elasticsearch->new(
94 $self->get_elasticsearch_params()
97 return $self->{elasticsearch};
100 =head2 get_elasticsearch_params
102 my $params = $self->get_elasticsearch_params();
104 This provides a hashref that contains the parameters for connecting to the
105 ElasicSearch servers, in the form:
108 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
109 'index_name' => 'koha_instance_index',
112 This is configured by the following in the C<config> block in koha-conf.xml:
115 <server>127.0.0.1:9200</server>
116 <server>anotherserver:9200</server>
117 <index_name>koha_instance</index_name>
122 sub get_elasticsearch_params {
125 # Copy the hash so that we're not modifying the original
126 my $conf = C4::Context->config('elasticsearch');
127 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
128 my $es = { %{ $conf } };
130 # Helpfully, the multiple server lines end up in an array for us anyway
131 # if there are multiple ones, but not if there's only one.
132 my $server = $es->{server};
133 delete $es->{server};
134 if ( ref($server) eq 'ARRAY' ) {
136 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
137 $es->{nodes} = $server;
140 $es->{nodes} = [$server];
143 die "No elasticsearch servers were specified in koha-conf.xml.\n";
145 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
146 if ( !$es->{index_name} );
147 # Append the name of this particular index to our namespace
148 $es->{index_name} .= '_' . $self->index;
150 $es->{key_prefix} = 'es_';
151 $es->{cxn_pool} //= 'Static';
152 $es->{request_timeout} //= 60;
157 =head2 get_elasticsearch_settings
159 my $settings = $self->get_elasticsearch_settings();
161 This provides the settings provided to Elasticsearch when an index is created.
162 These can do things like define tokenization methods.
164 A hashref containing the settings is returned.
168 sub get_elasticsearch_settings {
171 # Use state to speed up repeated calls
172 state $settings = undef;
173 if (!defined $settings) {
174 my $config_file = C4::Context->config('elasticsearch_index_config');
175 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
176 $settings = LoadFile( $config_file );
182 =head2 get_elasticsearch_mappings
184 my $mappings = $self->get_elasticsearch_mappings();
186 This provides the mappings that get passed to Elasticsearch when an index is
191 sub get_elasticsearch_mappings {
194 # Use state to speed up repeated calls
198 if (!defined $all_mappings{$self->index}) {
199 $sort_fields{$self->index} = {};
200 # Clone the general mapping to break ties with the original hash
202 data => clone(_get_elasticsearch_field_config('general', ''))
204 my $marcflavour = lc C4::Context->preference('marcflavour');
205 $self->_foreach_mapping(
207 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
208 return if $marc_type ne $marcflavour;
209 # TODO if this gets any sort of complexity to it, it should
210 # be broken out into its own function.
212 # TODO be aware of date formats, but this requires pre-parsing
213 # as ES will simply reject anything with an invalid date.
214 my $es_type = 'text';
215 if ($type eq 'boolean') {
216 $es_type = 'boolean';
217 } elsif ($type eq 'number' || $type eq 'sum') {
218 $es_type = 'integer';
219 } elsif ($type eq 'isbn' || $type eq 'stdno') {
224 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
228 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
231 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
233 # Sort is a bit special as it can be true, false, undef.
234 # We care about "true" or "undef",
235 # "undef" means to do the default thing, which is make it sortable.
236 if (!defined $sort || $sort) {
237 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
238 $sort_fields{$self->index}{$name} = 1;
242 $all_mappings{$self->index} = $mappings;
244 $self->sort_fields(\%{$sort_fields{$self->index}});
246 return $all_mappings{$self->index};
249 =head2 raw_elasticsearch_mappings
251 Return elasticsearch mapping as it is in database.
252 marc_type: marc21|unimarc|normarc
254 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
258 sub raw_elasticsearch_mappings {
259 my ( $marc_type ) = @_;
261 my $schema = Koha::Database->new()->schema();
263 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
266 while ( my $search_field = $search_fields->next ) {
268 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
269 { search_field_id => $search_field->id },
271 join => 'search_marc_map',
272 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
276 while ( my $marc_to_field = $marc_to_fields->next ) {
278 my $marc_map = $marc_to_field->search_marc_map;
280 next if $marc_type && $marc_map->marc_type ne $marc_type;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
284 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
285 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
286 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
287 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
289 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
291 facet => $marc_to_field->facet || '',
292 marc_type => $marc_map->marc_type,
293 marc_field => $marc_map->marc_field,
294 sort => $marc_to_field->sort,
295 suggestible => $marc_to_field->suggestible || ''
304 =head2 _get_elasticsearch_field_config
306 Get the Elasticsearch field config for the given purpose and data type.
308 $mapping = _get_elasticsearch_field_config('search', 'text');
312 sub _get_elasticsearch_field_config {
314 my ( $purpose, $type ) = @_;
316 # Use state to speed up repeated calls
317 state $settings = undef;
318 if (!defined $settings) {
319 my $config_file = C4::Context->config('elasticsearch_field_config');
320 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
321 $settings = LoadFile( $config_file );
324 if (!defined $settings->{$purpose}) {
325 die "Field purpose $purpose not defined in field config";
328 return $settings->{$purpose};
330 if (defined $settings->{$purpose}{$type}) {
331 return $settings->{$purpose}{$type};
333 if (defined $settings->{$purpose}{'default'}) {
334 return $settings->{$purpose}{'default'};
339 =head2 _load_elasticsearch_mappings
341 Load Elasticsearch mappings in the format of mappings.yaml.
343 $indexes = _load_elasticsearch_mappings();
347 sub _load_elasticsearch_mappings {
348 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
349 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
350 return LoadFile( $mappings_yaml );
353 sub reset_elasticsearch_mappings {
355 my $indexes = $self->_load_elasticsearch_mappings();
357 Koha::SearchMarcMaps->delete;
358 Koha::SearchFields->delete;
360 while ( my ( $index_name, $fields ) = each %$indexes ) {
361 while ( my ( $field_name, $data ) = each %$fields ) {
363 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
366 $sf_params{staff_client} //= 1;
367 $sf_params{opac} //= 1;
369 $sf_params{name} = $field_name;
371 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
373 my $mappings = $data->{mappings};
374 for my $mapping ( @$mappings ) {
375 my $marc_field = Koha::SearchMarcMaps->find_or_create({
376 index_name => $index_name,
377 marc_type => $mapping->{marc_type},
378 marc_field => $mapping->{marc_field}
380 $search_field->add_to_search_marc_maps($marc_field, {
381 facet => $mapping->{facet} || 0,
382 suggestible => $mapping->{suggestible} || 0,
383 sort => $mapping->{sort},
384 search => $mapping->{search} // 1
390 my $cache = Koha::Caches->get_instance();
391 $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
392 $cache->clear_from_cache('elasticsearch_search_fields_opac');
394 # FIXME return the mappings?
397 # This overrides the accessor provided by Class::Accessor so that if
398 # sort_fields isn't set, then it'll generate it.
402 $self->_sort_fields_accessor(@_);
405 my $val = $self->_sort_fields_accessor();
408 # This will populate the accessor as a side effect
409 $self->get_elasticsearch_mappings();
410 return $self->_sort_fields_accessor();
413 =head2 _process_mappings($mappings, $data, $record_document, $meta)
415 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
417 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
418 Since we group all mappings by MARC field targets C<$mappings> will contain
419 all targets for C<$data> and thus we need to fetch the MARC field only once.
420 C<$mappings> will be applied to C<$record_document> and new field values added.
421 The method has no return value.
427 Arrayref of mappings containing arrayrefs in the format
428 [C<$target>, C<$options>] where C<$target> is the name of the target field and
429 C<$options> is a hashref containing processing directives for this particular
434 The source data from a MARC record field.
436 =item C<$record_document>
438 Hashref representing the Elasticsearch document on which mappings should be
443 A hashref containing metadata useful for enforcing per mapping rules. For
444 example for providing extra context for mapping options, or treating mapping
445 targets differently depending on type (sort, search, facet etc). Combining
446 this metadata with the mapping options and metadata allows us to mutate the
447 data per mapping, or even replace it with other data retrieved from the
450 Current properties are:
452 C<altscript>: A boolean value indicating whether an alternate script presentation is being
455 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
456 'subfield' or 'subfields_group'.
458 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
460 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
461 is 'subfields_group'.
463 C<field>: The original C<MARC::Record> object.
469 sub _process_mappings {
470 my ($_self, $mappings, $data, $record_document, $meta) = @_;
471 foreach my $mapping (@{$mappings}) {
472 my ($target, $options) = @{$mapping};
474 # Don't process sort fields for alternate scripts
475 my $sort = $target =~ /__sort$/;
476 if ($sort && $meta->{altscript}) {
480 # Copy (scalar) data since can have multiple targets
481 # with differing options for (possibly) mutating data
482 # so need a different copy for each
484 $record_document->{$target} //= [];
485 if (defined $options->{substr}) {
486 my ($start, $length) = @{$options->{substr}};
487 $_data = length($data) > $start ? substr $data, $start, $length : '';
489 if (defined $options->{value_callbacks}) {
490 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
492 if (defined $options->{property}) {
494 $options->{property} => $_data
497 if (defined $options->{nonfiling_characters_indicator}) {
498 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
499 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
500 if ($nonfiling_chars) {
501 $_data = substr $_data, $nonfiling_chars;
504 push @{$record_document->{$target}}, $_data;
508 =head2 marc_records_to_documents($marc_records)
510 my $record_documents = $self->marc_records_to_documents($marc_records);
512 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
514 Returns array of hash references, representing Elasticsearch documents,
515 acceptable as body payload in C<Search::Elasticsearch> requests.
519 =item C<$marc_documents>
521 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
527 sub marc_records_to_documents {
528 my ($self, $records) = @_;
529 my $rules = $self->_get_marc_mapping_rules();
530 my $control_fields_rules = $rules->{control_fields};
531 my $data_fields_rules = $rules->{data_fields};
532 my $marcflavour = lc C4::Context->preference('marcflavour');
533 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
535 my @record_documents;
537 foreach my $record (@{$records}) {
538 my $record_document = {};
539 my $mappings = $rules->{leader};
541 $self->_process_mappings($mappings, $record->leader(), $record_document, {
543 data_source => 'leader'
547 foreach my $field ($record->fields()) {
548 if ($field->is_control_field()) {
549 my $mappings = $control_fields_rules->{$field->tag()};
551 $self->_process_mappings($mappings, $field->data(), $record_document, {
553 data_source => 'control_field',
560 my $tag = $field->tag();
561 # Handle alternate scripts in MARC 21
563 if ($marcflavour eq 'marc21' && $tag eq '880') {
564 my $sub6 = $field->subfield('6');
565 if ($sub6 =~ /^(...)-\d+/) {
571 my $data_field_rules = $data_fields_rules->{$tag};
572 if ($data_field_rules) {
573 my $subfields_mappings = $data_field_rules->{subfields};
574 my $wildcard_mappings = $subfields_mappings->{'*'};
575 foreach my $subfield ($field->subfields()) {
576 my ($code, $data) = @{$subfield};
577 my $mappings = $subfields_mappings->{$code} // [];
578 if ($wildcard_mappings) {
579 $mappings = [@{$mappings}, @{$wildcard_mappings}];
582 $self->_process_mappings($mappings, $data, $record_document, {
583 altscript => $altscript,
584 data_source => 'subfield',
590 if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
591 # Used by the authority linker the match-heading field requires a specific syntax
592 # that is specified in C4/Heading
593 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
594 next unless $heading;
595 push @{$record_document->{'match-heading'}}, $heading->search_form;
599 my $subfields_join_mappings = $data_field_rules->{subfields_join};
600 if ($subfields_join_mappings) {
601 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
602 # Map each subfield to values, remove empty values, join with space
607 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
611 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
612 altscript => $altscript,
613 data_source => 'subfields_group',
614 codes => $subfields_group,
619 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
620 # Used by the authority linker the match-heading field requires a specific syntax
621 # that is specified in C4/Heading
622 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
623 next unless $heading;
624 push @{$record_document->{'match-heading'}}, $heading->search_form;
631 foreach my $field (keys %{$rules->{defaults}}) {
632 unless (defined $record_document->{$field}) {
633 $record_document->{$field} = $rules->{defaults}->{$field};
636 foreach my $field (@{$rules->{sum}}) {
637 if (defined $record_document->{$field}) {
638 # TODO: validate numeric? filter?
639 # TODO: Or should only accept fields without nested values?
640 # TODO: Quick and dirty, improve if needed
641 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
644 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
645 foreach my $field (@{$rules->{isbn}}) {
646 if (defined $record_document->{$field}) {
648 foreach my $input_isbn (@{$record_document->{$field}}) {
649 my $isbn = Business::ISBN->new($input_isbn);
650 if (defined $isbn && $isbn->is_valid) {
651 my $isbn13 = $isbn->as_isbn13->as_string;
652 push @isbns, $isbn13;
654 push @isbns, $isbn13;
656 my $isbn10 = $isbn->as_isbn10;
658 $isbn10 = $isbn10->as_string;
659 push @isbns, $isbn10;
661 push @isbns, $isbn10;
664 push @isbns, $input_isbn;
667 $record_document->{$field} = \@isbns;
671 # Remove duplicate values and collapse sort fields
672 foreach my $field (keys %{$record_document}) {
673 if (ref($record_document->{$field}) eq 'ARRAY') {
674 @{$record_document->{$field}} = do {
676 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
678 if ($field =~ /__sort$/) {
679 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
680 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
685 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
686 $record->encoding('UTF-8');
688 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
689 $record_document->{'marc_format'} = 'ARRAY';
693 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
694 local $SIG{__WARN__} = sub {
695 push @warnings, $_[0];
697 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
700 # Suppress warnings if record length exceeded
701 unless (substr($record->leader(), 0, 5) eq '99999') {
702 foreach my $warning (@warnings) {
706 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
707 $record_document->{'marc_format'} = 'MARCXML';
710 $record_document->{'marc_format'} = 'base64ISO2709';
713 push @record_documents, $record_document;
715 return \@record_documents;
718 =head2 _marc_to_array($record)
720 my @fields = _marc_to_array($record)
722 Convert a MARC::Record to an array modeled after MARC-in-JSON
723 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
729 A MARC::Record object
736 my ($self, $record) = @_;
739 leader => $record->leader(),
742 for my $field ($record->fields()) {
743 my $tag = $field->tag();
744 if ($field->is_control_field()) {
745 push @{$data->{fields}}, {$tag => $field->data()};
748 foreach my $subfield ($field->subfields()) {
749 my ($code, $contents) = @{$subfield};
750 push @{$subfields}, {$code => $contents};
752 push @{$data->{fields}}, {
754 ind1 => $field->indicator(1),
755 ind2 => $field->indicator(2),
756 subfields => $subfields
764 =head2 _array_to_marc($data)
766 my $record = _array_to_marc($data)
768 Convert an array modeled after MARC-in-JSON to a MARC::Record
774 An array modeled after MARC-in-JSON
775 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
782 my ($self, $data) = @_;
784 my $record = MARC::Record->new();
786 $record->leader($data->{leader});
787 for my $field (@{$data->{fields}}) {
788 my $tag = (keys %{$field})[0];
789 $field = $field->{$tag};
791 if (ref($field) eq 'HASH') {
793 foreach my $subfield (@{$field->{subfields}}) {
794 my $code = (keys %{$subfield})[0];
795 push @subfields, $code;
796 push @subfields, $subfield->{$code};
798 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
800 $marc_field = MARC::Field->new($tag, $field)
802 $record->append_fields($marc_field);
808 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
810 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
812 Get mappings, an internal data structure later used by
813 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
814 data for a MARC mapping.
816 The returned C<$mappings> is not to to be confused with mappings provided by
817 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
818 provided by C<_foreach_mapping> and expands it to this internal data structure.
819 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
820 is then applied to each MARC target (leader, control field data, subfield or
821 joined subfields) and integrated into the mapping rules data structure used in
822 C<marc_records_to_documents> to transform MARC records into Elasticsearch
829 Boolean indicating whether to create a facet field for this mapping.
831 =item C<$suggestible>
833 Boolean indicating whether to create a suggestion field for this mapping.
837 Boolean indicating whether to create a sort field for this mapping.
841 Boolean indicating whether to create a search field for this mapping.
843 =item C<$target_name>
845 Elasticsearch document target field name.
847 =item C<$target_type>
849 Elasticsearch document target field type.
853 An optional range as a string in the format "<START>-<END>" or "<START>",
854 where "<START>" and "<END>" are integers specifying a range that will be used
855 for extracting a substring from MARC data as Elasticsearch field target value.
857 The first character position is "0", and the range is inclusive,
858 so "0-2" means the first three characters of MARC data.
860 If only "<START>" is provided only one character at position "<START>" will
867 sub _field_mappings {
868 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
869 my %mapping_defaults = ();
872 my $substr_args = undef;
873 if (defined $range) {
874 # TODO: use value_callback instead?
875 my ($start, $end) = map(int, split /-/, $range, 2);
876 $substr_args = [$start];
877 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
879 my $default_options = {};
881 $default_options->{substr} = $substr_args;
884 # TODO: Should probably have per type value callback/hook
885 # but hard code for now
886 if ($target_type eq 'boolean') {
887 $default_options->{value_callbacks} //= [];
888 push @{$default_options->{value_callbacks}}, sub {
890 # Trim whitespace at both ends
891 $value =~ s/^\s+|\s+$//g;
892 return $value ? 'true' : 'false';
897 my $mapping = [$target_name, $default_options];
898 push @mappings, $mapping;
902 push @suffixes, 'facet' if $facet;
903 push @suffixes, 'suggestion' if $suggestible;
904 push @suffixes, 'sort' if !defined $sort || $sort;
906 foreach my $suffix (@suffixes) {
907 my $mapping = ["${target_name}__$suffix"];
908 # TODO: Hack, fix later in less hideous manner
909 if ($suffix eq 'suggestion') {
910 push @{$mapping}, {%{$default_options}, property => 'input'};
913 # Important! Make shallow clone, or we end up with the same hashref
914 # shared by all mappings
915 push @{$mapping}, {%{$default_options}};
917 push @mappings, $mapping;
922 =head2 _get_marc_mapping_rules
924 my $mapping_rules = $self->_get_marc_mapping_rules()
926 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
928 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
929 each call to C<MARC::Record>->field) we create an optimized structure of mapping
930 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
932 We can then iterate through all MARC fields for each record and apply all relevant
933 rules once per fields instead of retreiving fields multiple times for each mapping rule
934 which is terribly slow.
938 # TODO: This structure can be used for processing multiple MARC::Records so is currently
939 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
940 # memory cache which it is currently not. The performance gain of caching
941 # would probably be marginal, but to do this could be a further improvement.
943 sub _get_marc_mapping_rules {
945 my $marcflavour = lc C4::Context->preference('marcflavour');
946 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
947 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
950 'control_fields' => {},
957 $self->_foreach_mapping(sub {
958 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
959 return if $marc_type ne $marcflavour;
961 if ($type eq 'sum') {
962 push @{$rules->{sum}}, $name;
963 push @{$rules->{sum}}, $name."__sort" if $sort;
965 elsif ($type eq 'isbn') {
966 push @{$rules->{isbn}}, $name;
968 elsif ($type eq 'boolean') {
969 # boolean gets special handling, if value doesn't exist for a field,
971 $rules->{defaults}->{$name} = 'false';
974 if ($marc_field =~ $field_spec_regexp) {
979 # Parse and separate subfields form subfield groups
981 my $subfield_group = '';
984 foreach my $token (split //, $2) {
987 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
988 "Unmatched opening parenthesis for $marc_field"
995 elsif ($token eq ")") {
997 if ($subfield_group) {
998 push @subfield_groups, $subfield_group;
999 $subfield_group = '';
1004 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1005 "Unmatched closing parenthesis for $marc_field"
1009 elsif ($open_group) {
1010 $subfield_group .= $token;
1013 push @subfields, $token;
1018 push @subfields, '*';
1021 my $range = defined $3 ? $3 : undef;
1022 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1023 if ($field_tag < 10) {
1024 $rules->{control_fields}->{$field_tag} //= [];
1025 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1028 $rules->{data_fields}->{$field_tag} //= {};
1029 foreach my $subfield (@subfields) {
1030 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1031 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1033 foreach my $subfield_group (@subfield_groups) {
1034 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1035 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1039 elsif ($marc_field =~ $leader_regexp) {
1040 my $range = defined $1 ? $1 : undef;
1041 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1042 push @{$rules->{leader}}, @mappings;
1045 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1046 "Invalid MARC field expression: $marc_field"
1051 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1052 if ($marcflavour eq 'marc21') {
1053 # Nonfiling characters processing for sort fields
1055 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1056 # Format is: nonfiling characters indicator => field names list
1058 1 => [130, 630, 730, 740],
1059 2 => [222, 240, 242, 243, 245, 440, 830]
1062 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1065 2 => [130, 430, 530]
1068 foreach my $indicator (keys %title_fields) {
1069 foreach my $field_tag (@{$title_fields{$indicator}}) {
1070 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1071 foreach my $mapping (@{$mappings}) {
1072 if ($mapping->[0] =~ /__sort$/) {
1073 # Mark this as to be processed for nonfiling characters indicator
1074 # later on in _process_mappings
1075 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1085 =head2 _foreach_mapping
1087 $self->_foreach_mapping(
1089 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1092 return unless $marc_type eq 'marc21';
1093 print "Data comes from: " . $marc_field . "\n";
1097 This allows you to apply a function to each entry in the elasticsearch mappings
1098 table, in order to build the mappings for whatever is needed.
1100 In the provided function, the files are:
1106 The field name for elasticsearch (corresponds to the 'mapping' column in the
1111 The type for this value, e.g. 'string'.
1115 True if this value should be facetised. This only really makes sense if the
1116 field is understood by the facet processing code anyway.
1120 True if this is a field that a) needs special sort handling, and b) if it
1121 should be sorted on. False if a) but not b). Undef if not a). This allows,
1122 for example, author to be sorted on but not everything marked with "author"
1123 to be included in that sort.
1127 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1128 'unimarc', 'normarc'.
1130 =item C<$marc_field>
1132 A string that describes the MARC field that contains the data to extract.
1138 sub _foreach_mapping {
1139 my ( $self, $sub ) = @_;
1141 # TODO use a caching framework here
1142 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1144 'search_marc_map.index_name' => $self->index,
1146 { join => { search_marc_to_fields => 'search_marc_map' },
1148 'search_marc_to_fields.facet',
1149 'search_marc_to_fields.suggestible',
1150 'search_marc_to_fields.sort',
1151 'search_marc_to_fields.search',
1152 'search_marc_map.marc_type',
1153 'search_marc_map.marc_field',
1166 while ( my $search_field = $search_fields->next ) {
1168 # Force lower case on indexed field names for case insensitive
1169 # field name searches
1170 lc($search_field->name),
1171 $search_field->type,
1172 $search_field->get_column('facet'),
1173 $search_field->get_column('suggestible'),
1174 $search_field->get_column('sort'),
1175 $search_field->get_column('search'),
1176 $search_field->get_column('marc_type'),
1177 $search_field->get_column('marc_field'),
1182 =head2 process_error
1184 die process_error($@);
1186 This parses an Elasticsearch error message and produces a human-readable
1187 result from it. This result is probably missing all the useful information
1188 that you might want in diagnosing an issue, so the warning is also logged.
1190 Note that currently the resulting message is not internationalised. This
1191 will happen eventually by some method or other.
1196 my ($self, $msg) = @_;
1198 warn $msg; # simple logging
1200 # This is super-primitive
1201 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1203 return "Unable to perform your search. Please try again.\n";
1206 =head2 _read_configuration
1208 my $conf = _read_configuration();
1210 Reads the I<configuration file> and returns a hash structure with the
1211 configuration information. It raises an exception if mandatory entries
1214 The hashref structure has the following form:
1217 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1218 'index_name' => 'koha_instance',
1221 This is configured by the following in the C<config> block in koha-conf.xml:
1224 <server>127.0.0.1:9200</server>
1225 <server>anotherserver:9200</server>
1226 <index_name>koha_instance</index_name>
1231 sub _read_configuration {
1235 my $conf = C4::Context->config('elasticsearch');
1236 Koha::Exceptions::Config::MissingEntry->throw(
1237 "Missing 'elasticsearch' block in config file")
1238 unless defined $conf;
1240 if ( $conf && $conf->{server} ) {
1241 my $nodes = $conf->{server};
1242 if ( ref($nodes) eq 'ARRAY' ) {
1243 $configuration->{nodes} = $nodes;
1246 $configuration->{nodes} = [$nodes];
1250 Koha::Exceptions::Config::MissingEntry->throw(
1251 "Missing 'server' entry in config file for elasticsearch");
1254 if ( defined $conf->{index_name} ) {
1255 $configuration->{index_name} = $conf->{index_name};
1258 Koha::Exceptions::Config::MissingEntry->throw(
1259 "Missing 'index_name' entry in config file for elasticsearch");
1262 return $configuration;
1265 =head2 get_facetable_fields
1267 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1269 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1273 sub get_facetable_fields {
1276 # These should correspond to the ES field names, as opposed to the CCL
1277 # things that zebra uses.
1278 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1279 my @faceted_fields = Koha::SearchFields->search(
1280 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1282 my @not_faceted_fields = Koha::SearchFields->search(
1283 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1285 # This could certainly be improved
1286 return ( @faceted_fields, @not_faceted_fields );
1297 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1299 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1301 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>