Bug 25957: Don't push blank field values to ES document
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
281
282             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
283                 {
284                     facet   => $marc_to_field->facet || '',
285                     marc_type => $marc_map->marc_type,
286                     marc_field => $marc_map->marc_field,
287                     sort        => $marc_to_field->sort,
288                     suggestible => $marc_to_field->suggestible || ''
289                 });
290
291         }
292     }
293
294     return $mappings;
295 }
296
297 =head2 _get_elasticsearch_field_config
298
299 Get the Elasticsearch field config for the given purpose and data type.
300
301 $mapping = _get_elasticsearch_field_config('search', 'text');
302
303 =cut
304
305 sub _get_elasticsearch_field_config {
306
307     my ( $purpose, $type ) = @_;
308
309     # Use state to speed up repeated calls
310     state $settings = undef;
311     if (!defined $settings) {
312         my $config_file = C4::Context->config('elasticsearch_field_config');
313         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314         $settings = LoadFile( $config_file );
315     }
316
317     if (!defined $settings->{$purpose}) {
318         die "Field purpose $purpose not defined in field config";
319     }
320     if ($type eq '') {
321         return $settings->{$purpose};
322     }
323     if (defined $settings->{$purpose}{$type}) {
324         return $settings->{$purpose}{$type};
325     }
326     if (defined $settings->{$purpose}{'default'}) {
327         return $settings->{$purpose}{'default'};
328     }
329     return;
330 }
331
332 =head2 _load_elasticsearch_mappings
333
334 Load Elasticsearch mappings in the format of mappings.yaml.
335
336 $indexes = _load_elasticsearch_mappings();
337
338 =cut
339
340 sub _load_elasticsearch_mappings {
341     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343     return LoadFile( $mappings_yaml );
344 }
345
346 sub reset_elasticsearch_mappings {
347     my ( $self ) = @_;
348     my $indexes = $self->_load_elasticsearch_mappings();
349
350     Koha::SearchMarcMaps->delete;
351     Koha::SearchFields->delete;
352
353     while ( my ( $index_name, $fields ) = each %$indexes ) {
354         while ( my ( $field_name, $data ) = each %$fields ) {
355
356             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
357
358             # Set default values
359             $sf_params{staff_client} //= 1;
360             $sf_params{opac} //= 1;
361
362             $sf_params{name} = $field_name;
363
364             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
365
366             my $mappings = $data->{mappings};
367             for my $mapping ( @$mappings ) {
368                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369                     index_name => $index_name,
370                     marc_type => $mapping->{marc_type},
371                     marc_field => $mapping->{marc_field}
372                 });
373                 $search_field->add_to_search_marc_maps($marc_field, {
374                     facet => $mapping->{facet} || 0,
375                     suggestible => $mapping->{suggestible} || 0,
376                     sort => $mapping->{sort},
377                     search => $mapping->{search} // 1
378                 });
379             }
380         }
381     }
382
383     $self->clear_search_fields_cache();
384
385     # FIXME return the mappings?
386 }
387
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
390 sub sort_fields {
391     my $self = shift;
392     if (@_) {
393         $self->_sort_fields_accessor(@_);
394         return;
395     }
396     my $val = $self->_sort_fields_accessor();
397     return $val if $val;
398
399     # This will populate the accessor as a side effect
400     $self->get_elasticsearch_mappings();
401     return $self->_sort_fields_accessor();
402 }
403
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
405
406     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
407
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
413
414 =over 4
415
416 =item C<$mappings>
417
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
421 mapping.
422
423 =item C<$data>
424
425 The source data from a MARC record field.
426
427 =item C<$record_document>
428
429 Hashref representing the Elasticsearch document on which mappings should be
430 applied.
431
432 =item C<$meta>
433
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
439 metadata context.
440
441 Current properties are:
442
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
444 processed.
445
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
448
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
450
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
453
454 C<field>: The original C<MARC::Record> object.
455
456 =back
457
458 =cut
459
460 sub _process_mappings {
461     my ($_self, $mappings, $data, $record_document, $meta) = @_;
462     foreach my $mapping (@{$mappings}) {
463         my ($target, $options) = @{$mapping};
464
465         # Don't process sort fields for alternate scripts
466         my $sort = $target =~ /__sort$/;
467         if ($sort && $meta->{altscript}) {
468             next;
469         }
470
471         # Copy (scalar) data since can have multiple targets
472         # with differing options for (possibly) mutating data
473         # so need a different copy for each
474         my $data_copy = $data;
475         if (defined $options->{substr}) {
476             my ($start, $length) = @{$options->{substr}};
477             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
478         }
479
480         # Add data to values array for callbacks processing
481         my $values = [$data_copy];
482
483         # Value callbacks takes subfield data (or values from previous
484         # callbacks) as argument, and returns a possibly different list of values.
485         # Note that the returned list may also be empty.
486         if (defined $options->{value_callbacks}) {
487             foreach my $callback (@{$options->{value_callbacks}}) {
488                 # Pass each value to current callback which returns a list
489                 # (scalar is fine too) resulting either in a list or
490                 # a list of lists that will be flattened by perl.
491                 # The next callback will receive the possibly expanded list of values.
492                 $values = [ map { $callback->($_) } @{$values} ];
493             }
494         }
495
496         # Skip mapping if all values has been removed
497         next unless @{$values};
498
499         if (defined $options->{property}) {
500             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
501         }
502         if (defined $options->{nonfiling_characters_indicator}) {
503             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505             # Nonfiling chars does not make sense for multiple values
506             # Only apply on first element
507             $values->[0] = substr $values->[0], $nonfiling_chars;
508         }
509
510         $values = [ grep(!/^$/, @{$values}) ];
511
512         $record_document->{$target} //= [];
513         push @{$record_document->{$target}}, @{$values};
514     }
515 }
516
517 =head2 marc_records_to_documents($marc_records)
518
519     my $record_documents = $self->marc_records_to_documents($marc_records);
520
521 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
522
523 Returns array of hash references, representing Elasticsearch documents,
524 acceptable as body payload in C<Search::Elasticsearch> requests.
525
526 =over 4
527
528 =item C<$marc_documents>
529
530 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
531
532 =back
533
534 =cut
535
536 sub marc_records_to_documents {
537     my ($self, $records) = @_;
538     my $rules = $self->_get_marc_mapping_rules();
539     my $control_fields_rules = $rules->{control_fields};
540     my $data_fields_rules = $rules->{data_fields};
541     my $marcflavour = lc C4::Context->preference('marcflavour');
542     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
543
544     my @record_documents;
545
546     my %auth_match_headings;
547     if( $self->index eq 'authorities' ){
548         my @auth_types = Koha::Authority::Types->search();
549         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
550     }
551
552     foreach my $record (@{$records}) {
553         my $record_document = {};
554
555         if ( $self->index eq 'authorities' ){
556             my $authtypecode = GuessAuthTypeCode( $record );
557             if( $authtypecode ){
558                 my $field = $record->field( $auth_match_headings{ $authtypecode } );
559                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560                 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
561             } else {
562                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
563             }
564         }
565
566         my $mappings = $rules->{leader};
567         if ($mappings) {
568             $self->_process_mappings($mappings, $record->leader(), $record_document, {
569                     altscript => 0,
570                     data_source => 'leader'
571                 }
572             );
573         }
574         foreach my $field ($record->fields()) {
575             if ($field->is_control_field()) {
576                 my $mappings = $control_fields_rules->{$field->tag()};
577                 if ($mappings) {
578                     $self->_process_mappings($mappings, $field->data(), $record_document, {
579                             altscript => 0,
580                             data_source => 'control_field',
581                             field => $field
582                         }
583                     );
584                 }
585             }
586             else {
587                 my $tag = $field->tag();
588                 # Handle alternate scripts in MARC 21
589                 my $altscript = 0;
590                 if ($marcflavour eq 'marc21' && $tag eq '880') {
591                     my $sub6 = $field->subfield('6');
592                     if ($sub6 =~ /^(...)-\d+/) {
593                         $tag = $1;
594                         $altscript = 1;
595                     }
596                 }
597
598                 my $data_field_rules = $data_fields_rules->{$tag};
599                 if ($data_field_rules) {
600                     my $subfields_mappings = $data_field_rules->{subfields};
601                     my $wildcard_mappings = $subfields_mappings->{'*'};
602                     foreach my $subfield ($field->subfields()) {
603                         my ($code, $data) = @{$subfield};
604                         my $mappings = $subfields_mappings->{$code} // [];
605                         if ($wildcard_mappings) {
606                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
607                         }
608                         if (@{$mappings}) {
609                             $self->_process_mappings($mappings, $data, $record_document, {
610                                     altscript => $altscript,
611                                     data_source => 'subfield',
612                                     code => $code,
613                                     field => $field
614                                 }
615                             );
616                         }
617                     }
618
619                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
620                     if ($subfields_join_mappings) {
621                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
622                             # Map each subfield to values, remove empty values, join with space
623                             my $data = join(
624                                 ' ',
625                                 grep(
626                                     $_,
627                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
628                                 )
629                             );
630                             if ($data) {
631                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
632                                         altscript => $altscript,
633                                         data_source => 'subfields_group',
634                                         codes => $subfields_group,
635                                         field => $field
636                                     }
637                                 );
638                             }
639                         }
640                     }
641                 }
642             }
643         }
644         foreach my $field (keys %{$rules->{defaults}}) {
645             unless (defined $record_document->{$field}) {
646                 $record_document->{$field} = $rules->{defaults}->{$field};
647             }
648         }
649         foreach my $field (@{$rules->{sum}}) {
650             if (defined $record_document->{$field}) {
651                 # TODO: validate numeric? filter?
652                 # TODO: Or should only accept fields without nested values?
653                 # TODO: Quick and dirty, improve if needed
654                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
655             }
656         }
657         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
658         foreach my $field (@{$rules->{isbn}}) {
659             if (defined $record_document->{$field}) {
660                 my @isbns = ();
661                 foreach my $input_isbn (@{$record_document->{$field}}) {
662                     my $isbn = Business::ISBN->new($input_isbn);
663                     if (defined $isbn && $isbn->is_valid) {
664                         my $isbn13 = $isbn->as_isbn13->as_string;
665                         push @isbns, $isbn13;
666                         $isbn13 =~ s/\-//g;
667                         push @isbns, $isbn13;
668
669                         my $isbn10 = $isbn->as_isbn10;
670                         if ($isbn10) {
671                             $isbn10 = $isbn10->as_string;
672                             push @isbns, $isbn10;
673                             $isbn10 =~ s/\-//g;
674                             push @isbns, $isbn10;
675                         }
676                     } else {
677                         push @isbns, $input_isbn;
678                     }
679                 }
680                 $record_document->{$field} = \@isbns;
681             }
682         }
683
684         # Remove duplicate values and collapse sort fields
685         foreach my $field (keys %{$record_document}) {
686             if (ref($record_document->{$field}) eq 'ARRAY') {
687                 @{$record_document->{$field}} = do {
688                     my %seen;
689                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
690                 };
691                 if ($field =~ /__sort$/) {
692                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
693                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
694                 }
695             }
696         }
697
698         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
699         $record->encoding('UTF-8');
700         if ($use_array) {
701             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
702             $record_document->{'marc_format'} = 'ARRAY';
703         } else {
704             my @warnings;
705             {
706                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
707                 local $SIG{__WARN__} = sub {
708                     push @warnings, $_[0];
709                 };
710                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
711             }
712             if (@warnings) {
713                 # Suppress warnings if record length exceeded
714                 unless (substr($record->leader(), 0, 5) eq '99999') {
715                     foreach my $warning (@warnings) {
716                         carp $warning;
717                     }
718                 }
719                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
720                 $record_document->{'marc_format'} = 'MARCXML';
721             }
722             else {
723                 $record_document->{'marc_format'} = 'base64ISO2709';
724             }
725         }
726         push @record_documents, $record_document;
727     }
728     return \@record_documents;
729 }
730
731 =head2 _marc_to_array($record)
732
733     my @fields = _marc_to_array($record)
734
735 Convert a MARC::Record to an array modeled after MARC-in-JSON
736 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
737
738 =over 4
739
740 =item C<$record>
741
742 A MARC::Record object
743
744 =back
745
746 =cut
747
748 sub _marc_to_array {
749     my ($self, $record) = @_;
750
751     my $data = {
752         leader => $record->leader(),
753         fields => []
754     };
755     for my $field ($record->fields()) {
756         my $tag = $field->tag();
757         if ($field->is_control_field()) {
758             push @{$data->{fields}}, {$tag => $field->data()};
759         } else {
760             my $subfields = ();
761             foreach my $subfield ($field->subfields()) {
762                 my ($code, $contents) = @{$subfield};
763                 push @{$subfields}, {$code => $contents};
764             }
765             push @{$data->{fields}}, {
766                 $tag => {
767                     ind1 => $field->indicator(1),
768                     ind2 => $field->indicator(2),
769                     subfields => $subfields
770                 }
771             };
772         }
773     }
774     return $data;
775 }
776
777 =head2 _array_to_marc($data)
778
779     my $record = _array_to_marc($data)
780
781 Convert an array modeled after MARC-in-JSON to a MARC::Record
782
783 =over 4
784
785 =item C<$data>
786
787 An array modeled after MARC-in-JSON
788 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
789
790 =back
791
792 =cut
793
794 sub _array_to_marc {
795     my ($self, $data) = @_;
796
797     my $record = MARC::Record->new();
798
799     $record->leader($data->{leader});
800     for my $field (@{$data->{fields}}) {
801         my $tag = (keys %{$field})[0];
802         $field = $field->{$tag};
803         my $marc_field;
804         if (ref($field) eq 'HASH') {
805             my @subfields;
806             foreach my $subfield (@{$field->{subfields}}) {
807                 my $code = (keys %{$subfield})[0];
808                 push @subfields, $code;
809                 push @subfields, $subfield->{$code};
810             }
811             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
812         } else {
813             $marc_field = MARC::Field->new($tag, $field)
814         }
815         $record->append_fields($marc_field);
816     }
817 ;
818     return $record;
819 }
820
821 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
822
823     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
824
825 Get mappings, an internal data structure later used by
826 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
827 data for a MARC mapping.
828
829 The returned C<$mappings> is not to to be confused with mappings provided by
830 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
831 provided by C<_foreach_mapping> and expands it to this internal data structure.
832 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
833 is then applied to each MARC target (leader, control field data, subfield or
834 joined subfields) and integrated into the mapping rules data structure used in
835 C<marc_records_to_documents> to transform MARC records into Elasticsearch
836 documents.
837
838 =over 4
839
840 =item C<$facet>
841
842 Boolean indicating whether to create a facet field for this mapping.
843
844 =item C<$suggestible>
845
846 Boolean indicating whether to create a suggestion field for this mapping.
847
848 =item C<$sort>
849
850 Boolean indicating whether to create a sort field for this mapping.
851
852 =item C<$search>
853
854 Boolean indicating whether to create a search field for this mapping.
855
856 =item C<$target_name>
857
858 Elasticsearch document target field name.
859
860 =item C<$target_type>
861
862 Elasticsearch document target field type.
863
864 =item C<$range>
865
866 An optional range as a string in the format "<START>-<END>" or "<START>",
867 where "<START>" and "<END>" are integers specifying a range that will be used
868 for extracting a substring from MARC data as Elasticsearch field target value.
869
870 The first character position is "0", and the range is inclusive,
871 so "0-2" means the first three characters of MARC data.
872
873 If only "<START>" is provided only one character at position "<START>" will
874 be extracted.
875
876 =back
877
878 =cut
879
880 sub _field_mappings {
881     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
882     my %mapping_defaults = ();
883     my @mappings;
884
885     my $substr_args = undef;
886     if (defined $range) {
887         # TODO: use value_callback instead?
888         my ($start, $end) = map(int, split /-/, $range, 2);
889         $substr_args = [$start];
890         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
891     }
892     my $default_options = {};
893     if ($substr_args) {
894         $default_options->{substr} = $substr_args;
895     }
896
897     # TODO: Should probably have per type value callback/hook
898     # but hard code for now
899     if ($target_type eq 'boolean') {
900         $default_options->{value_callbacks} //= [];
901         push @{$default_options->{value_callbacks}}, sub {
902             my ($value) = @_;
903             # Trim whitespace at both ends
904             $value =~ s/^\s+|\s+$//g;
905             return $value ? 'true' : 'false';
906         };
907     }
908     elsif ($target_type eq 'year') {
909         $default_options->{value_callbacks} //= [];
910         # Only accept years containing digits and "u"
911         push @{$default_options->{value_callbacks}}, sub {
912             my ($value) = @_;
913             # Replace "u" with "0" for sorting
914             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
915         };
916     }
917
918     if ($search) {
919         my $mapping = [$target_name, $default_options];
920         push @mappings, $mapping;
921     }
922
923     my @suffixes = ();
924     push @suffixes, 'facet' if $facet;
925     push @suffixes, 'suggestion' if $suggestible;
926     push @suffixes, 'sort' if !defined $sort || $sort;
927
928     foreach my $suffix (@suffixes) {
929         my $mapping = ["${target_name}__$suffix"];
930         # TODO: Hack, fix later in less hideous manner
931         if ($suffix eq 'suggestion') {
932             push @{$mapping}, {%{$default_options}, property => 'input'};
933         }
934         else {
935             # Important! Make shallow clone, or we end up with the same hashref
936             # shared by all mappings
937             push @{$mapping}, {%{$default_options}};
938         }
939         push @mappings, $mapping;
940     }
941     return @mappings;
942 };
943
944 =head2 _get_marc_mapping_rules
945
946     my $mapping_rules = $self->_get_marc_mapping_rules()
947
948 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
949
950 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
951 each call to C<MARC::Record>->field) we create an optimized structure of mapping
952 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
953
954 We can then iterate through all MARC fields for each record and apply all relevant
955 rules once per fields instead of retreiving fields multiple times for each mapping rule
956 which is terribly slow.
957
958 =cut
959
960 # TODO: This structure can be used for processing multiple MARC::Records so is currently
961 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
962 # memory cache which it is currently not. The performance gain of caching
963 # would probably be marginal, but to do this could be a further improvement.
964
965 sub _get_marc_mapping_rules {
966     my ($self) = @_;
967     my $marcflavour = lc C4::Context->preference('marcflavour');
968     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
969     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
970     my $rules = {
971         'leader' => [],
972         'control_fields' => {},
973         'data_fields' => {},
974         'sum' => [],
975         'isbn' => [],
976         'defaults' => {}
977     };
978
979     $self->_foreach_mapping(sub {
980         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
981         return if $marc_type ne $marcflavour;
982
983         if ($type eq 'sum') {
984             push @{$rules->{sum}}, $name;
985             push @{$rules->{sum}}, $name."__sort" if $sort;
986         }
987         elsif ($type eq 'isbn') {
988             push @{$rules->{isbn}}, $name;
989         }
990         elsif ($type eq 'boolean') {
991             # boolean gets special handling, if value doesn't exist for a field,
992             # it is set to false
993             $rules->{defaults}->{$name} = 'false';
994         }
995
996         if ($marc_field =~ $field_spec_regexp) {
997             my $field_tag = $1;
998
999             my @subfields;
1000             my @subfield_groups;
1001             # Parse and separate subfields form subfield groups
1002             if (defined $2) {
1003                 my $subfield_group = '';
1004                 my $open_group = 0;
1005
1006                 foreach my $token (split //, $2) {
1007                     if ($token eq "(") {
1008                         if ($open_group) {
1009                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1010                                 "Unmatched opening parenthesis for $marc_field"
1011                             );
1012                         }
1013                         else {
1014                             $open_group = 1;
1015                         }
1016                     }
1017                     elsif ($token eq ")") {
1018                         if ($open_group) {
1019                             if ($subfield_group) {
1020                                 push @subfield_groups, $subfield_group;
1021                                 $subfield_group = '';
1022                             }
1023                             $open_group = 0;
1024                         }
1025                         else {
1026                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1027                                 "Unmatched closing parenthesis for $marc_field"
1028                             );
1029                         }
1030                     }
1031                     elsif ($open_group) {
1032                         $subfield_group .= $token;
1033                     }
1034                     else {
1035                         push @subfields, $token;
1036                     }
1037                 }
1038             }
1039             else {
1040                 push @subfields, '*';
1041             }
1042
1043             my $range = defined $3 ? $3 : undef;
1044             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1045             if ($field_tag < 10) {
1046                 $rules->{control_fields}->{$field_tag} //= [];
1047                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1048             }
1049             else {
1050                 $rules->{data_fields}->{$field_tag} //= {};
1051                 foreach my $subfield (@subfields) {
1052                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1053                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1054                 }
1055                 foreach my $subfield_group (@subfield_groups) {
1056                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1057                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1058                 }
1059             }
1060         }
1061         elsif ($marc_field =~ $leader_regexp) {
1062             my $range = defined $1 ? $1 : undef;
1063             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1064             push @{$rules->{leader}}, @mappings;
1065         }
1066         else {
1067             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1068                 "Invalid MARC field expression: $marc_field"
1069             );
1070         }
1071     });
1072
1073     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1074     if ($marcflavour eq 'marc21') {
1075         # Nonfiling characters processing for sort fields
1076         my %title_fields;
1077         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1078             # Format is: nonfiling characters indicator => field names list
1079             %title_fields = (
1080                 1 => [130, 630, 730, 740],
1081                 2 => [222, 240, 242, 243, 245, 440, 830]
1082             );
1083         }
1084         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1085             %title_fields = (
1086                 1 => [730],
1087                 2 => [130, 430, 530]
1088             );
1089         }
1090         foreach my $indicator (keys %title_fields) {
1091             foreach my $field_tag (@{$title_fields{$indicator}}) {
1092                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1093                 foreach my $mapping (@{$mappings}) {
1094                     if ($mapping->[0] =~ /__sort$/) {
1095                         # Mark this as to be processed for nonfiling characters indicator
1096                         # later on in _process_mappings
1097                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1098                     }
1099                 }
1100             }
1101         }
1102     }
1103
1104     return $rules;
1105 }
1106
1107 =head2 _foreach_mapping
1108
1109     $self->_foreach_mapping(
1110         sub {
1111             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1112                 $marc_field )
1113               = @_;
1114             return unless $marc_type eq 'marc21';
1115             print "Data comes from: " . $marc_field . "\n";
1116         }
1117     );
1118
1119 This allows you to apply a function to each entry in the elasticsearch mappings
1120 table, in order to build the mappings for whatever is needed.
1121
1122 In the provided function, the files are:
1123
1124 =over 4
1125
1126 =item C<$name>
1127
1128 The field name for elasticsearch (corresponds to the 'mapping' column in the
1129 database.
1130
1131 =item C<$type>
1132
1133 The type for this value, e.g. 'string'.
1134
1135 =item C<$facet>
1136
1137 True if this value should be facetised. This only really makes sense if the
1138 field is understood by the facet processing code anyway.
1139
1140 =item C<$sort>
1141
1142 True if this is a field that a) needs special sort handling, and b) if it
1143 should be sorted on. False if a) but not b). Undef if not a). This allows,
1144 for example, author to be sorted on but not everything marked with "author"
1145 to be included in that sort.
1146
1147 =item C<$marc_type>
1148
1149 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1150 'unimarc', 'normarc'.
1151
1152 =item C<$marc_field>
1153
1154 A string that describes the MARC field that contains the data to extract.
1155
1156 =back
1157
1158 =cut
1159
1160 sub _foreach_mapping {
1161     my ( $self, $sub ) = @_;
1162
1163     # TODO use a caching framework here
1164     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1165         {
1166             'search_marc_map.index_name' => $self->index,
1167         },
1168         {   join => { search_marc_to_fields => 'search_marc_map' },
1169             '+select' => [
1170                 'search_marc_to_fields.facet',
1171                 'search_marc_to_fields.suggestible',
1172                 'search_marc_to_fields.sort',
1173                 'search_marc_to_fields.search',
1174                 'search_marc_map.marc_type',
1175                 'search_marc_map.marc_field',
1176             ],
1177             '+as'     => [
1178                 'facet',
1179                 'suggestible',
1180                 'sort',
1181                 'search',
1182                 'marc_type',
1183                 'marc_field',
1184             ],
1185         }
1186     );
1187
1188     while ( my $search_field = $search_fields->next ) {
1189         $sub->(
1190             # Force lower case on indexed field names for case insensitive
1191             # field name searches
1192             lc($search_field->name),
1193             $search_field->type,
1194             $search_field->get_column('facet'),
1195             $search_field->get_column('suggestible'),
1196             $search_field->get_column('sort'),
1197             $search_field->get_column('search'),
1198             $search_field->get_column('marc_type'),
1199             $search_field->get_column('marc_field'),
1200         );
1201     }
1202 }
1203
1204 =head2 process_error
1205
1206     die process_error($@);
1207
1208 This parses an Elasticsearch error message and produces a human-readable
1209 result from it. This result is probably missing all the useful information
1210 that you might want in diagnosing an issue, so the warning is also logged.
1211
1212 Note that currently the resulting message is not internationalised. This
1213 will happen eventually by some method or other.
1214
1215 =cut
1216
1217 sub process_error {
1218     my ($self, $msg) = @_;
1219
1220     warn $msg; # simple logging
1221
1222     # This is super-primitive
1223     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1224
1225     return "Unable to perform your search. Please try again.\n";
1226 }
1227
1228 =head2 _read_configuration
1229
1230     my $conf = _read_configuration();
1231
1232 Reads the I<configuration file> and returns a hash structure with the
1233 configuration information. It raises an exception if mandatory entries
1234 are missing.
1235
1236 The hashref structure has the following form:
1237
1238     {
1239         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1240         'index_name' => 'koha_instance',
1241     }
1242
1243 This is configured by the following in the C<config> block in koha-conf.xml:
1244
1245     <elasticsearch>
1246         <server>127.0.0.1:9200</server>
1247         <server>anotherserver:9200</server>
1248         <index_name>koha_instance</index_name>
1249     </elasticsearch>
1250
1251 =cut
1252
1253 sub _read_configuration {
1254
1255     my $configuration;
1256
1257     my $conf = C4::Context->config('elasticsearch');
1258     unless ( defined $conf ) {
1259         Koha::Exceptions::Config::MissingEntry->throw(
1260             "Missing <elasticsearch> entry in koha-conf.xml"
1261         );
1262     }
1263
1264     if ( $conf && $conf->{server} ) {
1265         my $nodes = $conf->{server};
1266         if ( ref($nodes) eq 'ARRAY' ) {
1267             $configuration->{nodes} = $nodes;
1268         }
1269         else {
1270             $configuration->{nodes} = [$nodes];
1271         }
1272     }
1273     else {
1274         Koha::Exceptions::Config::MissingEntry->throw(
1275             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1276         );
1277     }
1278
1279     if ( defined $conf->{index_name} ) {
1280         $configuration->{index_name} = $conf->{index_name};
1281     }
1282     else {
1283         Koha::Exceptions::Config::MissingEntry->throw(
1284             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1285         );
1286     }
1287
1288     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1289
1290     return $configuration;
1291 }
1292
1293 =head2 get_facetable_fields
1294
1295 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1296
1297 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1298
1299 =cut
1300
1301 sub get_facetable_fields {
1302     my ($self) = @_;
1303
1304     # These should correspond to the ES field names, as opposed to the CCL
1305     # things that zebra uses.
1306     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1307     my @faceted_fields = Koha::SearchFields->search(
1308         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1309     );
1310     my @not_faceted_fields = Koha::SearchFields->search(
1311         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1312     );
1313     # This could certainly be improved
1314     return ( @faceted_fields, @not_faceted_fields );
1315 }
1316
1317 =head2 clear_search_fields_cache
1318
1319 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1320
1321 Clear cached values for ES search fields
1322
1323 =cut
1324
1325 sub clear_search_fields_cache {
1326
1327     my $cache = Koha::Caches->get_instance();
1328     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1329     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1330     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1331     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1332
1333 }
1334
1335 1;
1336
1337 __END__
1338
1339 =head1 AUTHOR
1340
1341 =over 4
1342
1343 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1344
1345 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1346
1347 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1348
1349 =back
1350
1351 =cut