Bug 24823: (QA follow-up) Fix deletion and POD
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31
32 use Carp;
33 use Clone qw(clone);
34 use JSON;
35 use Modern::Perl;
36 use Readonly;
37 use Search::Elasticsearch;
38 use Try::Tiny;
39 use YAML::Syck;
40
41 use List::Util qw( sum0 reduce );
42 use MARC::File::XML;
43 use MIME::Base64;
44 use Encode qw(encode);
45 use Business::ISBN;
46 use Scalar::Util qw(looks_like_number);
47
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =item index_name
68
69 The Elasticsearch index name with Koha instance prefix.
70
71 =back
72
73
74 =head1 FUNCTIONS
75
76 =cut
77
78 sub new {
79     my $class = shift @_;
80     my ($params) = @_;
81
82     # Check for a valid index
83     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84     my $config = _read_configuration();
85     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
86
87     my $self = $class->SUPER::new(@_);
88     return $self;
89 }
90
91 =head2 get_elasticsearch
92
93     my $elasticsearch_client = $self->get_elasticsearch();
94
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
97
98 =cut
99
100 sub get_elasticsearch {
101     my $self = shift @_;
102     unless (defined $self->{elasticsearch}) {
103         $self->{elasticsearch} = Search::Elasticsearch->new(
104             $self->get_elasticsearch_params()
105         );
106     }
107     return $self->{elasticsearch};
108 }
109
110 =head2 get_elasticsearch_params
111
112     my $params = $self->get_elasticsearch_params();
113
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
116
117     {
118         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119         'index_name' => 'koha_instance_index',
120     }
121
122 This is configured by the following in the C<config> block in koha-conf.xml:
123
124     <elasticsearch>
125         <server>127.0.0.1:9200</server>
126         <server>anotherserver:9200</server>
127         <index_name>koha_instance</index_name>
128     </elasticsearch>
129
130 =cut
131
132 sub get_elasticsearch_params {
133     my ($self) = @_;
134
135     my $conf;
136     try {
137         $conf = _read_configuration();
138     } catch {
139         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
140             croak($_->message);
141         }
142     };
143     # Extract relevant parts of configuration
144     my $params = {
145         nodes => $conf->{nodes}
146     };
147     $params->{cxn_pool} //= 'Static';
148
149     return $params;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         # Clone the general mapping to break ties with the original hash
196         my $mappings = {
197             data => clone(_get_elasticsearch_field_config('general', ''))
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 if ($search) {
219                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
220                 }
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 raw_elasticsearch_mappings
245
246 Return elasticsearch mapping as it is in database.
247 marc_type: marc21|unimarc|normarc
248
249 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
250
251 =cut
252
253 sub raw_elasticsearch_mappings {
254     my ( $marc_type ) = @_;
255
256     my $schema = Koha::Database->new()->schema();
257
258     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
259
260     my $mappings = {};
261     while ( my $search_field = $search_fields->next ) {
262
263         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
264             { search_field_id => $search_field->id },
265             {
266                 join     => 'search_marc_map',
267                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
268             }
269         );
270
271         while ( my $marc_to_field = $marc_to_fields->next ) {
272
273             my $marc_map = $marc_to_field->search_marc_map;
274
275             next if $marc_type && $marc_map->marc_type ne $marc_type;
276
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
283
284             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
285                 {
286                     facet   => $marc_to_field->facet || '',
287                     marc_type => $marc_map->marc_type,
288                     marc_field => $marc_map->marc_field,
289                     sort        => $marc_to_field->sort,
290                     suggestible => $marc_to_field->suggestible || ''
291                 });
292
293         }
294     }
295
296     return $mappings;
297 }
298
299 =head2 _get_elasticsearch_field_config
300
301 Get the Elasticsearch field config for the given purpose and data type.
302
303 $mapping = _get_elasticsearch_field_config('search', 'text');
304
305 =cut
306
307 sub _get_elasticsearch_field_config {
308
309     my ( $purpose, $type ) = @_;
310
311     # Use state to speed up repeated calls
312     state $settings = undef;
313     if (!defined $settings) {
314         my $config_file = C4::Context->config('elasticsearch_field_config');
315         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
316         $settings = LoadFile( $config_file );
317     }
318
319     if (!defined $settings->{$purpose}) {
320         die "Field purpose $purpose not defined in field config";
321     }
322     if ($type eq '') {
323         return $settings->{$purpose};
324     }
325     if (defined $settings->{$purpose}{$type}) {
326         return $settings->{$purpose}{$type};
327     }
328     if (defined $settings->{$purpose}{'default'}) {
329         return $settings->{$purpose}{'default'};
330     }
331     return;
332 }
333
334 =head2 _load_elasticsearch_mappings
335
336 Load Elasticsearch mappings in the format of mappings.yaml.
337
338 $indexes = _load_elasticsearch_mappings();
339
340 =cut
341
342 sub _load_elasticsearch_mappings {
343     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345     return LoadFile( $mappings_yaml );
346 }
347
348 sub reset_elasticsearch_mappings {
349     my ( $self ) = @_;
350     my $indexes = $self->_load_elasticsearch_mappings();
351
352     Koha::SearchMarcMaps->delete;
353     Koha::SearchFields->delete;
354
355     while ( my ( $index_name, $fields ) = each %$indexes ) {
356         while ( my ( $field_name, $data ) = each %$fields ) {
357
358             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
359
360             # Set default values
361             $sf_params{staff_client} //= 1;
362             $sf_params{opac} //= 1;
363
364             $sf_params{name} = $field_name;
365
366             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367
368             my $mappings = $data->{mappings};
369             for my $mapping ( @$mappings ) {
370                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371                     index_name => $index_name,
372                     marc_type => $mapping->{marc_type},
373                     marc_field => $mapping->{marc_field}
374                 });
375                 $search_field->add_to_search_marc_maps($marc_field, {
376                     facet => $mapping->{facet} || 0,
377                     suggestible => $mapping->{suggestible} || 0,
378                     sort => $mapping->{sort},
379                     search => $mapping->{search} // 1
380                 });
381             }
382         }
383     }
384
385     my $cache = Koha::Caches->get_instance();
386     $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
387     $cache->clear_from_cache('elasticsearch_search_fields_opac');
388
389     # FIXME return the mappings?
390 }
391
392 # This overrides the accessor provided by Class::Accessor so that if
393 # sort_fields isn't set, then it'll generate it.
394 sub sort_fields {
395     my $self = shift;
396     if (@_) {
397         $self->_sort_fields_accessor(@_);
398         return;
399     }
400     my $val = $self->_sort_fields_accessor();
401     return $val if $val;
402
403     # This will populate the accessor as a side effect
404     $self->get_elasticsearch_mappings();
405     return $self->_sort_fields_accessor();
406 }
407
408 =head2 _process_mappings($mappings, $data, $record_document, $meta)
409
410     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
411
412 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
413 Since we group all mappings by MARC field targets C<$mappings> will contain
414 all targets for C<$data> and thus we need to fetch the MARC field only once.
415 C<$mappings> will be applied to C<$record_document> and new field values added.
416 The method has no return value.
417
418 =over 4
419
420 =item C<$mappings>
421
422 Arrayref of mappings containing arrayrefs in the format
423 [C<$target>, C<$options>] where C<$target> is the name of the target field and
424 C<$options> is a hashref containing processing directives for this particular
425 mapping.
426
427 =item C<$data>
428
429 The source data from a MARC record field.
430
431 =item C<$record_document>
432
433 Hashref representing the Elasticsearch document on which mappings should be
434 applied.
435
436 =item C<$meta>
437
438 A hashref containing metadata useful for enforcing per mapping rules. For
439 example for providing extra context for mapping options, or treating mapping
440 targets differently depending on type (sort, search, facet etc). Combining
441 this metadata with the mapping options and metadata allows us to mutate the
442 data per mapping, or even replace it with other data retrieved from the
443 metadata context.
444
445 Current properties are:
446
447 C<altscript>: A boolean value indicating whether an alternate script presentation is being
448 processed.
449
450 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
451 'subfield' or 'subfields_group'.
452
453 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
454
455 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
456 is 'subfields_group'.
457
458 C<field>: The original C<MARC::Record> object.
459
460 =back
461
462 =cut
463
464 sub _process_mappings {
465     my ($_self, $mappings, $data, $record_document, $meta) = @_;
466     foreach my $mapping (@{$mappings}) {
467         my ($target, $options) = @{$mapping};
468
469         # Don't process sort fields for alternate scripts
470         my $sort = $target =~ /__sort$/;
471         if ($sort && $meta->{altscript}) {
472             next;
473         }
474
475         # Copy (scalar) data since can have multiple targets
476         # with differing options for (possibly) mutating data
477         # so need a different copy for each
478         my $_data = $data;
479         $record_document->{$target} //= [];
480         if (defined $options->{substr}) {
481             my ($start, $length) = @{$options->{substr}};
482             $_data = length($data) > $start ? substr $data, $start, $length : '';
483         }
484         if (defined $options->{value_callbacks}) {
485             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
486         }
487         if (defined $options->{property}) {
488             $_data = {
489                 $options->{property} => $_data
490             }
491         }
492         if (defined $options->{nonfiling_characters_indicator}) {
493             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
494             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
495             if ($nonfiling_chars) {
496                 $_data = substr $_data, $nonfiling_chars;
497             }
498         }
499         push @{$record_document->{$target}}, $_data;
500     }
501 }
502
503 =head2 marc_records_to_documents($marc_records)
504
505     my $record_documents = $self->marc_records_to_documents($marc_records);
506
507 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
508
509 Returns array of hash references, representing Elasticsearch documents,
510 acceptable as body payload in C<Search::Elasticsearch> requests.
511
512 =over 4
513
514 =item C<$marc_documents>
515
516 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
517
518 =back
519
520 =cut
521
522 sub marc_records_to_documents {
523     my ($self, $records) = @_;
524     my $rules = $self->_get_marc_mapping_rules();
525     my $control_fields_rules = $rules->{control_fields};
526     my $data_fields_rules = $rules->{data_fields};
527     my $marcflavour = lc C4::Context->preference('marcflavour');
528     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
529
530     my @record_documents;
531
532     foreach my $record (@{$records}) {
533         my $record_document = {};
534         my $mappings = $rules->{leader};
535         if ($mappings) {
536             $self->_process_mappings($mappings, $record->leader(), $record_document, {
537                     altscript => 0,
538                     data_source => 'leader'
539                 }
540             );
541         }
542         foreach my $field ($record->fields()) {
543             if ($field->is_control_field()) {
544                 my $mappings = $control_fields_rules->{$field->tag()};
545                 if ($mappings) {
546                     $self->_process_mappings($mappings, $field->data(), $record_document, {
547                             altscript => 0,
548                             data_source => 'control_field',
549                             field => $field
550                         }
551                     );
552                 }
553             }
554             else {
555                 my $tag = $field->tag();
556                 # Handle alternate scripts in MARC 21
557                 my $altscript = 0;
558                 if ($marcflavour eq 'marc21' && $tag eq '880') {
559                     my $sub6 = $field->subfield('6');
560                     if ($sub6 =~ /^(...)-\d+/) {
561                         $tag = $1;
562                         $altscript = 1;
563                     }
564                 }
565
566                 my $data_field_rules = $data_fields_rules->{$tag};
567                 if ($data_field_rules) {
568                     my $subfields_mappings = $data_field_rules->{subfields};
569                     my $wildcard_mappings = $subfields_mappings->{'*'};
570                     foreach my $subfield ($field->subfields()) {
571                         my ($code, $data) = @{$subfield};
572                         my $mappings = $subfields_mappings->{$code} // [];
573                         if ($wildcard_mappings) {
574                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
575                         }
576                         if (@{$mappings}) {
577                             $self->_process_mappings($mappings, $data, $record_document, {
578                                     altscript => $altscript,
579                                     data_source => 'subfield',
580                                     code => $code,
581                                     field => $field
582                                 }
583                             );
584                         }
585                         if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
586                             # Used by the authority linker the match-heading field requires a specific syntax
587                             # that is specified in C4/Heading
588                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
589                             next unless $heading;
590                             push @{$record_document->{'match-heading'}}, $heading->search_form;
591                         }
592                     }
593
594                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
595                     if ($subfields_join_mappings) {
596                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
597                             # Map each subfield to values, remove empty values, join with space
598                             my $data = join(
599                                 ' ',
600                                 grep(
601                                     $_,
602                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
603                                 )
604                             );
605                             if ($data) {
606                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
607                                         altscript => $altscript,
608                                         data_source => 'subfields_group',
609                                         codes => $subfields_group,
610                                         field => $field
611                                     }
612                                 );
613                             }
614                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
615                                 # Used by the authority linker the match-heading field requires a specific syntax
616                                 # that is specified in C4/Heading
617                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
618                                 next unless $heading;
619                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
620                             }
621                         }
622                     }
623                 }
624             }
625         }
626         foreach my $field (keys %{$rules->{defaults}}) {
627             unless (defined $record_document->{$field}) {
628                 $record_document->{$field} = $rules->{defaults}->{$field};
629             }
630         }
631         foreach my $field (@{$rules->{sum}}) {
632             if (defined $record_document->{$field}) {
633                 # TODO: validate numeric? filter?
634                 # TODO: Or should only accept fields without nested values?
635                 # TODO: Quick and dirty, improve if needed
636                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
637             }
638         }
639         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
640         foreach my $field (@{$rules->{isbn}}) {
641             if (defined $record_document->{$field}) {
642                 my @isbns = ();
643                 foreach my $input_isbn (@{$record_document->{$field}}) {
644                     my $isbn = Business::ISBN->new($input_isbn);
645                     if (defined $isbn && $isbn->is_valid) {
646                         my $isbn13 = $isbn->as_isbn13->as_string;
647                         push @isbns, $isbn13;
648                         $isbn13 =~ s/\-//g;
649                         push @isbns, $isbn13;
650
651                         my $isbn10 = $isbn->as_isbn10;
652                         if ($isbn10) {
653                             $isbn10 = $isbn10->as_string;
654                             push @isbns, $isbn10;
655                             $isbn10 =~ s/\-//g;
656                             push @isbns, $isbn10;
657                         }
658                     } else {
659                         push @isbns, $input_isbn;
660                     }
661                 }
662                 $record_document->{$field} = \@isbns;
663             }
664         }
665
666         # Remove duplicate values and collapse sort fields
667         foreach my $field (keys %{$record_document}) {
668             if (ref($record_document->{$field}) eq 'ARRAY') {
669                 @{$record_document->{$field}} = do {
670                     my %seen;
671                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
672                 };
673                 if ($field =~ /__sort$/) {
674                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
675                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
676                 }
677             }
678         }
679
680         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
681         $record->encoding('UTF-8');
682         if ($use_array) {
683             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
684             $record_document->{'marc_format'} = 'ARRAY';
685         } else {
686             my @warnings;
687             {
688                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
689                 local $SIG{__WARN__} = sub {
690                     push @warnings, $_[0];
691                 };
692                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
693             }
694             if (@warnings) {
695                 # Suppress warnings if record length exceeded
696                 unless (substr($record->leader(), 0, 5) eq '99999') {
697                     foreach my $warning (@warnings) {
698                         carp $warning;
699                     }
700                 }
701                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
702                 $record_document->{'marc_format'} = 'MARCXML';
703             }
704             else {
705                 $record_document->{'marc_format'} = 'base64ISO2709';
706             }
707         }
708         push @record_documents, $record_document;
709     }
710     return \@record_documents;
711 }
712
713 =head2 _marc_to_array($record)
714
715     my @fields = _marc_to_array($record)
716
717 Convert a MARC::Record to an array modeled after MARC-in-JSON
718 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
719
720 =over 4
721
722 =item C<$record>
723
724 A MARC::Record object
725
726 =back
727
728 =cut
729
730 sub _marc_to_array {
731     my ($self, $record) = @_;
732
733     my $data = {
734         leader => $record->leader(),
735         fields => []
736     };
737     for my $field ($record->fields()) {
738         my $tag = $field->tag();
739         if ($field->is_control_field()) {
740             push @{$data->{fields}}, {$tag => $field->data()};
741         } else {
742             my $subfields = ();
743             foreach my $subfield ($field->subfields()) {
744                 my ($code, $contents) = @{$subfield};
745                 push @{$subfields}, {$code => $contents};
746             }
747             push @{$data->{fields}}, {
748                 $tag => {
749                     ind1 => $field->indicator(1),
750                     ind2 => $field->indicator(2),
751                     subfields => $subfields
752                 }
753             };
754         }
755     }
756     return $data;
757 }
758
759 =head2 _array_to_marc($data)
760
761     my $record = _array_to_marc($data)
762
763 Convert an array modeled after MARC-in-JSON to a MARC::Record
764
765 =over 4
766
767 =item C<$data>
768
769 An array modeled after MARC-in-JSON
770 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
771
772 =back
773
774 =cut
775
776 sub _array_to_marc {
777     my ($self, $data) = @_;
778
779     my $record = MARC::Record->new();
780
781     $record->leader($data->{leader});
782     for my $field (@{$data->{fields}}) {
783         my $tag = (keys %{$field})[0];
784         $field = $field->{$tag};
785         my $marc_field;
786         if (ref($field) eq 'HASH') {
787             my @subfields;
788             foreach my $subfield (@{$field->{subfields}}) {
789                 my $code = (keys %{$subfield})[0];
790                 push @subfields, $code;
791                 push @subfields, $subfield->{$code};
792             }
793             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
794         } else {
795             $marc_field = MARC::Field->new($tag, $field)
796         }
797         $record->append_fields($marc_field);
798     }
799 ;
800     return $record;
801 }
802
803 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
804
805     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
806
807 Get mappings, an internal data structure later used by
808 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
809 data for a MARC mapping.
810
811 The returned C<$mappings> is not to to be confused with mappings provided by
812 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
813 provided by C<_foreach_mapping> and expands it to this internal data structure.
814 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
815 is then applied to each MARC target (leader, control field data, subfield or
816 joined subfields) and integrated into the mapping rules data structure used in
817 C<marc_records_to_documents> to transform MARC records into Elasticsearch
818 documents.
819
820 =over 4
821
822 =item C<$facet>
823
824 Boolean indicating whether to create a facet field for this mapping.
825
826 =item C<$suggestible>
827
828 Boolean indicating whether to create a suggestion field for this mapping.
829
830 =item C<$sort>
831
832 Boolean indicating whether to create a sort field for this mapping.
833
834 =item C<$search>
835
836 Boolean indicating whether to create a search field for this mapping.
837
838 =item C<$target_name>
839
840 Elasticsearch document target field name.
841
842 =item C<$target_type>
843
844 Elasticsearch document target field type.
845
846 =item C<$range>
847
848 An optional range as a string in the format "<START>-<END>" or "<START>",
849 where "<START>" and "<END>" are integers specifying a range that will be used
850 for extracting a substring from MARC data as Elasticsearch field target value.
851
852 The first character position is "0", and the range is inclusive,
853 so "0-2" means the first three characters of MARC data.
854
855 If only "<START>" is provided only one character at position "<START>" will
856 be extracted.
857
858 =back
859
860 =cut
861
862 sub _field_mappings {
863     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
864     my %mapping_defaults = ();
865     my @mappings;
866
867     my $substr_args = undef;
868     if (defined $range) {
869         # TODO: use value_callback instead?
870         my ($start, $end) = map(int, split /-/, $range, 2);
871         $substr_args = [$start];
872         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
873     }
874     my $default_options = {};
875     if ($substr_args) {
876         $default_options->{substr} = $substr_args;
877     }
878
879     # TODO: Should probably have per type value callback/hook
880     # but hard code for now
881     if ($target_type eq 'boolean') {
882         $default_options->{value_callbacks} //= [];
883         push @{$default_options->{value_callbacks}}, sub {
884             my ($value) = @_;
885             # Trim whitespace at both ends
886             $value =~ s/^\s+|\s+$//g;
887             return $value ? 'true' : 'false';
888         };
889     }
890
891     if ($search) {
892         my $mapping = [$target_name, $default_options];
893         push @mappings, $mapping;
894     }
895
896     my @suffixes = ();
897     push @suffixes, 'facet' if $facet;
898     push @suffixes, 'suggestion' if $suggestible;
899     push @suffixes, 'sort' if !defined $sort || $sort;
900
901     foreach my $suffix (@suffixes) {
902         my $mapping = ["${target_name}__$suffix"];
903         # TODO: Hack, fix later in less hideous manner
904         if ($suffix eq 'suggestion') {
905             push @{$mapping}, {%{$default_options}, property => 'input'};
906         }
907         else {
908             # Important! Make shallow clone, or we end up with the same hashref
909             # shared by all mappings
910             push @{$mapping}, {%{$default_options}};
911         }
912         push @mappings, $mapping;
913     }
914     return @mappings;
915 };
916
917 =head2 _get_marc_mapping_rules
918
919     my $mapping_rules = $self->_get_marc_mapping_rules()
920
921 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
922
923 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
924 each call to C<MARC::Record>->field) we create an optimized structure of mapping
925 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
926
927 We can then iterate through all MARC fields for each record and apply all relevant
928 rules once per fields instead of retreiving fields multiple times for each mapping rule
929 which is terribly slow.
930
931 =cut
932
933 # TODO: This structure can be used for processing multiple MARC::Records so is currently
934 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
935 # memory cache which it is currently not. The performance gain of caching
936 # would probably be marginal, but to do this could be a further improvement.
937
938 sub _get_marc_mapping_rules {
939     my ($self) = @_;
940     my $marcflavour = lc C4::Context->preference('marcflavour');
941     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
942     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
943     my $rules = {
944         'leader' => [],
945         'control_fields' => {},
946         'data_fields' => {},
947         'sum' => [],
948         'isbn' => [],
949         'defaults' => {}
950     };
951
952     $self->_foreach_mapping(sub {
953         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
954         return if $marc_type ne $marcflavour;
955
956         if ($type eq 'sum') {
957             push @{$rules->{sum}}, $name;
958             push @{$rules->{sum}}, $name."__sort" if $sort;
959         }
960         elsif ($type eq 'isbn') {
961             push @{$rules->{isbn}}, $name;
962         }
963         elsif ($type eq 'boolean') {
964             # boolean gets special handling, if value doesn't exist for a field,
965             # it is set to false
966             $rules->{defaults}->{$name} = 'false';
967         }
968
969         if ($marc_field =~ $field_spec_regexp) {
970             my $field_tag = $1;
971
972             my @subfields;
973             my @subfield_groups;
974             # Parse and separate subfields form subfield groups
975             if (defined $2) {
976                 my $subfield_group = '';
977                 my $open_group = 0;
978
979                 foreach my $token (split //, $2) {
980                     if ($token eq "(") {
981                         if ($open_group) {
982                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
983                                 "Unmatched opening parenthesis for $marc_field"
984                             );
985                         }
986                         else {
987                             $open_group = 1;
988                         }
989                     }
990                     elsif ($token eq ")") {
991                         if ($open_group) {
992                             if ($subfield_group) {
993                                 push @subfield_groups, $subfield_group;
994                                 $subfield_group = '';
995                             }
996                             $open_group = 0;
997                         }
998                         else {
999                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1000                                 "Unmatched closing parenthesis for $marc_field"
1001                             );
1002                         }
1003                     }
1004                     elsif ($open_group) {
1005                         $subfield_group .= $token;
1006                     }
1007                     else {
1008                         push @subfields, $token;
1009                     }
1010                 }
1011             }
1012             else {
1013                 push @subfields, '*';
1014             }
1015
1016             my $range = defined $3 ? $3 : undef;
1017             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1018             if ($field_tag < 10) {
1019                 $rules->{control_fields}->{$field_tag} //= [];
1020                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1021             }
1022             else {
1023                 $rules->{data_fields}->{$field_tag} //= {};
1024                 foreach my $subfield (@subfields) {
1025                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1026                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1027                 }
1028                 foreach my $subfield_group (@subfield_groups) {
1029                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1030                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1031                 }
1032             }
1033         }
1034         elsif ($marc_field =~ $leader_regexp) {
1035             my $range = defined $1 ? $1 : undef;
1036             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1037             push @{$rules->{leader}}, @mappings;
1038         }
1039         else {
1040             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1041                 "Invalid MARC field expression: $marc_field"
1042             );
1043         }
1044     });
1045
1046     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1047     if ($marcflavour eq 'marc21') {
1048         # Nonfiling characters processing for sort fields
1049         my %title_fields;
1050         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1051             # Format is: nonfiling characters indicator => field names list
1052             %title_fields = (
1053                 1 => [130, 630, 730, 740],
1054                 2 => [222, 240, 242, 243, 245, 440, 830]
1055             );
1056         }
1057         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1058             %title_fields = (
1059                 1 => [730],
1060                 2 => [130, 430, 530]
1061             );
1062         }
1063         foreach my $indicator (keys %title_fields) {
1064             foreach my $field_tag (@{$title_fields{$indicator}}) {
1065                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1066                 foreach my $mapping (@{$mappings}) {
1067                     if ($mapping->[0] =~ /__sort$/) {
1068                         # Mark this as to be processed for nonfiling characters indicator
1069                         # later on in _process_mappings
1070                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1071                     }
1072                 }
1073             }
1074         }
1075     }
1076
1077     return $rules;
1078 }
1079
1080 =head2 _foreach_mapping
1081
1082     $self->_foreach_mapping(
1083         sub {
1084             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1085                 $marc_field )
1086               = @_;
1087             return unless $marc_type eq 'marc21';
1088             print "Data comes from: " . $marc_field . "\n";
1089         }
1090     );
1091
1092 This allows you to apply a function to each entry in the elasticsearch mappings
1093 table, in order to build the mappings for whatever is needed.
1094
1095 In the provided function, the files are:
1096
1097 =over 4
1098
1099 =item C<$name>
1100
1101 The field name for elasticsearch (corresponds to the 'mapping' column in the
1102 database.
1103
1104 =item C<$type>
1105
1106 The type for this value, e.g. 'string'.
1107
1108 =item C<$facet>
1109
1110 True if this value should be facetised. This only really makes sense if the
1111 field is understood by the facet processing code anyway.
1112
1113 =item C<$sort>
1114
1115 True if this is a field that a) needs special sort handling, and b) if it
1116 should be sorted on. False if a) but not b). Undef if not a). This allows,
1117 for example, author to be sorted on but not everything marked with "author"
1118 to be included in that sort.
1119
1120 =item C<$marc_type>
1121
1122 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1123 'unimarc', 'normarc'.
1124
1125 =item C<$marc_field>
1126
1127 A string that describes the MARC field that contains the data to extract.
1128
1129 =back
1130
1131 =cut
1132
1133 sub _foreach_mapping {
1134     my ( $self, $sub ) = @_;
1135
1136     # TODO use a caching framework here
1137     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1138         {
1139             'search_marc_map.index_name' => $self->index,
1140         },
1141         {   join => { search_marc_to_fields => 'search_marc_map' },
1142             '+select' => [
1143                 'search_marc_to_fields.facet',
1144                 'search_marc_to_fields.suggestible',
1145                 'search_marc_to_fields.sort',
1146                 'search_marc_to_fields.search',
1147                 'search_marc_map.marc_type',
1148                 'search_marc_map.marc_field',
1149             ],
1150             '+as'     => [
1151                 'facet',
1152                 'suggestible',
1153                 'sort',
1154                 'search',
1155                 'marc_type',
1156                 'marc_field',
1157             ],
1158         }
1159     );
1160
1161     while ( my $search_field = $search_fields->next ) {
1162         $sub->(
1163             # Force lower case on indexed field names for case insensitive
1164             # field name searches
1165             lc($search_field->name),
1166             $search_field->type,
1167             $search_field->get_column('facet'),
1168             $search_field->get_column('suggestible'),
1169             $search_field->get_column('sort'),
1170             $search_field->get_column('search'),
1171             $search_field->get_column('marc_type'),
1172             $search_field->get_column('marc_field'),
1173         );
1174     }
1175 }
1176
1177 =head2 process_error
1178
1179     die process_error($@);
1180
1181 This parses an Elasticsearch error message and produces a human-readable
1182 result from it. This result is probably missing all the useful information
1183 that you might want in diagnosing an issue, so the warning is also logged.
1184
1185 Note that currently the resulting message is not internationalised. This
1186 will happen eventually by some method or other.
1187
1188 =cut
1189
1190 sub process_error {
1191     my ($self, $msg) = @_;
1192
1193     warn $msg; # simple logging
1194
1195     # This is super-primitive
1196     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1197
1198     return "Unable to perform your search. Please try again.\n";
1199 }
1200
1201 =head2 _read_configuration
1202
1203     my $conf = _read_configuration();
1204
1205 Reads the I<configuration file> and returns a hash structure with the
1206 configuration information. It raises an exception if mandatory entries
1207 are missing.
1208
1209 The hashref structure has the following form:
1210
1211     {
1212         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1213         'index_name' => 'koha_instance',
1214     }
1215
1216 This is configured by the following in the C<config> block in koha-conf.xml:
1217
1218     <elasticsearch>
1219         <server>127.0.0.1:9200</server>
1220         <server>anotherserver:9200</server>
1221         <index_name>koha_instance</index_name>
1222     </elasticsearch>
1223
1224 =cut
1225
1226 sub _read_configuration {
1227
1228     my $configuration;
1229
1230     my $conf = C4::Context->config('elasticsearch');
1231     unless ( defined $conf ) {
1232         Koha::Exceptions::Config::MissingEntry->throw(
1233             "Missing <elasticsearch> entry in koha-conf.xml"
1234         );
1235     }
1236
1237     if ( $conf && $conf->{server} ) {
1238         my $nodes = $conf->{server};
1239         if ( ref($nodes) eq 'ARRAY' ) {
1240             $configuration->{nodes} = $nodes;
1241         }
1242         else {
1243             $configuration->{nodes} = [$nodes];
1244         }
1245     }
1246     else {
1247         Koha::Exceptions::Config::MissingEntry->throw(
1248             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1249         );
1250     }
1251
1252     if ( defined $conf->{index_name} ) {
1253         $configuration->{index_name} = $conf->{index_name};
1254     }
1255     else {
1256         Koha::Exceptions::Config::MissingEntry->throw(
1257             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1258         );
1259     }
1260
1261     return $configuration;
1262 }
1263
1264 =head2 get_facetable_fields
1265
1266 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1267
1268 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1269
1270 =cut
1271
1272 sub get_facetable_fields {
1273     my ($self) = @_;
1274
1275     # These should correspond to the ES field names, as opposed to the CCL
1276     # things that zebra uses.
1277     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1278     my @faceted_fields = Koha::SearchFields->search(
1279         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1280     );
1281     my @not_faceted_fields = Koha::SearchFields->search(
1282         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1283     );
1284     # This could certainly be improved
1285     return ( @faceted_fields, @not_faceted_fields );
1286 }
1287
1288 1;
1289
1290 __END__
1291
1292 =head1 AUTHOR
1293
1294 =over 4
1295
1296 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1297
1298 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1299
1300 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1301
1302 =back
1303
1304 =cut