Bug 25342: Force ES id as string
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31
32 use Carp;
33 use Clone qw(clone);
34 use JSON;
35 use Modern::Perl;
36 use Readonly;
37 use Search::Elasticsearch;
38 use Try::Tiny;
39 use YAML::Syck;
40
41 use List::Util qw( sum0 reduce );
42 use MARC::File::XML;
43 use MIME::Base64;
44 use Encode qw(encode);
45 use Business::ISBN;
46 use Scalar::Util qw(looks_like_number);
47
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =back
68
69 =head1 FUNCTIONS
70
71 =cut
72
73 sub new {
74     my $class = shift @_;
75     my $self = $class->SUPER::new(@_);
76     # Check for a valid index
77     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
78     return $self;
79 }
80
81 =head2 get_elasticsearch
82
83     my $elasticsearch_client = $self->get_elasticsearch();
84
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
87
88 =cut
89
90 sub get_elasticsearch {
91     my $self = shift @_;
92     unless (defined $self->{elasticsearch}) {
93         my $conf = $self->get_elasticsearch_params();
94         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     $es->{cxn_pool} //= 'Static';
151     $es->{request_timeout} //= 60;
152
153     return $es;
154 }
155
156 =head2 get_elasticsearch_settings
157
158     my $settings = $self->get_elasticsearch_settings();
159
160 This provides the settings provided to Elasticsearch when an index is created.
161 These can do things like define tokenization methods.
162
163 A hashref containing the settings is returned.
164
165 =cut
166
167 sub get_elasticsearch_settings {
168     my ($self) = @_;
169
170     # Use state to speed up repeated calls
171     state $settings = undef;
172     if (!defined $settings) {
173         my $config_file = C4::Context->config('elasticsearch_index_config');
174         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
175         $settings = LoadFile( $config_file );
176     }
177
178     return $settings;
179 }
180
181 =head2 get_elasticsearch_mappings
182
183     my $mappings = $self->get_elasticsearch_mappings();
184
185 This provides the mappings that get passed to Elasticsearch when an index is
186 created.
187
188 =cut
189
190 sub get_elasticsearch_mappings {
191     my ($self) = @_;
192
193     # Use state to speed up repeated calls
194     state %all_mappings;
195     state %sort_fields;
196
197     if (!defined $all_mappings{$self->index}) {
198         $sort_fields{$self->index} = {};
199         # Clone the general mapping to break ties with the original hash
200         my $mappings = {
201             data => clone(_get_elasticsearch_field_config('general', ''))
202         };
203         my $marcflavour = lc C4::Context->preference('marcflavour');
204         $self->_foreach_mapping(
205             sub {
206                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
207                 return if $marc_type ne $marcflavour;
208                 # TODO if this gets any sort of complexity to it, it should
209                 # be broken out into its own function.
210
211                 # TODO be aware of date formats, but this requires pre-parsing
212                 # as ES will simply reject anything with an invalid date.
213                 my $es_type = 'text';
214                 if ($type eq 'boolean') {
215                     $es_type = 'boolean';
216                 } elsif ($type eq 'number' || $type eq 'sum') {
217                     $es_type = 'integer';
218                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
219                     $es_type = 'stdno';
220                 }
221
222                 if ($search) {
223                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
224                 }
225
226                 if ($facet) {
227                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
228                 }
229                 if ($suggestible) {
230                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231                 }
232                 # Sort is a bit special as it can be true, false, undef.
233                 # We care about "true" or "undef",
234                 # "undef" means to do the default thing, which is make it sortable.
235                 if (!defined $sort || $sort) {
236                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
237                     $sort_fields{$self->index}{$name} = 1;
238                 }
239             }
240         );
241         $all_mappings{$self->index} = $mappings;
242     }
243     $self->sort_fields(\%{$sort_fields{$self->index}});
244
245     return $all_mappings{$self->index};
246 }
247
248 =head2 raw_elasticsearch_mappings
249
250 Return elasticsearch mapping as it is in database.
251 marc_type: marc21|unimarc|normarc
252
253 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
254
255 =cut
256
257 sub raw_elasticsearch_mappings {
258     my ( $marc_type ) = @_;
259
260     my $schema = Koha::Database->new()->schema();
261
262     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
263
264     my $mappings = {};
265     while ( my $search_field = $search_fields->next ) {
266
267         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
268             { search_field_id => $search_field->id },
269             {
270                 join     => 'search_marc_map',
271                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
272             }
273         );
274
275         while ( my $marc_to_field = $marc_to_fields->next ) {
276
277             my $marc_map = $marc_to_field->search_marc_map;
278
279             next if $marc_type && $marc_map->marc_type ne $marc_type;
280
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
287
288             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
289                 {
290                     facet   => $marc_to_field->facet || '',
291                     marc_type => $marc_map->marc_type,
292                     marc_field => $marc_map->marc_field,
293                     sort        => $marc_to_field->sort,
294                     suggestible => $marc_to_field->suggestible || ''
295                 });
296
297         }
298     }
299
300     return $mappings;
301 }
302
303 =head2 _get_elasticsearch_field_config
304
305 Get the Elasticsearch field config for the given purpose and data type.
306
307 $mapping = _get_elasticsearch_field_config('search', 'text');
308
309 =cut
310
311 sub _get_elasticsearch_field_config {
312
313     my ( $purpose, $type ) = @_;
314
315     # Use state to speed up repeated calls
316     state $settings = undef;
317     if (!defined $settings) {
318         my $config_file = C4::Context->config('elasticsearch_field_config');
319         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
320         $settings = LoadFile( $config_file );
321     }
322
323     if (!defined $settings->{$purpose}) {
324         die "Field purpose $purpose not defined in field config";
325     }
326     if ($type eq '') {
327         return $settings->{$purpose};
328     }
329     if (defined $settings->{$purpose}{$type}) {
330         return $settings->{$purpose}{$type};
331     }
332     if (defined $settings->{$purpose}{'default'}) {
333         return $settings->{$purpose}{'default'};
334     }
335     return;
336 }
337
338 =head2 _load_elasticsearch_mappings
339
340 Load Elasticsearch mappings in the format of mappings.yaml.
341
342 $indexes = _load_elasticsearch_mappings();
343
344 =cut
345
346 sub _load_elasticsearch_mappings {
347     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
348     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
349     return LoadFile( $mappings_yaml );
350 }
351
352 sub reset_elasticsearch_mappings {
353     my ( $self ) = @_;
354     my $indexes = $self->_load_elasticsearch_mappings();
355
356     Koha::SearchMarcMaps->delete;
357     Koha::SearchFields->delete;
358
359     while ( my ( $index_name, $fields ) = each %$indexes ) {
360         while ( my ( $field_name, $data ) = each %$fields ) {
361
362             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
363
364             # Set default values
365             $sf_params{staff_client} //= 1;
366             $sf_params{opac} //= 1;
367
368             $sf_params{name} = $field_name;
369
370             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
371
372             my $mappings = $data->{mappings};
373             for my $mapping ( @$mappings ) {
374                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
375                     index_name => $index_name,
376                     marc_type => $mapping->{marc_type},
377                     marc_field => $mapping->{marc_field}
378                 });
379                 $search_field->add_to_search_marc_maps($marc_field, {
380                     facet => $mapping->{facet} || 0,
381                     suggestible => $mapping->{suggestible} || 0,
382                     sort => $mapping->{sort},
383                     search => $mapping->{search} // 1
384                 });
385             }
386         }
387     }
388
389     my $cache = Koha::Caches->get_instance();
390     $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
391     $cache->clear_from_cache('elasticsearch_search_fields_opac');
392
393     # FIXME return the mappings?
394 }
395
396 # This overrides the accessor provided by Class::Accessor so that if
397 # sort_fields isn't set, then it'll generate it.
398 sub sort_fields {
399     my $self = shift;
400     if (@_) {
401         $self->_sort_fields_accessor(@_);
402         return;
403     }
404     my $val = $self->_sort_fields_accessor();
405     return $val if $val;
406
407     # This will populate the accessor as a side effect
408     $self->get_elasticsearch_mappings();
409     return $self->_sort_fields_accessor();
410 }
411
412 =head2 _process_mappings($mappings, $data, $record_document, $meta)
413
414     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
415
416 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
417 Since we group all mappings by MARC field targets C<$mappings> will contain
418 all targets for C<$data> and thus we need to fetch the MARC field only once.
419 C<$mappings> will be applied to C<$record_document> and new field values added.
420 The method has no return value.
421
422 =over 4
423
424 =item C<$mappings>
425
426 Arrayref of mappings containing arrayrefs in the format
427 [C<$target>, C<$options>] where C<$target> is the name of the target field and
428 C<$options> is a hashref containing processing directives for this particular
429 mapping.
430
431 =item C<$data>
432
433 The source data from a MARC record field.
434
435 =item C<$record_document>
436
437 Hashref representing the Elasticsearch document on which mappings should be
438 applied.
439
440 =item C<$meta>
441
442 A hashref containing metadata useful for enforcing per mapping rules. For
443 example for providing extra context for mapping options, or treating mapping
444 targets differently depending on type (sort, search, facet etc). Combining
445 this metadata with the mapping options and metadata allows us to mutate the
446 data per mapping, or even replace it with other data retrieved from the
447 metadata context.
448
449 Current properties are:
450
451 C<altscript>: A boolean value indicating whether an alternate script presentation is being
452 processed.
453
454 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
455 'subfield' or 'subfields_group'.
456
457 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
458
459 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
460 is 'subfields_group'.
461
462 C<field>: The original C<MARC::Record> object.
463
464 =back
465
466 =cut
467
468 sub _process_mappings {
469     my ($_self, $mappings, $data, $record_document, $meta) = @_;
470     foreach my $mapping (@{$mappings}) {
471         my ($target, $options) = @{$mapping};
472
473         # Don't process sort fields for alternate scripts
474         my $sort = $target =~ /__sort$/;
475         if ($sort && $meta->{altscript}) {
476             next;
477         }
478
479         # Copy (scalar) data since can have multiple targets
480         # with differing options for (possibly) mutating data
481         # so need a different copy for each
482         my $_data = $data;
483         $record_document->{$target} //= [];
484         if (defined $options->{substr}) {
485             my ($start, $length) = @{$options->{substr}};
486             $_data = length($data) > $start ? substr $data, $start, $length : '';
487         }
488         if (defined $options->{value_callbacks}) {
489             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
490         }
491         if (defined $options->{property}) {
492             $_data = {
493                 $options->{property} => $_data
494             }
495         }
496         if (defined $options->{nonfiling_characters_indicator}) {
497             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
498             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
499             if ($nonfiling_chars) {
500                 $_data = substr $_data, $nonfiling_chars;
501             }
502         }
503         push @{$record_document->{$target}}, $_data;
504     }
505 }
506
507 =head2 marc_records_to_documents($marc_records)
508
509     my $record_documents = $self->marc_records_to_documents($marc_records);
510
511 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
512
513 Returns array of hash references, representing Elasticsearch documents,
514 acceptable as body payload in C<Search::Elasticsearch> requests.
515
516 =over 4
517
518 =item C<$marc_documents>
519
520 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
521
522 =back
523
524 =cut
525
526 sub marc_records_to_documents {
527     my ($self, $records) = @_;
528     my $rules = $self->_get_marc_mapping_rules();
529     my $control_fields_rules = $rules->{control_fields};
530     my $data_fields_rules = $rules->{data_fields};
531     my $marcflavour = lc C4::Context->preference('marcflavour');
532     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
533
534     my @record_documents;
535
536     foreach my $record (@{$records}) {
537         my $record_document = {};
538         my $mappings = $rules->{leader};
539         if ($mappings) {
540             $self->_process_mappings($mappings, $record->leader(), $record_document, {
541                     altscript => 0,
542                     data_source => 'leader'
543                 }
544             );
545         }
546         foreach my $field ($record->fields()) {
547             if ($field->is_control_field()) {
548                 my $mappings = $control_fields_rules->{$field->tag()};
549                 if ($mappings) {
550                     $self->_process_mappings($mappings, $field->data(), $record_document, {
551                             altscript => 0,
552                             data_source => 'control_field',
553                             field => $field
554                         }
555                     );
556                 }
557             }
558             else {
559                 my $tag = $field->tag();
560                 # Handle alternate scripts in MARC 21
561                 my $altscript = 0;
562                 if ($marcflavour eq 'marc21' && $tag eq '880') {
563                     my $sub6 = $field->subfield('6');
564                     if ($sub6 =~ /^(...)-\d+/) {
565                         $tag = $1;
566                         $altscript = 1;
567                     }
568                 }
569
570                 my $data_field_rules = $data_fields_rules->{$tag};
571                 if ($data_field_rules) {
572                     my $subfields_mappings = $data_field_rules->{subfields};
573                     my $wildcard_mappings = $subfields_mappings->{'*'};
574                     foreach my $subfield ($field->subfields()) {
575                         my ($code, $data) = @{$subfield};
576                         my $mappings = $subfields_mappings->{$code} // [];
577                         if ($wildcard_mappings) {
578                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
579                         }
580                         if (@{$mappings}) {
581                             $self->_process_mappings($mappings, $data, $record_document, {
582                                     altscript => $altscript,
583                                     data_source => 'subfield',
584                                     code => $code,
585                                     field => $field
586                                 }
587                             );
588                         }
589                         if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
590                             # Used by the authority linker the match-heading field requires a specific syntax
591                             # that is specified in C4/Heading
592                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
593                             next unless $heading;
594                             push @{$record_document->{'match-heading'}}, $heading->search_form;
595                         }
596                     }
597
598                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
599                     if ($subfields_join_mappings) {
600                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
601                             # Map each subfield to values, remove empty values, join with space
602                             my $data = join(
603                                 ' ',
604                                 grep(
605                                     $_,
606                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
607                                 )
608                             );
609                             if ($data) {
610                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
611                                         altscript => $altscript,
612                                         data_source => 'subfields_group',
613                                         codes => $subfields_group,
614                                         field => $field
615                                     }
616                                 );
617                             }
618                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
619                                 # Used by the authority linker the match-heading field requires a specific syntax
620                                 # that is specified in C4/Heading
621                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
622                                 next unless $heading;
623                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
624                             }
625                         }
626                     }
627                 }
628             }
629         }
630         foreach my $field (keys %{$rules->{defaults}}) {
631             unless (defined $record_document->{$field}) {
632                 $record_document->{$field} = $rules->{defaults}->{$field};
633             }
634         }
635         foreach my $field (@{$rules->{sum}}) {
636             if (defined $record_document->{$field}) {
637                 # TODO: validate numeric? filter?
638                 # TODO: Or should only accept fields without nested values?
639                 # TODO: Quick and dirty, improve if needed
640                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
641             }
642         }
643         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
644         foreach my $field (@{$rules->{isbn}}) {
645             if (defined $record_document->{$field}) {
646                 my @isbns = ();
647                 foreach my $input_isbn (@{$record_document->{$field}}) {
648                     my $isbn = Business::ISBN->new($input_isbn);
649                     if (defined $isbn && $isbn->is_valid) {
650                         my $isbn13 = $isbn->as_isbn13->as_string;
651                         push @isbns, $isbn13;
652                         $isbn13 =~ s/\-//g;
653                         push @isbns, $isbn13;
654
655                         my $isbn10 = $isbn->as_isbn10;
656                         if ($isbn10) {
657                             $isbn10 = $isbn10->as_string;
658                             push @isbns, $isbn10;
659                             $isbn10 =~ s/\-//g;
660                             push @isbns, $isbn10;
661                         }
662                     } else {
663                         push @isbns, $input_isbn;
664                     }
665                 }
666                 $record_document->{$field} = \@isbns;
667             }
668         }
669
670         # Remove duplicate values and collapse sort fields
671         foreach my $field (keys %{$record_document}) {
672             if (ref($record_document->{$field}) eq 'ARRAY') {
673                 @{$record_document->{$field}} = do {
674                     my %seen;
675                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
676                 };
677                 if ($field =~ /__sort$/) {
678                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
679                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
680                 }
681             }
682         }
683
684         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
685         $record->encoding('UTF-8');
686         if ($use_array) {
687             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
688             $record_document->{'marc_format'} = 'ARRAY';
689         } else {
690             my @warnings;
691             {
692                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
693                 local $SIG{__WARN__} = sub {
694                     push @warnings, $_[0];
695                 };
696                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
697             }
698             if (@warnings) {
699                 # Suppress warnings if record length exceeded
700                 unless (substr($record->leader(), 0, 5) eq '99999') {
701                     foreach my $warning (@warnings) {
702                         carp $warning;
703                     }
704                 }
705                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
706                 $record_document->{'marc_format'} = 'MARCXML';
707             }
708             else {
709                 $record_document->{'marc_format'} = 'base64ISO2709';
710             }
711         }
712         push @record_documents, $record_document;
713     }
714     return \@record_documents;
715 }
716
717 =head2 _marc_to_array($record)
718
719     my @fields = _marc_to_array($record)
720
721 Convert a MARC::Record to an array modeled after MARC-in-JSON
722 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
723
724 =over 4
725
726 =item C<$record>
727
728 A MARC::Record object
729
730 =back
731
732 =cut
733
734 sub _marc_to_array {
735     my ($self, $record) = @_;
736
737     my $data = {
738         leader => $record->leader(),
739         fields => []
740     };
741     for my $field ($record->fields()) {
742         my $tag = $field->tag();
743         if ($field->is_control_field()) {
744             push @{$data->{fields}}, {$tag => $field->data()};
745         } else {
746             my $subfields = ();
747             foreach my $subfield ($field->subfields()) {
748                 my ($code, $contents) = @{$subfield};
749                 push @{$subfields}, {$code => $contents};
750             }
751             push @{$data->{fields}}, {
752                 $tag => {
753                     ind1 => $field->indicator(1),
754                     ind2 => $field->indicator(2),
755                     subfields => $subfields
756                 }
757             };
758         }
759     }
760     return $data;
761 }
762
763 =head2 _array_to_marc($data)
764
765     my $record = _array_to_marc($data)
766
767 Convert an array modeled after MARC-in-JSON to a MARC::Record
768
769 =over 4
770
771 =item C<$data>
772
773 An array modeled after MARC-in-JSON
774 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
775
776 =back
777
778 =cut
779
780 sub _array_to_marc {
781     my ($self, $data) = @_;
782
783     my $record = MARC::Record->new();
784
785     $record->leader($data->{leader});
786     for my $field (@{$data->{fields}}) {
787         my $tag = (keys %{$field})[0];
788         $field = $field->{$tag};
789         my $marc_field;
790         if (ref($field) eq 'HASH') {
791             my @subfields;
792             foreach my $subfield (@{$field->{subfields}}) {
793                 my $code = (keys %{$subfield})[0];
794                 push @subfields, $code;
795                 push @subfields, $subfield->{$code};
796             }
797             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
798         } else {
799             $marc_field = MARC::Field->new($tag, $field)
800         }
801         $record->append_fields($marc_field);
802     }
803 ;
804     return $record;
805 }
806
807 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
808
809     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
810
811 Get mappings, an internal data structure later used by
812 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
813 data for a MARC mapping.
814
815 The returned C<$mappings> is not to to be confused with mappings provided by
816 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
817 provided by C<_foreach_mapping> and expands it to this internal data structure.
818 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
819 is then applied to each MARC target (leader, control field data, subfield or
820 joined subfields) and integrated into the mapping rules data structure used in
821 C<marc_records_to_documents> to transform MARC records into Elasticsearch
822 documents.
823
824 =over 4
825
826 =item C<$facet>
827
828 Boolean indicating whether to create a facet field for this mapping.
829
830 =item C<$suggestible>
831
832 Boolean indicating whether to create a suggestion field for this mapping.
833
834 =item C<$sort>
835
836 Boolean indicating whether to create a sort field for this mapping.
837
838 =item C<$search>
839
840 Boolean indicating whether to create a search field for this mapping.
841
842 =item C<$target_name>
843
844 Elasticsearch document target field name.
845
846 =item C<$target_type>
847
848 Elasticsearch document target field type.
849
850 =item C<$range>
851
852 An optional range as a string in the format "<START>-<END>" or "<START>",
853 where "<START>" and "<END>" are integers specifying a range that will be used
854 for extracting a substring from MARC data as Elasticsearch field target value.
855
856 The first character position is "0", and the range is inclusive,
857 so "0-2" means the first three characters of MARC data.
858
859 If only "<START>" is provided only one character at position "<START>" will
860 be extracted.
861
862 =back
863
864 =cut
865
866 sub _field_mappings {
867     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
868     my %mapping_defaults = ();
869     my @mappings;
870
871     my $substr_args = undef;
872     if (defined $range) {
873         # TODO: use value_callback instead?
874         my ($start, $end) = map(int, split /-/, $range, 2);
875         $substr_args = [$start];
876         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
877     }
878     my $default_options = {};
879     if ($substr_args) {
880         $default_options->{substr} = $substr_args;
881     }
882
883     # TODO: Should probably have per type value callback/hook
884     # but hard code for now
885     if ($target_type eq 'boolean') {
886         $default_options->{value_callbacks} //= [];
887         push @{$default_options->{value_callbacks}}, sub {
888             my ($value) = @_;
889             # Trim whitespace at both ends
890             $value =~ s/^\s+|\s+$//g;
891             return $value ? 'true' : 'false';
892         };
893     }
894
895     if ($search) {
896         my $mapping = [$target_name, $default_options];
897         push @mappings, $mapping;
898     }
899
900     my @suffixes = ();
901     push @suffixes, 'facet' if $facet;
902     push @suffixes, 'suggestion' if $suggestible;
903     push @suffixes, 'sort' if !defined $sort || $sort;
904
905     foreach my $suffix (@suffixes) {
906         my $mapping = ["${target_name}__$suffix"];
907         # TODO: Hack, fix later in less hideous manner
908         if ($suffix eq 'suggestion') {
909             push @{$mapping}, {%{$default_options}, property => 'input'};
910         }
911         else {
912             # Important! Make shallow clone, or we end up with the same hashref
913             # shared by all mappings
914             push @{$mapping}, {%{$default_options}};
915         }
916         push @mappings, $mapping;
917     }
918     return @mappings;
919 };
920
921 =head2 _get_marc_mapping_rules
922
923     my $mapping_rules = $self->_get_marc_mapping_rules()
924
925 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
926
927 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
928 each call to C<MARC::Record>->field) we create an optimized structure of mapping
929 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
930
931 We can then iterate through all MARC fields for each record and apply all relevant
932 rules once per fields instead of retreiving fields multiple times for each mapping rule
933 which is terribly slow.
934
935 =cut
936
937 # TODO: This structure can be used for processing multiple MARC::Records so is currently
938 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
939 # memory cache which it is currently not. The performance gain of caching
940 # would probably be marginal, but to do this could be a further improvement.
941
942 sub _get_marc_mapping_rules {
943     my ($self) = @_;
944     my $marcflavour = lc C4::Context->preference('marcflavour');
945     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
946     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
947     my $rules = {
948         'leader' => [],
949         'control_fields' => {},
950         'data_fields' => {},
951         'sum' => [],
952         'isbn' => [],
953         'defaults' => {}
954     };
955
956     $self->_foreach_mapping(sub {
957         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
958         return if $marc_type ne $marcflavour;
959
960         if ($type eq 'sum') {
961             push @{$rules->{sum}}, $name;
962             push @{$rules->{sum}}, $name."__sort" if $sort;
963         }
964         elsif ($type eq 'isbn') {
965             push @{$rules->{isbn}}, $name;
966         }
967         elsif ($type eq 'boolean') {
968             # boolean gets special handling, if value doesn't exist for a field,
969             # it is set to false
970             $rules->{defaults}->{$name} = 'false';
971         }
972
973         if ($marc_field =~ $field_spec_regexp) {
974             my $field_tag = $1;
975
976             my @subfields;
977             my @subfield_groups;
978             # Parse and separate subfields form subfield groups
979             if (defined $2) {
980                 my $subfield_group = '';
981                 my $open_group = 0;
982
983                 foreach my $token (split //, $2) {
984                     if ($token eq "(") {
985                         if ($open_group) {
986                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
987                                 "Unmatched opening parenthesis for $marc_field"
988                             );
989                         }
990                         else {
991                             $open_group = 1;
992                         }
993                     }
994                     elsif ($token eq ")") {
995                         if ($open_group) {
996                             if ($subfield_group) {
997                                 push @subfield_groups, $subfield_group;
998                                 $subfield_group = '';
999                             }
1000                             $open_group = 0;
1001                         }
1002                         else {
1003                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1004                                 "Unmatched closing parenthesis for $marc_field"
1005                             );
1006                         }
1007                     }
1008                     elsif ($open_group) {
1009                         $subfield_group .= $token;
1010                     }
1011                     else {
1012                         push @subfields, $token;
1013                     }
1014                 }
1015             }
1016             else {
1017                 push @subfields, '*';
1018             }
1019
1020             my $range = defined $3 ? $3 : undef;
1021             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1022             if ($field_tag < 10) {
1023                 $rules->{control_fields}->{$field_tag} //= [];
1024                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1025             }
1026             else {
1027                 $rules->{data_fields}->{$field_tag} //= {};
1028                 foreach my $subfield (@subfields) {
1029                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1030                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1031                 }
1032                 foreach my $subfield_group (@subfield_groups) {
1033                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1034                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1035                 }
1036             }
1037         }
1038         elsif ($marc_field =~ $leader_regexp) {
1039             my $range = defined $1 ? $1 : undef;
1040             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1041             push @{$rules->{leader}}, @mappings;
1042         }
1043         else {
1044             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1045                 "Invalid MARC field expression: $marc_field"
1046             );
1047         }
1048     });
1049
1050     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1051     if ($marcflavour eq 'marc21') {
1052         # Nonfiling characters processing for sort fields
1053         my %title_fields;
1054         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1055             # Format is: nonfiling characters indicator => field names list
1056             %title_fields = (
1057                 1 => [130, 630, 730, 740],
1058                 2 => [222, 240, 242, 243, 245, 440, 830]
1059             );
1060         }
1061         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1062             %title_fields = (
1063                 1 => [730],
1064                 2 => [130, 430, 530]
1065             );
1066         }
1067         foreach my $indicator (keys %title_fields) {
1068             foreach my $field_tag (@{$title_fields{$indicator}}) {
1069                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1070                 foreach my $mapping (@{$mappings}) {
1071                     if ($mapping->[0] =~ /__sort$/) {
1072                         # Mark this as to be processed for nonfiling characters indicator
1073                         # later on in _process_mappings
1074                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1075                     }
1076                 }
1077             }
1078         }
1079     }
1080
1081     return $rules;
1082 }
1083
1084 =head2 _foreach_mapping
1085
1086     $self->_foreach_mapping(
1087         sub {
1088             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1089                 $marc_field )
1090               = @_;
1091             return unless $marc_type eq 'marc21';
1092             print "Data comes from: " . $marc_field . "\n";
1093         }
1094     );
1095
1096 This allows you to apply a function to each entry in the elasticsearch mappings
1097 table, in order to build the mappings for whatever is needed.
1098
1099 In the provided function, the files are:
1100
1101 =over 4
1102
1103 =item C<$name>
1104
1105 The field name for elasticsearch (corresponds to the 'mapping' column in the
1106 database.
1107
1108 =item C<$type>
1109
1110 The type for this value, e.g. 'string'.
1111
1112 =item C<$facet>
1113
1114 True if this value should be facetised. This only really makes sense if the
1115 field is understood by the facet processing code anyway.
1116
1117 =item C<$sort>
1118
1119 True if this is a field that a) needs special sort handling, and b) if it
1120 should be sorted on. False if a) but not b). Undef if not a). This allows,
1121 for example, author to be sorted on but not everything marked with "author"
1122 to be included in that sort.
1123
1124 =item C<$marc_type>
1125
1126 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1127 'unimarc', 'normarc'.
1128
1129 =item C<$marc_field>
1130
1131 A string that describes the MARC field that contains the data to extract.
1132 These are of a form suited to Catmandu's MARC fixers.
1133
1134 =back
1135
1136 =cut
1137
1138 sub _foreach_mapping {
1139     my ( $self, $sub ) = @_;
1140
1141     # TODO use a caching framework here
1142     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1143         {
1144             'search_marc_map.index_name' => $self->index,
1145         },
1146         {   join => { search_marc_to_fields => 'search_marc_map' },
1147             '+select' => [
1148                 'search_marc_to_fields.facet',
1149                 'search_marc_to_fields.suggestible',
1150                 'search_marc_to_fields.sort',
1151                 'search_marc_to_fields.search',
1152                 'search_marc_map.marc_type',
1153                 'search_marc_map.marc_field',
1154             ],
1155             '+as'     => [
1156                 'facet',
1157                 'suggestible',
1158                 'sort',
1159                 'search',
1160                 'marc_type',
1161                 'marc_field',
1162             ],
1163         }
1164     );
1165
1166     while ( my $search_field = $search_fields->next ) {
1167         $sub->(
1168             # Force lower case on indexed field names for case insensitive
1169             # field name searches
1170             lc($search_field->name),
1171             $search_field->type,
1172             $search_field->get_column('facet'),
1173             $search_field->get_column('suggestible'),
1174             $search_field->get_column('sort'),
1175             $search_field->get_column('search'),
1176             $search_field->get_column('marc_type'),
1177             $search_field->get_column('marc_field'),
1178         );
1179     }
1180 }
1181
1182 =head2 process_error
1183
1184     die process_error($@);
1185
1186 This parses an Elasticsearch error message and produces a human-readable
1187 result from it. This result is probably missing all the useful information
1188 that you might want in diagnosing an issue, so the warning is also logged.
1189
1190 Note that currently the resulting message is not internationalised. This
1191 will happen eventually by some method or other.
1192
1193 =cut
1194
1195 sub process_error {
1196     my ($self, $msg) = @_;
1197
1198     warn $msg; # simple logging
1199
1200     # This is super-primitive
1201     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1202
1203     return "Unable to perform your search. Please try again.\n";
1204 }
1205
1206 =head2 _read_configuration
1207
1208     my $conf = _read_configuration();
1209
1210 Reads the I<configuration file> and returns a hash structure with the
1211 configuration information. It raises an exception if mandatory entries
1212 are missing.
1213
1214 The hashref structure has the following form:
1215
1216     {
1217         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1218         'index_name' => 'koha_instance',
1219     }
1220
1221 This is configured by the following in the C<config> block in koha-conf.xml:
1222
1223     <elasticsearch>
1224         <server>127.0.0.1:9200</server>
1225         <server>anotherserver:9200</server>
1226         <index_name>koha_instance</index_name>
1227     </elasticsearch>
1228
1229 =cut
1230
1231 sub _read_configuration {
1232
1233     my $configuration;
1234
1235     my $conf = C4::Context->config('elasticsearch');
1236     Koha::Exceptions::Config::MissingEntry->throw(
1237         "Missing 'elasticsearch' block in config file")
1238       unless defined $conf;
1239
1240     if ( $conf && $conf->{server} ) {
1241         my $nodes = $conf->{server};
1242         if ( ref($nodes) eq 'ARRAY' ) {
1243             $configuration->{nodes} = $nodes;
1244         }
1245         else {
1246             $configuration->{nodes} = [$nodes];
1247         }
1248     }
1249     else {
1250         Koha::Exceptions::Config::MissingEntry->throw(
1251             "Missing 'server' entry in config file for elasticsearch");
1252     }
1253
1254     if ( defined $conf->{index_name} ) {
1255         $configuration->{index_name} = $conf->{index_name};
1256     }
1257     else {
1258         Koha::Exceptions::Config::MissingEntry->throw(
1259             "Missing 'index_name' entry in config file for elasticsearch");
1260     }
1261
1262     return $configuration;
1263 }
1264
1265 =head2 get_facetable_fields
1266
1267 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1268
1269 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1270
1271 =cut
1272
1273 sub get_facetable_fields {
1274     my ($self) = @_;
1275
1276     # These should correspond to the ES field names, as opposed to the CCL
1277     # things that zebra uses.
1278     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1279     my @faceted_fields = Koha::SearchFields->search(
1280         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1281     );
1282     my @not_faceted_fields = Koha::SearchFields->search(
1283         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1284     );
1285     # This could certainly be improved
1286     return ( @faceted_fields, @not_faceted_fields );
1287 }
1288
1289 1;
1290
1291 __END__
1292
1293 =head1 AUTHOR
1294
1295 =over 4
1296
1297 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1298
1299 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1300
1301 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1302
1303 =back
1304
1305 =cut