Bug 26082: Call store on new items to update itemnumber
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31
32 use Carp;
33 use Clone qw(clone);
34 use JSON;
35 use Modern::Perl;
36 use Readonly;
37 use Search::Elasticsearch;
38 use Try::Tiny;
39 use YAML::Syck;
40
41 use List::Util qw( sum0 reduce );
42 use MARC::File::XML;
43 use MIME::Base64;
44 use Encode qw(encode);
45 use Business::ISBN;
46 use Scalar::Util qw(looks_like_number);
47
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =item index_name
68
69 The Elasticsearch index name with Koha instance prefix.
70
71 =back
72
73
74 =head1 FUNCTIONS
75
76 =cut
77
78 sub new {
79     my $class = shift @_;
80     my ($params) = @_;
81
82     # Check for a valid index
83     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84     my $config = _read_configuration();
85     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
86
87     my $self = $class->SUPER::new(@_);
88     return $self;
89 }
90
91 =head2 get_elasticsearch
92
93     my $elasticsearch_client = $self->get_elasticsearch();
94
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
97
98 =cut
99
100 sub get_elasticsearch {
101     my $self = shift @_;
102     unless (defined $self->{elasticsearch}) {
103         $self->{elasticsearch} = Search::Elasticsearch->new(
104             $self->get_elasticsearch_params()
105         );
106     }
107     return $self->{elasticsearch};
108 }
109
110 =head2 get_elasticsearch_params
111
112     my $params = $self->get_elasticsearch_params();
113
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
116
117     {
118         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119         'index_name' => 'koha_instance_index',
120     }
121
122 This is configured by the following in the C<config> block in koha-conf.xml:
123
124     <elasticsearch>
125         <server>127.0.0.1:9200</server>
126         <server>anotherserver:9200</server>
127         <index_name>koha_instance</index_name>
128     </elasticsearch>
129
130 =cut
131
132 sub get_elasticsearch_params {
133     my ($self) = @_;
134
135     my $conf;
136     try {
137         $conf = _read_configuration();
138     } catch {
139         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
140             croak($_->message);
141         }
142     };
143     # Extract relevant parts of configuration
144     my $params = {
145         nodes => $conf->{nodes}
146     };
147     $params->{cxn_pool} //= 'Static';
148
149     return $params;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         # Clone the general mapping to break ties with the original hash
196         my $mappings = {
197             data => clone(_get_elasticsearch_field_config('general', ''))
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 if ($search) {
219                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
220                 }
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 raw_elasticsearch_mappings
245
246 Return elasticsearch mapping as it is in database.
247 marc_type: marc21|unimarc|normarc
248
249 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
250
251 =cut
252
253 sub raw_elasticsearch_mappings {
254     my ( $marc_type ) = @_;
255
256     my $schema = Koha::Database->new()->schema();
257
258     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
259
260     my $mappings = {};
261     while ( my $search_field = $search_fields->next ) {
262
263         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
264             { search_field_id => $search_field->id },
265             {
266                 join     => 'search_marc_map',
267                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
268             }
269         );
270
271         while ( my $marc_to_field = $marc_to_fields->next ) {
272
273             my $marc_map = $marc_to_field->search_marc_map;
274
275             next if $marc_type && $marc_map->marc_type ne $marc_type;
276
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
283
284             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
285                 {
286                     facet   => $marc_to_field->facet || '',
287                     marc_type => $marc_map->marc_type,
288                     marc_field => $marc_map->marc_field,
289                     sort        => $marc_to_field->sort,
290                     suggestible => $marc_to_field->suggestible || ''
291                 });
292
293         }
294     }
295
296     return $mappings;
297 }
298
299 =head2 _get_elasticsearch_field_config
300
301 Get the Elasticsearch field config for the given purpose and data type.
302
303 $mapping = _get_elasticsearch_field_config('search', 'text');
304
305 =cut
306
307 sub _get_elasticsearch_field_config {
308
309     my ( $purpose, $type ) = @_;
310
311     # Use state to speed up repeated calls
312     state $settings = undef;
313     if (!defined $settings) {
314         my $config_file = C4::Context->config('elasticsearch_field_config');
315         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
316         $settings = LoadFile( $config_file );
317     }
318
319     if (!defined $settings->{$purpose}) {
320         die "Field purpose $purpose not defined in field config";
321     }
322     if ($type eq '') {
323         return $settings->{$purpose};
324     }
325     if (defined $settings->{$purpose}{$type}) {
326         return $settings->{$purpose}{$type};
327     }
328     if (defined $settings->{$purpose}{'default'}) {
329         return $settings->{$purpose}{'default'};
330     }
331     return;
332 }
333
334 =head2 _load_elasticsearch_mappings
335
336 Load Elasticsearch mappings in the format of mappings.yaml.
337
338 $indexes = _load_elasticsearch_mappings();
339
340 =cut
341
342 sub _load_elasticsearch_mappings {
343     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345     return LoadFile( $mappings_yaml );
346 }
347
348 sub reset_elasticsearch_mappings {
349     my ( $self ) = @_;
350     my $indexes = $self->_load_elasticsearch_mappings();
351
352     Koha::SearchMarcMaps->delete;
353     Koha::SearchFields->delete;
354
355     while ( my ( $index_name, $fields ) = each %$indexes ) {
356         while ( my ( $field_name, $data ) = each %$fields ) {
357
358             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
359
360             # Set default values
361             $sf_params{staff_client} //= 1;
362             $sf_params{opac} //= 1;
363
364             $sf_params{name} = $field_name;
365
366             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367
368             my $mappings = $data->{mappings};
369             for my $mapping ( @$mappings ) {
370                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371                     index_name => $index_name,
372                     marc_type => $mapping->{marc_type},
373                     marc_field => $mapping->{marc_field}
374                 });
375                 $search_field->add_to_search_marc_maps($marc_field, {
376                     facet => $mapping->{facet} || 0,
377                     suggestible => $mapping->{suggestible} || 0,
378                     sort => $mapping->{sort},
379                     search => $mapping->{search} // 1
380                 });
381             }
382         }
383     }
384
385     $self->clear_search_fields_cache();
386
387     # FIXME return the mappings?
388 }
389
390 # This overrides the accessor provided by Class::Accessor so that if
391 # sort_fields isn't set, then it'll generate it.
392 sub sort_fields {
393     my $self = shift;
394     if (@_) {
395         $self->_sort_fields_accessor(@_);
396         return;
397     }
398     my $val = $self->_sort_fields_accessor();
399     return $val if $val;
400
401     # This will populate the accessor as a side effect
402     $self->get_elasticsearch_mappings();
403     return $self->_sort_fields_accessor();
404 }
405
406 =head2 _process_mappings($mappings, $data, $record_document, $meta)
407
408     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
409
410 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
411 Since we group all mappings by MARC field targets C<$mappings> will contain
412 all targets for C<$data> and thus we need to fetch the MARC field only once.
413 C<$mappings> will be applied to C<$record_document> and new field values added.
414 The method has no return value.
415
416 =over 4
417
418 =item C<$mappings>
419
420 Arrayref of mappings containing arrayrefs in the format
421 [C<$target>, C<$options>] where C<$target> is the name of the target field and
422 C<$options> is a hashref containing processing directives for this particular
423 mapping.
424
425 =item C<$data>
426
427 The source data from a MARC record field.
428
429 =item C<$record_document>
430
431 Hashref representing the Elasticsearch document on which mappings should be
432 applied.
433
434 =item C<$meta>
435
436 A hashref containing metadata useful for enforcing per mapping rules. For
437 example for providing extra context for mapping options, or treating mapping
438 targets differently depending on type (sort, search, facet etc). Combining
439 this metadata with the mapping options and metadata allows us to mutate the
440 data per mapping, or even replace it with other data retrieved from the
441 metadata context.
442
443 Current properties are:
444
445 C<altscript>: A boolean value indicating whether an alternate script presentation is being
446 processed.
447
448 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
449 'subfield' or 'subfields_group'.
450
451 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
452
453 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
454 is 'subfields_group'.
455
456 C<field>: The original C<MARC::Record> object.
457
458 =back
459
460 =cut
461
462 sub _process_mappings {
463     my ($_self, $mappings, $data, $record_document, $meta) = @_;
464     foreach my $mapping (@{$mappings}) {
465         my ($target, $options) = @{$mapping};
466
467         # Don't process sort fields for alternate scripts
468         my $sort = $target =~ /__sort$/;
469         if ($sort && $meta->{altscript}) {
470             next;
471         }
472
473         # Copy (scalar) data since can have multiple targets
474         # with differing options for (possibly) mutating data
475         # so need a different copy for each
476         my $_data = $data;
477         $record_document->{$target} //= [];
478         if (defined $options->{substr}) {
479             my ($start, $length) = @{$options->{substr}};
480             $_data = length($data) > $start ? substr $data, $start, $length : '';
481         }
482         if (defined $options->{value_callbacks}) {
483             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
484         }
485         if (defined $options->{property}) {
486             $_data = {
487                 $options->{property} => $_data
488             }
489         }
490         if (defined $options->{nonfiling_characters_indicator}) {
491             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
492             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
493             if ($nonfiling_chars) {
494                 $_data = substr $_data, $nonfiling_chars;
495             }
496         }
497         push @{$record_document->{$target}}, $_data;
498     }
499 }
500
501 =head2 marc_records_to_documents($marc_records)
502
503     my $record_documents = $self->marc_records_to_documents($marc_records);
504
505 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
506
507 Returns array of hash references, representing Elasticsearch documents,
508 acceptable as body payload in C<Search::Elasticsearch> requests.
509
510 =over 4
511
512 =item C<$marc_documents>
513
514 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
515
516 =back
517
518 =cut
519
520 sub marc_records_to_documents {
521     my ($self, $records) = @_;
522     my $rules = $self->_get_marc_mapping_rules();
523     my $control_fields_rules = $rules->{control_fields};
524     my $data_fields_rules = $rules->{data_fields};
525     my $marcflavour = lc C4::Context->preference('marcflavour');
526     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
527
528     my @record_documents;
529
530     foreach my $record (@{$records}) {
531         my $record_document = {};
532         my $mappings = $rules->{leader};
533         if ($mappings) {
534             $self->_process_mappings($mappings, $record->leader(), $record_document, {
535                     altscript => 0,
536                     data_source => 'leader'
537                 }
538             );
539         }
540         foreach my $field ($record->fields()) {
541             if ($field->is_control_field()) {
542                 my $mappings = $control_fields_rules->{$field->tag()};
543                 if ($mappings) {
544                     $self->_process_mappings($mappings, $field->data(), $record_document, {
545                             altscript => 0,
546                             data_source => 'control_field',
547                             field => $field
548                         }
549                     );
550                 }
551             }
552             else {
553                 my $tag = $field->tag();
554                 # Handle alternate scripts in MARC 21
555                 my $altscript = 0;
556                 if ($marcflavour eq 'marc21' && $tag eq '880') {
557                     my $sub6 = $field->subfield('6');
558                     if ($sub6 =~ /^(...)-\d+/) {
559                         $tag = $1;
560                         $altscript = 1;
561                     }
562                 }
563
564                 my $data_field_rules = $data_fields_rules->{$tag};
565                 if ($data_field_rules) {
566                     my $subfields_mappings = $data_field_rules->{subfields};
567                     my $wildcard_mappings = $subfields_mappings->{'*'};
568                     foreach my $subfield ($field->subfields()) {
569                         my ($code, $data) = @{$subfield};
570                         my $mappings = $subfields_mappings->{$code} // [];
571                         if ($wildcard_mappings) {
572                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
573                         }
574                         if (@{$mappings}) {
575                             $self->_process_mappings($mappings, $data, $record_document, {
576                                     altscript => $altscript,
577                                     data_source => 'subfield',
578                                     code => $code,
579                                     field => $field
580                                 }
581                             );
582                         }
583                         if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
584                             # Used by the authority linker the match-heading field requires a specific syntax
585                             # that is specified in C4/Heading
586                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
587                             next unless $heading;
588                             push @{$record_document->{'match-heading'}}, $heading->search_form;
589                         }
590                     }
591
592                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
593                     if ($subfields_join_mappings) {
594                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
595                             # Map each subfield to values, remove empty values, join with space
596                             my $data = join(
597                                 ' ',
598                                 grep(
599                                     $_,
600                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
601                                 )
602                             );
603                             if ($data) {
604                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
605                                         altscript => $altscript,
606                                         data_source => 'subfields_group',
607                                         codes => $subfields_group,
608                                         field => $field
609                                     }
610                                 );
611                             }
612                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
613                                 # Used by the authority linker the match-heading field requires a specific syntax
614                                 # that is specified in C4/Heading
615                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
616                                 next unless $heading;
617                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
618                             }
619                         }
620                     }
621                 }
622             }
623         }
624         foreach my $field (keys %{$rules->{defaults}}) {
625             unless (defined $record_document->{$field}) {
626                 $record_document->{$field} = $rules->{defaults}->{$field};
627             }
628         }
629         foreach my $field (@{$rules->{sum}}) {
630             if (defined $record_document->{$field}) {
631                 # TODO: validate numeric? filter?
632                 # TODO: Or should only accept fields without nested values?
633                 # TODO: Quick and dirty, improve if needed
634                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
635             }
636         }
637         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
638         foreach my $field (@{$rules->{isbn}}) {
639             if (defined $record_document->{$field}) {
640                 my @isbns = ();
641                 foreach my $input_isbn (@{$record_document->{$field}}) {
642                     my $isbn = Business::ISBN->new($input_isbn);
643                     if (defined $isbn && $isbn->is_valid) {
644                         my $isbn13 = $isbn->as_isbn13->as_string;
645                         push @isbns, $isbn13;
646                         $isbn13 =~ s/\-//g;
647                         push @isbns, $isbn13;
648
649                         my $isbn10 = $isbn->as_isbn10;
650                         if ($isbn10) {
651                             $isbn10 = $isbn10->as_string;
652                             push @isbns, $isbn10;
653                             $isbn10 =~ s/\-//g;
654                             push @isbns, $isbn10;
655                         }
656                     } else {
657                         push @isbns, $input_isbn;
658                     }
659                 }
660                 $record_document->{$field} = \@isbns;
661             }
662         }
663
664         # Remove duplicate values and collapse sort fields
665         foreach my $field (keys %{$record_document}) {
666             if (ref($record_document->{$field}) eq 'ARRAY') {
667                 @{$record_document->{$field}} = do {
668                     my %seen;
669                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
670                 };
671                 if ($field =~ /__sort$/) {
672                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
673                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
674                 }
675             }
676         }
677
678         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
679         $record->encoding('UTF-8');
680         if ($use_array) {
681             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
682             $record_document->{'marc_format'} = 'ARRAY';
683         } else {
684             my @warnings;
685             {
686                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
687                 local $SIG{__WARN__} = sub {
688                     push @warnings, $_[0];
689                 };
690                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
691             }
692             if (@warnings) {
693                 # Suppress warnings if record length exceeded
694                 unless (substr($record->leader(), 0, 5) eq '99999') {
695                     foreach my $warning (@warnings) {
696                         carp $warning;
697                     }
698                 }
699                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
700                 $record_document->{'marc_format'} = 'MARCXML';
701             }
702             else {
703                 $record_document->{'marc_format'} = 'base64ISO2709';
704             }
705         }
706         push @record_documents, $record_document;
707     }
708     return \@record_documents;
709 }
710
711 =head2 _marc_to_array($record)
712
713     my @fields = _marc_to_array($record)
714
715 Convert a MARC::Record to an array modeled after MARC-in-JSON
716 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
717
718 =over 4
719
720 =item C<$record>
721
722 A MARC::Record object
723
724 =back
725
726 =cut
727
728 sub _marc_to_array {
729     my ($self, $record) = @_;
730
731     my $data = {
732         leader => $record->leader(),
733         fields => []
734     };
735     for my $field ($record->fields()) {
736         my $tag = $field->tag();
737         if ($field->is_control_field()) {
738             push @{$data->{fields}}, {$tag => $field->data()};
739         } else {
740             my $subfields = ();
741             foreach my $subfield ($field->subfields()) {
742                 my ($code, $contents) = @{$subfield};
743                 push @{$subfields}, {$code => $contents};
744             }
745             push @{$data->{fields}}, {
746                 $tag => {
747                     ind1 => $field->indicator(1),
748                     ind2 => $field->indicator(2),
749                     subfields => $subfields
750                 }
751             };
752         }
753     }
754     return $data;
755 }
756
757 =head2 _array_to_marc($data)
758
759     my $record = _array_to_marc($data)
760
761 Convert an array modeled after MARC-in-JSON to a MARC::Record
762
763 =over 4
764
765 =item C<$data>
766
767 An array modeled after MARC-in-JSON
768 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
769
770 =back
771
772 =cut
773
774 sub _array_to_marc {
775     my ($self, $data) = @_;
776
777     my $record = MARC::Record->new();
778
779     $record->leader($data->{leader});
780     for my $field (@{$data->{fields}}) {
781         my $tag = (keys %{$field})[0];
782         $field = $field->{$tag};
783         my $marc_field;
784         if (ref($field) eq 'HASH') {
785             my @subfields;
786             foreach my $subfield (@{$field->{subfields}}) {
787                 my $code = (keys %{$subfield})[0];
788                 push @subfields, $code;
789                 push @subfields, $subfield->{$code};
790             }
791             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
792         } else {
793             $marc_field = MARC::Field->new($tag, $field)
794         }
795         $record->append_fields($marc_field);
796     }
797 ;
798     return $record;
799 }
800
801 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
802
803     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
804
805 Get mappings, an internal data structure later used by
806 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
807 data for a MARC mapping.
808
809 The returned C<$mappings> is not to to be confused with mappings provided by
810 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
811 provided by C<_foreach_mapping> and expands it to this internal data structure.
812 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
813 is then applied to each MARC target (leader, control field data, subfield or
814 joined subfields) and integrated into the mapping rules data structure used in
815 C<marc_records_to_documents> to transform MARC records into Elasticsearch
816 documents.
817
818 =over 4
819
820 =item C<$facet>
821
822 Boolean indicating whether to create a facet field for this mapping.
823
824 =item C<$suggestible>
825
826 Boolean indicating whether to create a suggestion field for this mapping.
827
828 =item C<$sort>
829
830 Boolean indicating whether to create a sort field for this mapping.
831
832 =item C<$search>
833
834 Boolean indicating whether to create a search field for this mapping.
835
836 =item C<$target_name>
837
838 Elasticsearch document target field name.
839
840 =item C<$target_type>
841
842 Elasticsearch document target field type.
843
844 =item C<$range>
845
846 An optional range as a string in the format "<START>-<END>" or "<START>",
847 where "<START>" and "<END>" are integers specifying a range that will be used
848 for extracting a substring from MARC data as Elasticsearch field target value.
849
850 The first character position is "0", and the range is inclusive,
851 so "0-2" means the first three characters of MARC data.
852
853 If only "<START>" is provided only one character at position "<START>" will
854 be extracted.
855
856 =back
857
858 =cut
859
860 sub _field_mappings {
861     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
862     my %mapping_defaults = ();
863     my @mappings;
864
865     my $substr_args = undef;
866     if (defined $range) {
867         # TODO: use value_callback instead?
868         my ($start, $end) = map(int, split /-/, $range, 2);
869         $substr_args = [$start];
870         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
871     }
872     my $default_options = {};
873     if ($substr_args) {
874         $default_options->{substr} = $substr_args;
875     }
876
877     # TODO: Should probably have per type value callback/hook
878     # but hard code for now
879     if ($target_type eq 'boolean') {
880         $default_options->{value_callbacks} //= [];
881         push @{$default_options->{value_callbacks}}, sub {
882             my ($value) = @_;
883             # Trim whitespace at both ends
884             $value =~ s/^\s+|\s+$//g;
885             return $value ? 'true' : 'false';
886         };
887     }
888
889     if ($search) {
890         my $mapping = [$target_name, $default_options];
891         push @mappings, $mapping;
892     }
893
894     my @suffixes = ();
895     push @suffixes, 'facet' if $facet;
896     push @suffixes, 'suggestion' if $suggestible;
897     push @suffixes, 'sort' if !defined $sort || $sort;
898
899     foreach my $suffix (@suffixes) {
900         my $mapping = ["${target_name}__$suffix"];
901         # TODO: Hack, fix later in less hideous manner
902         if ($suffix eq 'suggestion') {
903             push @{$mapping}, {%{$default_options}, property => 'input'};
904         }
905         else {
906             # Important! Make shallow clone, or we end up with the same hashref
907             # shared by all mappings
908             push @{$mapping}, {%{$default_options}};
909         }
910         push @mappings, $mapping;
911     }
912     return @mappings;
913 };
914
915 =head2 _get_marc_mapping_rules
916
917     my $mapping_rules = $self->_get_marc_mapping_rules()
918
919 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
920
921 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
922 each call to C<MARC::Record>->field) we create an optimized structure of mapping
923 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
924
925 We can then iterate through all MARC fields for each record and apply all relevant
926 rules once per fields instead of retreiving fields multiple times for each mapping rule
927 which is terribly slow.
928
929 =cut
930
931 # TODO: This structure can be used for processing multiple MARC::Records so is currently
932 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
933 # memory cache which it is currently not. The performance gain of caching
934 # would probably be marginal, but to do this could be a further improvement.
935
936 sub _get_marc_mapping_rules {
937     my ($self) = @_;
938     my $marcflavour = lc C4::Context->preference('marcflavour');
939     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
940     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
941     my $rules = {
942         'leader' => [],
943         'control_fields' => {},
944         'data_fields' => {},
945         'sum' => [],
946         'isbn' => [],
947         'defaults' => {}
948     };
949
950     $self->_foreach_mapping(sub {
951         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
952         return if $marc_type ne $marcflavour;
953
954         if ($type eq 'sum') {
955             push @{$rules->{sum}}, $name;
956             push @{$rules->{sum}}, $name."__sort" if $sort;
957         }
958         elsif ($type eq 'isbn') {
959             push @{$rules->{isbn}}, $name;
960         }
961         elsif ($type eq 'boolean') {
962             # boolean gets special handling, if value doesn't exist for a field,
963             # it is set to false
964             $rules->{defaults}->{$name} = 'false';
965         }
966
967         if ($marc_field =~ $field_spec_regexp) {
968             my $field_tag = $1;
969
970             my @subfields;
971             my @subfield_groups;
972             # Parse and separate subfields form subfield groups
973             if (defined $2) {
974                 my $subfield_group = '';
975                 my $open_group = 0;
976
977                 foreach my $token (split //, $2) {
978                     if ($token eq "(") {
979                         if ($open_group) {
980                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
981                                 "Unmatched opening parenthesis for $marc_field"
982                             );
983                         }
984                         else {
985                             $open_group = 1;
986                         }
987                     }
988                     elsif ($token eq ")") {
989                         if ($open_group) {
990                             if ($subfield_group) {
991                                 push @subfield_groups, $subfield_group;
992                                 $subfield_group = '';
993                             }
994                             $open_group = 0;
995                         }
996                         else {
997                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
998                                 "Unmatched closing parenthesis for $marc_field"
999                             );
1000                         }
1001                     }
1002                     elsif ($open_group) {
1003                         $subfield_group .= $token;
1004                     }
1005                     else {
1006                         push @subfields, $token;
1007                     }
1008                 }
1009             }
1010             else {
1011                 push @subfields, '*';
1012             }
1013
1014             my $range = defined $3 ? $3 : undef;
1015             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1016             if ($field_tag < 10) {
1017                 $rules->{control_fields}->{$field_tag} //= [];
1018                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1019             }
1020             else {
1021                 $rules->{data_fields}->{$field_tag} //= {};
1022                 foreach my $subfield (@subfields) {
1023                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1024                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1025                 }
1026                 foreach my $subfield_group (@subfield_groups) {
1027                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1028                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1029                 }
1030             }
1031         }
1032         elsif ($marc_field =~ $leader_regexp) {
1033             my $range = defined $1 ? $1 : undef;
1034             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1035             push @{$rules->{leader}}, @mappings;
1036         }
1037         else {
1038             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1039                 "Invalid MARC field expression: $marc_field"
1040             );
1041         }
1042     });
1043
1044     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1045     if ($marcflavour eq 'marc21') {
1046         # Nonfiling characters processing for sort fields
1047         my %title_fields;
1048         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1049             # Format is: nonfiling characters indicator => field names list
1050             %title_fields = (
1051                 1 => [130, 630, 730, 740],
1052                 2 => [222, 240, 242, 243, 245, 440, 830]
1053             );
1054         }
1055         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1056             %title_fields = (
1057                 1 => [730],
1058                 2 => [130, 430, 530]
1059             );
1060         }
1061         foreach my $indicator (keys %title_fields) {
1062             foreach my $field_tag (@{$title_fields{$indicator}}) {
1063                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1064                 foreach my $mapping (@{$mappings}) {
1065                     if ($mapping->[0] =~ /__sort$/) {
1066                         # Mark this as to be processed for nonfiling characters indicator
1067                         # later on in _process_mappings
1068                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1069                     }
1070                 }
1071             }
1072         }
1073     }
1074
1075     return $rules;
1076 }
1077
1078 =head2 _foreach_mapping
1079
1080     $self->_foreach_mapping(
1081         sub {
1082             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1083                 $marc_field )
1084               = @_;
1085             return unless $marc_type eq 'marc21';
1086             print "Data comes from: " . $marc_field . "\n";
1087         }
1088     );
1089
1090 This allows you to apply a function to each entry in the elasticsearch mappings
1091 table, in order to build the mappings for whatever is needed.
1092
1093 In the provided function, the files are:
1094
1095 =over 4
1096
1097 =item C<$name>
1098
1099 The field name for elasticsearch (corresponds to the 'mapping' column in the
1100 database.
1101
1102 =item C<$type>
1103
1104 The type for this value, e.g. 'string'.
1105
1106 =item C<$facet>
1107
1108 True if this value should be facetised. This only really makes sense if the
1109 field is understood by the facet processing code anyway.
1110
1111 =item C<$sort>
1112
1113 True if this is a field that a) needs special sort handling, and b) if it
1114 should be sorted on. False if a) but not b). Undef if not a). This allows,
1115 for example, author to be sorted on but not everything marked with "author"
1116 to be included in that sort.
1117
1118 =item C<$marc_type>
1119
1120 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1121 'unimarc', 'normarc'.
1122
1123 =item C<$marc_field>
1124
1125 A string that describes the MARC field that contains the data to extract.
1126
1127 =back
1128
1129 =cut
1130
1131 sub _foreach_mapping {
1132     my ( $self, $sub ) = @_;
1133
1134     # TODO use a caching framework here
1135     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1136         {
1137             'search_marc_map.index_name' => $self->index,
1138         },
1139         {   join => { search_marc_to_fields => 'search_marc_map' },
1140             '+select' => [
1141                 'search_marc_to_fields.facet',
1142                 'search_marc_to_fields.suggestible',
1143                 'search_marc_to_fields.sort',
1144                 'search_marc_to_fields.search',
1145                 'search_marc_map.marc_type',
1146                 'search_marc_map.marc_field',
1147             ],
1148             '+as'     => [
1149                 'facet',
1150                 'suggestible',
1151                 'sort',
1152                 'search',
1153                 'marc_type',
1154                 'marc_field',
1155             ],
1156         }
1157     );
1158
1159     while ( my $search_field = $search_fields->next ) {
1160         $sub->(
1161             # Force lower case on indexed field names for case insensitive
1162             # field name searches
1163             lc($search_field->name),
1164             $search_field->type,
1165             $search_field->get_column('facet'),
1166             $search_field->get_column('suggestible'),
1167             $search_field->get_column('sort'),
1168             $search_field->get_column('search'),
1169             $search_field->get_column('marc_type'),
1170             $search_field->get_column('marc_field'),
1171         );
1172     }
1173 }
1174
1175 =head2 process_error
1176
1177     die process_error($@);
1178
1179 This parses an Elasticsearch error message and produces a human-readable
1180 result from it. This result is probably missing all the useful information
1181 that you might want in diagnosing an issue, so the warning is also logged.
1182
1183 Note that currently the resulting message is not internationalised. This
1184 will happen eventually by some method or other.
1185
1186 =cut
1187
1188 sub process_error {
1189     my ($self, $msg) = @_;
1190
1191     warn $msg; # simple logging
1192
1193     # This is super-primitive
1194     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1195
1196     return "Unable to perform your search. Please try again.\n";
1197 }
1198
1199 =head2 _read_configuration
1200
1201     my $conf = _read_configuration();
1202
1203 Reads the I<configuration file> and returns a hash structure with the
1204 configuration information. It raises an exception if mandatory entries
1205 are missing.
1206
1207 The hashref structure has the following form:
1208
1209     {
1210         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1211         'index_name' => 'koha_instance',
1212     }
1213
1214 This is configured by the following in the C<config> block in koha-conf.xml:
1215
1216     <elasticsearch>
1217         <server>127.0.0.1:9200</server>
1218         <server>anotherserver:9200</server>
1219         <index_name>koha_instance</index_name>
1220     </elasticsearch>
1221
1222 =cut
1223
1224 sub _read_configuration {
1225
1226     my $configuration;
1227
1228     my $conf = C4::Context->config('elasticsearch');
1229     unless ( defined $conf ) {
1230         Koha::Exceptions::Config::MissingEntry->throw(
1231             "Missing <elasticsearch> entry in koha-conf.xml"
1232         );
1233     }
1234
1235     if ( $conf && $conf->{server} ) {
1236         my $nodes = $conf->{server};
1237         if ( ref($nodes) eq 'ARRAY' ) {
1238             $configuration->{nodes} = $nodes;
1239         }
1240         else {
1241             $configuration->{nodes} = [$nodes];
1242         }
1243     }
1244     else {
1245         Koha::Exceptions::Config::MissingEntry->throw(
1246             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1247         );
1248     }
1249
1250     if ( defined $conf->{index_name} ) {
1251         $configuration->{index_name} = $conf->{index_name};
1252     }
1253     else {
1254         Koha::Exceptions::Config::MissingEntry->throw(
1255             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1256         );
1257     }
1258
1259     return $configuration;
1260 }
1261
1262 =head2 get_facetable_fields
1263
1264 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1265
1266 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1267
1268 =cut
1269
1270 sub get_facetable_fields {
1271     my ($self) = @_;
1272
1273     # These should correspond to the ES field names, as opposed to the CCL
1274     # things that zebra uses.
1275     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1276     my @faceted_fields = Koha::SearchFields->search(
1277         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1278     );
1279     my @not_faceted_fields = Koha::SearchFields->search(
1280         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1281     );
1282     # This could certainly be improved
1283     return ( @faceted_fields, @not_faceted_fields );
1284 }
1285
1286 =head2 clear_search_fields_cache
1287
1288 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1289
1290 Clear cached values for ES search fields
1291
1292 =cut
1293
1294 sub clear_search_fields_cache {
1295
1296     my $cache = Koha::Caches->get_instance();
1297     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1298     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1299     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1300     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1301
1302 }
1303
1304 1;
1305
1306 __END__
1307
1308 =head1 AUTHOR
1309
1310 =over 4
1311
1312 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1313
1314 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1315
1316 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1317
1318 =back
1319
1320 =cut