1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
34 use Carp qw( carp croak );
35 use Clone qw( clone );
37 use Readonly qw( Readonly );
38 use Search::Elasticsearch;
39 use Try::Tiny qw( catch try );
42 use List::Util qw( sum0 );
44 use MIME::Base64 qw( encode_base64 );
45 use Encode qw( encode );
47 use Scalar::Util qw( looks_like_number );
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
66 The name of the index to use, generally 'biblios' or 'authorities'.
70 The Elasticsearch index name with Koha instance prefix.
83 # Check for a valid index
84 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85 my $config = _read_configuration();
86 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88 my $self = $class->SUPER::new(@_);
92 =head2 get_elasticsearch
94 my $elasticsearch_client = $self->get_elasticsearch();
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
101 sub get_elasticsearch {
103 unless (defined $self->{elasticsearch}) {
104 $self->{elasticsearch} = Search::Elasticsearch->new(
105 $self->get_elasticsearch_params()
108 return $self->{elasticsearch};
111 =head2 get_elasticsearch_params
113 my $params = $self->get_elasticsearch_params();
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
119 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120 'index_name' => 'koha_instance_index',
123 This is configured by the following in the C<config> block in koha-conf.xml:
126 <server>127.0.0.1:9200</server>
127 <server>anotherserver:9200</server>
128 <index_name>koha_instance</index_name>
133 sub get_elasticsearch_params {
138 $conf = _read_configuration();
140 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
148 =head2 get_elasticsearch_settings
150 my $settings = $self->get_elasticsearch_settings();
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
155 A hashref containing the settings is returned.
159 sub get_elasticsearch_settings {
162 # Use state to speed up repeated calls
163 state $settings = undef;
164 if (!defined $settings) {
165 my $config_file = C4::Context->config('elasticsearch_index_config');
166 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167 $settings = YAML::XS::LoadFile( $config_file );
173 =head2 get_elasticsearch_mappings
175 my $mappings = $self->get_elasticsearch_mappings();
177 This provides the mappings that get passed to Elasticsearch when an index is
182 sub get_elasticsearch_mappings {
185 # Use state to speed up repeated calls
189 if (!defined $all_mappings{$self->index}) {
190 $sort_fields{$self->index} = {};
191 # Clone the general mapping to break ties with the original hash
192 my $mappings = clone(_get_elasticsearch_field_config('general', ''));
193 my $marcflavour = lc C4::Context->preference('marcflavour');
194 $self->_foreach_mapping(
196 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
197 return if $marc_type ne $marcflavour;
198 # TODO if this gets any sort of complexity to it, it should
199 # be broken out into its own function.
201 # TODO be aware of date formats, but this requires pre-parsing
202 # as ES will simply reject anything with an invalid date.
203 my $es_type = 'text';
204 if ($type eq 'boolean') {
205 $es_type = 'boolean';
206 } elsif ($type eq 'number' || $type eq 'sum') {
207 $es_type = 'integer';
208 } elsif ($type eq 'isbn' || $type eq 'stdno') {
210 } elsif ($type eq 'year') {
212 } elsif ($type eq 'callnumber') {
213 $es_type = 'cn_sort';
217 $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221 $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224 $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236 $all_mappings{$self->index} = $mappings;
238 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 raw_elasticsearch_mappings
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
251 sub raw_elasticsearch_mappings {
252 my ( $marc_type ) = @_;
254 my $schema = Koha::Database->new()->schema();
256 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
259 while ( my $search_field = $search_fields->next ) {
261 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262 { search_field_id => $search_field->id },
264 join => 'search_marc_map',
265 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
269 while ( my $marc_to_field = $marc_to_fields->next ) {
271 my $marc_map = $marc_to_field->search_marc_map;
273 next if $marc_type && $marc_map->marc_type ne $marc_type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
283 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
285 facet => $marc_to_field->facet || '',
286 marc_type => $marc_map->marc_type,
287 marc_field => $marc_map->marc_field,
288 sort => $marc_to_field->sort,
289 suggestible => $marc_to_field->suggestible || ''
298 =head2 _get_elasticsearch_field_config
300 Get the Elasticsearch field config for the given purpose and data type.
302 $mapping = _get_elasticsearch_field_config('search', 'text');
306 sub _get_elasticsearch_field_config {
308 my ( $purpose, $type ) = @_;
310 # Use state to speed up repeated calls
311 state $settings = undef;
312 if (!defined $settings) {
313 my $config_file = C4::Context->config('elasticsearch_field_config');
314 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315 local $YAML::XS::Boolean = 'JSON::PP';
316 $settings = YAML::XS::LoadFile( $config_file );
319 if (!defined $settings->{$purpose}) {
320 die "Field purpose $purpose not defined in field config";
323 return $settings->{$purpose};
325 if (defined $settings->{$purpose}{$type}) {
326 return $settings->{$purpose}{$type};
328 if (defined $settings->{$purpose}{'default'}) {
329 return $settings->{$purpose}{'default'};
334 =head2 _load_elasticsearch_mappings
336 Load Elasticsearch mappings in the format of mappings.yaml.
338 $indexes = _load_elasticsearch_mappings();
342 sub _load_elasticsearch_mappings {
343 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345 return YAML::XS::LoadFile( $mappings_yaml );
348 sub reset_elasticsearch_mappings {
350 my $indexes = $self->_load_elasticsearch_mappings();
352 Koha::SearchMarcMaps->delete;
353 Koha::SearchFields->delete;
355 while ( my ( $index_name, $fields ) = each %$indexes ) {
356 while ( my ( $field_name, $data ) = each %$fields ) {
358 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
361 $sf_params{staff_client} //= 1;
362 $sf_params{opac} //= 1;
364 $sf_params{name} = $field_name;
366 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
368 my $mappings = $data->{mappings};
369 for my $mapping ( @$mappings ) {
370 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371 index_name => $index_name,
372 marc_type => $mapping->{marc_type},
373 marc_field => $mapping->{marc_field}
375 $search_field->add_to_search_marc_maps($marc_field, {
376 facet => $mapping->{facet} || 0,
377 suggestible => $mapping->{suggestible} || 0,
378 sort => $mapping->{sort} // 1,
379 search => $mapping->{search} // 1
385 $self->clear_search_fields_cache();
387 # FIXME return the mappings?
390 # This overrides the accessor provided by Class::Accessor so that if
391 # sort_fields isn't set, then it'll generate it.
395 $self->_sort_fields_accessor(@_);
398 my $val = $self->_sort_fields_accessor();
401 # This will populate the accessor as a side effect
402 $self->get_elasticsearch_mappings();
403 return $self->_sort_fields_accessor();
406 =head2 _process_mappings($mappings, $data, $record_document, $meta)
408 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
410 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
411 Since we group all mappings by MARC field targets C<$mappings> will contain
412 all targets for C<$data> and thus we need to fetch the MARC field only once.
413 C<$mappings> will be applied to C<$record_document> and new field values added.
414 The method has no return value.
420 Arrayref of mappings containing arrayrefs in the format
421 [C<$target>, C<$options>] where C<$target> is the name of the target field and
422 C<$options> is a hashref containing processing directives for this particular
427 The source data from a MARC record field.
429 =item C<$record_document>
431 Hashref representing the Elasticsearch document on which mappings should be
436 A hashref containing metadata useful for enforcing per mapping rules. For
437 example for providing extra context for mapping options, or treating mapping
438 targets differently depending on type (sort, search, facet etc). Combining
439 this metadata with the mapping options and metadata allows us to mutate the
440 data per mapping, or even replace it with other data retrieved from the
443 Current properties are:
445 C<altscript>: A boolean value indicating whether an alternate script presentation is being
448 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
449 'subfield' or 'subfields_group'.
451 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
453 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
454 is 'subfields_group'.
456 C<field>: The original C<MARC::Record> object.
462 sub _process_mappings {
463 my ($_self, $mappings, $data, $record_document, $meta) = @_;
464 foreach my $mapping (@{$mappings}) {
465 my ($target, $options) = @{$mapping};
467 # Don't process sort fields for alternate scripts
468 my $sort = $target =~ /__sort$/;
469 if ($sort && $meta->{altscript}) {
473 # Copy (scalar) data since can have multiple targets
474 # with differing options for (possibly) mutating data
475 # so need a different copy for each
476 my $data_copy = $data;
477 if (defined $options->{substr}) {
478 my ($start, $length) = @{$options->{substr}};
479 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
482 # Add data to values array for callbacks processing
483 my $values = [$data_copy];
485 # Value callbacks takes subfield data (or values from previous
486 # callbacks) as argument, and returns a possibly different list of values.
487 # Note that the returned list may also be empty.
488 if (defined $options->{value_callbacks}) {
489 foreach my $callback (@{$options->{value_callbacks}}) {
490 # Pass each value to current callback which returns a list
491 # (scalar is fine too) resulting either in a list or
492 # a list of lists that will be flattened by perl.
493 # The next callback will receive the possibly expanded list of values.
494 $values = [ map { $callback->($_) } @{$values} ];
498 # Skip mapping if all values has been removed
499 next unless @{$values};
501 if (defined $options->{property}) {
502 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
504 if (defined $options->{nonfiling_characters_indicator}) {
505 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
506 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
507 # Nonfiling chars does not make sense for multiple values
508 # Only apply on first element
509 $values->[0] = substr $values->[0], $nonfiling_chars;
512 $values = [ grep(!/^$/, @{$values}) ];
514 $record_document->{$target} //= [];
515 push @{$record_document->{$target}}, @{$values};
519 =head2 marc_records_to_documents($marc_records)
521 my $record_documents = $self->marc_records_to_documents($marc_records);
523 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
525 Returns array of hash references, representing Elasticsearch documents,
526 acceptable as body payload in C<Search::Elasticsearch> requests.
530 =item C<$marc_documents>
532 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
538 sub marc_records_to_documents {
539 my ($self, $records) = @_;
540 my $rules = $self->_get_marc_mapping_rules();
541 my $control_fields_rules = $rules->{control_fields};
542 my $data_fields_rules = $rules->{data_fields};
543 my $marcflavour = lc C4::Context->preference('marcflavour');
544 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
546 my @record_documents;
548 my %auth_match_headings;
549 if( $self->index eq 'authorities' ){
550 my @auth_types = Koha::Authority::Types->search->as_list;
551 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
554 foreach my $record (@{$records}) {
555 my $record_document = {};
557 if ( $self->index eq 'authorities' ){
558 my $authtypecode = GuessAuthTypeCode( $record );
560 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
561 my $field = $record->field( $auth_match_headings{ $authtypecode } );
562 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
563 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
566 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
570 my $mappings = $rules->{leader};
572 $self->_process_mappings($mappings, $record->leader(), $record_document, {
574 data_source => 'leader'
578 foreach my $field ($record->fields()) {
579 if ($field->is_control_field()) {
580 my $mappings = $control_fields_rules->{$field->tag()};
582 $self->_process_mappings($mappings, $field->data(), $record_document, {
584 data_source => 'control_field',
591 my $tag = $field->tag();
592 # Handle alternate scripts in MARC 21
594 if ($marcflavour eq 'marc21' && $tag eq '880') {
595 my $sub6 = $field->subfield('6');
596 if ($sub6 =~ /^(...)-\d+/) {
602 my $data_field_rules = $data_fields_rules->{$tag};
603 if ($data_field_rules) {
604 my $subfields_mappings = $data_field_rules->{subfields};
605 my $wildcard_mappings = $subfields_mappings->{'*'};
606 foreach my $subfield ($field->subfields()) {
607 my ($code, $data) = @{$subfield};
608 my $mappings = $subfields_mappings->{$code} // [];
609 if ($wildcard_mappings) {
610 $mappings = [@{$mappings}, @{$wildcard_mappings}];
613 $self->_process_mappings($mappings, $data, $record_document, {
614 altscript => $altscript,
615 data_source => 'subfield',
623 my $subfields_join_mappings = $data_field_rules->{subfields_join};
624 if ($subfields_join_mappings) {
625 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
626 my $data_field = $field->clone; #copy field to preserve for alt scripts
627 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
628 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
630 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
631 altscript => $altscript,
632 data_source => 'subfields_group',
633 codes => $subfields_group,
644 if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
645 foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
646 my $data_field_rules = $data_fields_rules->{$field->tag()};
647 if ($data_field_rules) {
648 my $subfields_mappings = $data_field_rules->{subfields};
649 my $wildcard_mappings = $subfields_mappings->{'*'};
650 foreach my $subfield ($field->subfields()) {
651 my ($code, $data) = @{$subfield};
653 push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
654 push @mappings, @$wildcard_mappings if $wildcard_mappings;
655 # Do not include "see from" into these kind of fields
656 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
658 $self->_process_mappings(\@mappings, $data, $record_document, {
659 data_source => 'subfield',
667 my $subfields_join_mappings = $data_field_rules->{subfields_join};
668 if ($subfields_join_mappings) {
669 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
670 my $data_field = $field->clone;
671 # remove empty subfields, otherwise they are printed as a space
672 $data_field->delete_subfield(match => qr/^$/);
673 my $data = $data_field->as_string( $subfields_group );
675 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
676 # Do not include "see from" into these kind of fields
677 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
678 $self->_process_mappings(\@mappings, $data, $record_document, {
679 data_source => 'subfields_group',
680 codes => $subfields_group,
691 foreach my $field (keys %{$rules->{defaults}}) {
692 unless (defined $record_document->{$field}) {
693 $record_document->{$field} = $rules->{defaults}->{$field};
696 foreach my $field (@{$rules->{sum}}) {
697 if (defined $record_document->{$field}) {
698 # TODO: validate numeric? filter?
699 # TODO: Or should only accept fields without nested values?
700 # TODO: Quick and dirty, improve if needed
701 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
704 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
705 foreach my $field (@{$rules->{isbn}}) {
706 if (defined $record_document->{$field}) {
708 foreach my $input_isbn (@{$record_document->{$field}}) {
709 my $isbn = Business::ISBN->new($input_isbn);
710 if (defined $isbn && $isbn->is_valid) {
711 my $isbn13 = $isbn->as_isbn13->as_string;
712 push @isbns, $isbn13;
714 push @isbns, $isbn13;
716 my $isbn10 = $isbn->as_isbn10;
718 $isbn10 = $isbn10->as_string;
719 push @isbns, $isbn10;
721 push @isbns, $isbn10;
724 push @isbns, $input_isbn;
727 $record_document->{$field} = \@isbns;
731 # Remove duplicate values and collapse sort fields
732 foreach my $field (keys %{$record_document}) {
733 if (ref($record_document->{$field}) eq 'ARRAY') {
734 @{$record_document->{$field}} = do {
736 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
738 if ($field =~ /__sort$/) {
739 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
740 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
745 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
746 $record->encoding('UTF-8');
748 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
749 $record_document->{'marc_format'} = 'ARRAY';
753 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
754 local $SIG{__WARN__} = sub {
755 push @warnings, $_[0];
757 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
760 # Suppress warnings if record length exceeded
761 unless (substr($record->leader(), 0, 5) eq '99999') {
762 foreach my $warning (@warnings) {
766 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
767 $record_document->{'marc_format'} = 'MARCXML';
770 $record_document->{'marc_format'} = 'base64ISO2709';
774 # Check if there is at least one available item
775 if ($self->index eq $BIBLIOS_INDEX) {
776 my $biblio = Koha::Biblios->find($record->field('001')->data);
777 my $items = $biblio->items;
779 while (my $item = $items->next) {
780 next if $item->onloan;
781 next if $item->notforloan;
782 next if $item->withdrawn;
783 next if $item->itemlost;
784 next if $item->damaged;
790 $record_document->{available} = $available ? \1 : \0;
793 push @record_documents, $record_document;
795 return \@record_documents;
798 =head2 _marc_to_array($record)
800 my @fields = _marc_to_array($record)
802 Convert a MARC::Record to an array modeled after MARC-in-JSON
803 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
809 A MARC::Record object
816 my ($self, $record) = @_;
819 leader => $record->leader(),
822 for my $field ($record->fields()) {
823 my $tag = $field->tag();
824 if ($field->is_control_field()) {
825 push @{$data->{fields}}, {$tag => $field->data()};
828 foreach my $subfield ($field->subfields()) {
829 my ($code, $contents) = @{$subfield};
830 push @{$subfields}, {$code => $contents};
832 push @{$data->{fields}}, {
834 ind1 => $field->indicator(1),
835 ind2 => $field->indicator(2),
836 subfields => $subfields
844 =head2 _array_to_marc($data)
846 my $record = _array_to_marc($data)
848 Convert an array modeled after MARC-in-JSON to a MARC::Record
854 An array modeled after MARC-in-JSON
855 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
862 my ($self, $data) = @_;
864 my $record = MARC::Record->new();
866 $record->leader($data->{leader});
867 for my $field (@{$data->{fields}}) {
868 my $tag = (keys %{$field})[0];
869 $field = $field->{$tag};
871 if (ref($field) eq 'HASH') {
873 foreach my $subfield (@{$field->{subfields}}) {
874 my $code = (keys %{$subfield})[0];
875 push @subfields, $code;
876 push @subfields, $subfield->{$code};
878 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
880 $marc_field = MARC::Field->new($tag, $field)
882 $record->append_fields($marc_field);
888 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
890 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
892 Get mappings, an internal data structure later used by
893 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
894 data for a MARC mapping.
896 The returned C<$mappings> is not to to be confused with mappings provided by
897 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
898 provided by C<_foreach_mapping> and expands it to this internal data structure.
899 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
900 is then applied to each MARC target (leader, control field data, subfield or
901 joined subfields) and integrated into the mapping rules data structure used in
902 C<marc_records_to_documents> to transform MARC records into Elasticsearch
909 Boolean indicating whether to create a facet field for this mapping.
911 =item C<$suggestible>
913 Boolean indicating whether to create a suggestion field for this mapping.
917 Boolean indicating whether to create a sort field for this mapping.
921 Boolean indicating whether to create a search field for this mapping.
923 =item C<$target_name>
925 Elasticsearch document target field name.
927 =item C<$target_type>
929 Elasticsearch document target field type.
933 An optional range as a string in the format "<START>-<END>" or "<START>",
934 where "<START>" and "<END>" are integers specifying a range that will be used
935 for extracting a substring from MARC data as Elasticsearch field target value.
937 The first character position is "0", and the range is inclusive,
938 so "0-2" means the first three characters of MARC data.
940 If only "<START>" is provided only one character at position "<START>" will
947 sub _field_mappings {
948 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
949 my %mapping_defaults = ();
952 my $substr_args = undef;
953 if (defined $range) {
954 # TODO: use value_callback instead?
955 my ($start, $end) = map(int, split /-/, $range, 2);
956 $substr_args = [$start];
957 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
959 my $default_options = {};
961 $default_options->{substr} = $substr_args;
964 # TODO: Should probably have per type value callback/hook
965 # but hard code for now
966 if ($target_type eq 'boolean') {
967 $default_options->{value_callbacks} //= [];
968 push @{$default_options->{value_callbacks}}, sub {
970 # Trim whitespace at both ends
971 $value =~ s/^\s+|\s+$//g;
972 return $value ? 'true' : 'false';
975 elsif ($target_type eq 'year') {
976 $default_options->{value_callbacks} //= [];
977 # Only accept years containing digits and "u"
978 push @{$default_options->{value_callbacks}}, sub {
980 # Replace "u" with "0" for sorting
981 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
986 my $mapping = [$target_name, $default_options];
987 push @mappings, $mapping;
991 push @suffixes, 'facet' if $facet;
992 push @suffixes, 'suggestion' if $suggestible;
993 push @suffixes, 'sort' if !defined $sort || $sort;
995 foreach my $suffix (@suffixes) {
996 my $mapping = ["${target_name}__$suffix"];
997 # TODO: Hack, fix later in less hideous manner
998 if ($suffix eq 'suggestion') {
999 push @{$mapping}, {%{$default_options}, property => 'input'};
1002 # Important! Make shallow clone, or we end up with the same hashref
1003 # shared by all mappings
1004 push @{$mapping}, {%{$default_options}};
1006 push @mappings, $mapping;
1011 =head2 _get_marc_mapping_rules
1013 my $mapping_rules = $self->_get_marc_mapping_rules()
1015 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1017 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1018 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1019 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1021 We can then iterate through all MARC fields for each record and apply all relevant
1022 rules once per fields instead of retreiving fields multiple times for each mapping rule
1023 which is terribly slow.
1027 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1028 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1029 # memory cache which it is currently not. The performance gain of caching
1030 # would probably be marginal, but to do this could be a further improvement.
1032 sub _get_marc_mapping_rules {
1034 my $marcflavour = lc C4::Context->preference('marcflavour');
1035 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1036 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1039 'control_fields' => {},
1040 'data_fields' => {},
1046 $self->_foreach_mapping(sub {
1047 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
1048 return if $marc_type ne $marcflavour;
1050 if ($type eq 'sum') {
1051 push @{$rules->{sum}}, $name;
1052 push @{$rules->{sum}}, $name."__sort" if $sort;
1054 elsif ($type eq 'isbn') {
1055 push @{$rules->{isbn}}, $name;
1057 elsif ($type eq 'boolean') {
1058 # boolean gets special handling, if value doesn't exist for a field,
1059 # it is set to false
1060 $rules->{defaults}->{$name} = 'false';
1063 if ($marc_field =~ $field_spec_regexp) {
1067 my @subfield_groups;
1068 # Parse and separate subfields form subfield groups
1070 my $subfield_group = '';
1073 foreach my $token (split //, $2) {
1074 if ($token eq "(") {
1076 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1077 "Unmatched opening parenthesis for $marc_field"
1084 elsif ($token eq ")") {
1086 if ($subfield_group) {
1087 push @subfield_groups, $subfield_group;
1088 $subfield_group = '';
1093 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1094 "Unmatched closing parenthesis for $marc_field"
1098 elsif ($open_group) {
1099 $subfield_group .= $token;
1102 push @subfields, $token;
1107 push @subfields, '*';
1110 my $range = defined $3 ? $3 : undef;
1111 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1112 if ($field_tag < 10) {
1113 $rules->{control_fields}->{$field_tag} //= [];
1114 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1117 $rules->{data_fields}->{$field_tag} //= {};
1118 foreach my $subfield (@subfields) {
1119 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1120 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1122 foreach my $subfield_group (@subfield_groups) {
1123 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1124 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1128 elsif ($marc_field =~ $leader_regexp) {
1129 my $range = defined $1 ? $1 : undef;
1130 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1131 push @{$rules->{leader}}, @mappings;
1134 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1135 "Invalid MARC field expression: $marc_field"
1140 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1141 if ($marcflavour eq 'marc21') {
1142 # Nonfiling characters processing for sort fields
1144 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1145 # Format is: nonfiling characters indicator => field names list
1147 1 => [130, 630, 730, 740],
1148 2 => [222, 240, 242, 243, 245, 440, 830]
1151 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1154 2 => [130, 430, 530]
1157 foreach my $indicator (keys %title_fields) {
1158 foreach my $field_tag (@{$title_fields{$indicator}}) {
1159 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1160 foreach my $mapping (@{$mappings}) {
1161 if ($mapping->[0] =~ /__sort$/) {
1162 # Mark this as to be processed for nonfiling characters indicator
1163 # later on in _process_mappings
1164 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1174 =head2 _foreach_mapping
1176 $self->_foreach_mapping(
1178 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1181 return unless $marc_type eq 'marc21';
1182 print "Data comes from: " . $marc_field . "\n";
1186 This allows you to apply a function to each entry in the elasticsearch mappings
1187 table, in order to build the mappings for whatever is needed.
1189 In the provided function, the files are:
1195 The field name for elasticsearch (corresponds to the 'mapping' column in the
1200 The type for this value, e.g. 'string'.
1204 True if this value should be facetised. This only really makes sense if the
1205 field is understood by the facet processing code anyway.
1209 True if this is a field that a) needs special sort handling, and b) if it
1210 should be sorted on. False if a) but not b). Undef if not a). This allows,
1211 for example, author to be sorted on but not everything marked with "author"
1212 to be included in that sort.
1216 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1219 =item C<$marc_field>
1221 A string that describes the MARC field that contains the data to extract.
1227 sub _foreach_mapping {
1228 my ( $self, $sub ) = @_;
1230 # TODO use a caching framework here
1231 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1233 'search_marc_map.index_name' => $self->index,
1235 { join => { search_marc_to_fields => 'search_marc_map' },
1237 'search_marc_to_fields.facet',
1238 'search_marc_to_fields.suggestible',
1239 'search_marc_to_fields.sort',
1240 'search_marc_to_fields.search',
1241 'search_marc_map.marc_type',
1242 'search_marc_map.marc_field',
1255 while ( my $search_field = $search_fields->next ) {
1257 # Force lower case on indexed field names for case insensitive
1258 # field name searches
1259 lc($search_field->name),
1260 $search_field->type,
1261 $search_field->get_column('facet'),
1262 $search_field->get_column('suggestible'),
1263 $search_field->get_column('sort'),
1264 $search_field->get_column('search'),
1265 $search_field->get_column('marc_type'),
1266 $search_field->get_column('marc_field'),
1271 =head2 process_error
1273 die process_error($@);
1275 This parses an Elasticsearch error message and produces a human-readable
1276 result from it. This result is probably missing all the useful information
1277 that you might want in diagnosing an issue, so the warning is also logged.
1279 Note that currently the resulting message is not internationalised. This
1280 will happen eventually by some method or other.
1285 my ($self, $msg) = @_;
1287 warn $msg; # simple logging
1289 # This is super-primitive
1290 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1292 return "Unable to perform your search. Please try again.\n";
1295 =head2 _read_configuration
1297 my $conf = _read_configuration();
1299 Reads the I<configuration file> and returns a hash structure with the
1300 configuration information. It raises an exception if mandatory entries
1303 The hashref structure has the following form:
1306 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1307 'index_name' => 'koha_instance',
1310 This is configured by the following in the C<config> block in koha-conf.xml:
1313 <server>127.0.0.1:9200</server>
1314 <server>anotherserver:9200</server>
1315 <index_name>koha_instance</index_name>
1320 sub _read_configuration {
1324 my $conf = C4::Context->config('elasticsearch');
1325 unless ( defined $conf ) {
1326 Koha::Exceptions::Config::MissingEntry->throw(
1327 "Missing <elasticsearch> entry in koha-conf.xml"
1331 unless ( exists $conf->{server} ) {
1332 Koha::Exceptions::Config::MissingEntry->throw(
1333 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1337 unless ( exists $conf->{index_name} ) {
1338 Koha::Exceptions::Config::MissingEntry->throw(
1339 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1343 while ( my ( $var, $val ) = each %$conf ) {
1344 if ( $var eq 'server' ) {
1345 if ( ref($val) eq 'ARRAY' ) {
1346 $configuration->{nodes} = $val;
1349 $configuration->{nodes} = [$val];
1352 $configuration->{$var} = $val;
1356 $configuration->{cxn_pool} //= 'Static';
1358 return $configuration;
1361 =head2 get_facetable_fields
1363 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1365 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1369 sub get_facetable_fields {
1372 # These should correspond to the ES field names, as opposed to the CCL
1373 # things that zebra uses.
1374 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1375 my @faceted_fields = Koha::SearchFields->search(
1376 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1378 my @not_faceted_fields = Koha::SearchFields->search(
1379 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1381 # This could certainly be improved
1382 return ( @faceted_fields, @not_faceted_fields );
1385 =head2 clear_search_fields_cache
1387 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1389 Clear cached values for ES search fields
1393 sub clear_search_fields_cache {
1395 my $cache = Koha::Caches->get_instance();
1396 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1397 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1398 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1399 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1411 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1413 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1415 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>