1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
31 use C4::AuthoritiesMarc;
38 use Search::Elasticsearch;
42 use List::Util qw( sum0 reduce all );
45 use Encode qw(encode);
47 use Scalar::Util qw(looks_like_number);
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
66 The name of the index to use, generally 'biblios' or 'authorities'.
70 The Elasticsearch index name with Koha instance prefix.
83 # Check for a valid index
84 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85 my $config = _read_configuration();
86 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88 my $self = $class->SUPER::new(@_);
92 =head2 get_elasticsearch
94 my $elasticsearch_client = $self->get_elasticsearch();
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
101 sub get_elasticsearch {
103 unless (defined $self->{elasticsearch}) {
104 $self->{elasticsearch} = Search::Elasticsearch->new(
105 $self->get_elasticsearch_params()
108 return $self->{elasticsearch};
111 =head2 get_elasticsearch_params
113 my $params = $self->get_elasticsearch_params();
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
119 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120 'index_name' => 'koha_instance_index',
123 This is configured by the following in the C<config> block in koha-conf.xml:
126 <server>127.0.0.1:9200</server>
127 <server>anotherserver:9200</server>
128 <index_name>koha_instance</index_name>
133 sub get_elasticsearch_params {
138 $conf = _read_configuration();
140 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
148 =head2 get_elasticsearch_settings
150 my $settings = $self->get_elasticsearch_settings();
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
155 A hashref containing the settings is returned.
159 sub get_elasticsearch_settings {
162 # Use state to speed up repeated calls
163 state $settings = undef;
164 if (!defined $settings) {
165 my $config_file = C4::Context->config('elasticsearch_index_config');
166 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167 $settings = LoadFile( $config_file );
173 =head2 get_elasticsearch_mappings
175 my $mappings = $self->get_elasticsearch_mappings();
177 This provides the mappings that get passed to Elasticsearch when an index is
182 sub get_elasticsearch_mappings {
185 # Use state to speed up repeated calls
189 if (!defined $all_mappings{$self->index}) {
190 $sort_fields{$self->index} = {};
191 # Clone the general mapping to break ties with the original hash
193 data => clone(_get_elasticsearch_field_config('general', ''))
195 my $marcflavour = lc C4::Context->preference('marcflavour');
196 $self->_foreach_mapping(
198 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199 return if $marc_type ne $marcflavour;
200 # TODO if this gets any sort of complexity to it, it should
201 # be broken out into its own function.
203 # TODO be aware of date formats, but this requires pre-parsing
204 # as ES will simply reject anything with an invalid date.
205 my $es_type = 'text';
206 if ($type eq 'boolean') {
207 $es_type = 'boolean';
208 } elsif ($type eq 'number' || $type eq 'sum') {
209 $es_type = 'integer';
210 } elsif ($type eq 'isbn' || $type eq 'stdno') {
212 } elsif ($type eq 'year') {
217 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236 $all_mappings{$self->index} = $mappings;
238 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 raw_elasticsearch_mappings
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
251 sub raw_elasticsearch_mappings {
252 my ( $marc_type ) = @_;
254 my $schema = Koha::Database->new()->schema();
256 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
259 while ( my $search_field = $search_fields->next ) {
261 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262 { search_field_id => $search_field->id },
264 join => 'search_marc_map',
265 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
269 while ( my $marc_to_field = $marc_to_fields->next ) {
271 my $marc_map = $marc_to_field->search_marc_map;
273 next if $marc_type && $marc_map->marc_type ne $marc_type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284 facet => $marc_to_field->facet || '',
285 marc_type => $marc_map->marc_type,
286 marc_field => $marc_map->marc_field,
287 sort => $marc_to_field->sort,
288 suggestible => $marc_to_field->suggestible || ''
297 =head2 _get_elasticsearch_field_config
299 Get the Elasticsearch field config for the given purpose and data type.
301 $mapping = _get_elasticsearch_field_config('search', 'text');
305 sub _get_elasticsearch_field_config {
307 my ( $purpose, $type ) = @_;
309 # Use state to speed up repeated calls
310 state $settings = undef;
311 if (!defined $settings) {
312 my $config_file = C4::Context->config('elasticsearch_field_config');
313 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314 $settings = LoadFile( $config_file );
317 if (!defined $settings->{$purpose}) {
318 die "Field purpose $purpose not defined in field config";
321 return $settings->{$purpose};
323 if (defined $settings->{$purpose}{$type}) {
324 return $settings->{$purpose}{$type};
326 if (defined $settings->{$purpose}{'default'}) {
327 return $settings->{$purpose}{'default'};
332 =head2 _load_elasticsearch_mappings
334 Load Elasticsearch mappings in the format of mappings.yaml.
336 $indexes = _load_elasticsearch_mappings();
340 sub _load_elasticsearch_mappings {
341 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343 return LoadFile( $mappings_yaml );
346 sub reset_elasticsearch_mappings {
348 my $indexes = $self->_load_elasticsearch_mappings();
350 Koha::SearchMarcMaps->delete;
351 Koha::SearchFields->delete;
353 while ( my ( $index_name, $fields ) = each %$indexes ) {
354 while ( my ( $field_name, $data ) = each %$fields ) {
356 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
359 $sf_params{staff_client} //= 1;
360 $sf_params{opac} //= 1;
362 $sf_params{name} = $field_name;
364 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366 my $mappings = $data->{mappings};
367 for my $mapping ( @$mappings ) {
368 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369 index_name => $index_name,
370 marc_type => $mapping->{marc_type},
371 marc_field => $mapping->{marc_field}
373 $search_field->add_to_search_marc_maps($marc_field, {
374 facet => $mapping->{facet} || 0,
375 suggestible => $mapping->{suggestible} || 0,
376 sort => $mapping->{sort},
377 search => $mapping->{search} // 1
383 $self->clear_search_fields_cache();
385 # FIXME return the mappings?
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
393 $self->_sort_fields_accessor(@_);
396 my $val = $self->_sort_fields_accessor();
399 # This will populate the accessor as a side effect
400 $self->get_elasticsearch_mappings();
401 return $self->_sort_fields_accessor();
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
425 The source data from a MARC record field.
427 =item C<$record_document>
429 Hashref representing the Elasticsearch document on which mappings should be
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
441 Current properties are:
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
454 C<field>: The original C<MARC::Record> object.
460 sub _process_mappings {
461 my ($_self, $mappings, $data, $record_document, $meta) = @_;
462 foreach my $mapping (@{$mappings}) {
463 my ($target, $options) = @{$mapping};
465 # Don't process sort fields for alternate scripts
466 my $sort = $target =~ /__sort$/;
467 if ($sort && $meta->{altscript}) {
471 # Copy (scalar) data since can have multiple targets
472 # with differing options for (possibly) mutating data
473 # so need a different copy for each
474 my $data_copy = $data;
475 if (defined $options->{substr}) {
476 my ($start, $length) = @{$options->{substr}};
477 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
480 # Add data to values array for callbacks processing
481 my $values = [$data_copy];
483 # Value callbacks takes subfield data (or values from previous
484 # callbacks) as argument, and returns a possibly different list of values.
485 # Note that the returned list may also be empty.
486 if (defined $options->{value_callbacks}) {
487 foreach my $callback (@{$options->{value_callbacks}}) {
488 # Pass each value to current callback which returns a list
489 # (scalar is fine too) resulting either in a list or
490 # a list of lists that will be flattened by perl.
491 # The next callback will receive the possibly expanded list of values.
492 $values = [ map { $callback->($_) } @{$values} ];
496 # Skip mapping if all values has been removed
497 next unless @{$values};
499 if (defined $options->{property}) {
500 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
502 if (defined $options->{nonfiling_characters_indicator}) {
503 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505 # Nonfiling chars does not make sense for multiple values
506 # Only apply on first element
507 $values->[0] = substr $values->[0], $nonfiling_chars;
510 $values = [ grep(!/^$/, @{$values}) ];
512 $record_document->{$target} //= [];
513 push @{$record_document->{$target}}, @{$values};
517 =head2 marc_records_to_documents($marc_records)
519 my $record_documents = $self->marc_records_to_documents($marc_records);
521 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
523 Returns array of hash references, representing Elasticsearch documents,
524 acceptable as body payload in C<Search::Elasticsearch> requests.
528 =item C<$marc_documents>
530 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
536 sub marc_records_to_documents {
537 my ($self, $records) = @_;
538 my $rules = $self->_get_marc_mapping_rules();
539 my $control_fields_rules = $rules->{control_fields};
540 my $data_fields_rules = $rules->{data_fields};
541 my $marcflavour = lc C4::Context->preference('marcflavour');
542 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
544 my @record_documents;
546 my %auth_match_headings;
547 if( $self->index eq 'authorities' ){
548 my @auth_types = Koha::Authority::Types->search();
549 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
552 foreach my $record (@{$records}) {
553 my $record_document = {};
555 if ( $self->index eq 'authorities' ){
556 my $authtypecode = GuessAuthTypeCode( $record );
558 my $field = $record->field( $auth_match_headings{ $authtypecode } );
559 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
562 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
566 my $mappings = $rules->{leader};
568 $self->_process_mappings($mappings, $record->leader(), $record_document, {
570 data_source => 'leader'
574 foreach my $field ($record->fields()) {
575 if ($field->is_control_field()) {
576 my $mappings = $control_fields_rules->{$field->tag()};
578 $self->_process_mappings($mappings, $field->data(), $record_document, {
580 data_source => 'control_field',
587 my $tag = $field->tag();
588 # Handle alternate scripts in MARC 21
590 if ($marcflavour eq 'marc21' && $tag eq '880') {
591 my $sub6 = $field->subfield('6');
592 if ($sub6 =~ /^(...)-\d+/) {
598 my $data_field_rules = $data_fields_rules->{$tag};
599 if ($data_field_rules) {
600 my $subfields_mappings = $data_field_rules->{subfields};
601 my $wildcard_mappings = $subfields_mappings->{'*'};
602 foreach my $subfield ($field->subfields()) {
603 my ($code, $data) = @{$subfield};
604 my $mappings = $subfields_mappings->{$code} // [];
605 if ($wildcard_mappings) {
606 $mappings = [@{$mappings}, @{$wildcard_mappings}];
609 $self->_process_mappings($mappings, $data, $record_document, {
610 altscript => $altscript,
611 data_source => 'subfield',
619 my $subfields_join_mappings = $data_field_rules->{subfields_join};
620 if ($subfields_join_mappings) {
621 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
622 my $data_field = $field->clone; #copy field to preserve for alt scripts
623 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
624 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
626 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
627 altscript => $altscript,
628 data_source => 'subfields_group',
629 codes => $subfields_group,
639 foreach my $field (keys %{$rules->{defaults}}) {
640 unless (defined $record_document->{$field}) {
641 $record_document->{$field} = $rules->{defaults}->{$field};
644 foreach my $field (@{$rules->{sum}}) {
645 if (defined $record_document->{$field}) {
646 # TODO: validate numeric? filter?
647 # TODO: Or should only accept fields without nested values?
648 # TODO: Quick and dirty, improve if needed
649 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
652 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
653 foreach my $field (@{$rules->{isbn}}) {
654 if (defined $record_document->{$field}) {
656 foreach my $input_isbn (@{$record_document->{$field}}) {
657 my $isbn = Business::ISBN->new($input_isbn);
658 if (defined $isbn && $isbn->is_valid) {
659 my $isbn13 = $isbn->as_isbn13->as_string;
660 push @isbns, $isbn13;
662 push @isbns, $isbn13;
664 my $isbn10 = $isbn->as_isbn10;
666 $isbn10 = $isbn10->as_string;
667 push @isbns, $isbn10;
669 push @isbns, $isbn10;
672 push @isbns, $input_isbn;
675 $record_document->{$field} = \@isbns;
679 # Remove duplicate values and collapse sort fields
680 foreach my $field (keys %{$record_document}) {
681 if (ref($record_document->{$field}) eq 'ARRAY') {
682 @{$record_document->{$field}} = do {
684 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
686 if ($field =~ /__sort$/) {
687 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
688 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
693 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
694 $record->encoding('UTF-8');
696 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
697 $record_document->{'marc_format'} = 'ARRAY';
701 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
702 local $SIG{__WARN__} = sub {
703 push @warnings, $_[0];
705 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
708 # Suppress warnings if record length exceeded
709 unless (substr($record->leader(), 0, 5) eq '99999') {
710 foreach my $warning (@warnings) {
714 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
715 $record_document->{'marc_format'} = 'MARCXML';
718 $record_document->{'marc_format'} = 'base64ISO2709';
721 push @record_documents, $record_document;
723 return \@record_documents;
726 =head2 _marc_to_array($record)
728 my @fields = _marc_to_array($record)
730 Convert a MARC::Record to an array modeled after MARC-in-JSON
731 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
737 A MARC::Record object
744 my ($self, $record) = @_;
747 leader => $record->leader(),
750 for my $field ($record->fields()) {
751 my $tag = $field->tag();
752 if ($field->is_control_field()) {
753 push @{$data->{fields}}, {$tag => $field->data()};
756 foreach my $subfield ($field->subfields()) {
757 my ($code, $contents) = @{$subfield};
758 push @{$subfields}, {$code => $contents};
760 push @{$data->{fields}}, {
762 ind1 => $field->indicator(1),
763 ind2 => $field->indicator(2),
764 subfields => $subfields
772 =head2 _array_to_marc($data)
774 my $record = _array_to_marc($data)
776 Convert an array modeled after MARC-in-JSON to a MARC::Record
782 An array modeled after MARC-in-JSON
783 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
790 my ($self, $data) = @_;
792 my $record = MARC::Record->new();
794 $record->leader($data->{leader});
795 for my $field (@{$data->{fields}}) {
796 my $tag = (keys %{$field})[0];
797 $field = $field->{$tag};
799 if (ref($field) eq 'HASH') {
801 foreach my $subfield (@{$field->{subfields}}) {
802 my $code = (keys %{$subfield})[0];
803 push @subfields, $code;
804 push @subfields, $subfield->{$code};
806 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
808 $marc_field = MARC::Field->new($tag, $field)
810 $record->append_fields($marc_field);
816 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
818 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
820 Get mappings, an internal data structure later used by
821 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
822 data for a MARC mapping.
824 The returned C<$mappings> is not to to be confused with mappings provided by
825 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
826 provided by C<_foreach_mapping> and expands it to this internal data structure.
827 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
828 is then applied to each MARC target (leader, control field data, subfield or
829 joined subfields) and integrated into the mapping rules data structure used in
830 C<marc_records_to_documents> to transform MARC records into Elasticsearch
837 Boolean indicating whether to create a facet field for this mapping.
839 =item C<$suggestible>
841 Boolean indicating whether to create a suggestion field for this mapping.
845 Boolean indicating whether to create a sort field for this mapping.
849 Boolean indicating whether to create a search field for this mapping.
851 =item C<$target_name>
853 Elasticsearch document target field name.
855 =item C<$target_type>
857 Elasticsearch document target field type.
861 An optional range as a string in the format "<START>-<END>" or "<START>",
862 where "<START>" and "<END>" are integers specifying a range that will be used
863 for extracting a substring from MARC data as Elasticsearch field target value.
865 The first character position is "0", and the range is inclusive,
866 so "0-2" means the first three characters of MARC data.
868 If only "<START>" is provided only one character at position "<START>" will
875 sub _field_mappings {
876 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
877 my %mapping_defaults = ();
880 my $substr_args = undef;
881 if (defined $range) {
882 # TODO: use value_callback instead?
883 my ($start, $end) = map(int, split /-/, $range, 2);
884 $substr_args = [$start];
885 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
887 my $default_options = {};
889 $default_options->{substr} = $substr_args;
892 # TODO: Should probably have per type value callback/hook
893 # but hard code for now
894 if ($target_type eq 'boolean') {
895 $default_options->{value_callbacks} //= [];
896 push @{$default_options->{value_callbacks}}, sub {
898 # Trim whitespace at both ends
899 $value =~ s/^\s+|\s+$//g;
900 return $value ? 'true' : 'false';
903 elsif ($target_type eq 'year') {
904 $default_options->{value_callbacks} //= [];
905 # Only accept years containing digits and "u"
906 push @{$default_options->{value_callbacks}}, sub {
908 # Replace "u" with "0" for sorting
909 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
914 my $mapping = [$target_name, $default_options];
915 push @mappings, $mapping;
919 push @suffixes, 'facet' if $facet;
920 push @suffixes, 'suggestion' if $suggestible;
921 push @suffixes, 'sort' if !defined $sort || $sort;
923 foreach my $suffix (@suffixes) {
924 my $mapping = ["${target_name}__$suffix"];
925 # TODO: Hack, fix later in less hideous manner
926 if ($suffix eq 'suggestion') {
927 push @{$mapping}, {%{$default_options}, property => 'input'};
930 # Important! Make shallow clone, or we end up with the same hashref
931 # shared by all mappings
932 push @{$mapping}, {%{$default_options}};
934 push @mappings, $mapping;
939 =head2 _get_marc_mapping_rules
941 my $mapping_rules = $self->_get_marc_mapping_rules()
943 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
945 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
946 each call to C<MARC::Record>->field) we create an optimized structure of mapping
947 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
949 We can then iterate through all MARC fields for each record and apply all relevant
950 rules once per fields instead of retreiving fields multiple times for each mapping rule
951 which is terribly slow.
955 # TODO: This structure can be used for processing multiple MARC::Records so is currently
956 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
957 # memory cache which it is currently not. The performance gain of caching
958 # would probably be marginal, but to do this could be a further improvement.
960 sub _get_marc_mapping_rules {
962 my $marcflavour = lc C4::Context->preference('marcflavour');
963 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
964 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
967 'control_fields' => {},
974 $self->_foreach_mapping(sub {
975 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
976 return if $marc_type ne $marcflavour;
978 if ($type eq 'sum') {
979 push @{$rules->{sum}}, $name;
980 push @{$rules->{sum}}, $name."__sort" if $sort;
982 elsif ($type eq 'isbn') {
983 push @{$rules->{isbn}}, $name;
985 elsif ($type eq 'boolean') {
986 # boolean gets special handling, if value doesn't exist for a field,
988 $rules->{defaults}->{$name} = 'false';
991 if ($marc_field =~ $field_spec_regexp) {
996 # Parse and separate subfields form subfield groups
998 my $subfield_group = '';
1001 foreach my $token (split //, $2) {
1002 if ($token eq "(") {
1004 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1005 "Unmatched opening parenthesis for $marc_field"
1012 elsif ($token eq ")") {
1014 if ($subfield_group) {
1015 push @subfield_groups, $subfield_group;
1016 $subfield_group = '';
1021 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1022 "Unmatched closing parenthesis for $marc_field"
1026 elsif ($open_group) {
1027 $subfield_group .= $token;
1030 push @subfields, $token;
1035 push @subfields, '*';
1038 my $range = defined $3 ? $3 : undef;
1039 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1040 if ($field_tag < 10) {
1041 $rules->{control_fields}->{$field_tag} //= [];
1042 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1045 $rules->{data_fields}->{$field_tag} //= {};
1046 foreach my $subfield (@subfields) {
1047 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1048 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1050 foreach my $subfield_group (@subfield_groups) {
1051 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1052 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1056 elsif ($marc_field =~ $leader_regexp) {
1057 my $range = defined $1 ? $1 : undef;
1058 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1059 push @{$rules->{leader}}, @mappings;
1062 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1063 "Invalid MARC field expression: $marc_field"
1068 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1069 if ($marcflavour eq 'marc21') {
1070 # Nonfiling characters processing for sort fields
1072 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1073 # Format is: nonfiling characters indicator => field names list
1075 1 => [130, 630, 730, 740],
1076 2 => [222, 240, 242, 243, 245, 440, 830]
1079 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1082 2 => [130, 430, 530]
1085 foreach my $indicator (keys %title_fields) {
1086 foreach my $field_tag (@{$title_fields{$indicator}}) {
1087 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1088 foreach my $mapping (@{$mappings}) {
1089 if ($mapping->[0] =~ /__sort$/) {
1090 # Mark this as to be processed for nonfiling characters indicator
1091 # later on in _process_mappings
1092 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1102 =head2 _foreach_mapping
1104 $self->_foreach_mapping(
1106 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1109 return unless $marc_type eq 'marc21';
1110 print "Data comes from: " . $marc_field . "\n";
1114 This allows you to apply a function to each entry in the elasticsearch mappings
1115 table, in order to build the mappings for whatever is needed.
1117 In the provided function, the files are:
1123 The field name for elasticsearch (corresponds to the 'mapping' column in the
1128 The type for this value, e.g. 'string'.
1132 True if this value should be facetised. This only really makes sense if the
1133 field is understood by the facet processing code anyway.
1137 True if this is a field that a) needs special sort handling, and b) if it
1138 should be sorted on. False if a) but not b). Undef if not a). This allows,
1139 for example, author to be sorted on but not everything marked with "author"
1140 to be included in that sort.
1144 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1145 'unimarc', 'normarc'.
1147 =item C<$marc_field>
1149 A string that describes the MARC field that contains the data to extract.
1155 sub _foreach_mapping {
1156 my ( $self, $sub ) = @_;
1158 # TODO use a caching framework here
1159 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1161 'search_marc_map.index_name' => $self->index,
1163 { join => { search_marc_to_fields => 'search_marc_map' },
1165 'search_marc_to_fields.facet',
1166 'search_marc_to_fields.suggestible',
1167 'search_marc_to_fields.sort',
1168 'search_marc_to_fields.search',
1169 'search_marc_map.marc_type',
1170 'search_marc_map.marc_field',
1183 while ( my $search_field = $search_fields->next ) {
1185 # Force lower case on indexed field names for case insensitive
1186 # field name searches
1187 lc($search_field->name),
1188 $search_field->type,
1189 $search_field->get_column('facet'),
1190 $search_field->get_column('suggestible'),
1191 $search_field->get_column('sort'),
1192 $search_field->get_column('search'),
1193 $search_field->get_column('marc_type'),
1194 $search_field->get_column('marc_field'),
1199 =head2 process_error
1201 die process_error($@);
1203 This parses an Elasticsearch error message and produces a human-readable
1204 result from it. This result is probably missing all the useful information
1205 that you might want in diagnosing an issue, so the warning is also logged.
1207 Note that currently the resulting message is not internationalised. This
1208 will happen eventually by some method or other.
1213 my ($self, $msg) = @_;
1215 warn $msg; # simple logging
1217 # This is super-primitive
1218 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1220 return "Unable to perform your search. Please try again.\n";
1223 =head2 _read_configuration
1225 my $conf = _read_configuration();
1227 Reads the I<configuration file> and returns a hash structure with the
1228 configuration information. It raises an exception if mandatory entries
1231 The hashref structure has the following form:
1234 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1235 'index_name' => 'koha_instance',
1238 This is configured by the following in the C<config> block in koha-conf.xml:
1241 <server>127.0.0.1:9200</server>
1242 <server>anotherserver:9200</server>
1243 <index_name>koha_instance</index_name>
1248 sub _read_configuration {
1252 my $conf = C4::Context->config('elasticsearch');
1253 unless ( defined $conf ) {
1254 Koha::Exceptions::Config::MissingEntry->throw(
1255 "Missing <elasticsearch> entry in koha-conf.xml"
1259 if ( $conf && $conf->{server} ) {
1260 my $nodes = $conf->{server};
1261 if ( ref($nodes) eq 'ARRAY' ) {
1262 $configuration->{nodes} = $nodes;
1265 $configuration->{nodes} = [$nodes];
1269 Koha::Exceptions::Config::MissingEntry->throw(
1270 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1274 if ( defined $conf->{index_name} ) {
1275 $configuration->{index_name} = $conf->{index_name};
1278 Koha::Exceptions::Config::MissingEntry->throw(
1279 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1283 $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1285 return $configuration;
1288 =head2 get_facetable_fields
1290 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1292 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1296 sub get_facetable_fields {
1299 # These should correspond to the ES field names, as opposed to the CCL
1300 # things that zebra uses.
1301 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1302 my @faceted_fields = Koha::SearchFields->search(
1303 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1305 my @not_faceted_fields = Koha::SearchFields->search(
1306 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1308 # This could certainly be improved
1309 return ( @faceted_fields, @not_faceted_fields );
1312 =head2 clear_search_fields_cache
1314 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1316 Clear cached values for ES search fields
1320 sub clear_search_fields_cache {
1322 my $cache = Koha::Caches->get_instance();
1323 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1324 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1325 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1326 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1338 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1340 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1342 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>