Bug 28959: Add virtualshelves.public as a boolean
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
30 use Koha::Caches;
31 use C4::Heading;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
33
34 use Carp qw( carp croak );
35 use Clone qw( clone );
36 use Modern::Perl;
37 use Readonly qw( Readonly );
38 use Search::Elasticsearch;
39 use Try::Tiny qw( catch try );
40 use YAML::XS;
41
42 use List::Util qw( sum0 );
43 use MARC::File::XML;
44 use MIME::Base64 qw( encode_base64 );
45 use Encode qw( encode );
46 use Business::ISBN;
47 use Scalar::Util qw( looks_like_number );
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = YAML::XS::LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282
283             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284                 {
285                     facet   => $marc_to_field->facet || '',
286                     marc_type => $marc_map->marc_type,
287                     marc_field => $marc_map->marc_field,
288                     sort        => $marc_to_field->sort,
289                     suggestible => $marc_to_field->suggestible || ''
290                 });
291
292         }
293     }
294
295     return $mappings;
296 }
297
298 =head2 _get_elasticsearch_field_config
299
300 Get the Elasticsearch field config for the given purpose and data type.
301
302 $mapping = _get_elasticsearch_field_config('search', 'text');
303
304 =cut
305
306 sub _get_elasticsearch_field_config {
307
308     my ( $purpose, $type ) = @_;
309
310     # Use state to speed up repeated calls
311     state $settings = undef;
312     if (!defined $settings) {
313         my $config_file = C4::Context->config('elasticsearch_field_config');
314         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315         local $YAML::XS::Boolean = 'JSON::PP';
316         $settings = YAML::XS::LoadFile( $config_file );
317     }
318
319     if (!defined $settings->{$purpose}) {
320         die "Field purpose $purpose not defined in field config";
321     }
322     if ($type eq '') {
323         return $settings->{$purpose};
324     }
325     if (defined $settings->{$purpose}{$type}) {
326         return $settings->{$purpose}{$type};
327     }
328     if (defined $settings->{$purpose}{'default'}) {
329         return $settings->{$purpose}{'default'};
330     }
331     return;
332 }
333
334 =head2 _load_elasticsearch_mappings
335
336 Load Elasticsearch mappings in the format of mappings.yaml.
337
338 $indexes = _load_elasticsearch_mappings();
339
340 =cut
341
342 sub _load_elasticsearch_mappings {
343     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345     return YAML::XS::LoadFile( $mappings_yaml );
346 }
347
348 sub reset_elasticsearch_mappings {
349     my ( $self ) = @_;
350     my $indexes = $self->_load_elasticsearch_mappings();
351
352     Koha::SearchMarcMaps->delete;
353     Koha::SearchFields->delete;
354
355     while ( my ( $index_name, $fields ) = each %$indexes ) {
356         while ( my ( $field_name, $data ) = each %$fields ) {
357
358             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
359
360             # Set default values
361             $sf_params{staff_client} //= 1;
362             $sf_params{opac} //= 1;
363
364             $sf_params{name} = $field_name;
365
366             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367
368             my $mappings = $data->{mappings};
369             for my $mapping ( @$mappings ) {
370                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371                     index_name => $index_name,
372                     marc_type => $mapping->{marc_type},
373                     marc_field => $mapping->{marc_field}
374                 });
375                 $search_field->add_to_search_marc_maps($marc_field, {
376                     facet => $mapping->{facet} || 0,
377                     suggestible => $mapping->{suggestible} || 0,
378                     sort => $mapping->{sort} // 1,
379                     search => $mapping->{search} // 1
380                 });
381             }
382         }
383     }
384
385     $self->clear_search_fields_cache();
386
387     # FIXME return the mappings?
388 }
389
390 # This overrides the accessor provided by Class::Accessor so that if
391 # sort_fields isn't set, then it'll generate it.
392 sub sort_fields {
393     my $self = shift;
394     if (@_) {
395         $self->_sort_fields_accessor(@_);
396         return;
397     }
398     my $val = $self->_sort_fields_accessor();
399     return $val if $val;
400
401     # This will populate the accessor as a side effect
402     $self->get_elasticsearch_mappings();
403     return $self->_sort_fields_accessor();
404 }
405
406 =head2 _process_mappings($mappings, $data, $record_document, $meta)
407
408     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
409
410 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
411 Since we group all mappings by MARC field targets C<$mappings> will contain
412 all targets for C<$data> and thus we need to fetch the MARC field only once.
413 C<$mappings> will be applied to C<$record_document> and new field values added.
414 The method has no return value.
415
416 =over 4
417
418 =item C<$mappings>
419
420 Arrayref of mappings containing arrayrefs in the format
421 [C<$target>, C<$options>] where C<$target> is the name of the target field and
422 C<$options> is a hashref containing processing directives for this particular
423 mapping.
424
425 =item C<$data>
426
427 The source data from a MARC record field.
428
429 =item C<$record_document>
430
431 Hashref representing the Elasticsearch document on which mappings should be
432 applied.
433
434 =item C<$meta>
435
436 A hashref containing metadata useful for enforcing per mapping rules. For
437 example for providing extra context for mapping options, or treating mapping
438 targets differently depending on type (sort, search, facet etc). Combining
439 this metadata with the mapping options and metadata allows us to mutate the
440 data per mapping, or even replace it with other data retrieved from the
441 metadata context.
442
443 Current properties are:
444
445 C<altscript>: A boolean value indicating whether an alternate script presentation is being
446 processed.
447
448 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
449 'subfield' or 'subfields_group'.
450
451 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
452
453 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
454 is 'subfields_group'.
455
456 C<field>: The original C<MARC::Record> object.
457
458 =back
459
460 =cut
461
462 sub _process_mappings {
463     my ($_self, $mappings, $data, $record_document, $meta) = @_;
464     foreach my $mapping (@{$mappings}) {
465         my ($target, $options) = @{$mapping};
466
467         # Don't process sort fields for alternate scripts
468         my $sort = $target =~ /__sort$/;
469         if ($sort && $meta->{altscript}) {
470             next;
471         }
472
473         # Copy (scalar) data since can have multiple targets
474         # with differing options for (possibly) mutating data
475         # so need a different copy for each
476         my $data_copy = $data;
477         if (defined $options->{substr}) {
478             my ($start, $length) = @{$options->{substr}};
479             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
480         }
481
482         # Add data to values array for callbacks processing
483         my $values = [$data_copy];
484
485         # Value callbacks takes subfield data (or values from previous
486         # callbacks) as argument, and returns a possibly different list of values.
487         # Note that the returned list may also be empty.
488         if (defined $options->{value_callbacks}) {
489             foreach my $callback (@{$options->{value_callbacks}}) {
490                 # Pass each value to current callback which returns a list
491                 # (scalar is fine too) resulting either in a list or
492                 # a list of lists that will be flattened by perl.
493                 # The next callback will receive the possibly expanded list of values.
494                 $values = [ map { $callback->($_) } @{$values} ];
495             }
496         }
497
498         # Skip mapping if all values has been removed
499         next unless @{$values};
500
501         if (defined $options->{property}) {
502             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
503         }
504         if (defined $options->{nonfiling_characters_indicator}) {
505             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
506             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
507             # Nonfiling chars does not make sense for multiple values
508             # Only apply on first element
509             $values->[0] = substr $values->[0], $nonfiling_chars;
510         }
511
512         $values = [ grep(!/^$/, @{$values}) ];
513
514         $record_document->{$target} //= [];
515         push @{$record_document->{$target}}, @{$values};
516     }
517 }
518
519 =head2 marc_records_to_documents($marc_records)
520
521     my $record_documents = $self->marc_records_to_documents($marc_records);
522
523 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
524
525 Returns array of hash references, representing Elasticsearch documents,
526 acceptable as body payload in C<Search::Elasticsearch> requests.
527
528 =over 4
529
530 =item C<$marc_documents>
531
532 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
533
534 =back
535
536 =cut
537
538 sub marc_records_to_documents {
539     my ($self, $records) = @_;
540     my $rules = $self->_get_marc_mapping_rules();
541     my $control_fields_rules = $rules->{control_fields};
542     my $data_fields_rules = $rules->{data_fields};
543     my $marcflavour = lc C4::Context->preference('marcflavour');
544     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
545
546     my @record_documents;
547
548     my %auth_match_headings;
549     if( $self->index eq 'authorities' ){
550         my @auth_types = Koha::Authority::Types->search();
551         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
552     }
553
554     foreach my $record (@{$records}) {
555         my $record_document = {};
556
557         if ( $self->index eq 'authorities' ){
558             my $authtypecode = GuessAuthTypeCode( $record );
559             if( $authtypecode ){
560                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
561                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
562                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
563                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
564                 }
565             } else {
566                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
567             }
568         }
569
570         my $mappings = $rules->{leader};
571         if ($mappings) {
572             $self->_process_mappings($mappings, $record->leader(), $record_document, {
573                     altscript => 0,
574                     data_source => 'leader'
575                 }
576             );
577         }
578         foreach my $field ($record->fields()) {
579             if ($field->is_control_field()) {
580                 my $mappings = $control_fields_rules->{$field->tag()};
581                 if ($mappings) {
582                     $self->_process_mappings($mappings, $field->data(), $record_document, {
583                             altscript => 0,
584                             data_source => 'control_field',
585                             field => $field
586                         }
587                     );
588                 }
589             }
590             else {
591                 my $tag = $field->tag();
592                 # Handle alternate scripts in MARC 21
593                 my $altscript = 0;
594                 if ($marcflavour eq 'marc21' && $tag eq '880') {
595                     my $sub6 = $field->subfield('6');
596                     if ($sub6 =~ /^(...)-\d+/) {
597                         $tag = $1;
598                         $altscript = 1;
599                     }
600                 }
601
602                 my $data_field_rules = $data_fields_rules->{$tag};
603                 if ($data_field_rules) {
604                     my $subfields_mappings = $data_field_rules->{subfields};
605                     my $wildcard_mappings = $subfields_mappings->{'*'};
606                     foreach my $subfield ($field->subfields()) {
607                         my ($code, $data) = @{$subfield};
608                         my $mappings = $subfields_mappings->{$code} // [];
609                         if ($wildcard_mappings) {
610                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
611                         }
612                         if (@{$mappings}) {
613                             $self->_process_mappings($mappings, $data, $record_document, {
614                                     altscript => $altscript,
615                                     data_source => 'subfield',
616                                     code => $code,
617                                     field => $field
618                                 }
619                             );
620                         }
621                     }
622
623                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
624                     if ($subfields_join_mappings) {
625                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
626                             my $data_field = $field->clone; #copy field to preserve for alt scripts
627                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
628                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
629                             if ($data) {
630                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
631                                         altscript => $altscript,
632                                         data_source => 'subfields_group',
633                                         codes => $subfields_group,
634                                         field => $field
635                                     }
636                                 );
637                             }
638                         }
639                     }
640                 }
641             }
642         }
643
644         if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
645             foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
646                 my $data_field_rules = $data_fields_rules->{$field->tag()};
647                 if ($data_field_rules) {
648                     my $subfields_mappings = $data_field_rules->{subfields};
649                     my $wildcard_mappings = $subfields_mappings->{'*'};
650                     foreach my $subfield ($field->subfields()) {
651                         my ($code, $data) = @{$subfield};
652                         my @mappings;
653                         push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
654                         push @mappings, @$wildcard_mappings if $wildcard_mappings;
655                         # Do not include "see from" into these kind of fields
656                         @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
657                         if (@mappings) {
658                             $self->_process_mappings(\@mappings, $data, $record_document, {
659                                     data_source => 'subfield',
660                                     code => $code,
661                                     field => $field
662                                 }
663                             );
664                         }
665                     }
666
667                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
668                     if ($subfields_join_mappings) {
669                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
670                             my $data_field = $field->clone;
671                             # remove empty subfields, otherwise they are printed as a space
672                             $data_field->delete_subfield(match => qr/^$/);
673                             my $data = $data_field->as_string( $subfields_group );
674                             if ($data) {
675                                 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
676                                 # Do not include "see from" into these kind of fields
677                                 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
678                                 $self->_process_mappings(\@mappings, $data, $record_document, {
679                                         data_source => 'subfields_group',
680                                         codes => $subfields_group,
681                                         field => $field
682                                     }
683                                 );
684                             }
685                         }
686                     }
687                 }
688             }
689         }
690
691         foreach my $field (keys %{$rules->{defaults}}) {
692             unless (defined $record_document->{$field}) {
693                 $record_document->{$field} = $rules->{defaults}->{$field};
694             }
695         }
696         foreach my $field (@{$rules->{sum}}) {
697             if (defined $record_document->{$field}) {
698                 # TODO: validate numeric? filter?
699                 # TODO: Or should only accept fields without nested values?
700                 # TODO: Quick and dirty, improve if needed
701                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
702             }
703         }
704         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
705         foreach my $field (@{$rules->{isbn}}) {
706             if (defined $record_document->{$field}) {
707                 my @isbns = ();
708                 foreach my $input_isbn (@{$record_document->{$field}}) {
709                     my $isbn = Business::ISBN->new($input_isbn);
710                     if (defined $isbn && $isbn->is_valid) {
711                         my $isbn13 = $isbn->as_isbn13->as_string;
712                         push @isbns, $isbn13;
713                         $isbn13 =~ s/\-//g;
714                         push @isbns, $isbn13;
715
716                         my $isbn10 = $isbn->as_isbn10;
717                         if ($isbn10) {
718                             $isbn10 = $isbn10->as_string;
719                             push @isbns, $isbn10;
720                             $isbn10 =~ s/\-//g;
721                             push @isbns, $isbn10;
722                         }
723                     } else {
724                         push @isbns, $input_isbn;
725                     }
726                 }
727                 $record_document->{$field} = \@isbns;
728             }
729         }
730
731         # Remove duplicate values and collapse sort fields
732         foreach my $field (keys %{$record_document}) {
733             if (ref($record_document->{$field}) eq 'ARRAY') {
734                 @{$record_document->{$field}} = do {
735                     my %seen;
736                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
737                 };
738                 if ($field =~ /__sort$/) {
739                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
740                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
741                 }
742             }
743         }
744
745         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
746         $record->encoding('UTF-8');
747         if ($use_array) {
748             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
749             $record_document->{'marc_format'} = 'ARRAY';
750         } else {
751             my @warnings;
752             {
753                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
754                 local $SIG{__WARN__} = sub {
755                     push @warnings, $_[0];
756                 };
757                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
758             }
759             if (@warnings) {
760                 # Suppress warnings if record length exceeded
761                 unless (substr($record->leader(), 0, 5) eq '99999') {
762                     foreach my $warning (@warnings) {
763                         carp $warning;
764                     }
765                 }
766                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
767                 $record_document->{'marc_format'} = 'MARCXML';
768             }
769             else {
770                 $record_document->{'marc_format'} = 'base64ISO2709';
771             }
772         }
773         push @record_documents, $record_document;
774     }
775     return \@record_documents;
776 }
777
778 =head2 _marc_to_array($record)
779
780     my @fields = _marc_to_array($record)
781
782 Convert a MARC::Record to an array modeled after MARC-in-JSON
783 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
784
785 =over 4
786
787 =item C<$record>
788
789 A MARC::Record object
790
791 =back
792
793 =cut
794
795 sub _marc_to_array {
796     my ($self, $record) = @_;
797
798     my $data = {
799         leader => $record->leader(),
800         fields => []
801     };
802     for my $field ($record->fields()) {
803         my $tag = $field->tag();
804         if ($field->is_control_field()) {
805             push @{$data->{fields}}, {$tag => $field->data()};
806         } else {
807             my $subfields = ();
808             foreach my $subfield ($field->subfields()) {
809                 my ($code, $contents) = @{$subfield};
810                 push @{$subfields}, {$code => $contents};
811             }
812             push @{$data->{fields}}, {
813                 $tag => {
814                     ind1 => $field->indicator(1),
815                     ind2 => $field->indicator(2),
816                     subfields => $subfields
817                 }
818             };
819         }
820     }
821     return $data;
822 }
823
824 =head2 _array_to_marc($data)
825
826     my $record = _array_to_marc($data)
827
828 Convert an array modeled after MARC-in-JSON to a MARC::Record
829
830 =over 4
831
832 =item C<$data>
833
834 An array modeled after MARC-in-JSON
835 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
836
837 =back
838
839 =cut
840
841 sub _array_to_marc {
842     my ($self, $data) = @_;
843
844     my $record = MARC::Record->new();
845
846     $record->leader($data->{leader});
847     for my $field (@{$data->{fields}}) {
848         my $tag = (keys %{$field})[0];
849         $field = $field->{$tag};
850         my $marc_field;
851         if (ref($field) eq 'HASH') {
852             my @subfields;
853             foreach my $subfield (@{$field->{subfields}}) {
854                 my $code = (keys %{$subfield})[0];
855                 push @subfields, $code;
856                 push @subfields, $subfield->{$code};
857             }
858             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
859         } else {
860             $marc_field = MARC::Field->new($tag, $field)
861         }
862         $record->append_fields($marc_field);
863     }
864 ;
865     return $record;
866 }
867
868 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
869
870     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
871
872 Get mappings, an internal data structure later used by
873 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
874 data for a MARC mapping.
875
876 The returned C<$mappings> is not to to be confused with mappings provided by
877 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
878 provided by C<_foreach_mapping> and expands it to this internal data structure.
879 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
880 is then applied to each MARC target (leader, control field data, subfield or
881 joined subfields) and integrated into the mapping rules data structure used in
882 C<marc_records_to_documents> to transform MARC records into Elasticsearch
883 documents.
884
885 =over 4
886
887 =item C<$facet>
888
889 Boolean indicating whether to create a facet field for this mapping.
890
891 =item C<$suggestible>
892
893 Boolean indicating whether to create a suggestion field for this mapping.
894
895 =item C<$sort>
896
897 Boolean indicating whether to create a sort field for this mapping.
898
899 =item C<$search>
900
901 Boolean indicating whether to create a search field for this mapping.
902
903 =item C<$target_name>
904
905 Elasticsearch document target field name.
906
907 =item C<$target_type>
908
909 Elasticsearch document target field type.
910
911 =item C<$range>
912
913 An optional range as a string in the format "<START>-<END>" or "<START>",
914 where "<START>" and "<END>" are integers specifying a range that will be used
915 for extracting a substring from MARC data as Elasticsearch field target value.
916
917 The first character position is "0", and the range is inclusive,
918 so "0-2" means the first three characters of MARC data.
919
920 If only "<START>" is provided only one character at position "<START>" will
921 be extracted.
922
923 =back
924
925 =cut
926
927 sub _field_mappings {
928     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
929     my %mapping_defaults = ();
930     my @mappings;
931
932     my $substr_args = undef;
933     if (defined $range) {
934         # TODO: use value_callback instead?
935         my ($start, $end) = map(int, split /-/, $range, 2);
936         $substr_args = [$start];
937         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
938     }
939     my $default_options = {};
940     if ($substr_args) {
941         $default_options->{substr} = $substr_args;
942     }
943
944     # TODO: Should probably have per type value callback/hook
945     # but hard code for now
946     if ($target_type eq 'boolean') {
947         $default_options->{value_callbacks} //= [];
948         push @{$default_options->{value_callbacks}}, sub {
949             my ($value) = @_;
950             # Trim whitespace at both ends
951             $value =~ s/^\s+|\s+$//g;
952             return $value ? 'true' : 'false';
953         };
954     }
955     elsif ($target_type eq 'year') {
956         $default_options->{value_callbacks} //= [];
957         # Only accept years containing digits and "u"
958         push @{$default_options->{value_callbacks}}, sub {
959             my ($value) = @_;
960             # Replace "u" with "0" for sorting
961             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
962         };
963     }
964
965     if ($search) {
966         my $mapping = [$target_name, $default_options];
967         push @mappings, $mapping;
968     }
969
970     my @suffixes = ();
971     push @suffixes, 'facet' if $facet;
972     push @suffixes, 'suggestion' if $suggestible;
973     push @suffixes, 'sort' if !defined $sort || $sort;
974
975     foreach my $suffix (@suffixes) {
976         my $mapping = ["${target_name}__$suffix"];
977         # TODO: Hack, fix later in less hideous manner
978         if ($suffix eq 'suggestion') {
979             push @{$mapping}, {%{$default_options}, property => 'input'};
980         }
981         else {
982             # Important! Make shallow clone, or we end up with the same hashref
983             # shared by all mappings
984             push @{$mapping}, {%{$default_options}};
985         }
986         push @mappings, $mapping;
987     }
988     return @mappings;
989 };
990
991 =head2 _get_marc_mapping_rules
992
993     my $mapping_rules = $self->_get_marc_mapping_rules()
994
995 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
996
997 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
998 each call to C<MARC::Record>->field) we create an optimized structure of mapping
999 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1000
1001 We can then iterate through all MARC fields for each record and apply all relevant
1002 rules once per fields instead of retreiving fields multiple times for each mapping rule
1003 which is terribly slow.
1004
1005 =cut
1006
1007 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1008 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1009 # memory cache which it is currently not. The performance gain of caching
1010 # would probably be marginal, but to do this could be a further improvement.
1011
1012 sub _get_marc_mapping_rules {
1013     my ($self) = @_;
1014     my $marcflavour = lc C4::Context->preference('marcflavour');
1015     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1016     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1017     my $rules = {
1018         'leader' => [],
1019         'control_fields' => {},
1020         'data_fields' => {},
1021         'sum' => [],
1022         'isbn' => [],
1023         'defaults' => {}
1024     };
1025
1026     $self->_foreach_mapping(sub {
1027         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
1028         return if $marc_type ne $marcflavour;
1029
1030         if ($type eq 'sum') {
1031             push @{$rules->{sum}}, $name;
1032             push @{$rules->{sum}}, $name."__sort" if $sort;
1033         }
1034         elsif ($type eq 'isbn') {
1035             push @{$rules->{isbn}}, $name;
1036         }
1037         elsif ($type eq 'boolean') {
1038             # boolean gets special handling, if value doesn't exist for a field,
1039             # it is set to false
1040             $rules->{defaults}->{$name} = 'false';
1041         }
1042
1043         if ($marc_field =~ $field_spec_regexp) {
1044             my $field_tag = $1;
1045
1046             my @subfields;
1047             my @subfield_groups;
1048             # Parse and separate subfields form subfield groups
1049             if (defined $2) {
1050                 my $subfield_group = '';
1051                 my $open_group = 0;
1052
1053                 foreach my $token (split //, $2) {
1054                     if ($token eq "(") {
1055                         if ($open_group) {
1056                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1057                                 "Unmatched opening parenthesis for $marc_field"
1058                             );
1059                         }
1060                         else {
1061                             $open_group = 1;
1062                         }
1063                     }
1064                     elsif ($token eq ")") {
1065                         if ($open_group) {
1066                             if ($subfield_group) {
1067                                 push @subfield_groups, $subfield_group;
1068                                 $subfield_group = '';
1069                             }
1070                             $open_group = 0;
1071                         }
1072                         else {
1073                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1074                                 "Unmatched closing parenthesis for $marc_field"
1075                             );
1076                         }
1077                     }
1078                     elsif ($open_group) {
1079                         $subfield_group .= $token;
1080                     }
1081                     else {
1082                         push @subfields, $token;
1083                     }
1084                 }
1085             }
1086             else {
1087                 push @subfields, '*';
1088             }
1089
1090             my $range = defined $3 ? $3 : undef;
1091             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1092             if ($field_tag < 10) {
1093                 $rules->{control_fields}->{$field_tag} //= [];
1094                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1095             }
1096             else {
1097                 $rules->{data_fields}->{$field_tag} //= {};
1098                 foreach my $subfield (@subfields) {
1099                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1100                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1101                 }
1102                 foreach my $subfield_group (@subfield_groups) {
1103                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1104                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1105                 }
1106             }
1107         }
1108         elsif ($marc_field =~ $leader_regexp) {
1109             my $range = defined $1 ? $1 : undef;
1110             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1111             push @{$rules->{leader}}, @mappings;
1112         }
1113         else {
1114             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1115                 "Invalid MARC field expression: $marc_field"
1116             );
1117         }
1118     });
1119
1120     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1121     if ($marcflavour eq 'marc21') {
1122         # Nonfiling characters processing for sort fields
1123         my %title_fields;
1124         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1125             # Format is: nonfiling characters indicator => field names list
1126             %title_fields = (
1127                 1 => [130, 630, 730, 740],
1128                 2 => [222, 240, 242, 243, 245, 440, 830]
1129             );
1130         }
1131         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1132             %title_fields = (
1133                 1 => [730],
1134                 2 => [130, 430, 530]
1135             );
1136         }
1137         foreach my $indicator (keys %title_fields) {
1138             foreach my $field_tag (@{$title_fields{$indicator}}) {
1139                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1140                 foreach my $mapping (@{$mappings}) {
1141                     if ($mapping->[0] =~ /__sort$/) {
1142                         # Mark this as to be processed for nonfiling characters indicator
1143                         # later on in _process_mappings
1144                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1145                     }
1146                 }
1147             }
1148         }
1149     }
1150
1151     return $rules;
1152 }
1153
1154 =head2 _foreach_mapping
1155
1156     $self->_foreach_mapping(
1157         sub {
1158             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1159                 $marc_field )
1160               = @_;
1161             return unless $marc_type eq 'marc21';
1162             print "Data comes from: " . $marc_field . "\n";
1163         }
1164     );
1165
1166 This allows you to apply a function to each entry in the elasticsearch mappings
1167 table, in order to build the mappings for whatever is needed.
1168
1169 In the provided function, the files are:
1170
1171 =over 4
1172
1173 =item C<$name>
1174
1175 The field name for elasticsearch (corresponds to the 'mapping' column in the
1176 database.
1177
1178 =item C<$type>
1179
1180 The type for this value, e.g. 'string'.
1181
1182 =item C<$facet>
1183
1184 True if this value should be facetised. This only really makes sense if the
1185 field is understood by the facet processing code anyway.
1186
1187 =item C<$sort>
1188
1189 True if this is a field that a) needs special sort handling, and b) if it
1190 should be sorted on. False if a) but not b). Undef if not a). This allows,
1191 for example, author to be sorted on but not everything marked with "author"
1192 to be included in that sort.
1193
1194 =item C<$marc_type>
1195
1196 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1197 'unimarc'.
1198
1199 =item C<$marc_field>
1200
1201 A string that describes the MARC field that contains the data to extract.
1202
1203 =back
1204
1205 =cut
1206
1207 sub _foreach_mapping {
1208     my ( $self, $sub ) = @_;
1209
1210     # TODO use a caching framework here
1211     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1212         {
1213             'search_marc_map.index_name' => $self->index,
1214         },
1215         {   join => { search_marc_to_fields => 'search_marc_map' },
1216             '+select' => [
1217                 'search_marc_to_fields.facet',
1218                 'search_marc_to_fields.suggestible',
1219                 'search_marc_to_fields.sort',
1220                 'search_marc_to_fields.search',
1221                 'search_marc_map.marc_type',
1222                 'search_marc_map.marc_field',
1223             ],
1224             '+as'     => [
1225                 'facet',
1226                 'suggestible',
1227                 'sort',
1228                 'search',
1229                 'marc_type',
1230                 'marc_field',
1231             ],
1232         }
1233     );
1234
1235     while ( my $search_field = $search_fields->next ) {
1236         $sub->(
1237             # Force lower case on indexed field names for case insensitive
1238             # field name searches
1239             lc($search_field->name),
1240             $search_field->type,
1241             $search_field->get_column('facet'),
1242             $search_field->get_column('suggestible'),
1243             $search_field->get_column('sort'),
1244             $search_field->get_column('search'),
1245             $search_field->get_column('marc_type'),
1246             $search_field->get_column('marc_field'),
1247         );
1248     }
1249 }
1250
1251 =head2 process_error
1252
1253     die process_error($@);
1254
1255 This parses an Elasticsearch error message and produces a human-readable
1256 result from it. This result is probably missing all the useful information
1257 that you might want in diagnosing an issue, so the warning is also logged.
1258
1259 Note that currently the resulting message is not internationalised. This
1260 will happen eventually by some method or other.
1261
1262 =cut
1263
1264 sub process_error {
1265     my ($self, $msg) = @_;
1266
1267     warn $msg; # simple logging
1268
1269     # This is super-primitive
1270     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1271
1272     return "Unable to perform your search. Please try again.\n";
1273 }
1274
1275 =head2 _read_configuration
1276
1277     my $conf = _read_configuration();
1278
1279 Reads the I<configuration file> and returns a hash structure with the
1280 configuration information. It raises an exception if mandatory entries
1281 are missing.
1282
1283 The hashref structure has the following form:
1284
1285     {
1286         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1287         'index_name' => 'koha_instance',
1288     }
1289
1290 This is configured by the following in the C<config> block in koha-conf.xml:
1291
1292     <elasticsearch>
1293         <server>127.0.0.1:9200</server>
1294         <server>anotherserver:9200</server>
1295         <index_name>koha_instance</index_name>
1296     </elasticsearch>
1297
1298 =cut
1299
1300 sub _read_configuration {
1301
1302     my $configuration;
1303
1304     my $conf = C4::Context->config('elasticsearch');
1305     unless ( defined $conf ) {
1306         Koha::Exceptions::Config::MissingEntry->throw(
1307             "Missing <elasticsearch> entry in koha-conf.xml"
1308         );
1309     }
1310
1311     if ( $conf && $conf->{server} ) {
1312         my $nodes = $conf->{server};
1313         if ( ref($nodes) eq 'ARRAY' ) {
1314             $configuration->{nodes} = $nodes;
1315         }
1316         else {
1317             $configuration->{nodes} = [$nodes];
1318         }
1319     }
1320     else {
1321         Koha::Exceptions::Config::MissingEntry->throw(
1322             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1323         );
1324     }
1325
1326     if ( defined $conf->{index_name} ) {
1327         $configuration->{index_name} = $conf->{index_name};
1328     }
1329     else {
1330         Koha::Exceptions::Config::MissingEntry->throw(
1331             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1332         );
1333     }
1334
1335     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1336
1337     $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1338
1339     return $configuration;
1340 }
1341
1342 =head2 get_facetable_fields
1343
1344 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1345
1346 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1347
1348 =cut
1349
1350 sub get_facetable_fields {
1351     my ($self) = @_;
1352
1353     # These should correspond to the ES field names, as opposed to the CCL
1354     # things that zebra uses.
1355     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1356     my @faceted_fields = Koha::SearchFields->search(
1357         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1358     );
1359     my @not_faceted_fields = Koha::SearchFields->search(
1360         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1361     );
1362     # This could certainly be improved
1363     return ( @faceted_fields, @not_faceted_fields );
1364 }
1365
1366 =head2 clear_search_fields_cache
1367
1368 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1369
1370 Clear cached values for ES search fields
1371
1372 =cut
1373
1374 sub clear_search_fields_cache {
1375
1376     my $cache = Koha::Caches->get_instance();
1377     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1378     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1379     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1380     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1381
1382 }
1383
1384 1;
1385
1386 __END__
1387
1388 =head1 AUTHOR
1389
1390 =over 4
1391
1392 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1393
1394 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1395
1396 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1397
1398 =back
1399
1400 =cut