Bug 26903: [20.05.x] Pass record ids and records through to update_index when passed...
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
281
282             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
283                 {
284                     facet   => $marc_to_field->facet || '',
285                     marc_type => $marc_map->marc_type,
286                     marc_field => $marc_map->marc_field,
287                     sort        => $marc_to_field->sort,
288                     suggestible => $marc_to_field->suggestible || ''
289                 });
290
291         }
292     }
293
294     return $mappings;
295 }
296
297 =head2 _get_elasticsearch_field_config
298
299 Get the Elasticsearch field config for the given purpose and data type.
300
301 $mapping = _get_elasticsearch_field_config('search', 'text');
302
303 =cut
304
305 sub _get_elasticsearch_field_config {
306
307     my ( $purpose, $type ) = @_;
308
309     # Use state to speed up repeated calls
310     state $settings = undef;
311     if (!defined $settings) {
312         my $config_file = C4::Context->config('elasticsearch_field_config');
313         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314         $settings = LoadFile( $config_file );
315     }
316
317     if (!defined $settings->{$purpose}) {
318         die "Field purpose $purpose not defined in field config";
319     }
320     if ($type eq '') {
321         return $settings->{$purpose};
322     }
323     if (defined $settings->{$purpose}{$type}) {
324         return $settings->{$purpose}{$type};
325     }
326     if (defined $settings->{$purpose}{'default'}) {
327         return $settings->{$purpose}{'default'};
328     }
329     return;
330 }
331
332 =head2 _load_elasticsearch_mappings
333
334 Load Elasticsearch mappings in the format of mappings.yaml.
335
336 $indexes = _load_elasticsearch_mappings();
337
338 =cut
339
340 sub _load_elasticsearch_mappings {
341     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343     return LoadFile( $mappings_yaml );
344 }
345
346 sub reset_elasticsearch_mappings {
347     my ( $self ) = @_;
348     my $indexes = $self->_load_elasticsearch_mappings();
349
350     Koha::SearchMarcMaps->delete;
351     Koha::SearchFields->delete;
352
353     while ( my ( $index_name, $fields ) = each %$indexes ) {
354         while ( my ( $field_name, $data ) = each %$fields ) {
355
356             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
357
358             # Set default values
359             $sf_params{staff_client} //= 1;
360             $sf_params{opac} //= 1;
361
362             $sf_params{name} = $field_name;
363
364             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
365
366             my $mappings = $data->{mappings};
367             for my $mapping ( @$mappings ) {
368                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369                     index_name => $index_name,
370                     marc_type => $mapping->{marc_type},
371                     marc_field => $mapping->{marc_field}
372                 });
373                 $search_field->add_to_search_marc_maps($marc_field, {
374                     facet => $mapping->{facet} || 0,
375                     suggestible => $mapping->{suggestible} || 0,
376                     sort => $mapping->{sort},
377                     search => $mapping->{search} // 1
378                 });
379             }
380         }
381     }
382
383     $self->clear_search_fields_cache();
384
385     # FIXME return the mappings?
386 }
387
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
390 sub sort_fields {
391     my $self = shift;
392     if (@_) {
393         $self->_sort_fields_accessor(@_);
394         return;
395     }
396     my $val = $self->_sort_fields_accessor();
397     return $val if $val;
398
399     # This will populate the accessor as a side effect
400     $self->get_elasticsearch_mappings();
401     return $self->_sort_fields_accessor();
402 }
403
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
405
406     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
407
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
413
414 =over 4
415
416 =item C<$mappings>
417
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
421 mapping.
422
423 =item C<$data>
424
425 The source data from a MARC record field.
426
427 =item C<$record_document>
428
429 Hashref representing the Elasticsearch document on which mappings should be
430 applied.
431
432 =item C<$meta>
433
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
439 metadata context.
440
441 Current properties are:
442
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
444 processed.
445
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
448
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
450
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
453
454 C<field>: The original C<MARC::Record> object.
455
456 =back
457
458 =cut
459
460 sub _process_mappings {
461     my ($_self, $mappings, $data, $record_document, $meta) = @_;
462     foreach my $mapping (@{$mappings}) {
463         my ($target, $options) = @{$mapping};
464
465         # Don't process sort fields for alternate scripts
466         my $sort = $target =~ /__sort$/;
467         if ($sort && $meta->{altscript}) {
468             next;
469         }
470
471         # Copy (scalar) data since can have multiple targets
472         # with differing options for (possibly) mutating data
473         # so need a different copy for each
474         my $data_copy = $data;
475         if (defined $options->{substr}) {
476             my ($start, $length) = @{$options->{substr}};
477             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
478         }
479
480         # Add data to values array for callbacks processing
481         my $values = [$data_copy];
482
483         # Value callbacks takes subfield data (or values from previous
484         # callbacks) as argument, and returns a possibly different list of values.
485         # Note that the returned list may also be empty.
486         if (defined $options->{value_callbacks}) {
487             foreach my $callback (@{$options->{value_callbacks}}) {
488                 # Pass each value to current callback which returns a list
489                 # (scalar is fine too) resulting either in a list or
490                 # a list of lists that will be flattened by perl.
491                 # The next callback will receive the possibly expanded list of values.
492                 $values = [ map { $callback->($_) } @{$values} ];
493             }
494         }
495
496         # Skip mapping if all values has been removed
497         next unless @{$values};
498
499         if (defined $options->{property}) {
500             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
501         }
502         if (defined $options->{nonfiling_characters_indicator}) {
503             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505             # Nonfiling chars does not make sense for multiple values
506             # Only apply on first element
507             $values->[0] = substr $values->[0], $nonfiling_chars;
508         }
509
510         $values = [ grep(!/^$/, @{$values}) ];
511
512         $record_document->{$target} //= [];
513         push @{$record_document->{$target}}, @{$values};
514     }
515 }
516
517 =head2 marc_records_to_documents($marc_records)
518
519     my $record_documents = $self->marc_records_to_documents($marc_records);
520
521 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
522
523 Returns array of hash references, representing Elasticsearch documents,
524 acceptable as body payload in C<Search::Elasticsearch> requests.
525
526 =over 4
527
528 =item C<$marc_documents>
529
530 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
531
532 =back
533
534 =cut
535
536 sub marc_records_to_documents {
537     my ($self, $records) = @_;
538     my $rules = $self->_get_marc_mapping_rules();
539     my $control_fields_rules = $rules->{control_fields};
540     my $data_fields_rules = $rules->{data_fields};
541     my $marcflavour = lc C4::Context->preference('marcflavour');
542     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
543
544     my @record_documents;
545
546     my %auth_match_headings;
547     if( $self->index eq 'authorities' ){
548         my @auth_types = Koha::Authority::Types->search();
549         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
550     }
551
552     foreach my $record (@{$records}) {
553         my $record_document = {};
554
555         if ( $self->index eq 'authorities' ){
556             my $authtypecode = GuessAuthTypeCode( $record );
557             if( $authtypecode ){
558                 my $field = $record->field( $auth_match_headings{ $authtypecode } );
559                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560                 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
561             } else {
562                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
563             }
564         }
565
566         my $mappings = $rules->{leader};
567         if ($mappings) {
568             $self->_process_mappings($mappings, $record->leader(), $record_document, {
569                     altscript => 0,
570                     data_source => 'leader'
571                 }
572             );
573         }
574         foreach my $field ($record->fields()) {
575             if ($field->is_control_field()) {
576                 my $mappings = $control_fields_rules->{$field->tag()};
577                 if ($mappings) {
578                     $self->_process_mappings($mappings, $field->data(), $record_document, {
579                             altscript => 0,
580                             data_source => 'control_field',
581                             field => $field
582                         }
583                     );
584                 }
585             }
586             else {
587                 my $tag = $field->tag();
588                 # Handle alternate scripts in MARC 21
589                 my $altscript = 0;
590                 if ($marcflavour eq 'marc21' && $tag eq '880') {
591                     my $sub6 = $field->subfield('6');
592                     if ($sub6 =~ /^(...)-\d+/) {
593                         $tag = $1;
594                         $altscript = 1;
595                     }
596                 }
597
598                 my $data_field_rules = $data_fields_rules->{$tag};
599                 if ($data_field_rules) {
600                     my $subfields_mappings = $data_field_rules->{subfields};
601                     my $wildcard_mappings = $subfields_mappings->{'*'};
602                     foreach my $subfield ($field->subfields()) {
603                         my ($code, $data) = @{$subfield};
604                         my $mappings = $subfields_mappings->{$code} // [];
605                         if ($wildcard_mappings) {
606                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
607                         }
608                         if (@{$mappings}) {
609                             $self->_process_mappings($mappings, $data, $record_document, {
610                                     altscript => $altscript,
611                                     data_source => 'subfield',
612                                     code => $code,
613                                     field => $field
614                                 }
615                             );
616                         }
617                     }
618
619                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
620                     if ($subfields_join_mappings) {
621                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
622                             my $data_field = $field->clone; #copy field to preserve for alt scripts
623                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
624                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
625                             if ($data) {
626                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
627                                         altscript => $altscript,
628                                         data_source => 'subfields_group',
629                                         codes => $subfields_group,
630                                         field => $field
631                                     }
632                                 );
633                             }
634                         }
635                     }
636                 }
637             }
638         }
639         foreach my $field (keys %{$rules->{defaults}}) {
640             unless (defined $record_document->{$field}) {
641                 $record_document->{$field} = $rules->{defaults}->{$field};
642             }
643         }
644         foreach my $field (@{$rules->{sum}}) {
645             if (defined $record_document->{$field}) {
646                 # TODO: validate numeric? filter?
647                 # TODO: Or should only accept fields without nested values?
648                 # TODO: Quick and dirty, improve if needed
649                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
650             }
651         }
652         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
653         foreach my $field (@{$rules->{isbn}}) {
654             if (defined $record_document->{$field}) {
655                 my @isbns = ();
656                 foreach my $input_isbn (@{$record_document->{$field}}) {
657                     my $isbn = Business::ISBN->new($input_isbn);
658                     if (defined $isbn && $isbn->is_valid) {
659                         my $isbn13 = $isbn->as_isbn13->as_string;
660                         push @isbns, $isbn13;
661                         $isbn13 =~ s/\-//g;
662                         push @isbns, $isbn13;
663
664                         my $isbn10 = $isbn->as_isbn10;
665                         if ($isbn10) {
666                             $isbn10 = $isbn10->as_string;
667                             push @isbns, $isbn10;
668                             $isbn10 =~ s/\-//g;
669                             push @isbns, $isbn10;
670                         }
671                     } else {
672                         push @isbns, $input_isbn;
673                     }
674                 }
675                 $record_document->{$field} = \@isbns;
676             }
677         }
678
679         # Remove duplicate values and collapse sort fields
680         foreach my $field (keys %{$record_document}) {
681             if (ref($record_document->{$field}) eq 'ARRAY') {
682                 @{$record_document->{$field}} = do {
683                     my %seen;
684                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
685                 };
686                 if ($field =~ /__sort$/) {
687                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
688                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
689                 }
690             }
691         }
692
693         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
694         $record->encoding('UTF-8');
695         if ($use_array) {
696             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
697             $record_document->{'marc_format'} = 'ARRAY';
698         } else {
699             my @warnings;
700             {
701                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
702                 local $SIG{__WARN__} = sub {
703                     push @warnings, $_[0];
704                 };
705                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
706             }
707             if (@warnings) {
708                 # Suppress warnings if record length exceeded
709                 unless (substr($record->leader(), 0, 5) eq '99999') {
710                     foreach my $warning (@warnings) {
711                         carp $warning;
712                     }
713                 }
714                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
715                 $record_document->{'marc_format'} = 'MARCXML';
716             }
717             else {
718                 $record_document->{'marc_format'} = 'base64ISO2709';
719             }
720         }
721         push @record_documents, $record_document;
722     }
723     return \@record_documents;
724 }
725
726 =head2 _marc_to_array($record)
727
728     my @fields = _marc_to_array($record)
729
730 Convert a MARC::Record to an array modeled after MARC-in-JSON
731 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
732
733 =over 4
734
735 =item C<$record>
736
737 A MARC::Record object
738
739 =back
740
741 =cut
742
743 sub _marc_to_array {
744     my ($self, $record) = @_;
745
746     my $data = {
747         leader => $record->leader(),
748         fields => []
749     };
750     for my $field ($record->fields()) {
751         my $tag = $field->tag();
752         if ($field->is_control_field()) {
753             push @{$data->{fields}}, {$tag => $field->data()};
754         } else {
755             my $subfields = ();
756             foreach my $subfield ($field->subfields()) {
757                 my ($code, $contents) = @{$subfield};
758                 push @{$subfields}, {$code => $contents};
759             }
760             push @{$data->{fields}}, {
761                 $tag => {
762                     ind1 => $field->indicator(1),
763                     ind2 => $field->indicator(2),
764                     subfields => $subfields
765                 }
766             };
767         }
768     }
769     return $data;
770 }
771
772 =head2 _array_to_marc($data)
773
774     my $record = _array_to_marc($data)
775
776 Convert an array modeled after MARC-in-JSON to a MARC::Record
777
778 =over 4
779
780 =item C<$data>
781
782 An array modeled after MARC-in-JSON
783 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
784
785 =back
786
787 =cut
788
789 sub _array_to_marc {
790     my ($self, $data) = @_;
791
792     my $record = MARC::Record->new();
793
794     $record->leader($data->{leader});
795     for my $field (@{$data->{fields}}) {
796         my $tag = (keys %{$field})[0];
797         $field = $field->{$tag};
798         my $marc_field;
799         if (ref($field) eq 'HASH') {
800             my @subfields;
801             foreach my $subfield (@{$field->{subfields}}) {
802                 my $code = (keys %{$subfield})[0];
803                 push @subfields, $code;
804                 push @subfields, $subfield->{$code};
805             }
806             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
807         } else {
808             $marc_field = MARC::Field->new($tag, $field)
809         }
810         $record->append_fields($marc_field);
811     }
812 ;
813     return $record;
814 }
815
816 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
817
818     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
819
820 Get mappings, an internal data structure later used by
821 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
822 data for a MARC mapping.
823
824 The returned C<$mappings> is not to to be confused with mappings provided by
825 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
826 provided by C<_foreach_mapping> and expands it to this internal data structure.
827 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
828 is then applied to each MARC target (leader, control field data, subfield or
829 joined subfields) and integrated into the mapping rules data structure used in
830 C<marc_records_to_documents> to transform MARC records into Elasticsearch
831 documents.
832
833 =over 4
834
835 =item C<$facet>
836
837 Boolean indicating whether to create a facet field for this mapping.
838
839 =item C<$suggestible>
840
841 Boolean indicating whether to create a suggestion field for this mapping.
842
843 =item C<$sort>
844
845 Boolean indicating whether to create a sort field for this mapping.
846
847 =item C<$search>
848
849 Boolean indicating whether to create a search field for this mapping.
850
851 =item C<$target_name>
852
853 Elasticsearch document target field name.
854
855 =item C<$target_type>
856
857 Elasticsearch document target field type.
858
859 =item C<$range>
860
861 An optional range as a string in the format "<START>-<END>" or "<START>",
862 where "<START>" and "<END>" are integers specifying a range that will be used
863 for extracting a substring from MARC data as Elasticsearch field target value.
864
865 The first character position is "0", and the range is inclusive,
866 so "0-2" means the first three characters of MARC data.
867
868 If only "<START>" is provided only one character at position "<START>" will
869 be extracted.
870
871 =back
872
873 =cut
874
875 sub _field_mappings {
876     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
877     my %mapping_defaults = ();
878     my @mappings;
879
880     my $substr_args = undef;
881     if (defined $range) {
882         # TODO: use value_callback instead?
883         my ($start, $end) = map(int, split /-/, $range, 2);
884         $substr_args = [$start];
885         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
886     }
887     my $default_options = {};
888     if ($substr_args) {
889         $default_options->{substr} = $substr_args;
890     }
891
892     # TODO: Should probably have per type value callback/hook
893     # but hard code for now
894     if ($target_type eq 'boolean') {
895         $default_options->{value_callbacks} //= [];
896         push @{$default_options->{value_callbacks}}, sub {
897             my ($value) = @_;
898             # Trim whitespace at both ends
899             $value =~ s/^\s+|\s+$//g;
900             return $value ? 'true' : 'false';
901         };
902     }
903     elsif ($target_type eq 'year') {
904         $default_options->{value_callbacks} //= [];
905         # Only accept years containing digits and "u"
906         push @{$default_options->{value_callbacks}}, sub {
907             my ($value) = @_;
908             # Replace "u" with "0" for sorting
909             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
910         };
911     }
912
913     if ($search) {
914         my $mapping = [$target_name, $default_options];
915         push @mappings, $mapping;
916     }
917
918     my @suffixes = ();
919     push @suffixes, 'facet' if $facet;
920     push @suffixes, 'suggestion' if $suggestible;
921     push @suffixes, 'sort' if !defined $sort || $sort;
922
923     foreach my $suffix (@suffixes) {
924         my $mapping = ["${target_name}__$suffix"];
925         # TODO: Hack, fix later in less hideous manner
926         if ($suffix eq 'suggestion') {
927             push @{$mapping}, {%{$default_options}, property => 'input'};
928         }
929         else {
930             # Important! Make shallow clone, or we end up with the same hashref
931             # shared by all mappings
932             push @{$mapping}, {%{$default_options}};
933         }
934         push @mappings, $mapping;
935     }
936     return @mappings;
937 };
938
939 =head2 _get_marc_mapping_rules
940
941     my $mapping_rules = $self->_get_marc_mapping_rules()
942
943 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
944
945 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
946 each call to C<MARC::Record>->field) we create an optimized structure of mapping
947 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
948
949 We can then iterate through all MARC fields for each record and apply all relevant
950 rules once per fields instead of retreiving fields multiple times for each mapping rule
951 which is terribly slow.
952
953 =cut
954
955 # TODO: This structure can be used for processing multiple MARC::Records so is currently
956 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
957 # memory cache which it is currently not. The performance gain of caching
958 # would probably be marginal, but to do this could be a further improvement.
959
960 sub _get_marc_mapping_rules {
961     my ($self) = @_;
962     my $marcflavour = lc C4::Context->preference('marcflavour');
963     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
964     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
965     my $rules = {
966         'leader' => [],
967         'control_fields' => {},
968         'data_fields' => {},
969         'sum' => [],
970         'isbn' => [],
971         'defaults' => {}
972     };
973
974     $self->_foreach_mapping(sub {
975         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
976         return if $marc_type ne $marcflavour;
977
978         if ($type eq 'sum') {
979             push @{$rules->{sum}}, $name;
980             push @{$rules->{sum}}, $name."__sort" if $sort;
981         }
982         elsif ($type eq 'isbn') {
983             push @{$rules->{isbn}}, $name;
984         }
985         elsif ($type eq 'boolean') {
986             # boolean gets special handling, if value doesn't exist for a field,
987             # it is set to false
988             $rules->{defaults}->{$name} = 'false';
989         }
990
991         if ($marc_field =~ $field_spec_regexp) {
992             my $field_tag = $1;
993
994             my @subfields;
995             my @subfield_groups;
996             # Parse and separate subfields form subfield groups
997             if (defined $2) {
998                 my $subfield_group = '';
999                 my $open_group = 0;
1000
1001                 foreach my $token (split //, $2) {
1002                     if ($token eq "(") {
1003                         if ($open_group) {
1004                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1005                                 "Unmatched opening parenthesis for $marc_field"
1006                             );
1007                         }
1008                         else {
1009                             $open_group = 1;
1010                         }
1011                     }
1012                     elsif ($token eq ")") {
1013                         if ($open_group) {
1014                             if ($subfield_group) {
1015                                 push @subfield_groups, $subfield_group;
1016                                 $subfield_group = '';
1017                             }
1018                             $open_group = 0;
1019                         }
1020                         else {
1021                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1022                                 "Unmatched closing parenthesis for $marc_field"
1023                             );
1024                         }
1025                     }
1026                     elsif ($open_group) {
1027                         $subfield_group .= $token;
1028                     }
1029                     else {
1030                         push @subfields, $token;
1031                     }
1032                 }
1033             }
1034             else {
1035                 push @subfields, '*';
1036             }
1037
1038             my $range = defined $3 ? $3 : undef;
1039             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1040             if ($field_tag < 10) {
1041                 $rules->{control_fields}->{$field_tag} //= [];
1042                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1043             }
1044             else {
1045                 $rules->{data_fields}->{$field_tag} //= {};
1046                 foreach my $subfield (@subfields) {
1047                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1048                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1049                 }
1050                 foreach my $subfield_group (@subfield_groups) {
1051                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1052                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1053                 }
1054             }
1055         }
1056         elsif ($marc_field =~ $leader_regexp) {
1057             my $range = defined $1 ? $1 : undef;
1058             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1059             push @{$rules->{leader}}, @mappings;
1060         }
1061         else {
1062             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1063                 "Invalid MARC field expression: $marc_field"
1064             );
1065         }
1066     });
1067
1068     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1069     if ($marcflavour eq 'marc21') {
1070         # Nonfiling characters processing for sort fields
1071         my %title_fields;
1072         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1073             # Format is: nonfiling characters indicator => field names list
1074             %title_fields = (
1075                 1 => [130, 630, 730, 740],
1076                 2 => [222, 240, 242, 243, 245, 440, 830]
1077             );
1078         }
1079         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1080             %title_fields = (
1081                 1 => [730],
1082                 2 => [130, 430, 530]
1083             );
1084         }
1085         foreach my $indicator (keys %title_fields) {
1086             foreach my $field_tag (@{$title_fields{$indicator}}) {
1087                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1088                 foreach my $mapping (@{$mappings}) {
1089                     if ($mapping->[0] =~ /__sort$/) {
1090                         # Mark this as to be processed for nonfiling characters indicator
1091                         # later on in _process_mappings
1092                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1093                     }
1094                 }
1095             }
1096         }
1097     }
1098
1099     return $rules;
1100 }
1101
1102 =head2 _foreach_mapping
1103
1104     $self->_foreach_mapping(
1105         sub {
1106             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1107                 $marc_field )
1108               = @_;
1109             return unless $marc_type eq 'marc21';
1110             print "Data comes from: " . $marc_field . "\n";
1111         }
1112     );
1113
1114 This allows you to apply a function to each entry in the elasticsearch mappings
1115 table, in order to build the mappings for whatever is needed.
1116
1117 In the provided function, the files are:
1118
1119 =over 4
1120
1121 =item C<$name>
1122
1123 The field name for elasticsearch (corresponds to the 'mapping' column in the
1124 database.
1125
1126 =item C<$type>
1127
1128 The type for this value, e.g. 'string'.
1129
1130 =item C<$facet>
1131
1132 True if this value should be facetised. This only really makes sense if the
1133 field is understood by the facet processing code anyway.
1134
1135 =item C<$sort>
1136
1137 True if this is a field that a) needs special sort handling, and b) if it
1138 should be sorted on. False if a) but not b). Undef if not a). This allows,
1139 for example, author to be sorted on but not everything marked with "author"
1140 to be included in that sort.
1141
1142 =item C<$marc_type>
1143
1144 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1145 'unimarc', 'normarc'.
1146
1147 =item C<$marc_field>
1148
1149 A string that describes the MARC field that contains the data to extract.
1150
1151 =back
1152
1153 =cut
1154
1155 sub _foreach_mapping {
1156     my ( $self, $sub ) = @_;
1157
1158     # TODO use a caching framework here
1159     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1160         {
1161             'search_marc_map.index_name' => $self->index,
1162         },
1163         {   join => { search_marc_to_fields => 'search_marc_map' },
1164             '+select' => [
1165                 'search_marc_to_fields.facet',
1166                 'search_marc_to_fields.suggestible',
1167                 'search_marc_to_fields.sort',
1168                 'search_marc_to_fields.search',
1169                 'search_marc_map.marc_type',
1170                 'search_marc_map.marc_field',
1171             ],
1172             '+as'     => [
1173                 'facet',
1174                 'suggestible',
1175                 'sort',
1176                 'search',
1177                 'marc_type',
1178                 'marc_field',
1179             ],
1180         }
1181     );
1182
1183     while ( my $search_field = $search_fields->next ) {
1184         $sub->(
1185             # Force lower case on indexed field names for case insensitive
1186             # field name searches
1187             lc($search_field->name),
1188             $search_field->type,
1189             $search_field->get_column('facet'),
1190             $search_field->get_column('suggestible'),
1191             $search_field->get_column('sort'),
1192             $search_field->get_column('search'),
1193             $search_field->get_column('marc_type'),
1194             $search_field->get_column('marc_field'),
1195         );
1196     }
1197 }
1198
1199 =head2 process_error
1200
1201     die process_error($@);
1202
1203 This parses an Elasticsearch error message and produces a human-readable
1204 result from it. This result is probably missing all the useful information
1205 that you might want in diagnosing an issue, so the warning is also logged.
1206
1207 Note that currently the resulting message is not internationalised. This
1208 will happen eventually by some method or other.
1209
1210 =cut
1211
1212 sub process_error {
1213     my ($self, $msg) = @_;
1214
1215     warn $msg; # simple logging
1216
1217     # This is super-primitive
1218     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1219
1220     return "Unable to perform your search. Please try again.\n";
1221 }
1222
1223 =head2 _read_configuration
1224
1225     my $conf = _read_configuration();
1226
1227 Reads the I<configuration file> and returns a hash structure with the
1228 configuration information. It raises an exception if mandatory entries
1229 are missing.
1230
1231 The hashref structure has the following form:
1232
1233     {
1234         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1235         'index_name' => 'koha_instance',
1236     }
1237
1238 This is configured by the following in the C<config> block in koha-conf.xml:
1239
1240     <elasticsearch>
1241         <server>127.0.0.1:9200</server>
1242         <server>anotherserver:9200</server>
1243         <index_name>koha_instance</index_name>
1244     </elasticsearch>
1245
1246 =cut
1247
1248 sub _read_configuration {
1249
1250     my $configuration;
1251
1252     my $conf = C4::Context->config('elasticsearch');
1253     unless ( defined $conf ) {
1254         Koha::Exceptions::Config::MissingEntry->throw(
1255             "Missing <elasticsearch> entry in koha-conf.xml"
1256         );
1257     }
1258
1259     if ( $conf && $conf->{server} ) {
1260         my $nodes = $conf->{server};
1261         if ( ref($nodes) eq 'ARRAY' ) {
1262             $configuration->{nodes} = $nodes;
1263         }
1264         else {
1265             $configuration->{nodes} = [$nodes];
1266         }
1267     }
1268     else {
1269         Koha::Exceptions::Config::MissingEntry->throw(
1270             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1271         );
1272     }
1273
1274     if ( defined $conf->{index_name} ) {
1275         $configuration->{index_name} = $conf->{index_name};
1276     }
1277     else {
1278         Koha::Exceptions::Config::MissingEntry->throw(
1279             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1280         );
1281     }
1282
1283     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1284
1285     return $configuration;
1286 }
1287
1288 =head2 get_facetable_fields
1289
1290 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1291
1292 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1293
1294 =cut
1295
1296 sub get_facetable_fields {
1297     my ($self) = @_;
1298
1299     # These should correspond to the ES field names, as opposed to the CCL
1300     # things that zebra uses.
1301     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1302     my @faceted_fields = Koha::SearchFields->search(
1303         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1304     );
1305     my @not_faceted_fields = Koha::SearchFields->search(
1306         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1307     );
1308     # This could certainly be improved
1309     return ( @faceted_fields, @not_faceted_fields );
1310 }
1311
1312 =head2 clear_search_fields_cache
1313
1314 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1315
1316 Clear cached values for ES search fields
1317
1318 =cut
1319
1320 sub clear_search_fields_cache {
1321
1322     my $cache = Koha::Caches->get_instance();
1323     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1324     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1325     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1326     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1327
1328 }
1329
1330 1;
1331
1332 __END__
1333
1334 =head1 AUTHOR
1335
1336 =over 4
1337
1338 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1339
1340 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1341
1342 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1343
1344 =back
1345
1346 =cut