1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
31 use C4::AuthoritiesMarc;
38 use Search::Elasticsearch;
42 use List::Util qw( sum0 reduce all );
45 use Encode qw(encode);
47 use Scalar::Util qw(looks_like_number);
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
66 The name of the index to use, generally 'biblios' or 'authorities'.
70 The Elasticsearch index name with Koha instance prefix.
83 # Check for a valid index
84 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85 my $config = _read_configuration();
86 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88 my $self = $class->SUPER::new(@_);
92 =head2 get_elasticsearch
94 my $elasticsearch_client = $self->get_elasticsearch();
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
101 sub get_elasticsearch {
103 unless (defined $self->{elasticsearch}) {
104 $self->{elasticsearch} = Search::Elasticsearch->new(
105 $self->get_elasticsearch_params()
108 return $self->{elasticsearch};
111 =head2 get_elasticsearch_params
113 my $params = $self->get_elasticsearch_params();
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
119 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120 'index_name' => 'koha_instance_index',
123 This is configured by the following in the C<config> block in koha-conf.xml:
126 <server>127.0.0.1:9200</server>
127 <server>anotherserver:9200</server>
128 <index_name>koha_instance</index_name>
133 sub get_elasticsearch_params {
138 $conf = _read_configuration();
140 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
148 =head2 get_elasticsearch_settings
150 my $settings = $self->get_elasticsearch_settings();
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
155 A hashref containing the settings is returned.
159 sub get_elasticsearch_settings {
162 # Use state to speed up repeated calls
163 state $settings = undef;
164 if (!defined $settings) {
165 my $config_file = C4::Context->config('elasticsearch_index_config');
166 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167 $settings = LoadFile( $config_file );
173 =head2 get_elasticsearch_mappings
175 my $mappings = $self->get_elasticsearch_mappings();
177 This provides the mappings that get passed to Elasticsearch when an index is
182 sub get_elasticsearch_mappings {
185 # Use state to speed up repeated calls
189 if (!defined $all_mappings{$self->index}) {
190 $sort_fields{$self->index} = {};
191 # Clone the general mapping to break ties with the original hash
193 data => clone(_get_elasticsearch_field_config('general', ''))
195 my $marcflavour = lc C4::Context->preference('marcflavour');
196 $self->_foreach_mapping(
198 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199 return if $marc_type ne $marcflavour;
200 # TODO if this gets any sort of complexity to it, it should
201 # be broken out into its own function.
203 # TODO be aware of date formats, but this requires pre-parsing
204 # as ES will simply reject anything with an invalid date.
205 my $es_type = 'text';
206 if ($type eq 'boolean') {
207 $es_type = 'boolean';
208 } elsif ($type eq 'number' || $type eq 'sum') {
209 $es_type = 'integer';
210 } elsif ($type eq 'isbn' || $type eq 'stdno') {
212 } elsif ($type eq 'year') {
217 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236 $all_mappings{$self->index} = $mappings;
238 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 raw_elasticsearch_mappings
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
251 sub raw_elasticsearch_mappings {
252 my ( $marc_type ) = @_;
254 my $schema = Koha::Database->new()->schema();
256 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
259 while ( my $search_field = $search_fields->next ) {
261 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262 { search_field_id => $search_field->id },
264 join => 'search_marc_map',
265 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
269 while ( my $marc_to_field = $marc_to_fields->next ) {
271 my $marc_map = $marc_to_field->search_marc_map;
273 next if $marc_type && $marc_map->marc_type ne $marc_type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284 facet => $marc_to_field->facet || '',
285 marc_type => $marc_map->marc_type,
286 marc_field => $marc_map->marc_field,
287 sort => $marc_to_field->sort,
288 suggestible => $marc_to_field->suggestible || ''
297 =head2 _get_elasticsearch_field_config
299 Get the Elasticsearch field config for the given purpose and data type.
301 $mapping = _get_elasticsearch_field_config('search', 'text');
305 sub _get_elasticsearch_field_config {
307 my ( $purpose, $type ) = @_;
309 # Use state to speed up repeated calls
310 state $settings = undef;
311 if (!defined $settings) {
312 my $config_file = C4::Context->config('elasticsearch_field_config');
313 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314 $settings = LoadFile( $config_file );
317 if (!defined $settings->{$purpose}) {
318 die "Field purpose $purpose not defined in field config";
321 return $settings->{$purpose};
323 if (defined $settings->{$purpose}{$type}) {
324 return $settings->{$purpose}{$type};
326 if (defined $settings->{$purpose}{'default'}) {
327 return $settings->{$purpose}{'default'};
332 =head2 _load_elasticsearch_mappings
334 Load Elasticsearch mappings in the format of mappings.yaml.
336 $indexes = _load_elasticsearch_mappings();
340 sub _load_elasticsearch_mappings {
341 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343 return LoadFile( $mappings_yaml );
346 sub reset_elasticsearch_mappings {
348 my $indexes = $self->_load_elasticsearch_mappings();
350 Koha::SearchMarcMaps->delete;
351 Koha::SearchFields->delete;
353 while ( my ( $index_name, $fields ) = each %$indexes ) {
354 while ( my ( $field_name, $data ) = each %$fields ) {
356 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
359 $sf_params{staff_client} //= 1;
360 $sf_params{opac} //= 1;
362 $sf_params{name} = $field_name;
364 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366 my $mappings = $data->{mappings};
367 for my $mapping ( @$mappings ) {
368 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369 index_name => $index_name,
370 marc_type => $mapping->{marc_type},
371 marc_field => $mapping->{marc_field}
373 $search_field->add_to_search_marc_maps($marc_field, {
374 facet => $mapping->{facet} || 0,
375 suggestible => $mapping->{suggestible} || 0,
376 sort => $mapping->{sort},
377 search => $mapping->{search} // 1
383 $self->clear_search_fields_cache();
385 # FIXME return the mappings?
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
393 $self->_sort_fields_accessor(@_);
396 my $val = $self->_sort_fields_accessor();
399 # This will populate the accessor as a side effect
400 $self->get_elasticsearch_mappings();
401 return $self->_sort_fields_accessor();
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
425 The source data from a MARC record field.
427 =item C<$record_document>
429 Hashref representing the Elasticsearch document on which mappings should be
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
441 Current properties are:
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
454 C<field>: The original C<MARC::Record> object.
460 sub _process_mappings {
461 my ($_self, $mappings, $data, $record_document, $meta) = @_;
462 foreach my $mapping (@{$mappings}) {
463 my ($target, $options) = @{$mapping};
465 # Don't process sort fields for alternate scripts
466 my $sort = $target =~ /__sort$/;
467 if ($sort && $meta->{altscript}) {
471 # Copy (scalar) data since can have multiple targets
472 # with differing options for (possibly) mutating data
473 # so need a different copy for each
474 my $data_copy = $data;
475 if (defined $options->{substr}) {
476 my ($start, $length) = @{$options->{substr}};
477 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
480 # Add data to values array for callbacks processing
481 my $values = [$data_copy];
483 # Value callbacks takes subfield data (or values from previous
484 # callbacks) as argument, and returns a possibly different list of values.
485 # Note that the returned list may also be empty.
486 if (defined $options->{value_callbacks}) {
487 foreach my $callback (@{$options->{value_callbacks}}) {
488 # Pass each value to current callback which returns a list
489 # (scalar is fine too) resulting either in a list or
490 # a list of lists that will be flattened by perl.
491 # The next callback will receive the possibly expanded list of values.
492 $values = [ map { $callback->($_) } @{$values} ];
496 # Skip mapping if all values has been removed
497 next unless @{$values};
499 if (defined $options->{property}) {
500 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
502 if (defined $options->{nonfiling_characters_indicator}) {
503 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505 # Nonfiling chars does not make sense for multiple values
506 # Only apply on first element
507 $values->[0] = substr $values->[0], $nonfiling_chars;
510 $values = [ grep(!/^$/, @{$values}) ];
512 $record_document->{$target} //= [];
513 push @{$record_document->{$target}}, @{$values};
517 =head2 marc_records_to_documents($marc_records)
519 my $record_documents = $self->marc_records_to_documents($marc_records);
521 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
523 Returns array of hash references, representing Elasticsearch documents,
524 acceptable as body payload in C<Search::Elasticsearch> requests.
528 =item C<$marc_documents>
530 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
536 sub marc_records_to_documents {
537 my ($self, $records) = @_;
538 my $rules = $self->_get_marc_mapping_rules();
539 my $control_fields_rules = $rules->{control_fields};
540 my $data_fields_rules = $rules->{data_fields};
541 my $marcflavour = lc C4::Context->preference('marcflavour');
542 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
544 my @record_documents;
546 my %auth_match_headings;
547 if( $self->index eq 'authorities' ){
548 my @auth_types = Koha::Authority::Types->search();
549 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
552 foreach my $record (@{$records}) {
553 my $record_document = {};
555 if ( $self->index eq 'authorities' ){
556 my $authtypecode = GuessAuthTypeCode( $record );
558 my $field = $record->field( $auth_match_headings{ $authtypecode } );
559 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
562 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
566 my $mappings = $rules->{leader};
568 $self->_process_mappings($mappings, $record->leader(), $record_document, {
570 data_source => 'leader'
574 foreach my $field ($record->fields()) {
575 if ($field->is_control_field()) {
576 my $mappings = $control_fields_rules->{$field->tag()};
578 $self->_process_mappings($mappings, $field->data(), $record_document, {
580 data_source => 'control_field',
587 my $tag = $field->tag();
588 # Handle alternate scripts in MARC 21
590 if ($marcflavour eq 'marc21' && $tag eq '880') {
591 my $sub6 = $field->subfield('6');
592 if ($sub6 =~ /^(...)-\d+/) {
598 my $data_field_rules = $data_fields_rules->{$tag};
599 if ($data_field_rules) {
600 my $subfields_mappings = $data_field_rules->{subfields};
601 my $wildcard_mappings = $subfields_mappings->{'*'};
602 foreach my $subfield ($field->subfields()) {
603 my ($code, $data) = @{$subfield};
604 my $mappings = $subfields_mappings->{$code} // [];
605 if ($wildcard_mappings) {
606 $mappings = [@{$mappings}, @{$wildcard_mappings}];
609 $self->_process_mappings($mappings, $data, $record_document, {
610 altscript => $altscript,
611 data_source => 'subfield',
619 my $subfields_join_mappings = $data_field_rules->{subfields_join};
620 if ($subfields_join_mappings) {
621 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
622 # Map each subfield to values, remove empty values, join with space
627 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
631 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
632 altscript => $altscript,
633 data_source => 'subfields_group',
634 codes => $subfields_group,
644 foreach my $field (keys %{$rules->{defaults}}) {
645 unless (defined $record_document->{$field}) {
646 $record_document->{$field} = $rules->{defaults}->{$field};
649 foreach my $field (@{$rules->{sum}}) {
650 if (defined $record_document->{$field}) {
651 # TODO: validate numeric? filter?
652 # TODO: Or should only accept fields without nested values?
653 # TODO: Quick and dirty, improve if needed
654 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
657 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
658 foreach my $field (@{$rules->{isbn}}) {
659 if (defined $record_document->{$field}) {
661 foreach my $input_isbn (@{$record_document->{$field}}) {
662 my $isbn = Business::ISBN->new($input_isbn);
663 if (defined $isbn && $isbn->is_valid) {
664 my $isbn13 = $isbn->as_isbn13->as_string;
665 push @isbns, $isbn13;
667 push @isbns, $isbn13;
669 my $isbn10 = $isbn->as_isbn10;
671 $isbn10 = $isbn10->as_string;
672 push @isbns, $isbn10;
674 push @isbns, $isbn10;
677 push @isbns, $input_isbn;
680 $record_document->{$field} = \@isbns;
684 # Remove duplicate values and collapse sort fields
685 foreach my $field (keys %{$record_document}) {
686 if (ref($record_document->{$field}) eq 'ARRAY') {
687 @{$record_document->{$field}} = do {
689 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
691 if ($field =~ /__sort$/) {
692 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
693 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
698 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
699 $record->encoding('UTF-8');
701 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
702 $record_document->{'marc_format'} = 'ARRAY';
706 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
707 local $SIG{__WARN__} = sub {
708 push @warnings, $_[0];
710 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
713 # Suppress warnings if record length exceeded
714 unless (substr($record->leader(), 0, 5) eq '99999') {
715 foreach my $warning (@warnings) {
719 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
720 $record_document->{'marc_format'} = 'MARCXML';
723 $record_document->{'marc_format'} = 'base64ISO2709';
726 push @record_documents, $record_document;
728 return \@record_documents;
731 =head2 _marc_to_array($record)
733 my @fields = _marc_to_array($record)
735 Convert a MARC::Record to an array modeled after MARC-in-JSON
736 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
742 A MARC::Record object
749 my ($self, $record) = @_;
752 leader => $record->leader(),
755 for my $field ($record->fields()) {
756 my $tag = $field->tag();
757 if ($field->is_control_field()) {
758 push @{$data->{fields}}, {$tag => $field->data()};
761 foreach my $subfield ($field->subfields()) {
762 my ($code, $contents) = @{$subfield};
763 push @{$subfields}, {$code => $contents};
765 push @{$data->{fields}}, {
767 ind1 => $field->indicator(1),
768 ind2 => $field->indicator(2),
769 subfields => $subfields
777 =head2 _array_to_marc($data)
779 my $record = _array_to_marc($data)
781 Convert an array modeled after MARC-in-JSON to a MARC::Record
787 An array modeled after MARC-in-JSON
788 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
795 my ($self, $data) = @_;
797 my $record = MARC::Record->new();
799 $record->leader($data->{leader});
800 for my $field (@{$data->{fields}}) {
801 my $tag = (keys %{$field})[0];
802 $field = $field->{$tag};
804 if (ref($field) eq 'HASH') {
806 foreach my $subfield (@{$field->{subfields}}) {
807 my $code = (keys %{$subfield})[0];
808 push @subfields, $code;
809 push @subfields, $subfield->{$code};
811 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
813 $marc_field = MARC::Field->new($tag, $field)
815 $record->append_fields($marc_field);
821 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
823 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
825 Get mappings, an internal data structure later used by
826 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
827 data for a MARC mapping.
829 The returned C<$mappings> is not to to be confused with mappings provided by
830 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
831 provided by C<_foreach_mapping> and expands it to this internal data structure.
832 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
833 is then applied to each MARC target (leader, control field data, subfield or
834 joined subfields) and integrated into the mapping rules data structure used in
835 C<marc_records_to_documents> to transform MARC records into Elasticsearch
842 Boolean indicating whether to create a facet field for this mapping.
844 =item C<$suggestible>
846 Boolean indicating whether to create a suggestion field for this mapping.
850 Boolean indicating whether to create a sort field for this mapping.
854 Boolean indicating whether to create a search field for this mapping.
856 =item C<$target_name>
858 Elasticsearch document target field name.
860 =item C<$target_type>
862 Elasticsearch document target field type.
866 An optional range as a string in the format "<START>-<END>" or "<START>",
867 where "<START>" and "<END>" are integers specifying a range that will be used
868 for extracting a substring from MARC data as Elasticsearch field target value.
870 The first character position is "0", and the range is inclusive,
871 so "0-2" means the first three characters of MARC data.
873 If only "<START>" is provided only one character at position "<START>" will
880 sub _field_mappings {
881 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
882 my %mapping_defaults = ();
885 my $substr_args = undef;
886 if (defined $range) {
887 # TODO: use value_callback instead?
888 my ($start, $end) = map(int, split /-/, $range, 2);
889 $substr_args = [$start];
890 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
892 my $default_options = {};
894 $default_options->{substr} = $substr_args;
897 # TODO: Should probably have per type value callback/hook
898 # but hard code for now
899 if ($target_type eq 'boolean') {
900 $default_options->{value_callbacks} //= [];
901 push @{$default_options->{value_callbacks}}, sub {
903 # Trim whitespace at both ends
904 $value =~ s/^\s+|\s+$//g;
905 return $value ? 'true' : 'false';
908 elsif ($target_type eq 'year') {
909 $default_options->{value_callbacks} //= [];
910 # Only accept years containing digits and "u"
911 push @{$default_options->{value_callbacks}}, sub {
913 # Replace "u" with "0" for sorting
914 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
919 my $mapping = [$target_name, $default_options];
920 push @mappings, $mapping;
924 push @suffixes, 'facet' if $facet;
925 push @suffixes, 'suggestion' if $suggestible;
926 push @suffixes, 'sort' if !defined $sort || $sort;
928 foreach my $suffix (@suffixes) {
929 my $mapping = ["${target_name}__$suffix"];
930 # TODO: Hack, fix later in less hideous manner
931 if ($suffix eq 'suggestion') {
932 push @{$mapping}, {%{$default_options}, property => 'input'};
935 # Important! Make shallow clone, or we end up with the same hashref
936 # shared by all mappings
937 push @{$mapping}, {%{$default_options}};
939 push @mappings, $mapping;
944 =head2 _get_marc_mapping_rules
946 my $mapping_rules = $self->_get_marc_mapping_rules()
948 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
950 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
951 each call to C<MARC::Record>->field) we create an optimized structure of mapping
952 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
954 We can then iterate through all MARC fields for each record and apply all relevant
955 rules once per fields instead of retreiving fields multiple times for each mapping rule
956 which is terribly slow.
960 # TODO: This structure can be used for processing multiple MARC::Records so is currently
961 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
962 # memory cache which it is currently not. The performance gain of caching
963 # would probably be marginal, but to do this could be a further improvement.
965 sub _get_marc_mapping_rules {
967 my $marcflavour = lc C4::Context->preference('marcflavour');
968 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
969 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
972 'control_fields' => {},
979 $self->_foreach_mapping(sub {
980 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
981 return if $marc_type ne $marcflavour;
983 if ($type eq 'sum') {
984 push @{$rules->{sum}}, $name;
985 push @{$rules->{sum}}, $name."__sort" if $sort;
987 elsif ($type eq 'isbn') {
988 push @{$rules->{isbn}}, $name;
990 elsif ($type eq 'boolean') {
991 # boolean gets special handling, if value doesn't exist for a field,
993 $rules->{defaults}->{$name} = 'false';
996 if ($marc_field =~ $field_spec_regexp) {
1000 my @subfield_groups;
1001 # Parse and separate subfields form subfield groups
1003 my $subfield_group = '';
1006 foreach my $token (split //, $2) {
1007 if ($token eq "(") {
1009 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1010 "Unmatched opening parenthesis for $marc_field"
1017 elsif ($token eq ")") {
1019 if ($subfield_group) {
1020 push @subfield_groups, $subfield_group;
1021 $subfield_group = '';
1026 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1027 "Unmatched closing parenthesis for $marc_field"
1031 elsif ($open_group) {
1032 $subfield_group .= $token;
1035 push @subfields, $token;
1040 push @subfields, '*';
1043 my $range = defined $3 ? $3 : undef;
1044 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1045 if ($field_tag < 10) {
1046 $rules->{control_fields}->{$field_tag} //= [];
1047 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1050 $rules->{data_fields}->{$field_tag} //= {};
1051 foreach my $subfield (@subfields) {
1052 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1053 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1055 foreach my $subfield_group (@subfield_groups) {
1056 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1057 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1061 elsif ($marc_field =~ $leader_regexp) {
1062 my $range = defined $1 ? $1 : undef;
1063 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1064 push @{$rules->{leader}}, @mappings;
1067 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1068 "Invalid MARC field expression: $marc_field"
1073 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1074 if ($marcflavour eq 'marc21') {
1075 # Nonfiling characters processing for sort fields
1077 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1078 # Format is: nonfiling characters indicator => field names list
1080 1 => [130, 630, 730, 740],
1081 2 => [222, 240, 242, 243, 245, 440, 830]
1084 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1087 2 => [130, 430, 530]
1090 foreach my $indicator (keys %title_fields) {
1091 foreach my $field_tag (@{$title_fields{$indicator}}) {
1092 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1093 foreach my $mapping (@{$mappings}) {
1094 if ($mapping->[0] =~ /__sort$/) {
1095 # Mark this as to be processed for nonfiling characters indicator
1096 # later on in _process_mappings
1097 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1107 =head2 _foreach_mapping
1109 $self->_foreach_mapping(
1111 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1114 return unless $marc_type eq 'marc21';
1115 print "Data comes from: " . $marc_field . "\n";
1119 This allows you to apply a function to each entry in the elasticsearch mappings
1120 table, in order to build the mappings for whatever is needed.
1122 In the provided function, the files are:
1128 The field name for elasticsearch (corresponds to the 'mapping' column in the
1133 The type for this value, e.g. 'string'.
1137 True if this value should be facetised. This only really makes sense if the
1138 field is understood by the facet processing code anyway.
1142 True if this is a field that a) needs special sort handling, and b) if it
1143 should be sorted on. False if a) but not b). Undef if not a). This allows,
1144 for example, author to be sorted on but not everything marked with "author"
1145 to be included in that sort.
1149 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1150 'unimarc', 'normarc'.
1152 =item C<$marc_field>
1154 A string that describes the MARC field that contains the data to extract.
1160 sub _foreach_mapping {
1161 my ( $self, $sub ) = @_;
1163 # TODO use a caching framework here
1164 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1166 'search_marc_map.index_name' => $self->index,
1168 { join => { search_marc_to_fields => 'search_marc_map' },
1170 'search_marc_to_fields.facet',
1171 'search_marc_to_fields.suggestible',
1172 'search_marc_to_fields.sort',
1173 'search_marc_to_fields.search',
1174 'search_marc_map.marc_type',
1175 'search_marc_map.marc_field',
1188 while ( my $search_field = $search_fields->next ) {
1190 # Force lower case on indexed field names for case insensitive
1191 # field name searches
1192 lc($search_field->name),
1193 $search_field->type,
1194 $search_field->get_column('facet'),
1195 $search_field->get_column('suggestible'),
1196 $search_field->get_column('sort'),
1197 $search_field->get_column('search'),
1198 $search_field->get_column('marc_type'),
1199 $search_field->get_column('marc_field'),
1204 =head2 process_error
1206 die process_error($@);
1208 This parses an Elasticsearch error message and produces a human-readable
1209 result from it. This result is probably missing all the useful information
1210 that you might want in diagnosing an issue, so the warning is also logged.
1212 Note that currently the resulting message is not internationalised. This
1213 will happen eventually by some method or other.
1218 my ($self, $msg) = @_;
1220 warn $msg; # simple logging
1222 # This is super-primitive
1223 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1225 return "Unable to perform your search. Please try again.\n";
1228 =head2 _read_configuration
1230 my $conf = _read_configuration();
1232 Reads the I<configuration file> and returns a hash structure with the
1233 configuration information. It raises an exception if mandatory entries
1236 The hashref structure has the following form:
1239 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1240 'index_name' => 'koha_instance',
1243 This is configured by the following in the C<config> block in koha-conf.xml:
1246 <server>127.0.0.1:9200</server>
1247 <server>anotherserver:9200</server>
1248 <index_name>koha_instance</index_name>
1253 sub _read_configuration {
1257 my $conf = C4::Context->config('elasticsearch');
1258 unless ( defined $conf ) {
1259 Koha::Exceptions::Config::MissingEntry->throw(
1260 "Missing <elasticsearch> entry in koha-conf.xml"
1264 if ( $conf && $conf->{server} ) {
1265 my $nodes = $conf->{server};
1266 if ( ref($nodes) eq 'ARRAY' ) {
1267 $configuration->{nodes} = $nodes;
1270 $configuration->{nodes} = [$nodes];
1274 Koha::Exceptions::Config::MissingEntry->throw(
1275 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1279 if ( defined $conf->{index_name} ) {
1280 $configuration->{index_name} = $conf->{index_name};
1283 Koha::Exceptions::Config::MissingEntry->throw(
1284 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1288 $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1290 return $configuration;
1293 =head2 get_facetable_fields
1295 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1297 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1301 sub get_facetable_fields {
1304 # These should correspond to the ES field names, as opposed to the CCL
1305 # things that zebra uses.
1306 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1307 my @faceted_fields = Koha::SearchFields->search(
1308 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1310 my @not_faceted_fields = Koha::SearchFields->search(
1311 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1313 # This could certainly be improved
1314 return ( @faceted_fields, @not_faceted_fields );
1317 =head2 clear_search_fields_cache
1319 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1321 Clear cached values for ES search fields
1325 sub clear_search_fields_cache {
1327 my $cache = Koha::Caches->get_instance();
1328 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1329 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1330 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1331 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1343 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1345 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1347 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>