Fix translations for Koha 20.11.06
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282
283             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284                 {
285                     facet   => $marc_to_field->facet || '',
286                     marc_type => $marc_map->marc_type,
287                     marc_field => $marc_map->marc_field,
288                     sort        => $marc_to_field->sort,
289                     suggestible => $marc_to_field->suggestible || ''
290                 });
291
292         }
293     }
294
295     return $mappings;
296 }
297
298 =head2 _get_elasticsearch_field_config
299
300 Get the Elasticsearch field config for the given purpose and data type.
301
302 $mapping = _get_elasticsearch_field_config('search', 'text');
303
304 =cut
305
306 sub _get_elasticsearch_field_config {
307
308     my ( $purpose, $type ) = @_;
309
310     # Use state to speed up repeated calls
311     state $settings = undef;
312     if (!defined $settings) {
313         my $config_file = C4::Context->config('elasticsearch_field_config');
314         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315         $settings = LoadFile( $config_file );
316     }
317
318     if (!defined $settings->{$purpose}) {
319         die "Field purpose $purpose not defined in field config";
320     }
321     if ($type eq '') {
322         return $settings->{$purpose};
323     }
324     if (defined $settings->{$purpose}{$type}) {
325         return $settings->{$purpose}{$type};
326     }
327     if (defined $settings->{$purpose}{'default'}) {
328         return $settings->{$purpose}{'default'};
329     }
330     return;
331 }
332
333 =head2 _load_elasticsearch_mappings
334
335 Load Elasticsearch mappings in the format of mappings.yaml.
336
337 $indexes = _load_elasticsearch_mappings();
338
339 =cut
340
341 sub _load_elasticsearch_mappings {
342     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
343     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
344     return LoadFile( $mappings_yaml );
345 }
346
347 sub reset_elasticsearch_mappings {
348     my ( $self ) = @_;
349     my $indexes = $self->_load_elasticsearch_mappings();
350
351     Koha::SearchMarcMaps->delete;
352     Koha::SearchFields->delete;
353
354     while ( my ( $index_name, $fields ) = each %$indexes ) {
355         while ( my ( $field_name, $data ) = each %$fields ) {
356
357             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
358
359             # Set default values
360             $sf_params{staff_client} //= 1;
361             $sf_params{opac} //= 1;
362
363             $sf_params{name} = $field_name;
364
365             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366
367             my $mappings = $data->{mappings};
368             for my $mapping ( @$mappings ) {
369                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
370                     index_name => $index_name,
371                     marc_type => $mapping->{marc_type},
372                     marc_field => $mapping->{marc_field}
373                 });
374                 $search_field->add_to_search_marc_maps($marc_field, {
375                     facet => $mapping->{facet} || 0,
376                     suggestible => $mapping->{suggestible} || 0,
377                     sort => $mapping->{sort} // 1,
378                     search => $mapping->{search} // 1
379                 });
380             }
381         }
382     }
383
384     $self->clear_search_fields_cache();
385
386     # FIXME return the mappings?
387 }
388
389 # This overrides the accessor provided by Class::Accessor so that if
390 # sort_fields isn't set, then it'll generate it.
391 sub sort_fields {
392     my $self = shift;
393     if (@_) {
394         $self->_sort_fields_accessor(@_);
395         return;
396     }
397     my $val = $self->_sort_fields_accessor();
398     return $val if $val;
399
400     # This will populate the accessor as a side effect
401     $self->get_elasticsearch_mappings();
402     return $self->_sort_fields_accessor();
403 }
404
405 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406
407     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408
409 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
410 Since we group all mappings by MARC field targets C<$mappings> will contain
411 all targets for C<$data> and thus we need to fetch the MARC field only once.
412 C<$mappings> will be applied to C<$record_document> and new field values added.
413 The method has no return value.
414
415 =over 4
416
417 =item C<$mappings>
418
419 Arrayref of mappings containing arrayrefs in the format
420 [C<$target>, C<$options>] where C<$target> is the name of the target field and
421 C<$options> is a hashref containing processing directives for this particular
422 mapping.
423
424 =item C<$data>
425
426 The source data from a MARC record field.
427
428 =item C<$record_document>
429
430 Hashref representing the Elasticsearch document on which mappings should be
431 applied.
432
433 =item C<$meta>
434
435 A hashref containing metadata useful for enforcing per mapping rules. For
436 example for providing extra context for mapping options, or treating mapping
437 targets differently depending on type (sort, search, facet etc). Combining
438 this metadata with the mapping options and metadata allows us to mutate the
439 data per mapping, or even replace it with other data retrieved from the
440 metadata context.
441
442 Current properties are:
443
444 C<altscript>: A boolean value indicating whether an alternate script presentation is being
445 processed.
446
447 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
448 'subfield' or 'subfields_group'.
449
450 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451
452 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
453 is 'subfields_group'.
454
455 C<field>: The original C<MARC::Record> object.
456
457 =back
458
459 =cut
460
461 sub _process_mappings {
462     my ($_self, $mappings, $data, $record_document, $meta) = @_;
463     foreach my $mapping (@{$mappings}) {
464         my ($target, $options) = @{$mapping};
465
466         # Don't process sort fields for alternate scripts
467         my $sort = $target =~ /__sort$/;
468         if ($sort && $meta->{altscript}) {
469             next;
470         }
471
472         # Copy (scalar) data since can have multiple targets
473         # with differing options for (possibly) mutating data
474         # so need a different copy for each
475         my $data_copy = $data;
476         if (defined $options->{substr}) {
477             my ($start, $length) = @{$options->{substr}};
478             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
479         }
480
481         # Add data to values array for callbacks processing
482         my $values = [$data_copy];
483
484         # Value callbacks takes subfield data (or values from previous
485         # callbacks) as argument, and returns a possibly different list of values.
486         # Note that the returned list may also be empty.
487         if (defined $options->{value_callbacks}) {
488             foreach my $callback (@{$options->{value_callbacks}}) {
489                 # Pass each value to current callback which returns a list
490                 # (scalar is fine too) resulting either in a list or
491                 # a list of lists that will be flattened by perl.
492                 # The next callback will receive the possibly expanded list of values.
493                 $values = [ map { $callback->($_) } @{$values} ];
494             }
495         }
496
497         # Skip mapping if all values has been removed
498         next unless @{$values};
499
500         if (defined $options->{property}) {
501             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
502         }
503         if (defined $options->{nonfiling_characters_indicator}) {
504             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
505             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
506             # Nonfiling chars does not make sense for multiple values
507             # Only apply on first element
508             $values->[0] = substr $values->[0], $nonfiling_chars;
509         }
510
511         $values = [ grep(!/^$/, @{$values}) ];
512
513         $record_document->{$target} //= [];
514         push @{$record_document->{$target}}, @{$values};
515     }
516 }
517
518 =head2 marc_records_to_documents($marc_records)
519
520     my $record_documents = $self->marc_records_to_documents($marc_records);
521
522 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
523
524 Returns array of hash references, representing Elasticsearch documents,
525 acceptable as body payload in C<Search::Elasticsearch> requests.
526
527 =over 4
528
529 =item C<$marc_documents>
530
531 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
532
533 =back
534
535 =cut
536
537 sub marc_records_to_documents {
538     my ($self, $records) = @_;
539     my $rules = $self->_get_marc_mapping_rules();
540     my $control_fields_rules = $rules->{control_fields};
541     my $data_fields_rules = $rules->{data_fields};
542     my $marcflavour = lc C4::Context->preference('marcflavour');
543     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
544
545     my @record_documents;
546
547     my %auth_match_headings;
548     if( $self->index eq 'authorities' ){
549         my @auth_types = Koha::Authority::Types->search();
550         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
551     }
552
553     foreach my $record (@{$records}) {
554         my $record_document = {};
555
556         if ( $self->index eq 'authorities' ){
557             my $authtypecode = GuessAuthTypeCode( $record );
558             if( $authtypecode ){
559                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
560                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
561                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
562                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
563                 }
564             } else {
565                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
566             }
567         }
568
569         my $mappings = $rules->{leader};
570         if ($mappings) {
571             $self->_process_mappings($mappings, $record->leader(), $record_document, {
572                     altscript => 0,
573                     data_source => 'leader'
574                 }
575             );
576         }
577         foreach my $field ($record->fields()) {
578             if ($field->is_control_field()) {
579                 my $mappings = $control_fields_rules->{$field->tag()};
580                 if ($mappings) {
581                     $self->_process_mappings($mappings, $field->data(), $record_document, {
582                             altscript => 0,
583                             data_source => 'control_field',
584                             field => $field
585                         }
586                     );
587                 }
588             }
589             else {
590                 my $tag = $field->tag();
591                 # Handle alternate scripts in MARC 21
592                 my $altscript = 0;
593                 if ($marcflavour eq 'marc21' && $tag eq '880') {
594                     my $sub6 = $field->subfield('6');
595                     if ($sub6 =~ /^(...)-\d+/) {
596                         $tag = $1;
597                         $altscript = 1;
598                     }
599                 }
600
601                 my $data_field_rules = $data_fields_rules->{$tag};
602                 if ($data_field_rules) {
603                     my $subfields_mappings = $data_field_rules->{subfields};
604                     my $wildcard_mappings = $subfields_mappings->{'*'};
605                     foreach my $subfield ($field->subfields()) {
606                         my ($code, $data) = @{$subfield};
607                         my $mappings = $subfields_mappings->{$code} // [];
608                         if ($wildcard_mappings) {
609                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
610                         }
611                         if (@{$mappings}) {
612                             $self->_process_mappings($mappings, $data, $record_document, {
613                                     altscript => $altscript,
614                                     data_source => 'subfield',
615                                     code => $code,
616                                     field => $field
617                                 }
618                             );
619                         }
620                     }
621
622                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
623                     if ($subfields_join_mappings) {
624                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
625                             my $data_field = $field->clone; #copy field to preserve for alt scripts
626                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
627                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
628                             if ($data) {
629                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
630                                         altscript => $altscript,
631                                         data_source => 'subfields_group',
632                                         codes => $subfields_group,
633                                         field => $field
634                                     }
635                                 );
636                             }
637                         }
638                     }
639                 }
640             }
641         }
642         foreach my $field (keys %{$rules->{defaults}}) {
643             unless (defined $record_document->{$field}) {
644                 $record_document->{$field} = $rules->{defaults}->{$field};
645             }
646         }
647         foreach my $field (@{$rules->{sum}}) {
648             if (defined $record_document->{$field}) {
649                 # TODO: validate numeric? filter?
650                 # TODO: Or should only accept fields without nested values?
651                 # TODO: Quick and dirty, improve if needed
652                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
653             }
654         }
655         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
656         foreach my $field (@{$rules->{isbn}}) {
657             if (defined $record_document->{$field}) {
658                 my @isbns = ();
659                 foreach my $input_isbn (@{$record_document->{$field}}) {
660                     my $isbn = Business::ISBN->new($input_isbn);
661                     if (defined $isbn && $isbn->is_valid) {
662                         my $isbn13 = $isbn->as_isbn13->as_string;
663                         push @isbns, $isbn13;
664                         $isbn13 =~ s/\-//g;
665                         push @isbns, $isbn13;
666
667                         my $isbn10 = $isbn->as_isbn10;
668                         if ($isbn10) {
669                             $isbn10 = $isbn10->as_string;
670                             push @isbns, $isbn10;
671                             $isbn10 =~ s/\-//g;
672                             push @isbns, $isbn10;
673                         }
674                     } else {
675                         push @isbns, $input_isbn;
676                     }
677                 }
678                 $record_document->{$field} = \@isbns;
679             }
680         }
681
682         # Remove duplicate values and collapse sort fields
683         foreach my $field (keys %{$record_document}) {
684             if (ref($record_document->{$field}) eq 'ARRAY') {
685                 @{$record_document->{$field}} = do {
686                     my %seen;
687                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
688                 };
689                 if ($field =~ /__sort$/) {
690                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
691                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
692                 }
693             }
694         }
695
696         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
697         $record->encoding('UTF-8');
698         if ($use_array) {
699             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
700             $record_document->{'marc_format'} = 'ARRAY';
701         } else {
702             my @warnings;
703             {
704                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
705                 local $SIG{__WARN__} = sub {
706                     push @warnings, $_[0];
707                 };
708                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
709             }
710             if (@warnings) {
711                 # Suppress warnings if record length exceeded
712                 unless (substr($record->leader(), 0, 5) eq '99999') {
713                     foreach my $warning (@warnings) {
714                         carp $warning;
715                     }
716                 }
717                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
718                 $record_document->{'marc_format'} = 'MARCXML';
719             }
720             else {
721                 $record_document->{'marc_format'} = 'base64ISO2709';
722             }
723         }
724         push @record_documents, $record_document;
725     }
726     return \@record_documents;
727 }
728
729 =head2 _marc_to_array($record)
730
731     my @fields = _marc_to_array($record)
732
733 Convert a MARC::Record to an array modeled after MARC-in-JSON
734 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
735
736 =over 4
737
738 =item C<$record>
739
740 A MARC::Record object
741
742 =back
743
744 =cut
745
746 sub _marc_to_array {
747     my ($self, $record) = @_;
748
749     my $data = {
750         leader => $record->leader(),
751         fields => []
752     };
753     for my $field ($record->fields()) {
754         my $tag = $field->tag();
755         if ($field->is_control_field()) {
756             push @{$data->{fields}}, {$tag => $field->data()};
757         } else {
758             my $subfields = ();
759             foreach my $subfield ($field->subfields()) {
760                 my ($code, $contents) = @{$subfield};
761                 push @{$subfields}, {$code => $contents};
762             }
763             push @{$data->{fields}}, {
764                 $tag => {
765                     ind1 => $field->indicator(1),
766                     ind2 => $field->indicator(2),
767                     subfields => $subfields
768                 }
769             };
770         }
771     }
772     return $data;
773 }
774
775 =head2 _array_to_marc($data)
776
777     my $record = _array_to_marc($data)
778
779 Convert an array modeled after MARC-in-JSON to a MARC::Record
780
781 =over 4
782
783 =item C<$data>
784
785 An array modeled after MARC-in-JSON
786 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
787
788 =back
789
790 =cut
791
792 sub _array_to_marc {
793     my ($self, $data) = @_;
794
795     my $record = MARC::Record->new();
796
797     $record->leader($data->{leader});
798     for my $field (@{$data->{fields}}) {
799         my $tag = (keys %{$field})[0];
800         $field = $field->{$tag};
801         my $marc_field;
802         if (ref($field) eq 'HASH') {
803             my @subfields;
804             foreach my $subfield (@{$field->{subfields}}) {
805                 my $code = (keys %{$subfield})[0];
806                 push @subfields, $code;
807                 push @subfields, $subfield->{$code};
808             }
809             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
810         } else {
811             $marc_field = MARC::Field->new($tag, $field)
812         }
813         $record->append_fields($marc_field);
814     }
815 ;
816     return $record;
817 }
818
819 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
820
821     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
822
823 Get mappings, an internal data structure later used by
824 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
825 data for a MARC mapping.
826
827 The returned C<$mappings> is not to to be confused with mappings provided by
828 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
829 provided by C<_foreach_mapping> and expands it to this internal data structure.
830 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
831 is then applied to each MARC target (leader, control field data, subfield or
832 joined subfields) and integrated into the mapping rules data structure used in
833 C<marc_records_to_documents> to transform MARC records into Elasticsearch
834 documents.
835
836 =over 4
837
838 =item C<$facet>
839
840 Boolean indicating whether to create a facet field for this mapping.
841
842 =item C<$suggestible>
843
844 Boolean indicating whether to create a suggestion field for this mapping.
845
846 =item C<$sort>
847
848 Boolean indicating whether to create a sort field for this mapping.
849
850 =item C<$search>
851
852 Boolean indicating whether to create a search field for this mapping.
853
854 =item C<$target_name>
855
856 Elasticsearch document target field name.
857
858 =item C<$target_type>
859
860 Elasticsearch document target field type.
861
862 =item C<$range>
863
864 An optional range as a string in the format "<START>-<END>" or "<START>",
865 where "<START>" and "<END>" are integers specifying a range that will be used
866 for extracting a substring from MARC data as Elasticsearch field target value.
867
868 The first character position is "0", and the range is inclusive,
869 so "0-2" means the first three characters of MARC data.
870
871 If only "<START>" is provided only one character at position "<START>" will
872 be extracted.
873
874 =back
875
876 =cut
877
878 sub _field_mappings {
879     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
880     my %mapping_defaults = ();
881     my @mappings;
882
883     my $substr_args = undef;
884     if (defined $range) {
885         # TODO: use value_callback instead?
886         my ($start, $end) = map(int, split /-/, $range, 2);
887         $substr_args = [$start];
888         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
889     }
890     my $default_options = {};
891     if ($substr_args) {
892         $default_options->{substr} = $substr_args;
893     }
894
895     # TODO: Should probably have per type value callback/hook
896     # but hard code for now
897     if ($target_type eq 'boolean') {
898         $default_options->{value_callbacks} //= [];
899         push @{$default_options->{value_callbacks}}, sub {
900             my ($value) = @_;
901             # Trim whitespace at both ends
902             $value =~ s/^\s+|\s+$//g;
903             return $value ? 'true' : 'false';
904         };
905     }
906     elsif ($target_type eq 'year') {
907         $default_options->{value_callbacks} //= [];
908         # Only accept years containing digits and "u"
909         push @{$default_options->{value_callbacks}}, sub {
910             my ($value) = @_;
911             # Replace "u" with "0" for sorting
912             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
913         };
914     }
915
916     if ($search) {
917         my $mapping = [$target_name, $default_options];
918         push @mappings, $mapping;
919     }
920
921     my @suffixes = ();
922     push @suffixes, 'facet' if $facet;
923     push @suffixes, 'suggestion' if $suggestible;
924     push @suffixes, 'sort' if !defined $sort || $sort;
925
926     foreach my $suffix (@suffixes) {
927         my $mapping = ["${target_name}__$suffix"];
928         # TODO: Hack, fix later in less hideous manner
929         if ($suffix eq 'suggestion') {
930             push @{$mapping}, {%{$default_options}, property => 'input'};
931         }
932         else {
933             # Important! Make shallow clone, or we end up with the same hashref
934             # shared by all mappings
935             push @{$mapping}, {%{$default_options}};
936         }
937         push @mappings, $mapping;
938     }
939     return @mappings;
940 };
941
942 =head2 _get_marc_mapping_rules
943
944     my $mapping_rules = $self->_get_marc_mapping_rules()
945
946 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
947
948 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
949 each call to C<MARC::Record>->field) we create an optimized structure of mapping
950 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
951
952 We can then iterate through all MARC fields for each record and apply all relevant
953 rules once per fields instead of retreiving fields multiple times for each mapping rule
954 which is terribly slow.
955
956 =cut
957
958 # TODO: This structure can be used for processing multiple MARC::Records so is currently
959 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
960 # memory cache which it is currently not. The performance gain of caching
961 # would probably be marginal, but to do this could be a further improvement.
962
963 sub _get_marc_mapping_rules {
964     my ($self) = @_;
965     my $marcflavour = lc C4::Context->preference('marcflavour');
966     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
967     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
968     my $rules = {
969         'leader' => [],
970         'control_fields' => {},
971         'data_fields' => {},
972         'sum' => [],
973         'isbn' => [],
974         'defaults' => {}
975     };
976
977     $self->_foreach_mapping(sub {
978         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
979         return if $marc_type ne $marcflavour;
980
981         if ($type eq 'sum') {
982             push @{$rules->{sum}}, $name;
983             push @{$rules->{sum}}, $name."__sort" if $sort;
984         }
985         elsif ($type eq 'isbn') {
986             push @{$rules->{isbn}}, $name;
987         }
988         elsif ($type eq 'boolean') {
989             # boolean gets special handling, if value doesn't exist for a field,
990             # it is set to false
991             $rules->{defaults}->{$name} = 'false';
992         }
993
994         if ($marc_field =~ $field_spec_regexp) {
995             my $field_tag = $1;
996
997             my @subfields;
998             my @subfield_groups;
999             # Parse and separate subfields form subfield groups
1000             if (defined $2) {
1001                 my $subfield_group = '';
1002                 my $open_group = 0;
1003
1004                 foreach my $token (split //, $2) {
1005                     if ($token eq "(") {
1006                         if ($open_group) {
1007                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1008                                 "Unmatched opening parenthesis for $marc_field"
1009                             );
1010                         }
1011                         else {
1012                             $open_group = 1;
1013                         }
1014                     }
1015                     elsif ($token eq ")") {
1016                         if ($open_group) {
1017                             if ($subfield_group) {
1018                                 push @subfield_groups, $subfield_group;
1019                                 $subfield_group = '';
1020                             }
1021                             $open_group = 0;
1022                         }
1023                         else {
1024                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1025                                 "Unmatched closing parenthesis for $marc_field"
1026                             );
1027                         }
1028                     }
1029                     elsif ($open_group) {
1030                         $subfield_group .= $token;
1031                     }
1032                     else {
1033                         push @subfields, $token;
1034                     }
1035                 }
1036             }
1037             else {
1038                 push @subfields, '*';
1039             }
1040
1041             my $range = defined $3 ? $3 : undef;
1042             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1043             if ($field_tag < 10) {
1044                 $rules->{control_fields}->{$field_tag} //= [];
1045                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1046             }
1047             else {
1048                 $rules->{data_fields}->{$field_tag} //= {};
1049                 foreach my $subfield (@subfields) {
1050                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1051                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1052                 }
1053                 foreach my $subfield_group (@subfield_groups) {
1054                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1055                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1056                 }
1057             }
1058         }
1059         elsif ($marc_field =~ $leader_regexp) {
1060             my $range = defined $1 ? $1 : undef;
1061             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1062             push @{$rules->{leader}}, @mappings;
1063         }
1064         else {
1065             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1066                 "Invalid MARC field expression: $marc_field"
1067             );
1068         }
1069     });
1070
1071     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1072     if ($marcflavour eq 'marc21') {
1073         # Nonfiling characters processing for sort fields
1074         my %title_fields;
1075         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1076             # Format is: nonfiling characters indicator => field names list
1077             %title_fields = (
1078                 1 => [130, 630, 730, 740],
1079                 2 => [222, 240, 242, 243, 245, 440, 830]
1080             );
1081         }
1082         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1083             %title_fields = (
1084                 1 => [730],
1085                 2 => [130, 430, 530]
1086             );
1087         }
1088         foreach my $indicator (keys %title_fields) {
1089             foreach my $field_tag (@{$title_fields{$indicator}}) {
1090                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1091                 foreach my $mapping (@{$mappings}) {
1092                     if ($mapping->[0] =~ /__sort$/) {
1093                         # Mark this as to be processed for nonfiling characters indicator
1094                         # later on in _process_mappings
1095                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1096                     }
1097                 }
1098             }
1099         }
1100     }
1101
1102     return $rules;
1103 }
1104
1105 =head2 _foreach_mapping
1106
1107     $self->_foreach_mapping(
1108         sub {
1109             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1110                 $marc_field )
1111               = @_;
1112             return unless $marc_type eq 'marc21';
1113             print "Data comes from: " . $marc_field . "\n";
1114         }
1115     );
1116
1117 This allows you to apply a function to each entry in the elasticsearch mappings
1118 table, in order to build the mappings for whatever is needed.
1119
1120 In the provided function, the files are:
1121
1122 =over 4
1123
1124 =item C<$name>
1125
1126 The field name for elasticsearch (corresponds to the 'mapping' column in the
1127 database.
1128
1129 =item C<$type>
1130
1131 The type for this value, e.g. 'string'.
1132
1133 =item C<$facet>
1134
1135 True if this value should be facetised. This only really makes sense if the
1136 field is understood by the facet processing code anyway.
1137
1138 =item C<$sort>
1139
1140 True if this is a field that a) needs special sort handling, and b) if it
1141 should be sorted on. False if a) but not b). Undef if not a). This allows,
1142 for example, author to be sorted on but not everything marked with "author"
1143 to be included in that sort.
1144
1145 =item C<$marc_type>
1146
1147 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1148 'unimarc', 'normarc'.
1149
1150 =item C<$marc_field>
1151
1152 A string that describes the MARC field that contains the data to extract.
1153
1154 =back
1155
1156 =cut
1157
1158 sub _foreach_mapping {
1159     my ( $self, $sub ) = @_;
1160
1161     # TODO use a caching framework here
1162     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1163         {
1164             'search_marc_map.index_name' => $self->index,
1165         },
1166         {   join => { search_marc_to_fields => 'search_marc_map' },
1167             '+select' => [
1168                 'search_marc_to_fields.facet',
1169                 'search_marc_to_fields.suggestible',
1170                 'search_marc_to_fields.sort',
1171                 'search_marc_to_fields.search',
1172                 'search_marc_map.marc_type',
1173                 'search_marc_map.marc_field',
1174             ],
1175             '+as'     => [
1176                 'facet',
1177                 'suggestible',
1178                 'sort',
1179                 'search',
1180                 'marc_type',
1181                 'marc_field',
1182             ],
1183         }
1184     );
1185
1186     while ( my $search_field = $search_fields->next ) {
1187         $sub->(
1188             # Force lower case on indexed field names for case insensitive
1189             # field name searches
1190             lc($search_field->name),
1191             $search_field->type,
1192             $search_field->get_column('facet'),
1193             $search_field->get_column('suggestible'),
1194             $search_field->get_column('sort'),
1195             $search_field->get_column('search'),
1196             $search_field->get_column('marc_type'),
1197             $search_field->get_column('marc_field'),
1198         );
1199     }
1200 }
1201
1202 =head2 process_error
1203
1204     die process_error($@);
1205
1206 This parses an Elasticsearch error message and produces a human-readable
1207 result from it. This result is probably missing all the useful information
1208 that you might want in diagnosing an issue, so the warning is also logged.
1209
1210 Note that currently the resulting message is not internationalised. This
1211 will happen eventually by some method or other.
1212
1213 =cut
1214
1215 sub process_error {
1216     my ($self, $msg) = @_;
1217
1218     warn $msg; # simple logging
1219
1220     # This is super-primitive
1221     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1222
1223     return "Unable to perform your search. Please try again.\n";
1224 }
1225
1226 =head2 _read_configuration
1227
1228     my $conf = _read_configuration();
1229
1230 Reads the I<configuration file> and returns a hash structure with the
1231 configuration information. It raises an exception if mandatory entries
1232 are missing.
1233
1234 The hashref structure has the following form:
1235
1236     {
1237         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1238         'index_name' => 'koha_instance',
1239     }
1240
1241 This is configured by the following in the C<config> block in koha-conf.xml:
1242
1243     <elasticsearch>
1244         <server>127.0.0.1:9200</server>
1245         <server>anotherserver:9200</server>
1246         <index_name>koha_instance</index_name>
1247     </elasticsearch>
1248
1249 =cut
1250
1251 sub _read_configuration {
1252
1253     my $configuration;
1254
1255     my $conf = C4::Context->config('elasticsearch');
1256     unless ( defined $conf ) {
1257         Koha::Exceptions::Config::MissingEntry->throw(
1258             "Missing <elasticsearch> entry in koha-conf.xml"
1259         );
1260     }
1261
1262     if ( $conf && $conf->{server} ) {
1263         my $nodes = $conf->{server};
1264         if ( ref($nodes) eq 'ARRAY' ) {
1265             $configuration->{nodes} = $nodes;
1266         }
1267         else {
1268             $configuration->{nodes} = [$nodes];
1269         }
1270     }
1271     else {
1272         Koha::Exceptions::Config::MissingEntry->throw(
1273             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1274         );
1275     }
1276
1277     if ( defined $conf->{index_name} ) {
1278         $configuration->{index_name} = $conf->{index_name};
1279     }
1280     else {
1281         Koha::Exceptions::Config::MissingEntry->throw(
1282             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1283         );
1284     }
1285
1286     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1287
1288     $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1289
1290     return $configuration;
1291 }
1292
1293 =head2 get_facetable_fields
1294
1295 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1296
1297 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1298
1299 =cut
1300
1301 sub get_facetable_fields {
1302     my ($self) = @_;
1303
1304     # These should correspond to the ES field names, as opposed to the CCL
1305     # things that zebra uses.
1306     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1307     my @faceted_fields = Koha::SearchFields->search(
1308         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1309     );
1310     my @not_faceted_fields = Koha::SearchFields->search(
1311         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1312     );
1313     # This could certainly be improved
1314     return ( @faceted_fields, @not_faceted_fields );
1315 }
1316
1317 =head2 clear_search_fields_cache
1318
1319 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1320
1321 Clear cached values for ES search fields
1322
1323 =cut
1324
1325 sub clear_search_fields_cache {
1326
1327     my $cache = Koha::Caches->get_instance();
1328     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1329     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1330     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1331     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1332
1333 }
1334
1335 1;
1336
1337 __END__
1338
1339 =head1 AUTHOR
1340
1341 =over 4
1342
1343 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1344
1345 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1346
1347 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1348
1349 =back
1350
1351 =cut