1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
36 use Search::Elasticsearch;
40 use List::Util qw( sum0 reduce );
43 use Encode qw(encode);
45 use Scalar::Util qw(looks_like_number);
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
64 The name of the index to use, generally 'biblios' or 'authorities'.
74 my $self = $class->SUPER::new(@_);
75 # Check for a valid index
76 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
80 =head2 get_elasticsearch
82 my $elasticsearch_client = $self->get_elasticsearch();
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
89 sub get_elasticsearch {
91 unless (defined $self->{elasticsearch}) {
92 my $conf = $self->get_elasticsearch_params();
93 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95 return $self->{elasticsearch};
98 =head2 get_elasticsearch_params
100 my $params = $self->get_elasticsearch_params();
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
106 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107 'index_name' => 'koha_instance_index',
110 This is configured by the following in the C<config> block in koha-conf.xml:
113 <server>127.0.0.1:9200</server>
114 <server>anotherserver:9200</server>
115 <index_name>koha_instance</index_name>
120 sub get_elasticsearch_params {
123 # Copy the hash so that we're not modifying the original
124 my $conf = C4::Context->config('elasticsearch');
125 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126 my $es = { %{ $conf } };
128 # Helpfully, the multiple server lines end up in an array for us anyway
129 # if there are multiple ones, but not if there's only one.
130 my $server = $es->{server};
131 delete $es->{server};
132 if ( ref($server) eq 'ARRAY' ) {
134 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135 $es->{nodes} = $server;
138 $es->{nodes} = [$server];
141 die "No elasticsearch servers were specified in koha-conf.xml.\n";
143 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144 if ( !$es->{index_name} );
145 # Append the name of this particular index to our namespace
146 $es->{index_name} .= '_' . $self->index;
148 $es->{key_prefix} = 'es_';
149 $es->{cxn_pool} //= 'Static';
150 $es->{request_timeout} //= 60;
155 =head2 get_elasticsearch_settings
157 my $settings = $self->get_elasticsearch_settings();
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
162 A hashref containing the settings is returned.
166 sub get_elasticsearch_settings {
169 # Use state to speed up repeated calls
170 state $settings = undef;
171 if (!defined $settings) {
172 my $config_file = C4::Context->config('elasticsearch_index_config');
173 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174 $settings = LoadFile( $config_file );
180 =head2 get_elasticsearch_mappings
182 my $mappings = $self->get_elasticsearch_mappings();
184 This provides the mappings that get passed to Elasticsearch when an index is
189 sub get_elasticsearch_mappings {
192 # Use state to speed up repeated calls
196 if (!defined $all_mappings{$self->index}) {
197 $sort_fields{$self->index} = {};
198 # Clone the general mapping to break ties with the original hash
200 data => clone(_get_elasticsearch_field_config('general', ''))
202 my $marcflavour = lc C4::Context->preference('marcflavour');
203 $self->_foreach_mapping(
205 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206 return if $marc_type ne $marcflavour;
207 # TODO if this gets any sort of complexity to it, it should
208 # be broken out into its own function.
210 # TODO be aware of date formats, but this requires pre-parsing
211 # as ES will simply reject anything with an invalid date.
212 my $es_type = 'text';
213 if ($type eq 'boolean') {
214 $es_type = 'boolean';
215 } elsif ($type eq 'number' || $type eq 'sum') {
216 $es_type = 'integer';
217 } elsif ($type eq 'isbn' || $type eq 'stdno') {
222 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
226 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
229 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231 # Sort is a bit special as it can be true, false, undef.
232 # We care about "true" or "undef",
233 # "undef" means to do the default thing, which is make it sortable.
234 if (!defined $sort || $sort) {
235 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236 $sort_fields{$self->index}{$name} = 1;
240 $all_mappings{$self->index} = $mappings;
242 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 _get_elasticsearch_field_config
249 Get the Elasticsearch field config for the given purpose and data type.
251 $mapping = _get_elasticsearch_field_config('search', 'text');
255 sub _get_elasticsearch_field_config {
257 my ( $purpose, $type ) = @_;
259 # Use state to speed up repeated calls
260 state $settings = undef;
261 if (!defined $settings) {
262 my $config_file = C4::Context->config('elasticsearch_field_config');
263 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
264 $settings = LoadFile( $config_file );
267 if (!defined $settings->{$purpose}) {
268 die "Field purpose $purpose not defined in field config";
271 return $settings->{$purpose};
273 if (defined $settings->{$purpose}{$type}) {
274 return $settings->{$purpose}{$type};
276 if (defined $settings->{$purpose}{'default'}) {
277 return $settings->{$purpose}{'default'};
282 =head2 _load_elasticsearch_mappings
284 Load Elasticsearch mappings in the format of mappings.yaml.
286 $indexes = _load_elasticsearch_mappings();
290 sub _load_elasticsearch_mappings {
291 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
292 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
293 return LoadFile( $mappings_yaml );
296 sub reset_elasticsearch_mappings {
298 my $indexes = $self->_load_elasticsearch_mappings();
300 Koha::SearchMarcMaps->delete;
301 Koha::SearchFields->delete;
303 while ( my ( $index_name, $fields ) = each %$indexes ) {
304 while ( my ( $field_name, $data ) = each %$fields ) {
306 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
309 $sf_params{staff_client} //= 1;
310 $sf_params{opac} //= 1;
312 $sf_params{name} = $field_name;
314 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
316 my $mappings = $data->{mappings};
317 for my $mapping ( @$mappings ) {
318 my $marc_field = Koha::SearchMarcMaps->find_or_create({
319 index_name => $index_name,
320 marc_type => $mapping->{marc_type},
321 marc_field => $mapping->{marc_field}
323 $search_field->add_to_search_marc_maps($marc_field, {
324 facet => $mapping->{facet} || 0,
325 suggestible => $mapping->{suggestible} || 0,
326 sort => $mapping->{sort},
327 search => $mapping->{search} // 1
334 # This overrides the accessor provided by Class::Accessor so that if
335 # sort_fields isn't set, then it'll generate it.
339 $self->_sort_fields_accessor(@_);
342 my $val = $self->_sort_fields_accessor();
345 # This will populate the accessor as a side effect
346 $self->get_elasticsearch_mappings();
347 return $self->_sort_fields_accessor();
350 =head2 _process_mappings($mappings, $data, $record_document, $meta)
352 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
354 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
355 Since we group all mappings by MARC field targets C<$mappings> will contain
356 all targets for C<$data> and thus we need to fetch the MARC field only once.
357 C<$mappings> will be applied to C<$record_document> and new field values added.
358 The method has no return value.
364 Arrayref of mappings containing arrayrefs in the format
365 [C<$target>, C<$options>] where C<$target> is the name of the target field and
366 C<$options> is a hashref containing processing directives for this particular
371 The source data from a MARC record field.
373 =item C<$record_document>
375 Hashref representing the Elasticsearch document on which mappings should be
380 A hashref containing metadata useful for enforcing per mapping rules. For
381 example for providing extra context for mapping options, or treating mapping
382 targets differently depending on type (sort, search, facet etc). Combining
383 this metadata with the mapping options and metadata allows us to mutate the
384 data per mapping, or even replace it with other data retrieved from the
387 Current properties are:
389 C<altscript>: A boolean value indicating whether an alternate script presentation is being
392 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
393 'subfield' or 'subfields_group'.
395 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
397 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
398 is 'subfields_group'.
400 C<field>: The original C<MARC::Record> object.
406 sub _process_mappings {
407 my ($_self, $mappings, $data, $record_document, $meta) = @_;
408 foreach my $mapping (@{$mappings}) {
409 my ($target, $options) = @{$mapping};
411 # Don't process sort fields for alternate scripts
412 my $sort = $target =~ /__sort$/;
413 if ($sort && $meta->{altscript}) {
417 # Copy (scalar) data since can have multiple targets
418 # with differing options for (possibly) mutating data
419 # so need a different copy for each
421 $record_document->{$target} //= [];
422 if (defined $options->{substr}) {
423 my ($start, $length) = @{$options->{substr}};
424 $_data = length($data) > $start ? substr $data, $start, $length : '';
426 if (defined $options->{value_callbacks}) {
427 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
429 if (defined $options->{property}) {
431 $options->{property} => $_data
434 if (defined $options->{nonfiling_characters_indicator}) {
435 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
436 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
437 if ($nonfiling_chars) {
438 $_data = substr $_data, $nonfiling_chars;
441 push @{$record_document->{$target}}, $_data;
445 =head2 marc_records_to_documents($marc_records)
447 my $record_documents = $self->marc_records_to_documents($marc_records);
449 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
451 Returns array of hash references, representing Elasticsearch documents,
452 acceptable as body payload in C<Search::Elasticsearch> requests.
456 =item C<$marc_documents>
458 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
464 sub marc_records_to_documents {
465 my ($self, $records) = @_;
466 my $rules = $self->_get_marc_mapping_rules();
467 my $control_fields_rules = $rules->{control_fields};
468 my $data_fields_rules = $rules->{data_fields};
469 my $marcflavour = lc C4::Context->preference('marcflavour');
470 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
472 my @record_documents;
474 foreach my $record (@{$records}) {
475 my $record_document = {};
476 my $mappings = $rules->{leader};
478 $self->_process_mappings($mappings, $record->leader(), $record_document, {
480 data_source => 'leader'
484 foreach my $field ($record->fields()) {
485 if ($field->is_control_field()) {
486 my $mappings = $control_fields_rules->{$field->tag()};
488 $self->_process_mappings($mappings, $field->data(), $record_document, {
490 data_source => 'control_field',
497 my $tag = $field->tag();
498 # Handle alternate scripts in MARC 21
500 if ($marcflavour eq 'marc21' && $tag eq '880') {
501 my $sub6 = $field->subfield('6');
502 if ($sub6 =~ /^(...)-\d+/) {
508 my $data_field_rules = $data_fields_rules->{$tag};
509 if ($data_field_rules) {
510 my $subfields_mappings = $data_field_rules->{subfields};
511 my $wildcard_mappings = $subfields_mappings->{'*'};
512 foreach my $subfield ($field->subfields()) {
513 my ($code, $data) = @{$subfield};
514 my $mappings = $subfields_mappings->{$code} // [];
515 if ($wildcard_mappings) {
516 $mappings = [@{$mappings}, @{$wildcard_mappings}];
519 $self->_process_mappings($mappings, $data, $record_document, {
520 altscript => $altscript,
521 data_source => 'subfield',
527 if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
528 # Used by the authority linker the match-heading field requires a specific syntax
529 # that is specified in C4/Heading
530 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
531 next unless $heading;
532 push @{$record_document->{'match-heading'}}, $heading->search_form;
536 my $subfields_join_mappings = $data_field_rules->{subfields_join};
537 if ($subfields_join_mappings) {
538 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
539 # Map each subfield to values, remove empty values, join with space
544 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
548 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
549 altscript => $altscript,
550 data_source => 'subfields_group',
551 codes => $subfields_group,
556 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
557 # Used by the authority linker the match-heading field requires a specific syntax
558 # that is specified in C4/Heading
559 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560 next unless $heading;
561 push @{$record_document->{'match-heading'}}, $heading->search_form;
568 foreach my $field (keys %{$rules->{defaults}}) {
569 unless (defined $record_document->{$field}) {
570 $record_document->{$field} = $rules->{defaults}->{$field};
573 foreach my $field (@{$rules->{sum}}) {
574 if (defined $record_document->{$field}) {
575 # TODO: validate numeric? filter?
576 # TODO: Or should only accept fields without nested values?
577 # TODO: Quick and dirty, improve if needed
578 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
581 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
582 foreach my $field (@{$rules->{isbn}}) {
583 if (defined $record_document->{$field}) {
585 foreach my $input_isbn (@{$record_document->{$field}}) {
586 my $isbn = Business::ISBN->new($input_isbn);
587 if (defined $isbn && $isbn->is_valid) {
588 my $isbn13 = $isbn->as_isbn13->as_string;
589 push @isbns, $isbn13;
591 push @isbns, $isbn13;
593 my $isbn10 = $isbn->as_isbn10;
595 $isbn10 = $isbn10->as_string;
596 push @isbns, $isbn10;
598 push @isbns, $isbn10;
601 push @isbns, $input_isbn;
604 $record_document->{$field} = \@isbns;
608 # Remove duplicate values and collapse sort fields
609 foreach my $field (keys %{$record_document}) {
610 if (ref($record_document->{$field}) eq 'ARRAY') {
611 @{$record_document->{$field}} = do {
613 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
615 if ($field =~ /__sort$/) {
616 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
617 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
622 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
623 $record->encoding('UTF-8');
625 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
626 $record_document->{'marc_format'} = 'ARRAY';
630 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
631 local $SIG{__WARN__} = sub {
632 push @warnings, $_[0];
634 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
637 # Suppress warnings if record length exceeded
638 unless (substr($record->leader(), 0, 5) eq '99999') {
639 foreach my $warning (@warnings) {
643 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
644 $record_document->{'marc_format'} = 'MARCXML';
647 $record_document->{'marc_format'} = 'base64ISO2709';
650 push @record_documents, $record_document;
652 return \@record_documents;
655 =head2 _marc_to_array($record)
657 my @fields = _marc_to_array($record)
659 Convert a MARC::Record to an array modeled after MARC-in-JSON
660 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
666 A MARC::Record object
673 my ($self, $record) = @_;
676 leader => $record->leader(),
679 for my $field ($record->fields()) {
680 my $tag = $field->tag();
681 if ($field->is_control_field()) {
682 push @{$data->{fields}}, {$tag => $field->data()};
685 foreach my $subfield ($field->subfields()) {
686 my ($code, $contents) = @{$subfield};
687 push @{$subfields}, {$code => $contents};
689 push @{$data->{fields}}, {
691 ind1 => $field->indicator(1),
692 ind2 => $field->indicator(2),
693 subfields => $subfields
701 =head2 _array_to_marc($data)
703 my $record = _array_to_marc($data)
705 Convert an array modeled after MARC-in-JSON to a MARC::Record
711 An array modeled after MARC-in-JSON
712 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
719 my ($self, $data) = @_;
721 my $record = MARC::Record->new();
723 $record->leader($data->{leader});
724 for my $field (@{$data->{fields}}) {
725 my $tag = (keys %{$field})[0];
726 $field = $field->{$tag};
728 if (ref($field) eq 'HASH') {
730 foreach my $subfield (@{$field->{subfields}}) {
731 my $code = (keys %{$subfield})[0];
732 push @subfields, $code;
733 push @subfields, $subfield->{$code};
735 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
737 $marc_field = MARC::Field->new($tag, $field)
739 $record->append_fields($marc_field);
745 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
747 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
749 Get mappings, an internal data structure later used by
750 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
751 data for a MARC mapping.
753 The returned C<$mappings> is not to to be confused with mappings provided by
754 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
755 provided by C<_foreach_mapping> and expands it to this internal data structure.
756 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
757 is then applied to each MARC target (leader, control field data, subfield or
758 joined subfields) and integrated into the mapping rules data structure used in
759 C<marc_records_to_documents> to transform MARC records into Elasticsearch
766 Boolean indicating whether to create a facet field for this mapping.
768 =item C<$suggestible>
770 Boolean indicating whether to create a suggestion field for this mapping.
774 Boolean indicating whether to create a sort field for this mapping.
778 Boolean indicating whether to create a search field for this mapping.
780 =item C<$target_name>
782 Elasticsearch document target field name.
784 =item C<$target_type>
786 Elasticsearch document target field type.
790 An optional range as a string in the format "<START>-<END>" or "<START>",
791 where "<START>" and "<END>" are integers specifying a range that will be used
792 for extracting a substring from MARC data as Elasticsearch field target value.
794 The first character position is "0", and the range is inclusive,
795 so "0-2" means the first three characters of MARC data.
797 If only "<START>" is provided only one character at position "<START>" will
804 sub _field_mappings {
805 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
806 my %mapping_defaults = ();
809 my $substr_args = undef;
810 if (defined $range) {
811 # TODO: use value_callback instead?
812 my ($start, $end) = map(int, split /-/, $range, 2);
813 $substr_args = [$start];
814 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
816 my $default_options = {};
818 $default_options->{substr} = $substr_args;
821 # TODO: Should probably have per type value callback/hook
822 # but hard code for now
823 if ($target_type eq 'boolean') {
824 $default_options->{value_callbacks} //= [];
825 push @{$default_options->{value_callbacks}}, sub {
827 # Trim whitespace at both ends
828 $value =~ s/^\s+|\s+$//g;
829 return $value ? 'true' : 'false';
834 my $mapping = [$target_name, $default_options];
835 push @mappings, $mapping;
839 push @suffixes, 'facet' if $facet;
840 push @suffixes, 'suggestion' if $suggestible;
841 push @suffixes, 'sort' if !defined $sort || $sort;
843 foreach my $suffix (@suffixes) {
844 my $mapping = ["${target_name}__$suffix"];
845 # TODO: Hack, fix later in less hideous manner
846 if ($suffix eq 'suggestion') {
847 push @{$mapping}, {%{$default_options}, property => 'input'};
850 # Important! Make shallow clone, or we end up with the same hashref
851 # shared by all mappings
852 push @{$mapping}, {%{$default_options}};
854 push @mappings, $mapping;
859 =head2 _get_marc_mapping_rules
861 my $mapping_rules = $self->_get_marc_mapping_rules()
863 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
865 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
866 each call to C<MARC::Record>->field) we create an optimized structure of mapping
867 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
869 We can then iterate through all MARC fields for each record and apply all relevant
870 rules once per fields instead of retreiving fields multiple times for each mapping rule
871 which is terribly slow.
875 # TODO: This structure can be used for processing multiple MARC::Records so is currently
876 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
877 # memory cache which it is currently not. The performance gain of caching
878 # would probably be marginal, but to do this could be a further improvement.
880 sub _get_marc_mapping_rules {
882 my $marcflavour = lc C4::Context->preference('marcflavour');
883 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
884 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
887 'control_fields' => {},
894 $self->_foreach_mapping(sub {
895 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
896 return if $marc_type ne $marcflavour;
898 if ($type eq 'sum') {
899 push @{$rules->{sum}}, $name;
900 push @{$rules->{sum}}, $name."__sort" if $sort;
902 elsif ($type eq 'isbn') {
903 push @{$rules->{isbn}}, $name;
905 elsif ($type eq 'boolean') {
906 # boolean gets special handling, if value doesn't exist for a field,
908 $rules->{defaults}->{$name} = 'false';
911 if ($marc_field =~ $field_spec_regexp) {
916 # Parse and separate subfields form subfield groups
918 my $subfield_group = '';
921 foreach my $token (split //, $2) {
924 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
925 "Unmatched opening parenthesis for $marc_field"
932 elsif ($token eq ")") {
934 if ($subfield_group) {
935 push @subfield_groups, $subfield_group;
936 $subfield_group = '';
941 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
942 "Unmatched closing parenthesis for $marc_field"
946 elsif ($open_group) {
947 $subfield_group .= $token;
950 push @subfields, $token;
955 push @subfields, '*';
958 my $range = defined $3 ? $3 : undef;
959 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
960 if ($field_tag < 10) {
961 $rules->{control_fields}->{$field_tag} //= [];
962 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
965 $rules->{data_fields}->{$field_tag} //= {};
966 foreach my $subfield (@subfields) {
967 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
968 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
970 foreach my $subfield_group (@subfield_groups) {
971 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
972 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
976 elsif ($marc_field =~ $leader_regexp) {
977 my $range = defined $1 ? $1 : undef;
978 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
979 push @{$rules->{leader}}, @mappings;
982 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
983 "Invalid MARC field expression: $marc_field"
988 # Marc-flavour specific rule tweaks, could/should also provide hook for this
989 if ($marcflavour eq 'marc21') {
990 # Nonfiling characters processing for sort fields
992 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
993 # Format is: nonfiling characters indicator => field names list
995 1 => [130, 630, 730, 740],
996 2 => [222, 240, 242, 243, 245, 440, 830]
999 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1002 2 => [130, 430, 530]
1005 foreach my $indicator (keys %title_fields) {
1006 foreach my $field_tag (@{$title_fields{$indicator}}) {
1007 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1008 foreach my $mapping (@{$mappings}) {
1009 if ($mapping->[0] =~ /__sort$/) {
1010 # Mark this as to be processed for nonfiling characters indicator
1011 # later on in _process_mappings
1012 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1022 =head2 _foreach_mapping
1024 $self->_foreach_mapping(
1026 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1029 return unless $marc_type eq 'marc21';
1030 print "Data comes from: " . $marc_field . "\n";
1034 This allows you to apply a function to each entry in the elasticsearch mappings
1035 table, in order to build the mappings for whatever is needed.
1037 In the provided function, the files are:
1043 The field name for elasticsearch (corresponds to the 'mapping' column in the
1048 The type for this value, e.g. 'string'.
1052 True if this value should be facetised. This only really makes sense if the
1053 field is understood by the facet processing code anyway.
1057 True if this is a field that a) needs special sort handling, and b) if it
1058 should be sorted on. False if a) but not b). Undef if not a). This allows,
1059 for example, author to be sorted on but not everything marked with "author"
1060 to be included in that sort.
1064 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1065 'unimarc', 'normarc'.
1067 =item C<$marc_field>
1069 A string that describes the MARC field that contains the data to extract.
1070 These are of a form suited to Catmandu's MARC fixers.
1076 sub _foreach_mapping {
1077 my ( $self, $sub ) = @_;
1079 # TODO use a caching framework here
1080 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1082 'search_marc_map.index_name' => $self->index,
1084 { join => { search_marc_to_fields => 'search_marc_map' },
1086 'search_marc_to_fields.facet',
1087 'search_marc_to_fields.suggestible',
1088 'search_marc_to_fields.sort',
1089 'search_marc_to_fields.search',
1090 'search_marc_map.marc_type',
1091 'search_marc_map.marc_field',
1104 while ( my $search_field = $search_fields->next ) {
1106 # Force lower case on indexed field names for case insensitive
1107 # field name searches
1108 lc($search_field->name),
1109 $search_field->type,
1110 $search_field->get_column('facet'),
1111 $search_field->get_column('suggestible'),
1112 $search_field->get_column('sort'),
1113 $search_field->get_column('search'),
1114 $search_field->get_column('marc_type'),
1115 $search_field->get_column('marc_field'),
1120 =head2 process_error
1122 die process_error($@);
1124 This parses an Elasticsearch error message and produces a human-readable
1125 result from it. This result is probably missing all the useful information
1126 that you might want in diagnosing an issue, so the warning is also logged.
1128 Note that currently the resulting message is not internationalised. This
1129 will happen eventually by some method or other.
1134 my ($self, $msg) = @_;
1136 warn $msg; # simple logging
1138 # This is super-primitive
1139 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1141 return "Unable to perform your search. Please try again.\n";
1144 =head2 _read_configuration
1146 my $conf = _read_configuration();
1148 Reads the I<configuration file> and returns a hash structure with the
1149 configuration information. It raises an exception if mandatory entries
1152 The hashref structure has the following form:
1155 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1156 'index_name' => 'koha_instance',
1159 This is configured by the following in the C<config> block in koha-conf.xml:
1162 <server>127.0.0.1:9200</server>
1163 <server>anotherserver:9200</server>
1164 <index_name>koha_instance</index_name>
1169 sub _read_configuration {
1173 my $conf = C4::Context->config('elasticsearch');
1174 Koha::Exceptions::Config::MissingEntry->throw(
1175 "Missing 'elasticsearch' block in config file")
1176 unless defined $conf;
1178 if ( $conf && $conf->{server} ) {
1179 my $nodes = $conf->{server};
1180 if ( ref($nodes) eq 'ARRAY' ) {
1181 $configuration->{nodes} = $nodes;
1184 $configuration->{nodes} = [$nodes];
1188 Koha::Exceptions::Config::MissingEntry->throw(
1189 "Missing 'server' entry in config file for elasticsearch");
1192 if ( defined $conf->{index_name} ) {
1193 $configuration->{index_name} = $conf->{index_name};
1196 Koha::Exceptions::Config::MissingEntry->throw(
1197 "Missing 'index_name' entry in config file for elasticsearch");
1200 return $configuration;
1203 =head2 get_facetable_fields
1205 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1207 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1211 sub get_facetable_fields {
1214 # These should correspond to the ES field names, as opposed to the CCL
1215 # things that zebra uses.
1216 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1217 my @faceted_fields = Koha::SearchFields->search(
1218 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1220 my @not_faceted_fields = Koha::SearchFields->search(
1221 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1223 # This could certainly be improved
1224 return ( @faceted_fields, @not_faceted_fields );
1235 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1237 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1239 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>