Bug 24807: (follow-up) Add support for spaces as unknown characters
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
281
282             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
283                 {
284                     facet   => $marc_to_field->facet || '',
285                     marc_type => $marc_map->marc_type,
286                     marc_field => $marc_map->marc_field,
287                     sort        => $marc_to_field->sort,
288                     suggestible => $marc_to_field->suggestible || ''
289                 });
290
291         }
292     }
293
294     return $mappings;
295 }
296
297 =head2 _get_elasticsearch_field_config
298
299 Get the Elasticsearch field config for the given purpose and data type.
300
301 $mapping = _get_elasticsearch_field_config('search', 'text');
302
303 =cut
304
305 sub _get_elasticsearch_field_config {
306
307     my ( $purpose, $type ) = @_;
308
309     # Use state to speed up repeated calls
310     state $settings = undef;
311     if (!defined $settings) {
312         my $config_file = C4::Context->config('elasticsearch_field_config');
313         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314         $settings = LoadFile( $config_file );
315     }
316
317     if (!defined $settings->{$purpose}) {
318         die "Field purpose $purpose not defined in field config";
319     }
320     if ($type eq '') {
321         return $settings->{$purpose};
322     }
323     if (defined $settings->{$purpose}{$type}) {
324         return $settings->{$purpose}{$type};
325     }
326     if (defined $settings->{$purpose}{'default'}) {
327         return $settings->{$purpose}{'default'};
328     }
329     return;
330 }
331
332 =head2 _load_elasticsearch_mappings
333
334 Load Elasticsearch mappings in the format of mappings.yaml.
335
336 $indexes = _load_elasticsearch_mappings();
337
338 =cut
339
340 sub _load_elasticsearch_mappings {
341     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343     return LoadFile( $mappings_yaml );
344 }
345
346 sub reset_elasticsearch_mappings {
347     my ( $self ) = @_;
348     my $indexes = $self->_load_elasticsearch_mappings();
349
350     Koha::SearchMarcMaps->delete;
351     Koha::SearchFields->delete;
352
353     while ( my ( $index_name, $fields ) = each %$indexes ) {
354         while ( my ( $field_name, $data ) = each %$fields ) {
355
356             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
357
358             # Set default values
359             $sf_params{staff_client} //= 1;
360             $sf_params{opac} //= 1;
361
362             $sf_params{name} = $field_name;
363
364             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
365
366             my $mappings = $data->{mappings};
367             for my $mapping ( @$mappings ) {
368                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369                     index_name => $index_name,
370                     marc_type => $mapping->{marc_type},
371                     marc_field => $mapping->{marc_field}
372                 });
373                 $search_field->add_to_search_marc_maps($marc_field, {
374                     facet => $mapping->{facet} || 0,
375                     suggestible => $mapping->{suggestible} || 0,
376                     sort => $mapping->{sort},
377                     search => $mapping->{search} // 1
378                 });
379             }
380         }
381     }
382
383     $self->clear_search_fields_cache();
384
385     # FIXME return the mappings?
386 }
387
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
390 sub sort_fields {
391     my $self = shift;
392     if (@_) {
393         $self->_sort_fields_accessor(@_);
394         return;
395     }
396     my $val = $self->_sort_fields_accessor();
397     return $val if $val;
398
399     # This will populate the accessor as a side effect
400     $self->get_elasticsearch_mappings();
401     return $self->_sort_fields_accessor();
402 }
403
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
405
406     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
407
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
413
414 =over 4
415
416 =item C<$mappings>
417
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
421 mapping.
422
423 =item C<$data>
424
425 The source data from a MARC record field.
426
427 =item C<$record_document>
428
429 Hashref representing the Elasticsearch document on which mappings should be
430 applied.
431
432 =item C<$meta>
433
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
439 metadata context.
440
441 Current properties are:
442
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
444 processed.
445
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
448
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
450
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
453
454 C<field>: The original C<MARC::Record> object.
455
456 =back
457
458 =cut
459
460 sub _process_mappings {
461     my ($_self, $mappings, $data, $record_document, $meta) = @_;
462     foreach my $mapping (@{$mappings}) {
463         my ($target, $options) = @{$mapping};
464
465         # Don't process sort fields for alternate scripts
466         my $sort = $target =~ /__sort$/;
467         if ($sort && $meta->{altscript}) {
468             next;
469         }
470
471         # Copy (scalar) data since can have multiple targets
472         # with differing options for (possibly) mutating data
473         # so need a different copy for each
474         my $data_copy = $data;
475         if (defined $options->{substr}) {
476             my ($start, $length) = @{$options->{substr}};
477             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
478         }
479
480         # Add data to values array for callbacks processing
481         my $values = [$data_copy];
482
483         # Value callbacks takes subfield data (or values from previous
484         # callbacks) as argument, and returns a possibly different list of values.
485         # Note that the returned list may also be empty.
486         if (defined $options->{value_callbacks}) {
487             foreach my $callback (@{$options->{value_callbacks}}) {
488                 # Pass each value to current callback which returns a list
489                 # (scalar is fine too) resulting either in a list or
490                 # a list of lists that will be flattened by perl.
491                 # The next callback will receive the possibly expanded list of values.
492                 $values = [ map { $callback->($_) } @{$values} ];
493             }
494         }
495
496         # Skip mapping if all values has been removed
497         next unless @{$values};
498
499         if (defined $options->{property}) {
500             $values = [ map { { $options->{property} => $_ } } @{$values} ];
501         }
502         if (defined $options->{nonfiling_characters_indicator}) {
503             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505             # Nonfiling chars does not make sense for multiple values
506             # Only apply on first element
507             $values->[0] = substr $values->[0], $nonfiling_chars;
508         }
509
510         $record_document->{$target} //= [];
511         push @{$record_document->{$target}}, @{$values};
512     }
513 }
514
515 =head2 marc_records_to_documents($marc_records)
516
517     my $record_documents = $self->marc_records_to_documents($marc_records);
518
519 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
520
521 Returns array of hash references, representing Elasticsearch documents,
522 acceptable as body payload in C<Search::Elasticsearch> requests.
523
524 =over 4
525
526 =item C<$marc_documents>
527
528 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
529
530 =back
531
532 =cut
533
534 sub marc_records_to_documents {
535     my ($self, $records) = @_;
536     my $rules = $self->_get_marc_mapping_rules();
537     my $control_fields_rules = $rules->{control_fields};
538     my $data_fields_rules = $rules->{data_fields};
539     my $marcflavour = lc C4::Context->preference('marcflavour');
540     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
541
542     my @record_documents;
543
544     my %auth_match_headings;
545     if( $self->index eq 'authorities' ){
546         my @auth_types = Koha::Authority::Types->search();
547         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
548     }
549
550     foreach my $record (@{$records}) {
551         my $record_document = {};
552
553         if ( $self->index eq 'authorities' ){
554             my $authtypecode = GuessAuthTypeCode( $record );
555             if( $authtypecode ){
556                 my $field = $record->field( $auth_match_headings{ $authtypecode } );
557                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
558                 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
559             } else {
560                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
561             }
562         }
563
564         my $mappings = $rules->{leader};
565         if ($mappings) {
566             $self->_process_mappings($mappings, $record->leader(), $record_document, {
567                     altscript => 0,
568                     data_source => 'leader'
569                 }
570             );
571         }
572         foreach my $field ($record->fields()) {
573             if ($field->is_control_field()) {
574                 my $mappings = $control_fields_rules->{$field->tag()};
575                 if ($mappings) {
576                     $self->_process_mappings($mappings, $field->data(), $record_document, {
577                             altscript => 0,
578                             data_source => 'control_field',
579                             field => $field
580                         }
581                     );
582                 }
583             }
584             else {
585                 my $tag = $field->tag();
586                 # Handle alternate scripts in MARC 21
587                 my $altscript = 0;
588                 if ($marcflavour eq 'marc21' && $tag eq '880') {
589                     my $sub6 = $field->subfield('6');
590                     if ($sub6 =~ /^(...)-\d+/) {
591                         $tag = $1;
592                         $altscript = 1;
593                     }
594                 }
595
596                 my $data_field_rules = $data_fields_rules->{$tag};
597                 if ($data_field_rules) {
598                     my $subfields_mappings = $data_field_rules->{subfields};
599                     my $wildcard_mappings = $subfields_mappings->{'*'};
600                     foreach my $subfield ($field->subfields()) {
601                         my ($code, $data) = @{$subfield};
602                         my $mappings = $subfields_mappings->{$code} // [];
603                         if ($wildcard_mappings) {
604                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
605                         }
606                         if (@{$mappings}) {
607                             $self->_process_mappings($mappings, $data, $record_document, {
608                                     altscript => $altscript,
609                                     data_source => 'subfield',
610                                     code => $code,
611                                     field => $field
612                                 }
613                             );
614                         }
615                     }
616
617                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
618                     if ($subfields_join_mappings) {
619                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
620                             # Map each subfield to values, remove empty values, join with space
621                             my $data = join(
622                                 ' ',
623                                 grep(
624                                     $_,
625                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
626                                 )
627                             );
628                             if ($data) {
629                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
630                                         altscript => $altscript,
631                                         data_source => 'subfields_group',
632                                         codes => $subfields_group,
633                                         field => $field
634                                     }
635                                 );
636                             }
637                         }
638                     }
639                 }
640             }
641         }
642         foreach my $field (keys %{$rules->{defaults}}) {
643             unless (defined $record_document->{$field}) {
644                 $record_document->{$field} = $rules->{defaults}->{$field};
645             }
646         }
647         foreach my $field (@{$rules->{sum}}) {
648             if (defined $record_document->{$field}) {
649                 # TODO: validate numeric? filter?
650                 # TODO: Or should only accept fields without nested values?
651                 # TODO: Quick and dirty, improve if needed
652                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
653             }
654         }
655         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
656         foreach my $field (@{$rules->{isbn}}) {
657             if (defined $record_document->{$field}) {
658                 my @isbns = ();
659                 foreach my $input_isbn (@{$record_document->{$field}}) {
660                     my $isbn = Business::ISBN->new($input_isbn);
661                     if (defined $isbn && $isbn->is_valid) {
662                         my $isbn13 = $isbn->as_isbn13->as_string;
663                         push @isbns, $isbn13;
664                         $isbn13 =~ s/\-//g;
665                         push @isbns, $isbn13;
666
667                         my $isbn10 = $isbn->as_isbn10;
668                         if ($isbn10) {
669                             $isbn10 = $isbn10->as_string;
670                             push @isbns, $isbn10;
671                             $isbn10 =~ s/\-//g;
672                             push @isbns, $isbn10;
673                         }
674                     } else {
675                         push @isbns, $input_isbn;
676                     }
677                 }
678                 $record_document->{$field} = \@isbns;
679             }
680         }
681
682         # Remove duplicate values and collapse sort fields
683         foreach my $field (keys %{$record_document}) {
684             if (ref($record_document->{$field}) eq 'ARRAY') {
685                 @{$record_document->{$field}} = do {
686                     my %seen;
687                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
688                 };
689                 if ($field =~ /__sort$/) {
690                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
691                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
692                 }
693             }
694         }
695
696         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
697         $record->encoding('UTF-8');
698         if ($use_array) {
699             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
700             $record_document->{'marc_format'} = 'ARRAY';
701         } else {
702             my @warnings;
703             {
704                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
705                 local $SIG{__WARN__} = sub {
706                     push @warnings, $_[0];
707                 };
708                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
709             }
710             if (@warnings) {
711                 # Suppress warnings if record length exceeded
712                 unless (substr($record->leader(), 0, 5) eq '99999') {
713                     foreach my $warning (@warnings) {
714                         carp $warning;
715                     }
716                 }
717                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
718                 $record_document->{'marc_format'} = 'MARCXML';
719             }
720             else {
721                 $record_document->{'marc_format'} = 'base64ISO2709';
722             }
723         }
724         push @record_documents, $record_document;
725     }
726     return \@record_documents;
727 }
728
729 =head2 _marc_to_array($record)
730
731     my @fields = _marc_to_array($record)
732
733 Convert a MARC::Record to an array modeled after MARC-in-JSON
734 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
735
736 =over 4
737
738 =item C<$record>
739
740 A MARC::Record object
741
742 =back
743
744 =cut
745
746 sub _marc_to_array {
747     my ($self, $record) = @_;
748
749     my $data = {
750         leader => $record->leader(),
751         fields => []
752     };
753     for my $field ($record->fields()) {
754         my $tag = $field->tag();
755         if ($field->is_control_field()) {
756             push @{$data->{fields}}, {$tag => $field->data()};
757         } else {
758             my $subfields = ();
759             foreach my $subfield ($field->subfields()) {
760                 my ($code, $contents) = @{$subfield};
761                 push @{$subfields}, {$code => $contents};
762             }
763             push @{$data->{fields}}, {
764                 $tag => {
765                     ind1 => $field->indicator(1),
766                     ind2 => $field->indicator(2),
767                     subfields => $subfields
768                 }
769             };
770         }
771     }
772     return $data;
773 }
774
775 =head2 _array_to_marc($data)
776
777     my $record = _array_to_marc($data)
778
779 Convert an array modeled after MARC-in-JSON to a MARC::Record
780
781 =over 4
782
783 =item C<$data>
784
785 An array modeled after MARC-in-JSON
786 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
787
788 =back
789
790 =cut
791
792 sub _array_to_marc {
793     my ($self, $data) = @_;
794
795     my $record = MARC::Record->new();
796
797     $record->leader($data->{leader});
798     for my $field (@{$data->{fields}}) {
799         my $tag = (keys %{$field})[0];
800         $field = $field->{$tag};
801         my $marc_field;
802         if (ref($field) eq 'HASH') {
803             my @subfields;
804             foreach my $subfield (@{$field->{subfields}}) {
805                 my $code = (keys %{$subfield})[0];
806                 push @subfields, $code;
807                 push @subfields, $subfield->{$code};
808             }
809             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
810         } else {
811             $marc_field = MARC::Field->new($tag, $field)
812         }
813         $record->append_fields($marc_field);
814     }
815 ;
816     return $record;
817 }
818
819 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
820
821     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
822
823 Get mappings, an internal data structure later used by
824 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
825 data for a MARC mapping.
826
827 The returned C<$mappings> is not to to be confused with mappings provided by
828 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
829 provided by C<_foreach_mapping> and expands it to this internal data structure.
830 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
831 is then applied to each MARC target (leader, control field data, subfield or
832 joined subfields) and integrated into the mapping rules data structure used in
833 C<marc_records_to_documents> to transform MARC records into Elasticsearch
834 documents.
835
836 =over 4
837
838 =item C<$facet>
839
840 Boolean indicating whether to create a facet field for this mapping.
841
842 =item C<$suggestible>
843
844 Boolean indicating whether to create a suggestion field for this mapping.
845
846 =item C<$sort>
847
848 Boolean indicating whether to create a sort field for this mapping.
849
850 =item C<$search>
851
852 Boolean indicating whether to create a search field for this mapping.
853
854 =item C<$target_name>
855
856 Elasticsearch document target field name.
857
858 =item C<$target_type>
859
860 Elasticsearch document target field type.
861
862 =item C<$range>
863
864 An optional range as a string in the format "<START>-<END>" or "<START>",
865 where "<START>" and "<END>" are integers specifying a range that will be used
866 for extracting a substring from MARC data as Elasticsearch field target value.
867
868 The first character position is "0", and the range is inclusive,
869 so "0-2" means the first three characters of MARC data.
870
871 If only "<START>" is provided only one character at position "<START>" will
872 be extracted.
873
874 =back
875
876 =cut
877
878 sub _field_mappings {
879     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
880     my %mapping_defaults = ();
881     my @mappings;
882
883     my $substr_args = undef;
884     if (defined $range) {
885         # TODO: use value_callback instead?
886         my ($start, $end) = map(int, split /-/, $range, 2);
887         $substr_args = [$start];
888         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
889     }
890     my $default_options = {};
891     if ($substr_args) {
892         $default_options->{substr} = $substr_args;
893     }
894
895     # TODO: Should probably have per type value callback/hook
896     # but hard code for now
897     if ($target_type eq 'boolean') {
898         $default_options->{value_callbacks} //= [];
899         push @{$default_options->{value_callbacks}}, sub {
900             my ($value) = @_;
901             # Trim whitespace at both ends
902             $value =~ s/^\s+|\s+$//g;
903             return $value ? 'true' : 'false';
904         };
905     }
906     elsif ($target_type eq 'year') {
907         $default_options->{value_callbacks} //= [];
908         # Only accept years containing digits and "u"
909         push @{$default_options->{value_callbacks}}, sub {
910             my ($value) = @_;
911             # Replace "u" with "0" for sorting
912             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
913         };
914     }
915
916     if ($search) {
917         my $mapping = [$target_name, $default_options];
918         push @mappings, $mapping;
919     }
920
921     my @suffixes = ();
922     push @suffixes, 'facet' if $facet;
923     push @suffixes, 'suggestion' if $suggestible;
924     push @suffixes, 'sort' if !defined $sort || $sort;
925
926     foreach my $suffix (@suffixes) {
927         my $mapping = ["${target_name}__$suffix"];
928         # TODO: Hack, fix later in less hideous manner
929         if ($suffix eq 'suggestion') {
930             push @{$mapping}, {%{$default_options}, property => 'input'};
931         }
932         else {
933             # Important! Make shallow clone, or we end up with the same hashref
934             # shared by all mappings
935             push @{$mapping}, {%{$default_options}};
936         }
937         push @mappings, $mapping;
938     }
939     return @mappings;
940 };
941
942 =head2 _get_marc_mapping_rules
943
944     my $mapping_rules = $self->_get_marc_mapping_rules()
945
946 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
947
948 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
949 each call to C<MARC::Record>->field) we create an optimized structure of mapping
950 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
951
952 We can then iterate through all MARC fields for each record and apply all relevant
953 rules once per fields instead of retreiving fields multiple times for each mapping rule
954 which is terribly slow.
955
956 =cut
957
958 # TODO: This structure can be used for processing multiple MARC::Records so is currently
959 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
960 # memory cache which it is currently not. The performance gain of caching
961 # would probably be marginal, but to do this could be a further improvement.
962
963 sub _get_marc_mapping_rules {
964     my ($self) = @_;
965     my $marcflavour = lc C4::Context->preference('marcflavour');
966     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
967     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
968     my $rules = {
969         'leader' => [],
970         'control_fields' => {},
971         'data_fields' => {},
972         'sum' => [],
973         'isbn' => [],
974         'defaults' => {}
975     };
976
977     $self->_foreach_mapping(sub {
978         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
979         return if $marc_type ne $marcflavour;
980
981         if ($type eq 'sum') {
982             push @{$rules->{sum}}, $name;
983             push @{$rules->{sum}}, $name."__sort" if $sort;
984         }
985         elsif ($type eq 'isbn') {
986             push @{$rules->{isbn}}, $name;
987         }
988         elsif ($type eq 'boolean') {
989             # boolean gets special handling, if value doesn't exist for a field,
990             # it is set to false
991             $rules->{defaults}->{$name} = 'false';
992         }
993
994         if ($marc_field =~ $field_spec_regexp) {
995             my $field_tag = $1;
996
997             my @subfields;
998             my @subfield_groups;
999             # Parse and separate subfields form subfield groups
1000             if (defined $2) {
1001                 my $subfield_group = '';
1002                 my $open_group = 0;
1003
1004                 foreach my $token (split //, $2) {
1005                     if ($token eq "(") {
1006                         if ($open_group) {
1007                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1008                                 "Unmatched opening parenthesis for $marc_field"
1009                             );
1010                         }
1011                         else {
1012                             $open_group = 1;
1013                         }
1014                     }
1015                     elsif ($token eq ")") {
1016                         if ($open_group) {
1017                             if ($subfield_group) {
1018                                 push @subfield_groups, $subfield_group;
1019                                 $subfield_group = '';
1020                             }
1021                             $open_group = 0;
1022                         }
1023                         else {
1024                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1025                                 "Unmatched closing parenthesis for $marc_field"
1026                             );
1027                         }
1028                     }
1029                     elsif ($open_group) {
1030                         $subfield_group .= $token;
1031                     }
1032                     else {
1033                         push @subfields, $token;
1034                     }
1035                 }
1036             }
1037             else {
1038                 push @subfields, '*';
1039             }
1040
1041             my $range = defined $3 ? $3 : undef;
1042             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1043             if ($field_tag < 10) {
1044                 $rules->{control_fields}->{$field_tag} //= [];
1045                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1046             }
1047             else {
1048                 $rules->{data_fields}->{$field_tag} //= {};
1049                 foreach my $subfield (@subfields) {
1050                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1051                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1052                 }
1053                 foreach my $subfield_group (@subfield_groups) {
1054                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1055                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1056                 }
1057             }
1058         }
1059         elsif ($marc_field =~ $leader_regexp) {
1060             my $range = defined $1 ? $1 : undef;
1061             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1062             push @{$rules->{leader}}, @mappings;
1063         }
1064         else {
1065             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1066                 "Invalid MARC field expression: $marc_field"
1067             );
1068         }
1069     });
1070
1071     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1072     if ($marcflavour eq 'marc21') {
1073         # Nonfiling characters processing for sort fields
1074         my %title_fields;
1075         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1076             # Format is: nonfiling characters indicator => field names list
1077             %title_fields = (
1078                 1 => [130, 630, 730, 740],
1079                 2 => [222, 240, 242, 243, 245, 440, 830]
1080             );
1081         }
1082         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1083             %title_fields = (
1084                 1 => [730],
1085                 2 => [130, 430, 530]
1086             );
1087         }
1088         foreach my $indicator (keys %title_fields) {
1089             foreach my $field_tag (@{$title_fields{$indicator}}) {
1090                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1091                 foreach my $mapping (@{$mappings}) {
1092                     if ($mapping->[0] =~ /__sort$/) {
1093                         # Mark this as to be processed for nonfiling characters indicator
1094                         # later on in _process_mappings
1095                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1096                     }
1097                 }
1098             }
1099         }
1100     }
1101
1102     return $rules;
1103 }
1104
1105 =head2 _foreach_mapping
1106
1107     $self->_foreach_mapping(
1108         sub {
1109             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1110                 $marc_field )
1111               = @_;
1112             return unless $marc_type eq 'marc21';
1113             print "Data comes from: " . $marc_field . "\n";
1114         }
1115     );
1116
1117 This allows you to apply a function to each entry in the elasticsearch mappings
1118 table, in order to build the mappings for whatever is needed.
1119
1120 In the provided function, the files are:
1121
1122 =over 4
1123
1124 =item C<$name>
1125
1126 The field name for elasticsearch (corresponds to the 'mapping' column in the
1127 database.
1128
1129 =item C<$type>
1130
1131 The type for this value, e.g. 'string'.
1132
1133 =item C<$facet>
1134
1135 True if this value should be facetised. This only really makes sense if the
1136 field is understood by the facet processing code anyway.
1137
1138 =item C<$sort>
1139
1140 True if this is a field that a) needs special sort handling, and b) if it
1141 should be sorted on. False if a) but not b). Undef if not a). This allows,
1142 for example, author to be sorted on but not everything marked with "author"
1143 to be included in that sort.
1144
1145 =item C<$marc_type>
1146
1147 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1148 'unimarc', 'normarc'.
1149
1150 =item C<$marc_field>
1151
1152 A string that describes the MARC field that contains the data to extract.
1153
1154 =back
1155
1156 =cut
1157
1158 sub _foreach_mapping {
1159     my ( $self, $sub ) = @_;
1160
1161     # TODO use a caching framework here
1162     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1163         {
1164             'search_marc_map.index_name' => $self->index,
1165         },
1166         {   join => { search_marc_to_fields => 'search_marc_map' },
1167             '+select' => [
1168                 'search_marc_to_fields.facet',
1169                 'search_marc_to_fields.suggestible',
1170                 'search_marc_to_fields.sort',
1171                 'search_marc_to_fields.search',
1172                 'search_marc_map.marc_type',
1173                 'search_marc_map.marc_field',
1174             ],
1175             '+as'     => [
1176                 'facet',
1177                 'suggestible',
1178                 'sort',
1179                 'search',
1180                 'marc_type',
1181                 'marc_field',
1182             ],
1183         }
1184     );
1185
1186     while ( my $search_field = $search_fields->next ) {
1187         $sub->(
1188             # Force lower case on indexed field names for case insensitive
1189             # field name searches
1190             lc($search_field->name),
1191             $search_field->type,
1192             $search_field->get_column('facet'),
1193             $search_field->get_column('suggestible'),
1194             $search_field->get_column('sort'),
1195             $search_field->get_column('search'),
1196             $search_field->get_column('marc_type'),
1197             $search_field->get_column('marc_field'),
1198         );
1199     }
1200 }
1201
1202 =head2 process_error
1203
1204     die process_error($@);
1205
1206 This parses an Elasticsearch error message and produces a human-readable
1207 result from it. This result is probably missing all the useful information
1208 that you might want in diagnosing an issue, so the warning is also logged.
1209
1210 Note that currently the resulting message is not internationalised. This
1211 will happen eventually by some method or other.
1212
1213 =cut
1214
1215 sub process_error {
1216     my ($self, $msg) = @_;
1217
1218     warn $msg; # simple logging
1219
1220     # This is super-primitive
1221     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1222
1223     return "Unable to perform your search. Please try again.\n";
1224 }
1225
1226 =head2 _read_configuration
1227
1228     my $conf = _read_configuration();
1229
1230 Reads the I<configuration file> and returns a hash structure with the
1231 configuration information. It raises an exception if mandatory entries
1232 are missing.
1233
1234 The hashref structure has the following form:
1235
1236     {
1237         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1238         'index_name' => 'koha_instance',
1239     }
1240
1241 This is configured by the following in the C<config> block in koha-conf.xml:
1242
1243     <elasticsearch>
1244         <server>127.0.0.1:9200</server>
1245         <server>anotherserver:9200</server>
1246         <index_name>koha_instance</index_name>
1247     </elasticsearch>
1248
1249 =cut
1250
1251 sub _read_configuration {
1252
1253     my $configuration;
1254
1255     my $conf = C4::Context->config('elasticsearch');
1256     unless ( defined $conf ) {
1257         Koha::Exceptions::Config::MissingEntry->throw(
1258             "Missing <elasticsearch> entry in koha-conf.xml"
1259         );
1260     }
1261
1262     if ( $conf && $conf->{server} ) {
1263         my $nodes = $conf->{server};
1264         if ( ref($nodes) eq 'ARRAY' ) {
1265             $configuration->{nodes} = $nodes;
1266         }
1267         else {
1268             $configuration->{nodes} = [$nodes];
1269         }
1270     }
1271     else {
1272         Koha::Exceptions::Config::MissingEntry->throw(
1273             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1274         );
1275     }
1276
1277     if ( defined $conf->{index_name} ) {
1278         $configuration->{index_name} = $conf->{index_name};
1279     }
1280     else {
1281         Koha::Exceptions::Config::MissingEntry->throw(
1282             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1283         );
1284     }
1285
1286     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1287
1288     return $configuration;
1289 }
1290
1291 =head2 get_facetable_fields
1292
1293 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1294
1295 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1296
1297 =cut
1298
1299 sub get_facetable_fields {
1300     my ($self) = @_;
1301
1302     # These should correspond to the ES field names, as opposed to the CCL
1303     # things that zebra uses.
1304     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1305     my @faceted_fields = Koha::SearchFields->search(
1306         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1307     );
1308     my @not_faceted_fields = Koha::SearchFields->search(
1309         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1310     );
1311     # This could certainly be improved
1312     return ( @faceted_fields, @not_faceted_fields );
1313 }
1314
1315 =head2 clear_search_fields_cache
1316
1317 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1318
1319 Clear cached values for ES search fields
1320
1321 =cut
1322
1323 sub clear_search_fields_cache {
1324
1325     my $cache = Koha::Caches->get_instance();
1326     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1327     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1328     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1329     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1330
1331 }
1332
1333 1;
1334
1335 __END__
1336
1337 =head1 AUTHOR
1338
1339 =over 4
1340
1341 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1342
1343 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1344
1345 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1346
1347 =back
1348
1349 =cut