]> git.koha-community.org Git - koha.git/blob - Koha/SearchEngine/Elasticsearch.pm
Bug 24823: Drop Catmandu dependency
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31
32 use Carp;
33 use Clone qw(clone);
34 use JSON;
35 use Modern::Perl;
36 use Readonly;
37 use Search::Elasticsearch;
38 use Try::Tiny;
39 use YAML::Syck;
40
41 use List::Util qw( sum0 reduce );
42 use MARC::File::XML;
43 use MIME::Base64;
44 use Encode qw(encode);
45 use Business::ISBN;
46 use Scalar::Util qw(looks_like_number);
47
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =back
68
69 =head1 FUNCTIONS
70
71 =cut
72
73 sub new {
74     my $class = shift @_;
75     my $self = $class->SUPER::new(@_);
76     # Check for a valid index
77     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
78     return $self;
79 }
80
81 =head2 get_elasticsearch
82
83     my $elasticsearch_client = $self->get_elasticsearch();
84
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
87
88 =cut
89
90 sub get_elasticsearch {
91     my $self = shift @_;
92     unless (defined $self->{elasticsearch}) {
93         $self->{elasticsearch} = Search::Elasticsearch->new(
94             $self->get_elasticsearch_params()
95         );
96     }
97     return $self->{elasticsearch};
98 }
99
100 =head2 get_elasticsearch_params
101
102     my $params = $self->get_elasticsearch_params();
103
104 This provides a hashref that contains the parameters for connecting to the
105 ElasicSearch servers, in the form:
106
107     {
108         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
109         'index_name' => 'koha_instance_index',
110     }
111
112 This is configured by the following in the C<config> block in koha-conf.xml:
113
114     <elasticsearch>
115         <server>127.0.0.1:9200</server>
116         <server>anotherserver:9200</server>
117         <index_name>koha_instance</index_name>
118     </elasticsearch>
119
120 =cut
121
122 sub get_elasticsearch_params {
123     my ($self) = @_;
124
125     # Copy the hash so that we're not modifying the original
126     my $conf = C4::Context->config('elasticsearch');
127     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
128     my $es = { %{ $conf } };
129
130     # Helpfully, the multiple server lines end up in an array for us anyway
131     # if there are multiple ones, but not if there's only one.
132     my $server = $es->{server};
133     delete $es->{server};
134     if ( ref($server) eq 'ARRAY' ) {
135
136         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
137         $es->{nodes} = $server;
138     }
139     elsif ($server) {
140         $es->{nodes} = [$server];
141     }
142     else {
143         die "No elasticsearch servers were specified in koha-conf.xml.\n";
144     }
145     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
146       if ( !$es->{index_name} );
147     # Append the name of this particular index to our namespace
148     $es->{index_name} .= '_' . $self->index;
149
150     $es->{key_prefix} = 'es_';
151     $es->{cxn_pool} //= 'Static';
152     $es->{request_timeout} //= 60;
153
154     return $es;
155 }
156
157 =head2 get_elasticsearch_settings
158
159     my $settings = $self->get_elasticsearch_settings();
160
161 This provides the settings provided to Elasticsearch when an index is created.
162 These can do things like define tokenization methods.
163
164 A hashref containing the settings is returned.
165
166 =cut
167
168 sub get_elasticsearch_settings {
169     my ($self) = @_;
170
171     # Use state to speed up repeated calls
172     state $settings = undef;
173     if (!defined $settings) {
174         my $config_file = C4::Context->config('elasticsearch_index_config');
175         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
176         $settings = LoadFile( $config_file );
177     }
178
179     return $settings;
180 }
181
182 =head2 get_elasticsearch_mappings
183
184     my $mappings = $self->get_elasticsearch_mappings();
185
186 This provides the mappings that get passed to Elasticsearch when an index is
187 created.
188
189 =cut
190
191 sub get_elasticsearch_mappings {
192     my ($self) = @_;
193
194     # Use state to speed up repeated calls
195     state %all_mappings;
196     state %sort_fields;
197
198     if (!defined $all_mappings{$self->index}) {
199         $sort_fields{$self->index} = {};
200         # Clone the general mapping to break ties with the original hash
201         my $mappings = {
202             data => clone(_get_elasticsearch_field_config('general', ''))
203         };
204         my $marcflavour = lc C4::Context->preference('marcflavour');
205         $self->_foreach_mapping(
206             sub {
207                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
208                 return if $marc_type ne $marcflavour;
209                 # TODO if this gets any sort of complexity to it, it should
210                 # be broken out into its own function.
211
212                 # TODO be aware of date formats, but this requires pre-parsing
213                 # as ES will simply reject anything with an invalid date.
214                 my $es_type = 'text';
215                 if ($type eq 'boolean') {
216                     $es_type = 'boolean';
217                 } elsif ($type eq 'number' || $type eq 'sum') {
218                     $es_type = 'integer';
219                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
220                     $es_type = 'stdno';
221                 }
222
223                 if ($search) {
224                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
225                 }
226
227                 if ($facet) {
228                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
229                 }
230                 if ($suggestible) {
231                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
232                 }
233                 # Sort is a bit special as it can be true, false, undef.
234                 # We care about "true" or "undef",
235                 # "undef" means to do the default thing, which is make it sortable.
236                 if (!defined $sort || $sort) {
237                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
238                     $sort_fields{$self->index}{$name} = 1;
239                 }
240             }
241         );
242         $all_mappings{$self->index} = $mappings;
243     }
244     $self->sort_fields(\%{$sort_fields{$self->index}});
245
246     return $all_mappings{$self->index};
247 }
248
249 =head2 raw_elasticsearch_mappings
250
251 Return elasticsearch mapping as it is in database.
252 marc_type: marc21|unimarc|normarc
253
254 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
255
256 =cut
257
258 sub raw_elasticsearch_mappings {
259     my ( $marc_type ) = @_;
260
261     my $schema = Koha::Database->new()->schema();
262
263     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
264
265     my $mappings = {};
266     while ( my $search_field = $search_fields->next ) {
267
268         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
269             { search_field_id => $search_field->id },
270             {
271                 join     => 'search_marc_map',
272                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
273             }
274         );
275
276         while ( my $marc_to_field = $marc_to_fields->next ) {
277
278             my $marc_map = $marc_to_field->search_marc_map;
279
280             next if $marc_type && $marc_map->marc_type ne $marc_type;
281
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
284             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
285             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
286             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
287             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
288
289             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
290                 {
291                     facet   => $marc_to_field->facet || '',
292                     marc_type => $marc_map->marc_type,
293                     marc_field => $marc_map->marc_field,
294                     sort        => $marc_to_field->sort,
295                     suggestible => $marc_to_field->suggestible || ''
296                 });
297
298         }
299     }
300
301     return $mappings;
302 }
303
304 =head2 _get_elasticsearch_field_config
305
306 Get the Elasticsearch field config for the given purpose and data type.
307
308 $mapping = _get_elasticsearch_field_config('search', 'text');
309
310 =cut
311
312 sub _get_elasticsearch_field_config {
313
314     my ( $purpose, $type ) = @_;
315
316     # Use state to speed up repeated calls
317     state $settings = undef;
318     if (!defined $settings) {
319         my $config_file = C4::Context->config('elasticsearch_field_config');
320         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
321         $settings = LoadFile( $config_file );
322     }
323
324     if (!defined $settings->{$purpose}) {
325         die "Field purpose $purpose not defined in field config";
326     }
327     if ($type eq '') {
328         return $settings->{$purpose};
329     }
330     if (defined $settings->{$purpose}{$type}) {
331         return $settings->{$purpose}{$type};
332     }
333     if (defined $settings->{$purpose}{'default'}) {
334         return $settings->{$purpose}{'default'};
335     }
336     return;
337 }
338
339 =head2 _load_elasticsearch_mappings
340
341 Load Elasticsearch mappings in the format of mappings.yaml.
342
343 $indexes = _load_elasticsearch_mappings();
344
345 =cut
346
347 sub _load_elasticsearch_mappings {
348     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
349     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
350     return LoadFile( $mappings_yaml );
351 }
352
353 sub reset_elasticsearch_mappings {
354     my ( $self ) = @_;
355     my $indexes = $self->_load_elasticsearch_mappings();
356
357     Koha::SearchMarcMaps->delete;
358     Koha::SearchFields->delete;
359
360     while ( my ( $index_name, $fields ) = each %$indexes ) {
361         while ( my ( $field_name, $data ) = each %$fields ) {
362
363             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
364
365             # Set default values
366             $sf_params{staff_client} //= 1;
367             $sf_params{opac} //= 1;
368
369             $sf_params{name} = $field_name;
370
371             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
372
373             my $mappings = $data->{mappings};
374             for my $mapping ( @$mappings ) {
375                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
376                     index_name => $index_name,
377                     marc_type => $mapping->{marc_type},
378                     marc_field => $mapping->{marc_field}
379                 });
380                 $search_field->add_to_search_marc_maps($marc_field, {
381                     facet => $mapping->{facet} || 0,
382                     suggestible => $mapping->{suggestible} || 0,
383                     sort => $mapping->{sort},
384                     search => $mapping->{search} // 1
385                 });
386             }
387         }
388     }
389
390     my $cache = Koha::Caches->get_instance();
391     $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
392     $cache->clear_from_cache('elasticsearch_search_fields_opac');
393
394     # FIXME return the mappings?
395 }
396
397 # This overrides the accessor provided by Class::Accessor so that if
398 # sort_fields isn't set, then it'll generate it.
399 sub sort_fields {
400     my $self = shift;
401     if (@_) {
402         $self->_sort_fields_accessor(@_);
403         return;
404     }
405     my $val = $self->_sort_fields_accessor();
406     return $val if $val;
407
408     # This will populate the accessor as a side effect
409     $self->get_elasticsearch_mappings();
410     return $self->_sort_fields_accessor();
411 }
412
413 =head2 _process_mappings($mappings, $data, $record_document, $meta)
414
415     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
416
417 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
418 Since we group all mappings by MARC field targets C<$mappings> will contain
419 all targets for C<$data> and thus we need to fetch the MARC field only once.
420 C<$mappings> will be applied to C<$record_document> and new field values added.
421 The method has no return value.
422
423 =over 4
424
425 =item C<$mappings>
426
427 Arrayref of mappings containing arrayrefs in the format
428 [C<$target>, C<$options>] where C<$target> is the name of the target field and
429 C<$options> is a hashref containing processing directives for this particular
430 mapping.
431
432 =item C<$data>
433
434 The source data from a MARC record field.
435
436 =item C<$record_document>
437
438 Hashref representing the Elasticsearch document on which mappings should be
439 applied.
440
441 =item C<$meta>
442
443 A hashref containing metadata useful for enforcing per mapping rules. For
444 example for providing extra context for mapping options, or treating mapping
445 targets differently depending on type (sort, search, facet etc). Combining
446 this metadata with the mapping options and metadata allows us to mutate the
447 data per mapping, or even replace it with other data retrieved from the
448 metadata context.
449
450 Current properties are:
451
452 C<altscript>: A boolean value indicating whether an alternate script presentation is being
453 processed.
454
455 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
456 'subfield' or 'subfields_group'.
457
458 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
459
460 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
461 is 'subfields_group'.
462
463 C<field>: The original C<MARC::Record> object.
464
465 =back
466
467 =cut
468
469 sub _process_mappings {
470     my ($_self, $mappings, $data, $record_document, $meta) = @_;
471     foreach my $mapping (@{$mappings}) {
472         my ($target, $options) = @{$mapping};
473
474         # Don't process sort fields for alternate scripts
475         my $sort = $target =~ /__sort$/;
476         if ($sort && $meta->{altscript}) {
477             next;
478         }
479
480         # Copy (scalar) data since can have multiple targets
481         # with differing options for (possibly) mutating data
482         # so need a different copy for each
483         my $_data = $data;
484         $record_document->{$target} //= [];
485         if (defined $options->{substr}) {
486             my ($start, $length) = @{$options->{substr}};
487             $_data = length($data) > $start ? substr $data, $start, $length : '';
488         }
489         if (defined $options->{value_callbacks}) {
490             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
491         }
492         if (defined $options->{property}) {
493             $_data = {
494                 $options->{property} => $_data
495             }
496         }
497         if (defined $options->{nonfiling_characters_indicator}) {
498             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
499             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
500             if ($nonfiling_chars) {
501                 $_data = substr $_data, $nonfiling_chars;
502             }
503         }
504         push @{$record_document->{$target}}, $_data;
505     }
506 }
507
508 =head2 marc_records_to_documents($marc_records)
509
510     my $record_documents = $self->marc_records_to_documents($marc_records);
511
512 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
513
514 Returns array of hash references, representing Elasticsearch documents,
515 acceptable as body payload in C<Search::Elasticsearch> requests.
516
517 =over 4
518
519 =item C<$marc_documents>
520
521 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
522
523 =back
524
525 =cut
526
527 sub marc_records_to_documents {
528     my ($self, $records) = @_;
529     my $rules = $self->_get_marc_mapping_rules();
530     my $control_fields_rules = $rules->{control_fields};
531     my $data_fields_rules = $rules->{data_fields};
532     my $marcflavour = lc C4::Context->preference('marcflavour');
533     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
534
535     my @record_documents;
536
537     foreach my $record (@{$records}) {
538         my $record_document = {};
539         my $mappings = $rules->{leader};
540         if ($mappings) {
541             $self->_process_mappings($mappings, $record->leader(), $record_document, {
542                     altscript => 0,
543                     data_source => 'leader'
544                 }
545             );
546         }
547         foreach my $field ($record->fields()) {
548             if ($field->is_control_field()) {
549                 my $mappings = $control_fields_rules->{$field->tag()};
550                 if ($mappings) {
551                     $self->_process_mappings($mappings, $field->data(), $record_document, {
552                             altscript => 0,
553                             data_source => 'control_field',
554                             field => $field
555                         }
556                     );
557                 }
558             }
559             else {
560                 my $tag = $field->tag();
561                 # Handle alternate scripts in MARC 21
562                 my $altscript = 0;
563                 if ($marcflavour eq 'marc21' && $tag eq '880') {
564                     my $sub6 = $field->subfield('6');
565                     if ($sub6 =~ /^(...)-\d+/) {
566                         $tag = $1;
567                         $altscript = 1;
568                     }
569                 }
570
571                 my $data_field_rules = $data_fields_rules->{$tag};
572                 if ($data_field_rules) {
573                     my $subfields_mappings = $data_field_rules->{subfields};
574                     my $wildcard_mappings = $subfields_mappings->{'*'};
575                     foreach my $subfield ($field->subfields()) {
576                         my ($code, $data) = @{$subfield};
577                         my $mappings = $subfields_mappings->{$code} // [];
578                         if ($wildcard_mappings) {
579                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
580                         }
581                         if (@{$mappings}) {
582                             $self->_process_mappings($mappings, $data, $record_document, {
583                                     altscript => $altscript,
584                                     data_source => 'subfield',
585                                     code => $code,
586                                     field => $field
587                                 }
588                             );
589                         }
590                         if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
591                             # Used by the authority linker the match-heading field requires a specific syntax
592                             # that is specified in C4/Heading
593                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
594                             next unless $heading;
595                             push @{$record_document->{'match-heading'}}, $heading->search_form;
596                         }
597                     }
598
599                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
600                     if ($subfields_join_mappings) {
601                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
602                             # Map each subfield to values, remove empty values, join with space
603                             my $data = join(
604                                 ' ',
605                                 grep(
606                                     $_,
607                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
608                                 )
609                             );
610                             if ($data) {
611                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
612                                         altscript => $altscript,
613                                         data_source => 'subfields_group',
614                                         codes => $subfields_group,
615                                         field => $field
616                                     }
617                                 );
618                             }
619                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
620                                 # Used by the authority linker the match-heading field requires a specific syntax
621                                 # that is specified in C4/Heading
622                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
623                                 next unless $heading;
624                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
625                             }
626                         }
627                     }
628                 }
629             }
630         }
631         foreach my $field (keys %{$rules->{defaults}}) {
632             unless (defined $record_document->{$field}) {
633                 $record_document->{$field} = $rules->{defaults}->{$field};
634             }
635         }
636         foreach my $field (@{$rules->{sum}}) {
637             if (defined $record_document->{$field}) {
638                 # TODO: validate numeric? filter?
639                 # TODO: Or should only accept fields without nested values?
640                 # TODO: Quick and dirty, improve if needed
641                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
642             }
643         }
644         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
645         foreach my $field (@{$rules->{isbn}}) {
646             if (defined $record_document->{$field}) {
647                 my @isbns = ();
648                 foreach my $input_isbn (@{$record_document->{$field}}) {
649                     my $isbn = Business::ISBN->new($input_isbn);
650                     if (defined $isbn && $isbn->is_valid) {
651                         my $isbn13 = $isbn->as_isbn13->as_string;
652                         push @isbns, $isbn13;
653                         $isbn13 =~ s/\-//g;
654                         push @isbns, $isbn13;
655
656                         my $isbn10 = $isbn->as_isbn10;
657                         if ($isbn10) {
658                             $isbn10 = $isbn10->as_string;
659                             push @isbns, $isbn10;
660                             $isbn10 =~ s/\-//g;
661                             push @isbns, $isbn10;
662                         }
663                     } else {
664                         push @isbns, $input_isbn;
665                     }
666                 }
667                 $record_document->{$field} = \@isbns;
668             }
669         }
670
671         # Remove duplicate values and collapse sort fields
672         foreach my $field (keys %{$record_document}) {
673             if (ref($record_document->{$field}) eq 'ARRAY') {
674                 @{$record_document->{$field}} = do {
675                     my %seen;
676                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
677                 };
678                 if ($field =~ /__sort$/) {
679                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
680                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
681                 }
682             }
683         }
684
685         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
686         $record->encoding('UTF-8');
687         if ($use_array) {
688             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
689             $record_document->{'marc_format'} = 'ARRAY';
690         } else {
691             my @warnings;
692             {
693                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
694                 local $SIG{__WARN__} = sub {
695                     push @warnings, $_[0];
696                 };
697                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
698             }
699             if (@warnings) {
700                 # Suppress warnings if record length exceeded
701                 unless (substr($record->leader(), 0, 5) eq '99999') {
702                     foreach my $warning (@warnings) {
703                         carp $warning;
704                     }
705                 }
706                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
707                 $record_document->{'marc_format'} = 'MARCXML';
708             }
709             else {
710                 $record_document->{'marc_format'} = 'base64ISO2709';
711             }
712         }
713         push @record_documents, $record_document;
714     }
715     return \@record_documents;
716 }
717
718 =head2 _marc_to_array($record)
719
720     my @fields = _marc_to_array($record)
721
722 Convert a MARC::Record to an array modeled after MARC-in-JSON
723 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
724
725 =over 4
726
727 =item C<$record>
728
729 A MARC::Record object
730
731 =back
732
733 =cut
734
735 sub _marc_to_array {
736     my ($self, $record) = @_;
737
738     my $data = {
739         leader => $record->leader(),
740         fields => []
741     };
742     for my $field ($record->fields()) {
743         my $tag = $field->tag();
744         if ($field->is_control_field()) {
745             push @{$data->{fields}}, {$tag => $field->data()};
746         } else {
747             my $subfields = ();
748             foreach my $subfield ($field->subfields()) {
749                 my ($code, $contents) = @{$subfield};
750                 push @{$subfields}, {$code => $contents};
751             }
752             push @{$data->{fields}}, {
753                 $tag => {
754                     ind1 => $field->indicator(1),
755                     ind2 => $field->indicator(2),
756                     subfields => $subfields
757                 }
758             };
759         }
760     }
761     return $data;
762 }
763
764 =head2 _array_to_marc($data)
765
766     my $record = _array_to_marc($data)
767
768 Convert an array modeled after MARC-in-JSON to a MARC::Record
769
770 =over 4
771
772 =item C<$data>
773
774 An array modeled after MARC-in-JSON
775 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
776
777 =back
778
779 =cut
780
781 sub _array_to_marc {
782     my ($self, $data) = @_;
783
784     my $record = MARC::Record->new();
785
786     $record->leader($data->{leader});
787     for my $field (@{$data->{fields}}) {
788         my $tag = (keys %{$field})[0];
789         $field = $field->{$tag};
790         my $marc_field;
791         if (ref($field) eq 'HASH') {
792             my @subfields;
793             foreach my $subfield (@{$field->{subfields}}) {
794                 my $code = (keys %{$subfield})[0];
795                 push @subfields, $code;
796                 push @subfields, $subfield->{$code};
797             }
798             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
799         } else {
800             $marc_field = MARC::Field->new($tag, $field)
801         }
802         $record->append_fields($marc_field);
803     }
804 ;
805     return $record;
806 }
807
808 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
809
810     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
811
812 Get mappings, an internal data structure later used by
813 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
814 data for a MARC mapping.
815
816 The returned C<$mappings> is not to to be confused with mappings provided by
817 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
818 provided by C<_foreach_mapping> and expands it to this internal data structure.
819 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
820 is then applied to each MARC target (leader, control field data, subfield or
821 joined subfields) and integrated into the mapping rules data structure used in
822 C<marc_records_to_documents> to transform MARC records into Elasticsearch
823 documents.
824
825 =over 4
826
827 =item C<$facet>
828
829 Boolean indicating whether to create a facet field for this mapping.
830
831 =item C<$suggestible>
832
833 Boolean indicating whether to create a suggestion field for this mapping.
834
835 =item C<$sort>
836
837 Boolean indicating whether to create a sort field for this mapping.
838
839 =item C<$search>
840
841 Boolean indicating whether to create a search field for this mapping.
842
843 =item C<$target_name>
844
845 Elasticsearch document target field name.
846
847 =item C<$target_type>
848
849 Elasticsearch document target field type.
850
851 =item C<$range>
852
853 An optional range as a string in the format "<START>-<END>" or "<START>",
854 where "<START>" and "<END>" are integers specifying a range that will be used
855 for extracting a substring from MARC data as Elasticsearch field target value.
856
857 The first character position is "0", and the range is inclusive,
858 so "0-2" means the first three characters of MARC data.
859
860 If only "<START>" is provided only one character at position "<START>" will
861 be extracted.
862
863 =back
864
865 =cut
866
867 sub _field_mappings {
868     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
869     my %mapping_defaults = ();
870     my @mappings;
871
872     my $substr_args = undef;
873     if (defined $range) {
874         # TODO: use value_callback instead?
875         my ($start, $end) = map(int, split /-/, $range, 2);
876         $substr_args = [$start];
877         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
878     }
879     my $default_options = {};
880     if ($substr_args) {
881         $default_options->{substr} = $substr_args;
882     }
883
884     # TODO: Should probably have per type value callback/hook
885     # but hard code for now
886     if ($target_type eq 'boolean') {
887         $default_options->{value_callbacks} //= [];
888         push @{$default_options->{value_callbacks}}, sub {
889             my ($value) = @_;
890             # Trim whitespace at both ends
891             $value =~ s/^\s+|\s+$//g;
892             return $value ? 'true' : 'false';
893         };
894     }
895
896     if ($search) {
897         my $mapping = [$target_name, $default_options];
898         push @mappings, $mapping;
899     }
900
901     my @suffixes = ();
902     push @suffixes, 'facet' if $facet;
903     push @suffixes, 'suggestion' if $suggestible;
904     push @suffixes, 'sort' if !defined $sort || $sort;
905
906     foreach my $suffix (@suffixes) {
907         my $mapping = ["${target_name}__$suffix"];
908         # TODO: Hack, fix later in less hideous manner
909         if ($suffix eq 'suggestion') {
910             push @{$mapping}, {%{$default_options}, property => 'input'};
911         }
912         else {
913             # Important! Make shallow clone, or we end up with the same hashref
914             # shared by all mappings
915             push @{$mapping}, {%{$default_options}};
916         }
917         push @mappings, $mapping;
918     }
919     return @mappings;
920 };
921
922 =head2 _get_marc_mapping_rules
923
924     my $mapping_rules = $self->_get_marc_mapping_rules()
925
926 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
927
928 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
929 each call to C<MARC::Record>->field) we create an optimized structure of mapping
930 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
931
932 We can then iterate through all MARC fields for each record and apply all relevant
933 rules once per fields instead of retreiving fields multiple times for each mapping rule
934 which is terribly slow.
935
936 =cut
937
938 # TODO: This structure can be used for processing multiple MARC::Records so is currently
939 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
940 # memory cache which it is currently not. The performance gain of caching
941 # would probably be marginal, but to do this could be a further improvement.
942
943 sub _get_marc_mapping_rules {
944     my ($self) = @_;
945     my $marcflavour = lc C4::Context->preference('marcflavour');
946     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
947     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
948     my $rules = {
949         'leader' => [],
950         'control_fields' => {},
951         'data_fields' => {},
952         'sum' => [],
953         'isbn' => [],
954         'defaults' => {}
955     };
956
957     $self->_foreach_mapping(sub {
958         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
959         return if $marc_type ne $marcflavour;
960
961         if ($type eq 'sum') {
962             push @{$rules->{sum}}, $name;
963             push @{$rules->{sum}}, $name."__sort" if $sort;
964         }
965         elsif ($type eq 'isbn') {
966             push @{$rules->{isbn}}, $name;
967         }
968         elsif ($type eq 'boolean') {
969             # boolean gets special handling, if value doesn't exist for a field,
970             # it is set to false
971             $rules->{defaults}->{$name} = 'false';
972         }
973
974         if ($marc_field =~ $field_spec_regexp) {
975             my $field_tag = $1;
976
977             my @subfields;
978             my @subfield_groups;
979             # Parse and separate subfields form subfield groups
980             if (defined $2) {
981                 my $subfield_group = '';
982                 my $open_group = 0;
983
984                 foreach my $token (split //, $2) {
985                     if ($token eq "(") {
986                         if ($open_group) {
987                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
988                                 "Unmatched opening parenthesis for $marc_field"
989                             );
990                         }
991                         else {
992                             $open_group = 1;
993                         }
994                     }
995                     elsif ($token eq ")") {
996                         if ($open_group) {
997                             if ($subfield_group) {
998                                 push @subfield_groups, $subfield_group;
999                                 $subfield_group = '';
1000                             }
1001                             $open_group = 0;
1002                         }
1003                         else {
1004                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1005                                 "Unmatched closing parenthesis for $marc_field"
1006                             );
1007                         }
1008                     }
1009                     elsif ($open_group) {
1010                         $subfield_group .= $token;
1011                     }
1012                     else {
1013                         push @subfields, $token;
1014                     }
1015                 }
1016             }
1017             else {
1018                 push @subfields, '*';
1019             }
1020
1021             my $range = defined $3 ? $3 : undef;
1022             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1023             if ($field_tag < 10) {
1024                 $rules->{control_fields}->{$field_tag} //= [];
1025                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1026             }
1027             else {
1028                 $rules->{data_fields}->{$field_tag} //= {};
1029                 foreach my $subfield (@subfields) {
1030                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1031                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1032                 }
1033                 foreach my $subfield_group (@subfield_groups) {
1034                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1035                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1036                 }
1037             }
1038         }
1039         elsif ($marc_field =~ $leader_regexp) {
1040             my $range = defined $1 ? $1 : undef;
1041             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1042             push @{$rules->{leader}}, @mappings;
1043         }
1044         else {
1045             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1046                 "Invalid MARC field expression: $marc_field"
1047             );
1048         }
1049     });
1050
1051     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1052     if ($marcflavour eq 'marc21') {
1053         # Nonfiling characters processing for sort fields
1054         my %title_fields;
1055         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1056             # Format is: nonfiling characters indicator => field names list
1057             %title_fields = (
1058                 1 => [130, 630, 730, 740],
1059                 2 => [222, 240, 242, 243, 245, 440, 830]
1060             );
1061         }
1062         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1063             %title_fields = (
1064                 1 => [730],
1065                 2 => [130, 430, 530]
1066             );
1067         }
1068         foreach my $indicator (keys %title_fields) {
1069             foreach my $field_tag (@{$title_fields{$indicator}}) {
1070                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1071                 foreach my $mapping (@{$mappings}) {
1072                     if ($mapping->[0] =~ /__sort$/) {
1073                         # Mark this as to be processed for nonfiling characters indicator
1074                         # later on in _process_mappings
1075                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1076                     }
1077                 }
1078             }
1079         }
1080     }
1081
1082     return $rules;
1083 }
1084
1085 =head2 _foreach_mapping
1086
1087     $self->_foreach_mapping(
1088         sub {
1089             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1090                 $marc_field )
1091               = @_;
1092             return unless $marc_type eq 'marc21';
1093             print "Data comes from: " . $marc_field . "\n";
1094         }
1095     );
1096
1097 This allows you to apply a function to each entry in the elasticsearch mappings
1098 table, in order to build the mappings for whatever is needed.
1099
1100 In the provided function, the files are:
1101
1102 =over 4
1103
1104 =item C<$name>
1105
1106 The field name for elasticsearch (corresponds to the 'mapping' column in the
1107 database.
1108
1109 =item C<$type>
1110
1111 The type for this value, e.g. 'string'.
1112
1113 =item C<$facet>
1114
1115 True if this value should be facetised. This only really makes sense if the
1116 field is understood by the facet processing code anyway.
1117
1118 =item C<$sort>
1119
1120 True if this is a field that a) needs special sort handling, and b) if it
1121 should be sorted on. False if a) but not b). Undef if not a). This allows,
1122 for example, author to be sorted on but not everything marked with "author"
1123 to be included in that sort.
1124
1125 =item C<$marc_type>
1126
1127 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1128 'unimarc', 'normarc'.
1129
1130 =item C<$marc_field>
1131
1132 A string that describes the MARC field that contains the data to extract.
1133
1134 =back
1135
1136 =cut
1137
1138 sub _foreach_mapping {
1139     my ( $self, $sub ) = @_;
1140
1141     # TODO use a caching framework here
1142     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1143         {
1144             'search_marc_map.index_name' => $self->index,
1145         },
1146         {   join => { search_marc_to_fields => 'search_marc_map' },
1147             '+select' => [
1148                 'search_marc_to_fields.facet',
1149                 'search_marc_to_fields.suggestible',
1150                 'search_marc_to_fields.sort',
1151                 'search_marc_to_fields.search',
1152                 'search_marc_map.marc_type',
1153                 'search_marc_map.marc_field',
1154             ],
1155             '+as'     => [
1156                 'facet',
1157                 'suggestible',
1158                 'sort',
1159                 'search',
1160                 'marc_type',
1161                 'marc_field',
1162             ],
1163         }
1164     );
1165
1166     while ( my $search_field = $search_fields->next ) {
1167         $sub->(
1168             # Force lower case on indexed field names for case insensitive
1169             # field name searches
1170             lc($search_field->name),
1171             $search_field->type,
1172             $search_field->get_column('facet'),
1173             $search_field->get_column('suggestible'),
1174             $search_field->get_column('sort'),
1175             $search_field->get_column('search'),
1176             $search_field->get_column('marc_type'),
1177             $search_field->get_column('marc_field'),
1178         );
1179     }
1180 }
1181
1182 =head2 process_error
1183
1184     die process_error($@);
1185
1186 This parses an Elasticsearch error message and produces a human-readable
1187 result from it. This result is probably missing all the useful information
1188 that you might want in diagnosing an issue, so the warning is also logged.
1189
1190 Note that currently the resulting message is not internationalised. This
1191 will happen eventually by some method or other.
1192
1193 =cut
1194
1195 sub process_error {
1196     my ($self, $msg) = @_;
1197
1198     warn $msg; # simple logging
1199
1200     # This is super-primitive
1201     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1202
1203     return "Unable to perform your search. Please try again.\n";
1204 }
1205
1206 =head2 _read_configuration
1207
1208     my $conf = _read_configuration();
1209
1210 Reads the I<configuration file> and returns a hash structure with the
1211 configuration information. It raises an exception if mandatory entries
1212 are missing.
1213
1214 The hashref structure has the following form:
1215
1216     {
1217         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1218         'index_name' => 'koha_instance',
1219     }
1220
1221 This is configured by the following in the C<config> block in koha-conf.xml:
1222
1223     <elasticsearch>
1224         <server>127.0.0.1:9200</server>
1225         <server>anotherserver:9200</server>
1226         <index_name>koha_instance</index_name>
1227     </elasticsearch>
1228
1229 =cut
1230
1231 sub _read_configuration {
1232
1233     my $configuration;
1234
1235     my $conf = C4::Context->config('elasticsearch');
1236     Koha::Exceptions::Config::MissingEntry->throw(
1237         "Missing 'elasticsearch' block in config file")
1238       unless defined $conf;
1239
1240     if ( $conf && $conf->{server} ) {
1241         my $nodes = $conf->{server};
1242         if ( ref($nodes) eq 'ARRAY' ) {
1243             $configuration->{nodes} = $nodes;
1244         }
1245         else {
1246             $configuration->{nodes} = [$nodes];
1247         }
1248     }
1249     else {
1250         Koha::Exceptions::Config::MissingEntry->throw(
1251             "Missing 'server' entry in config file for elasticsearch");
1252     }
1253
1254     if ( defined $conf->{index_name} ) {
1255         $configuration->{index_name} = $conf->{index_name};
1256     }
1257     else {
1258         Koha::Exceptions::Config::MissingEntry->throw(
1259             "Missing 'index_name' entry in config file for elasticsearch");
1260     }
1261
1262     return $configuration;
1263 }
1264
1265 =head2 get_facetable_fields
1266
1267 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1268
1269 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1270
1271 =cut
1272
1273 sub get_facetable_fields {
1274     my ($self) = @_;
1275
1276     # These should correspond to the ES field names, as opposed to the CCL
1277     # things that zebra uses.
1278     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1279     my @faceted_fields = Koha::SearchFields->search(
1280         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1281     );
1282     my @not_faceted_fields = Koha::SearchFields->search(
1283         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1284     );
1285     # This could certainly be improved
1286     return ( @faceted_fields, @not_faceted_fields );
1287 }
1288
1289 1;
1290
1291 __END__
1292
1293 =head1 AUTHOR
1294
1295 =over 4
1296
1297 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1298
1299 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1300
1301 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1302
1303 =back
1304
1305 =cut