1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
35 use Carp qw( carp croak );
36 use Clone qw( clone );
38 use Readonly qw( Readonly );
39 use Search::Elasticsearch;
40 use Try::Tiny qw( catch try );
43 use List::Util qw( sum0 );
45 use MIME::Base64 qw( encode_base64 );
46 use Encode qw( encode );
48 use Scalar::Util qw( looks_like_number );
50 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
51 __PACKAGE__->mk_accessors(qw( sort_fields ));
53 # Constants to refer to the standard index names
54 Readonly our $BIBLIOS_INDEX => 'biblios';
55 Readonly our $AUTHORITIES_INDEX => 'authorities';
59 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
67 The name of the index to use, generally 'biblios' or 'authorities'.
71 The Elasticsearch index name with Koha instance prefix.
84 # Check for a valid index
85 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
86 my $config = _read_configuration();
87 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
89 my $self = $class->SUPER::new(@_);
93 =head2 get_elasticsearch
95 my $elasticsearch_client = $self->get_elasticsearch();
97 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
98 instance level and will be reused if method is called multiple times.
102 sub get_elasticsearch {
104 unless (defined $self->{elasticsearch}) {
105 $self->{elasticsearch} = Search::Elasticsearch->new(
106 $self->get_elasticsearch_params()
109 return $self->{elasticsearch};
112 =head2 get_elasticsearch_params
114 my $params = $self->get_elasticsearch_params();
116 This provides a hashref that contains the parameters for connecting to the
117 ElasicSearch servers, in the form:
120 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
121 'index_name' => 'koha_instance_index',
124 This is configured by the following in the C<config> block in koha-conf.xml:
127 <server>127.0.0.1:9200</server>
128 <server>anotherserver:9200</server>
129 <index_name>koha_instance</index_name>
134 sub get_elasticsearch_params {
139 $conf = _read_configuration();
141 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
149 =head2 get_elasticsearch_settings
151 my $settings = $self->get_elasticsearch_settings();
153 This provides the settings provided to Elasticsearch when an index is created.
154 These can do things like define tokenization methods.
156 A hashref containing the settings is returned.
160 sub get_elasticsearch_settings {
163 # Use state to speed up repeated calls
164 state $settings = undef;
165 if (!defined $settings) {
166 my $config_file = C4::Context->config('elasticsearch_index_config');
167 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
168 $settings = YAML::XS::LoadFile( $config_file );
174 =head2 get_elasticsearch_mappings
176 my $mappings = $self->get_elasticsearch_mappings();
178 This provides the mappings that get passed to Elasticsearch when an index is
183 sub get_elasticsearch_mappings {
186 # Use state to speed up repeated calls
190 if (!defined $all_mappings{$self->index}) {
191 $sort_fields{$self->index} = {};
192 # Clone the general mapping to break ties with the original hash
193 my $mappings = clone(_get_elasticsearch_field_config('general', ''));
194 my $marcflavour = lc C4::Context->preference('marcflavour');
195 $self->_foreach_mapping(
197 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
198 return if $marc_type ne $marcflavour;
199 # TODO if this gets any sort of complexity to it, it should
200 # be broken out into its own function.
202 # TODO be aware of date formats, but this requires pre-parsing
203 # as ES will simply reject anything with an invalid date.
204 my $es_type = 'text';
205 if ($type eq 'boolean') {
206 $es_type = 'boolean';
207 } elsif ($type eq 'number' || $type eq 'sum') {
208 $es_type = 'integer';
209 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 } elsif ($type eq 'year') {
213 } elsif ($type eq 'callnumber') {
214 $es_type = 'cn_sort';
218 $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222 $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
225 $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227 # Sort is a bit special as it can be true, false, undef.
228 # We care about "true" or "undef",
229 # "undef" means to do the default thing, which is make it sortable.
230 if (!defined $sort || $sort) {
231 $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
232 $sort_fields{$self->index}{$name} = 1;
236 $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
237 $all_mappings{$self->index} = $mappings;
239 $self->sort_fields(\%{$sort_fields{$self->index}});
240 return $all_mappings{$self->index};
243 =head2 raw_elasticsearch_mappings
245 Return elasticsearch mapping as it is in database.
246 marc_type: marc21|unimarc
248 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
252 sub raw_elasticsearch_mappings {
253 my ( $marc_type ) = @_;
255 my $schema = Koha::Database->new()->schema();
257 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
260 while ( my $search_field = $search_fields->next ) {
262 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
263 { search_field_id => $search_field->id },
265 join => 'search_marc_map',
266 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
270 while ( my $marc_to_field = $marc_to_fields->next ) {
272 my $marc_map = $marc_to_field->search_marc_map;
274 next if $marc_type && $marc_map->marc_type ne $marc_type;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
284 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
286 facet => $marc_to_field->facet || '',
287 marc_type => $marc_map->marc_type,
288 marc_field => $marc_map->marc_field,
289 sort => $marc_to_field->sort,
290 suggestible => $marc_to_field->suggestible || ''
299 =head2 _get_elasticsearch_field_config
301 Get the Elasticsearch field config for the given purpose and data type.
303 $mapping = _get_elasticsearch_field_config('search', 'text');
307 sub _get_elasticsearch_field_config {
309 my ( $purpose, $type ) = @_;
311 # Use state to speed up repeated calls
312 state $settings = undef;
313 if (!defined $settings) {
314 my $config_file = C4::Context->config('elasticsearch_field_config');
315 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
316 local $YAML::XS::Boolean = 'JSON::PP';
317 $settings = YAML::XS::LoadFile( $config_file );
320 if (!defined $settings->{$purpose}) {
321 die "Field purpose $purpose not defined in field config";
324 return $settings->{$purpose};
326 if (defined $settings->{$purpose}{$type}) {
327 return $settings->{$purpose}{$type};
329 if (defined $settings->{$purpose}{'default'}) {
330 return $settings->{$purpose}{'default'};
335 =head2 _load_elasticsearch_mappings
337 Load Elasticsearch mappings in the format of mappings.yaml.
339 $indexes = _load_elasticsearch_mappings();
343 sub _load_elasticsearch_mappings {
344 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
345 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
346 return YAML::XS::LoadFile( $mappings_yaml );
349 sub reset_elasticsearch_mappings {
351 my $indexes = $self->_load_elasticsearch_mappings();
353 Koha::SearchMarcMaps->delete;
354 Koha::SearchFields->delete;
356 while ( my ( $index_name, $fields ) = each %$indexes ) {
357 while ( my ( $field_name, $data ) = each %$fields ) {
359 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
362 $sf_params{staff_client} //= 1;
363 $sf_params{opac} //= 1;
365 $sf_params{name} = $field_name;
367 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
369 my $mappings = $data->{mappings};
370 for my $mapping ( @$mappings ) {
371 my $marc_field = Koha::SearchMarcMaps->find_or_create({
372 index_name => $index_name,
373 marc_type => $mapping->{marc_type},
374 marc_field => $mapping->{marc_field}
376 $search_field->add_to_search_marc_maps($marc_field, {
377 facet => $mapping->{facet} || 0,
378 suggestible => $mapping->{suggestible} || 0,
379 sort => $mapping->{sort} // 1,
380 search => $mapping->{search} // 1
386 $self->clear_search_fields_cache();
388 # FIXME return the mappings?
391 # This overrides the accessor provided by Class::Accessor so that if
392 # sort_fields isn't set, then it'll generate it.
396 $self->_sort_fields_accessor(@_);
399 my $val = $self->_sort_fields_accessor();
402 # This will populate the accessor as a side effect
403 $self->get_elasticsearch_mappings();
404 return $self->_sort_fields_accessor();
407 =head2 _process_mappings($mappings, $data, $record_document, $meta)
409 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
411 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
412 Since we group all mappings by MARC field targets C<$mappings> will contain
413 all targets for C<$data> and thus we need to fetch the MARC field only once.
414 C<$mappings> will be applied to C<$record_document> and new field values added.
415 The method has no return value.
421 Arrayref of mappings containing arrayrefs in the format
422 [C<$target>, C<$options>] where C<$target> is the name of the target field and
423 C<$options> is a hashref containing processing directives for this particular
428 The source data from a MARC record field.
430 =item C<$record_document>
432 Hashref representing the Elasticsearch document on which mappings should be
437 A hashref containing metadata useful for enforcing per mapping rules. For
438 example for providing extra context for mapping options, or treating mapping
439 targets differently depending on type (sort, search, facet etc). Combining
440 this metadata with the mapping options and metadata allows us to mutate the
441 data per mapping, or even replace it with other data retrieved from the
444 Current properties are:
446 C<altscript>: A boolean value indicating whether an alternate script presentation is being
449 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
450 'subfield' or 'subfields_group'.
452 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
454 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
455 is 'subfields_group'.
457 C<field>: The original C<MARC::Record> object.
463 sub _process_mappings {
464 my ($_self, $mappings, $data, $record_document, $meta) = @_;
465 foreach my $mapping (@{$mappings}) {
466 my ($target, $options) = @{$mapping};
468 # Don't process sort fields for alternate scripts
469 my $sort = $target =~ /__sort$/;
470 if ($sort && $meta->{altscript}) {
474 # Copy (scalar) data since can have multiple targets
475 # with differing options for (possibly) mutating data
476 # so need a different copy for each
477 my $data_copy = $data;
478 if (defined $options->{substr}) {
479 my ($start, $length) = @{$options->{substr}};
480 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
483 # Add data to values array for callbacks processing
484 my $values = [$data_copy];
486 # Value callbacks takes subfield data (or values from previous
487 # callbacks) as argument, and returns a possibly different list of values.
488 # Note that the returned list may also be empty.
489 if (defined $options->{value_callbacks}) {
490 foreach my $callback (@{$options->{value_callbacks}}) {
491 # Pass each value to current callback which returns a list
492 # (scalar is fine too) resulting either in a list or
493 # a list of lists that will be flattened by perl.
494 # The next callback will receive the possibly expanded list of values.
495 $values = [ map { $callback->($_) } @{$values} ];
499 # Skip mapping if all values has been removed
500 next unless @{$values};
502 if (defined $options->{property}) {
503 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
505 if (defined $options->{nonfiling_characters_indicator}) {
506 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
507 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
508 # Nonfiling chars does not make sense for multiple values
509 # Only apply on first element
510 $values->[0] = substr $values->[0], $nonfiling_chars;
513 $values = [ grep(!/^$/, @{$values}) ];
515 $record_document->{$target} //= [];
516 push @{$record_document->{$target}}, @{$values};
520 =head2 marc_records_to_documents($marc_records)
522 my $record_documents = $self->marc_records_to_documents($marc_records);
524 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
526 Returns array of hash references, representing Elasticsearch documents,
527 acceptable as body payload in C<Search::Elasticsearch> requests.
531 =item C<$marc_documents>
533 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
539 sub marc_records_to_documents {
540 my ($self, $records) = @_;
541 my $rules = $self->_get_marc_mapping_rules();
542 my $control_fields_rules = $rules->{control_fields};
543 my $data_fields_rules = $rules->{data_fields};
544 my $marcflavour = lc C4::Context->preference('marcflavour');
545 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
547 my @record_documents;
549 my %auth_match_headings;
550 if( $self->index eq 'authorities' ){
551 my @auth_types = Koha::Authority::Types->search->as_list;
552 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
555 foreach my $record (@{$records}) {
556 my $record_document = {};
558 if ( $self->index eq 'authorities' ){
559 my $authtypecode = GuessAuthTypeCode( $record );
561 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
562 my $field = $record->field( $auth_match_headings{ $authtypecode } );
563 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
564 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
567 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
571 my $mappings = $rules->{leader};
573 $self->_process_mappings($mappings, $record->leader(), $record_document, {
575 data_source => 'leader'
579 foreach my $field ($record->fields()) {
580 if ($field->is_control_field()) {
581 my $mappings = $control_fields_rules->{$field->tag()};
583 $self->_process_mappings($mappings, $field->data(), $record_document, {
585 data_source => 'control_field',
592 my $tag = $field->tag();
593 # Handle alternate scripts in MARC 21
595 if ($marcflavour eq 'marc21' && $tag eq '880') {
596 my $sub6 = $field->subfield('6');
597 if ($sub6 =~ /^(...)-\d+/) {
603 my $data_field_rules = $data_fields_rules->{$tag};
604 if ($data_field_rules) {
605 my $subfields_mappings = $data_field_rules->{subfields};
606 my $wildcard_mappings = $subfields_mappings->{'*'};
607 foreach my $subfield ($field->subfields()) {
608 my ($code, $data) = @{$subfield};
609 my $mappings = $subfields_mappings->{$code} // [];
610 if ($wildcard_mappings) {
611 $mappings = [@{$mappings}, @{$wildcard_mappings}];
614 $self->_process_mappings($mappings, $data, $record_document, {
615 altscript => $altscript,
616 data_source => 'subfield',
624 my $subfields_join_mappings = $data_field_rules->{subfields_join};
625 if ($subfields_join_mappings) {
626 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
627 my $data_field = $field->clone; #copy field to preserve for alt scripts
628 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
629 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
631 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
632 altscript => $altscript,
633 data_source => 'subfields_group',
634 codes => $subfields_group,
645 if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
646 foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
647 my $data_field_rules = $data_fields_rules->{$field->tag()};
648 if ($data_field_rules) {
649 my $subfields_mappings = $data_field_rules->{subfields};
650 my $wildcard_mappings = $subfields_mappings->{'*'};
651 foreach my $subfield ($field->subfields()) {
652 my ($code, $data) = @{$subfield};
654 push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
655 push @mappings, @$wildcard_mappings if $wildcard_mappings;
656 # Do not include "see from" into these kind of fields
657 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
659 $self->_process_mappings(\@mappings, $data, $record_document, {
660 data_source => 'subfield',
668 my $subfields_join_mappings = $data_field_rules->{subfields_join};
669 if ($subfields_join_mappings) {
670 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
671 my $data_field = $field->clone;
672 # remove empty subfields, otherwise they are printed as a space
673 $data_field->delete_subfield(match => qr/^$/);
674 my $data = $data_field->as_string( $subfields_group );
676 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
677 # Do not include "see from" into these kind of fields
678 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
679 $self->_process_mappings(\@mappings, $data, $record_document, {
680 data_source => 'subfields_group',
681 codes => $subfields_group,
692 foreach my $field (keys %{$rules->{defaults}}) {
693 unless (defined $record_document->{$field}) {
694 $record_document->{$field} = $rules->{defaults}->{$field};
697 foreach my $field (@{$rules->{sum}}) {
698 if (defined $record_document->{$field}) {
699 # TODO: validate numeric? filter?
700 # TODO: Or should only accept fields without nested values?
701 # TODO: Quick and dirty, improve if needed
702 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
705 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
706 foreach my $field (@{$rules->{isbn}}) {
707 if (defined $record_document->{$field}) {
709 foreach my $input_isbn (@{$record_document->{$field}}) {
710 my $isbn = Business::ISBN->new($input_isbn);
711 if (defined $isbn && $isbn->is_valid) {
712 my $isbn13 = $isbn->as_isbn13->as_string;
713 push @isbns, $isbn13;
715 push @isbns, $isbn13;
717 my $isbn10 = $isbn->as_isbn10;
719 $isbn10 = $isbn10->as_string;
720 push @isbns, $isbn10;
722 push @isbns, $isbn10;
725 push @isbns, $input_isbn;
728 $record_document->{$field} = \@isbns;
732 # Remove duplicate values and collapse sort fields
733 foreach my $field (keys %{$record_document}) {
734 if (ref($record_document->{$field}) eq 'ARRAY') {
735 @{$record_document->{$field}} = do {
737 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
739 if ($field =~ /__sort$/) {
740 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
741 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
746 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
747 $record->encoding('UTF-8');
749 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
750 $record_document->{'marc_format'} = 'ARRAY';
754 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
755 local $SIG{__WARN__} = sub {
756 push @warnings, $_[0];
758 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
761 # Suppress warnings if record length exceeded
762 unless (substr($record->leader(), 0, 5) eq '99999') {
763 foreach my $warning (@warnings) {
767 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
768 $record_document->{'marc_format'} = 'MARCXML';
771 $record_document->{'marc_format'} = 'base64ISO2709';
775 # Check if there is at least one available item
776 if ($self->index eq $BIBLIOS_INDEX) {
777 my ($tag, $code) = C4::Biblio::GetMarcFromKohaField('biblio.biblionumber');
778 my $field = $record->field($tag);
780 my $biblionumber = $field->is_control_field ? $field->data : $field->subfield($code);
781 my $avail_items = Koha::Items->search({
782 biblionumber => $biblionumber,
790 $record_document->{available} = $avail_items ? \1 : \0;
794 push @record_documents, $record_document;
796 return \@record_documents;
799 =head2 _marc_to_array($record)
801 my @fields = _marc_to_array($record)
803 Convert a MARC::Record to an array modeled after MARC-in-JSON
804 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
810 A MARC::Record object
817 my ($self, $record) = @_;
820 leader => $record->leader(),
823 for my $field ($record->fields()) {
824 my $tag = $field->tag();
825 if ($field->is_control_field()) {
826 push @{$data->{fields}}, {$tag => $field->data()};
829 foreach my $subfield ($field->subfields()) {
830 my ($code, $contents) = @{$subfield};
831 push @{$subfields}, {$code => $contents};
833 push @{$data->{fields}}, {
835 ind1 => $field->indicator(1),
836 ind2 => $field->indicator(2),
837 subfields => $subfields
845 =head2 _array_to_marc($data)
847 my $record = _array_to_marc($data)
849 Convert an array modeled after MARC-in-JSON to a MARC::Record
855 An array modeled after MARC-in-JSON
856 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
863 my ($self, $data) = @_;
865 my $record = MARC::Record->new();
867 $record->leader($data->{leader});
868 for my $field (@{$data->{fields}}) {
869 my $tag = (keys %{$field})[0];
870 $field = $field->{$tag};
872 if (ref($field) eq 'HASH') {
874 foreach my $subfield (@{$field->{subfields}}) {
875 my $code = (keys %{$subfield})[0];
876 push @subfields, $code;
877 push @subfields, $subfield->{$code};
879 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
881 $marc_field = MARC::Field->new($tag, $field)
883 $record->append_fields($marc_field);
889 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
891 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
893 Get mappings, an internal data structure later used by
894 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
895 data for a MARC mapping.
897 The returned C<$mappings> is not to to be confused with mappings provided by
898 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
899 provided by C<_foreach_mapping> and expands it to this internal data structure.
900 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
901 is then applied to each MARC target (leader, control field data, subfield or
902 joined subfields) and integrated into the mapping rules data structure used in
903 C<marc_records_to_documents> to transform MARC records into Elasticsearch
910 Boolean indicating whether to create a facet field for this mapping.
912 =item C<$suggestible>
914 Boolean indicating whether to create a suggestion field for this mapping.
918 Boolean indicating whether to create a sort field for this mapping.
922 Boolean indicating whether to create a search field for this mapping.
924 =item C<$target_name>
926 Elasticsearch document target field name.
928 =item C<$target_type>
930 Elasticsearch document target field type.
934 An optional range as a string in the format "<START>-<END>" or "<START>",
935 where "<START>" and "<END>" are integers specifying a range that will be used
936 for extracting a substring from MARC data as Elasticsearch field target value.
938 The first character position is "0", and the range is inclusive,
939 so "0-2" means the first three characters of MARC data.
941 If only "<START>" is provided only one character at position "<START>" will
948 sub _field_mappings {
949 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
950 my %mapping_defaults = ();
953 my $substr_args = undef;
954 if (defined $range) {
955 # TODO: use value_callback instead?
956 my ($start, $end) = map(int, split /-/, $range, 2);
957 $substr_args = [$start];
958 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
960 my $default_options = {};
962 $default_options->{substr} = $substr_args;
965 # TODO: Should probably have per type value callback/hook
966 # but hard code for now
967 if ($target_type eq 'boolean') {
968 $default_options->{value_callbacks} //= [];
969 push @{$default_options->{value_callbacks}}, sub {
971 # Trim whitespace at both ends
972 $value =~ s/^\s+|\s+$//g;
973 return $value ? 'true' : 'false';
976 elsif ($target_type eq 'year') {
977 $default_options->{value_callbacks} //= [];
978 # Only accept years containing digits and "u"
979 push @{$default_options->{value_callbacks}}, sub {
981 # Replace "u" with "0" for sorting
982 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
987 my $mapping = [$target_name, $default_options];
988 push @mappings, $mapping;
992 push @suffixes, 'facet' if $facet;
993 push @suffixes, 'suggestion' if $suggestible;
994 push @suffixes, 'sort' if !defined $sort || $sort;
996 foreach my $suffix (@suffixes) {
997 my $mapping = ["${target_name}__$suffix"];
998 # TODO: Hack, fix later in less hideous manner
999 if ($suffix eq 'suggestion') {
1000 push @{$mapping}, {%{$default_options}, property => 'input'};
1003 # Important! Make shallow clone, or we end up with the same hashref
1004 # shared by all mappings
1005 push @{$mapping}, {%{$default_options}};
1007 push @mappings, $mapping;
1012 =head2 _get_marc_mapping_rules
1014 my $mapping_rules = $self->_get_marc_mapping_rules()
1016 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1018 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1019 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1020 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1022 We can then iterate through all MARC fields for each record and apply all relevant
1023 rules once per fields instead of retreiving fields multiple times for each mapping rule
1024 which is terribly slow.
1028 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1029 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1030 # memory cache which it is currently not. The performance gain of caching
1031 # would probably be marginal, but to do this could be a further improvement.
1033 sub _get_marc_mapping_rules {
1035 my $marcflavour = lc C4::Context->preference('marcflavour');
1036 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1037 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1040 'control_fields' => {},
1041 'data_fields' => {},
1047 $self->_foreach_mapping(sub {
1048 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
1049 return if $marc_type ne $marcflavour;
1051 if ($type eq 'sum') {
1052 push @{$rules->{sum}}, $name;
1053 push @{$rules->{sum}}, $name."__sort" if $sort;
1055 elsif ($type eq 'isbn') {
1056 push @{$rules->{isbn}}, $name;
1058 elsif ($type eq 'boolean') {
1059 # boolean gets special handling, if value doesn't exist for a field,
1060 # it is set to false
1061 $rules->{defaults}->{$name} = 'false';
1064 if ($marc_field =~ $field_spec_regexp) {
1068 my @subfield_groups;
1069 # Parse and separate subfields form subfield groups
1071 my $subfield_group = '';
1074 foreach my $token (split //, $2) {
1075 if ($token eq "(") {
1077 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1078 "Unmatched opening parenthesis for $marc_field"
1085 elsif ($token eq ")") {
1087 if ($subfield_group) {
1088 push @subfield_groups, $subfield_group;
1089 $subfield_group = '';
1094 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1095 "Unmatched closing parenthesis for $marc_field"
1099 elsif ($open_group) {
1100 $subfield_group .= $token;
1103 push @subfields, $token;
1108 push @subfields, '*';
1111 my $range = defined $3 ? $3 : undef;
1112 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1113 if ($field_tag < 10) {
1114 $rules->{control_fields}->{$field_tag} //= [];
1115 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1118 $rules->{data_fields}->{$field_tag} //= {};
1119 foreach my $subfield (@subfields) {
1120 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1121 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1123 foreach my $subfield_group (@subfield_groups) {
1124 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1125 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1129 elsif ($marc_field =~ $leader_regexp) {
1130 my $range = defined $1 ? $1 : undef;
1131 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1132 push @{$rules->{leader}}, @mappings;
1135 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1136 "Invalid MARC field expression: $marc_field"
1141 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1142 if ($marcflavour eq 'marc21') {
1143 # Nonfiling characters processing for sort fields
1145 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1146 # Format is: nonfiling characters indicator => field names list
1148 1 => [130, 630, 730, 740],
1149 2 => [222, 240, 242, 243, 245, 440, 830]
1152 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1155 2 => [130, 430, 530]
1158 foreach my $indicator (keys %title_fields) {
1159 foreach my $field_tag (@{$title_fields{$indicator}}) {
1160 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1161 foreach my $mapping (@{$mappings}) {
1162 if ($mapping->[0] =~ /__sort$/) {
1163 # Mark this as to be processed for nonfiling characters indicator
1164 # later on in _process_mappings
1165 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1175 =head2 _foreach_mapping
1177 $self->_foreach_mapping(
1179 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1182 return unless $marc_type eq 'marc21';
1183 print "Data comes from: " . $marc_field . "\n";
1187 This allows you to apply a function to each entry in the elasticsearch mappings
1188 table, in order to build the mappings for whatever is needed.
1190 In the provided function, the files are:
1196 The field name for elasticsearch (corresponds to the 'mapping' column in the
1201 The type for this value, e.g. 'string'.
1205 True if this value should be facetised. This only really makes sense if the
1206 field is understood by the facet processing code anyway.
1210 True if this is a field that a) needs special sort handling, and b) if it
1211 should be sorted on. False if a) but not b). Undef if not a). This allows,
1212 for example, author to be sorted on but not everything marked with "author"
1213 to be included in that sort.
1217 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1220 =item C<$marc_field>
1222 A string that describes the MARC field that contains the data to extract.
1228 sub _foreach_mapping {
1229 my ( $self, $sub ) = @_;
1231 # TODO use a caching framework here
1232 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1234 'search_marc_map.index_name' => $self->index,
1236 { join => { search_marc_to_fields => 'search_marc_map' },
1238 'search_marc_to_fields.facet',
1239 'search_marc_to_fields.suggestible',
1240 'search_marc_to_fields.sort',
1241 'search_marc_to_fields.search',
1242 'search_marc_map.marc_type',
1243 'search_marc_map.marc_field',
1256 while ( my $search_field = $search_fields->next ) {
1258 # Force lower case on indexed field names for case insensitive
1259 # field name searches
1260 lc($search_field->name),
1261 $search_field->type,
1262 $search_field->get_column('facet'),
1263 $search_field->get_column('suggestible'),
1264 $search_field->get_column('sort'),
1265 $search_field->get_column('search'),
1266 $search_field->get_column('marc_type'),
1267 $search_field->get_column('marc_field'),
1272 =head2 process_error
1274 die process_error($@);
1276 This parses an Elasticsearch error message and produces a human-readable
1277 result from it. This result is probably missing all the useful information
1278 that you might want in diagnosing an issue, so the warning is also logged.
1280 Note that currently the resulting message is not internationalised. This
1281 will happen eventually by some method or other.
1286 my ($self, $msg) = @_;
1288 warn $msg; # simple logging
1290 # This is super-primitive
1291 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1293 return "Unable to perform your search. Please try again.\n";
1296 =head2 _read_configuration
1298 my $conf = _read_configuration();
1300 Reads the I<configuration file> and returns a hash structure with the
1301 configuration information. It raises an exception if mandatory entries
1304 The hashref structure has the following form:
1307 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1308 'index_name' => 'koha_instance',
1311 This is configured by the following in the C<config> block in koha-conf.xml:
1314 <server>127.0.0.1:9200</server>
1315 <server>anotherserver:9200</server>
1316 <index_name>koha_instance</index_name>
1321 sub _read_configuration {
1325 my $conf = C4::Context->config('elasticsearch');
1326 unless ( defined $conf ) {
1327 Koha::Exceptions::Config::MissingEntry->throw(
1328 "Missing <elasticsearch> entry in koha-conf.xml"
1332 unless ( exists $conf->{server} ) {
1333 Koha::Exceptions::Config::MissingEntry->throw(
1334 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1338 unless ( exists $conf->{index_name} ) {
1339 Koha::Exceptions::Config::MissingEntry->throw(
1340 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1344 while ( my ( $var, $val ) = each %$conf ) {
1345 if ( $var eq 'server' ) {
1346 if ( ref($val) eq 'ARRAY' ) {
1347 $configuration->{nodes} = $val;
1350 $configuration->{nodes} = [$val];
1353 $configuration->{$var} = $val;
1357 $configuration->{cxn_pool} //= 'Static';
1359 return $configuration;
1362 =head2 get_facetable_fields
1364 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1366 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1370 sub get_facetable_fields {
1373 # These should correspond to the ES field names, as opposed to the CCL
1374 # things that zebra uses.
1375 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1376 my @faceted_fields = Koha::SearchFields->search(
1377 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1379 my @not_faceted_fields = Koha::SearchFields->search(
1380 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1382 # This could certainly be improved
1383 return ( @faceted_fields, @not_faceted_fields );
1386 =head2 clear_search_fields_cache
1388 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1390 Clear cached values for ES search fields
1394 sub clear_search_fields_cache {
1396 my $cache = Koha::Caches->get_instance();
1397 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1398 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1399 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1400 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1412 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1414 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1416 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>