Bug 19482: (follow-up) Fix select and adjust export of new field
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282
283             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284                 {
285                     facet   => $marc_to_field->facet || '',
286                     marc_type => $marc_map->marc_type,
287                     marc_field => $marc_map->marc_field,
288                     sort        => $marc_to_field->sort,
289                     suggestible => $marc_to_field->suggestible || ''
290                 });
291
292         }
293     }
294
295     return $mappings;
296 }
297
298 =head2 _get_elasticsearch_field_config
299
300 Get the Elasticsearch field config for the given purpose and data type.
301
302 $mapping = _get_elasticsearch_field_config('search', 'text');
303
304 =cut
305
306 sub _get_elasticsearch_field_config {
307
308     my ( $purpose, $type ) = @_;
309
310     # Use state to speed up repeated calls
311     state $settings = undef;
312     if (!defined $settings) {
313         my $config_file = C4::Context->config('elasticsearch_field_config');
314         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315         $settings = LoadFile( $config_file );
316     }
317
318     if (!defined $settings->{$purpose}) {
319         die "Field purpose $purpose not defined in field config";
320     }
321     if ($type eq '') {
322         return $settings->{$purpose};
323     }
324     if (defined $settings->{$purpose}{$type}) {
325         return $settings->{$purpose}{$type};
326     }
327     if (defined $settings->{$purpose}{'default'}) {
328         return $settings->{$purpose}{'default'};
329     }
330     return;
331 }
332
333 =head2 _load_elasticsearch_mappings
334
335 Load Elasticsearch mappings in the format of mappings.yaml.
336
337 $indexes = _load_elasticsearch_mappings();
338
339 =cut
340
341 sub _load_elasticsearch_mappings {
342     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
343     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
344     return LoadFile( $mappings_yaml );
345 }
346
347 sub reset_elasticsearch_mappings {
348     my ( $self ) = @_;
349     my $indexes = $self->_load_elasticsearch_mappings();
350
351     Koha::SearchMarcMaps->delete;
352     Koha::SearchFields->delete;
353
354     while ( my ( $index_name, $fields ) = each %$indexes ) {
355         while ( my ( $field_name, $data ) = each %$fields ) {
356
357             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
358
359             # Set default values
360             $sf_params{staff_client} //= 1;
361             $sf_params{opac} //= 1;
362
363             $sf_params{name} = $field_name;
364
365             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366
367             my $mappings = $data->{mappings};
368             for my $mapping ( @$mappings ) {
369                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
370                     index_name => $index_name,
371                     marc_type => $mapping->{marc_type},
372                     marc_field => $mapping->{marc_field}
373                 });
374                 $search_field->add_to_search_marc_maps($marc_field, {
375                     facet => $mapping->{facet} || 0,
376                     suggestible => $mapping->{suggestible} || 0,
377                     sort => $mapping->{sort},
378                     search => $mapping->{search} // 1
379                 });
380             }
381         }
382     }
383
384     $self->clear_search_fields_cache();
385
386     # FIXME return the mappings?
387 }
388
389 # This overrides the accessor provided by Class::Accessor so that if
390 # sort_fields isn't set, then it'll generate it.
391 sub sort_fields {
392     my $self = shift;
393     if (@_) {
394         $self->_sort_fields_accessor(@_);
395         return;
396     }
397     my $val = $self->_sort_fields_accessor();
398     return $val if $val;
399
400     # This will populate the accessor as a side effect
401     $self->get_elasticsearch_mappings();
402     return $self->_sort_fields_accessor();
403 }
404
405 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406
407     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408
409 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
410 Since we group all mappings by MARC field targets C<$mappings> will contain
411 all targets for C<$data> and thus we need to fetch the MARC field only once.
412 C<$mappings> will be applied to C<$record_document> and new field values added.
413 The method has no return value.
414
415 =over 4
416
417 =item C<$mappings>
418
419 Arrayref of mappings containing arrayrefs in the format
420 [C<$target>, C<$options>] where C<$target> is the name of the target field and
421 C<$options> is a hashref containing processing directives for this particular
422 mapping.
423
424 =item C<$data>
425
426 The source data from a MARC record field.
427
428 =item C<$record_document>
429
430 Hashref representing the Elasticsearch document on which mappings should be
431 applied.
432
433 =item C<$meta>
434
435 A hashref containing metadata useful for enforcing per mapping rules. For
436 example for providing extra context for mapping options, or treating mapping
437 targets differently depending on type (sort, search, facet etc). Combining
438 this metadata with the mapping options and metadata allows us to mutate the
439 data per mapping, or even replace it with other data retrieved from the
440 metadata context.
441
442 Current properties are:
443
444 C<altscript>: A boolean value indicating whether an alternate script presentation is being
445 processed.
446
447 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
448 'subfield' or 'subfields_group'.
449
450 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451
452 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
453 is 'subfields_group'.
454
455 C<field>: The original C<MARC::Record> object.
456
457 =back
458
459 =cut
460
461 sub _process_mappings {
462     my ($_self, $mappings, $data, $record_document, $meta) = @_;
463     foreach my $mapping (@{$mappings}) {
464         my ($target, $options) = @{$mapping};
465
466         # Don't process sort fields for alternate scripts
467         my $sort = $target =~ /__sort$/;
468         if ($sort && $meta->{altscript}) {
469             next;
470         }
471
472         # Copy (scalar) data since can have multiple targets
473         # with differing options for (possibly) mutating data
474         # so need a different copy for each
475         my $data_copy = $data;
476         if (defined $options->{substr}) {
477             my ($start, $length) = @{$options->{substr}};
478             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
479         }
480
481         # Add data to values array for callbacks processing
482         my $values = [$data_copy];
483
484         # Value callbacks takes subfield data (or values from previous
485         # callbacks) as argument, and returns a possibly different list of values.
486         # Note that the returned list may also be empty.
487         if (defined $options->{value_callbacks}) {
488             foreach my $callback (@{$options->{value_callbacks}}) {
489                 # Pass each value to current callback which returns a list
490                 # (scalar is fine too) resulting either in a list or
491                 # a list of lists that will be flattened by perl.
492                 # The next callback will receive the possibly expanded list of values.
493                 $values = [ map { $callback->($_) } @{$values} ];
494             }
495         }
496
497         # Skip mapping if all values has been removed
498         next unless @{$values};
499
500         if (defined $options->{property}) {
501             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
502         }
503         if (defined $options->{nonfiling_characters_indicator}) {
504             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
505             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
506             # Nonfiling chars does not make sense for multiple values
507             # Only apply on first element
508             $values->[0] = substr $values->[0], $nonfiling_chars;
509         }
510
511         $values = [ grep(!/^$/, @{$values}) ];
512
513         $record_document->{$target} //= [];
514         push @{$record_document->{$target}}, @{$values};
515     }
516 }
517
518 =head2 marc_records_to_documents($marc_records)
519
520     my $record_documents = $self->marc_records_to_documents($marc_records);
521
522 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
523
524 Returns array of hash references, representing Elasticsearch documents,
525 acceptable as body payload in C<Search::Elasticsearch> requests.
526
527 =over 4
528
529 =item C<$marc_documents>
530
531 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
532
533 =back
534
535 =cut
536
537 sub marc_records_to_documents {
538     my ($self, $records) = @_;
539     my $rules = $self->_get_marc_mapping_rules();
540     my $control_fields_rules = $rules->{control_fields};
541     my $data_fields_rules = $rules->{data_fields};
542     my $marcflavour = lc C4::Context->preference('marcflavour');
543     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
544
545     my @record_documents;
546
547     my %auth_match_headings;
548     if( $self->index eq 'authorities' ){
549         my @auth_types = Koha::Authority::Types->search();
550         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
551     }
552
553     foreach my $record (@{$records}) {
554         my $record_document = {};
555
556         if ( $self->index eq 'authorities' ){
557             my $authtypecode = GuessAuthTypeCode( $record );
558             if( $authtypecode ){
559                 my $field = $record->field( $auth_match_headings{ $authtypecode } );
560                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
561                 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
562             } else {
563                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
564             }
565         }
566
567         my $mappings = $rules->{leader};
568         if ($mappings) {
569             $self->_process_mappings($mappings, $record->leader(), $record_document, {
570                     altscript => 0,
571                     data_source => 'leader'
572                 }
573             );
574         }
575         foreach my $field ($record->fields()) {
576             if ($field->is_control_field()) {
577                 my $mappings = $control_fields_rules->{$field->tag()};
578                 if ($mappings) {
579                     $self->_process_mappings($mappings, $field->data(), $record_document, {
580                             altscript => 0,
581                             data_source => 'control_field',
582                             field => $field
583                         }
584                     );
585                 }
586             }
587             else {
588                 my $tag = $field->tag();
589                 # Handle alternate scripts in MARC 21
590                 my $altscript = 0;
591                 if ($marcflavour eq 'marc21' && $tag eq '880') {
592                     my $sub6 = $field->subfield('6');
593                     if ($sub6 =~ /^(...)-\d+/) {
594                         $tag = $1;
595                         $altscript = 1;
596                     }
597                 }
598
599                 my $data_field_rules = $data_fields_rules->{$tag};
600                 if ($data_field_rules) {
601                     my $subfields_mappings = $data_field_rules->{subfields};
602                     my $wildcard_mappings = $subfields_mappings->{'*'};
603                     foreach my $subfield ($field->subfields()) {
604                         my ($code, $data) = @{$subfield};
605                         my $mappings = $subfields_mappings->{$code} // [];
606                         if ($wildcard_mappings) {
607                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
608                         }
609                         if (@{$mappings}) {
610                             $self->_process_mappings($mappings, $data, $record_document, {
611                                     altscript => $altscript,
612                                     data_source => 'subfield',
613                                     code => $code,
614                                     field => $field
615                                 }
616                             );
617                         }
618                     }
619
620                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
621                     if ($subfields_join_mappings) {
622                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
623                             my $data_field = $field->clone; #copy field to preserve for alt scripts
624                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
625                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
626                             if ($data) {
627                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
628                                         altscript => $altscript,
629                                         data_source => 'subfields_group',
630                                         codes => $subfields_group,
631                                         field => $field
632                                     }
633                                 );
634                             }
635                         }
636                     }
637                 }
638             }
639         }
640         foreach my $field (keys %{$rules->{defaults}}) {
641             unless (defined $record_document->{$field}) {
642                 $record_document->{$field} = $rules->{defaults}->{$field};
643             }
644         }
645         foreach my $field (@{$rules->{sum}}) {
646             if (defined $record_document->{$field}) {
647                 # TODO: validate numeric? filter?
648                 # TODO: Or should only accept fields without nested values?
649                 # TODO: Quick and dirty, improve if needed
650                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
651             }
652         }
653         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
654         foreach my $field (@{$rules->{isbn}}) {
655             if (defined $record_document->{$field}) {
656                 my @isbns = ();
657                 foreach my $input_isbn (@{$record_document->{$field}}) {
658                     my $isbn = Business::ISBN->new($input_isbn);
659                     if (defined $isbn && $isbn->is_valid) {
660                         my $isbn13 = $isbn->as_isbn13->as_string;
661                         push @isbns, $isbn13;
662                         $isbn13 =~ s/\-//g;
663                         push @isbns, $isbn13;
664
665                         my $isbn10 = $isbn->as_isbn10;
666                         if ($isbn10) {
667                             $isbn10 = $isbn10->as_string;
668                             push @isbns, $isbn10;
669                             $isbn10 =~ s/\-//g;
670                             push @isbns, $isbn10;
671                         }
672                     } else {
673                         push @isbns, $input_isbn;
674                     }
675                 }
676                 $record_document->{$field} = \@isbns;
677             }
678         }
679
680         # Remove duplicate values and collapse sort fields
681         foreach my $field (keys %{$record_document}) {
682             if (ref($record_document->{$field}) eq 'ARRAY') {
683                 @{$record_document->{$field}} = do {
684                     my %seen;
685                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
686                 };
687                 if ($field =~ /__sort$/) {
688                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
689                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
690                 }
691             }
692         }
693
694         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
695         $record->encoding('UTF-8');
696         if ($use_array) {
697             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
698             $record_document->{'marc_format'} = 'ARRAY';
699         } else {
700             my @warnings;
701             {
702                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
703                 local $SIG{__WARN__} = sub {
704                     push @warnings, $_[0];
705                 };
706                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
707             }
708             if (@warnings) {
709                 # Suppress warnings if record length exceeded
710                 unless (substr($record->leader(), 0, 5) eq '99999') {
711                     foreach my $warning (@warnings) {
712                         carp $warning;
713                     }
714                 }
715                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
716                 $record_document->{'marc_format'} = 'MARCXML';
717             }
718             else {
719                 $record_document->{'marc_format'} = 'base64ISO2709';
720             }
721         }
722         push @record_documents, $record_document;
723     }
724     return \@record_documents;
725 }
726
727 =head2 _marc_to_array($record)
728
729     my @fields = _marc_to_array($record)
730
731 Convert a MARC::Record to an array modeled after MARC-in-JSON
732 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
733
734 =over 4
735
736 =item C<$record>
737
738 A MARC::Record object
739
740 =back
741
742 =cut
743
744 sub _marc_to_array {
745     my ($self, $record) = @_;
746
747     my $data = {
748         leader => $record->leader(),
749         fields => []
750     };
751     for my $field ($record->fields()) {
752         my $tag = $field->tag();
753         if ($field->is_control_field()) {
754             push @{$data->{fields}}, {$tag => $field->data()};
755         } else {
756             my $subfields = ();
757             foreach my $subfield ($field->subfields()) {
758                 my ($code, $contents) = @{$subfield};
759                 push @{$subfields}, {$code => $contents};
760             }
761             push @{$data->{fields}}, {
762                 $tag => {
763                     ind1 => $field->indicator(1),
764                     ind2 => $field->indicator(2),
765                     subfields => $subfields
766                 }
767             };
768         }
769     }
770     return $data;
771 }
772
773 =head2 _array_to_marc($data)
774
775     my $record = _array_to_marc($data)
776
777 Convert an array modeled after MARC-in-JSON to a MARC::Record
778
779 =over 4
780
781 =item C<$data>
782
783 An array modeled after MARC-in-JSON
784 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
785
786 =back
787
788 =cut
789
790 sub _array_to_marc {
791     my ($self, $data) = @_;
792
793     my $record = MARC::Record->new();
794
795     $record->leader($data->{leader});
796     for my $field (@{$data->{fields}}) {
797         my $tag = (keys %{$field})[0];
798         $field = $field->{$tag};
799         my $marc_field;
800         if (ref($field) eq 'HASH') {
801             my @subfields;
802             foreach my $subfield (@{$field->{subfields}}) {
803                 my $code = (keys %{$subfield})[0];
804                 push @subfields, $code;
805                 push @subfields, $subfield->{$code};
806             }
807             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
808         } else {
809             $marc_field = MARC::Field->new($tag, $field)
810         }
811         $record->append_fields($marc_field);
812     }
813 ;
814     return $record;
815 }
816
817 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
818
819     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
820
821 Get mappings, an internal data structure later used by
822 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
823 data for a MARC mapping.
824
825 The returned C<$mappings> is not to to be confused with mappings provided by
826 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
827 provided by C<_foreach_mapping> and expands it to this internal data structure.
828 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
829 is then applied to each MARC target (leader, control field data, subfield or
830 joined subfields) and integrated into the mapping rules data structure used in
831 C<marc_records_to_documents> to transform MARC records into Elasticsearch
832 documents.
833
834 =over 4
835
836 =item C<$facet>
837
838 Boolean indicating whether to create a facet field for this mapping.
839
840 =item C<$suggestible>
841
842 Boolean indicating whether to create a suggestion field for this mapping.
843
844 =item C<$sort>
845
846 Boolean indicating whether to create a sort field for this mapping.
847
848 =item C<$search>
849
850 Boolean indicating whether to create a search field for this mapping.
851
852 =item C<$target_name>
853
854 Elasticsearch document target field name.
855
856 =item C<$target_type>
857
858 Elasticsearch document target field type.
859
860 =item C<$range>
861
862 An optional range as a string in the format "<START>-<END>" or "<START>",
863 where "<START>" and "<END>" are integers specifying a range that will be used
864 for extracting a substring from MARC data as Elasticsearch field target value.
865
866 The first character position is "0", and the range is inclusive,
867 so "0-2" means the first three characters of MARC data.
868
869 If only "<START>" is provided only one character at position "<START>" will
870 be extracted.
871
872 =back
873
874 =cut
875
876 sub _field_mappings {
877     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
878     my %mapping_defaults = ();
879     my @mappings;
880
881     my $substr_args = undef;
882     if (defined $range) {
883         # TODO: use value_callback instead?
884         my ($start, $end) = map(int, split /-/, $range, 2);
885         $substr_args = [$start];
886         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
887     }
888     my $default_options = {};
889     if ($substr_args) {
890         $default_options->{substr} = $substr_args;
891     }
892
893     # TODO: Should probably have per type value callback/hook
894     # but hard code for now
895     if ($target_type eq 'boolean') {
896         $default_options->{value_callbacks} //= [];
897         push @{$default_options->{value_callbacks}}, sub {
898             my ($value) = @_;
899             # Trim whitespace at both ends
900             $value =~ s/^\s+|\s+$//g;
901             return $value ? 'true' : 'false';
902         };
903     }
904     elsif ($target_type eq 'year') {
905         $default_options->{value_callbacks} //= [];
906         # Only accept years containing digits and "u"
907         push @{$default_options->{value_callbacks}}, sub {
908             my ($value) = @_;
909             # Replace "u" with "0" for sorting
910             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
911         };
912     }
913
914     if ($search) {
915         my $mapping = [$target_name, $default_options];
916         push @mappings, $mapping;
917     }
918
919     my @suffixes = ();
920     push @suffixes, 'facet' if $facet;
921     push @suffixes, 'suggestion' if $suggestible;
922     push @suffixes, 'sort' if !defined $sort || $sort;
923
924     foreach my $suffix (@suffixes) {
925         my $mapping = ["${target_name}__$suffix"];
926         # TODO: Hack, fix later in less hideous manner
927         if ($suffix eq 'suggestion') {
928             push @{$mapping}, {%{$default_options}, property => 'input'};
929         }
930         else {
931             # Important! Make shallow clone, or we end up with the same hashref
932             # shared by all mappings
933             push @{$mapping}, {%{$default_options}};
934         }
935         push @mappings, $mapping;
936     }
937     return @mappings;
938 };
939
940 =head2 _get_marc_mapping_rules
941
942     my $mapping_rules = $self->_get_marc_mapping_rules()
943
944 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
945
946 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
947 each call to C<MARC::Record>->field) we create an optimized structure of mapping
948 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
949
950 We can then iterate through all MARC fields for each record and apply all relevant
951 rules once per fields instead of retreiving fields multiple times for each mapping rule
952 which is terribly slow.
953
954 =cut
955
956 # TODO: This structure can be used for processing multiple MARC::Records so is currently
957 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
958 # memory cache which it is currently not. The performance gain of caching
959 # would probably be marginal, but to do this could be a further improvement.
960
961 sub _get_marc_mapping_rules {
962     my ($self) = @_;
963     my $marcflavour = lc C4::Context->preference('marcflavour');
964     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
965     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
966     my $rules = {
967         'leader' => [],
968         'control_fields' => {},
969         'data_fields' => {},
970         'sum' => [],
971         'isbn' => [],
972         'defaults' => {}
973     };
974
975     $self->_foreach_mapping(sub {
976         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
977         return if $marc_type ne $marcflavour;
978
979         if ($type eq 'sum') {
980             push @{$rules->{sum}}, $name;
981             push @{$rules->{sum}}, $name."__sort" if $sort;
982         }
983         elsif ($type eq 'isbn') {
984             push @{$rules->{isbn}}, $name;
985         }
986         elsif ($type eq 'boolean') {
987             # boolean gets special handling, if value doesn't exist for a field,
988             # it is set to false
989             $rules->{defaults}->{$name} = 'false';
990         }
991
992         if ($marc_field =~ $field_spec_regexp) {
993             my $field_tag = $1;
994
995             my @subfields;
996             my @subfield_groups;
997             # Parse and separate subfields form subfield groups
998             if (defined $2) {
999                 my $subfield_group = '';
1000                 my $open_group = 0;
1001
1002                 foreach my $token (split //, $2) {
1003                     if ($token eq "(") {
1004                         if ($open_group) {
1005                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1006                                 "Unmatched opening parenthesis for $marc_field"
1007                             );
1008                         }
1009                         else {
1010                             $open_group = 1;
1011                         }
1012                     }
1013                     elsif ($token eq ")") {
1014                         if ($open_group) {
1015                             if ($subfield_group) {
1016                                 push @subfield_groups, $subfield_group;
1017                                 $subfield_group = '';
1018                             }
1019                             $open_group = 0;
1020                         }
1021                         else {
1022                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1023                                 "Unmatched closing parenthesis for $marc_field"
1024                             );
1025                         }
1026                     }
1027                     elsif ($open_group) {
1028                         $subfield_group .= $token;
1029                     }
1030                     else {
1031                         push @subfields, $token;
1032                     }
1033                 }
1034             }
1035             else {
1036                 push @subfields, '*';
1037             }
1038
1039             my $range = defined $3 ? $3 : undef;
1040             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1041             if ($field_tag < 10) {
1042                 $rules->{control_fields}->{$field_tag} //= [];
1043                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1044             }
1045             else {
1046                 $rules->{data_fields}->{$field_tag} //= {};
1047                 foreach my $subfield (@subfields) {
1048                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1049                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1050                 }
1051                 foreach my $subfield_group (@subfield_groups) {
1052                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1053                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1054                 }
1055             }
1056         }
1057         elsif ($marc_field =~ $leader_regexp) {
1058             my $range = defined $1 ? $1 : undef;
1059             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1060             push @{$rules->{leader}}, @mappings;
1061         }
1062         else {
1063             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1064                 "Invalid MARC field expression: $marc_field"
1065             );
1066         }
1067     });
1068
1069     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1070     if ($marcflavour eq 'marc21') {
1071         # Nonfiling characters processing for sort fields
1072         my %title_fields;
1073         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1074             # Format is: nonfiling characters indicator => field names list
1075             %title_fields = (
1076                 1 => [130, 630, 730, 740],
1077                 2 => [222, 240, 242, 243, 245, 440, 830]
1078             );
1079         }
1080         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1081             %title_fields = (
1082                 1 => [730],
1083                 2 => [130, 430, 530]
1084             );
1085         }
1086         foreach my $indicator (keys %title_fields) {
1087             foreach my $field_tag (@{$title_fields{$indicator}}) {
1088                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1089                 foreach my $mapping (@{$mappings}) {
1090                     if ($mapping->[0] =~ /__sort$/) {
1091                         # Mark this as to be processed for nonfiling characters indicator
1092                         # later on in _process_mappings
1093                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1094                     }
1095                 }
1096             }
1097         }
1098     }
1099
1100     return $rules;
1101 }
1102
1103 =head2 _foreach_mapping
1104
1105     $self->_foreach_mapping(
1106         sub {
1107             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1108                 $marc_field )
1109               = @_;
1110             return unless $marc_type eq 'marc21';
1111             print "Data comes from: " . $marc_field . "\n";
1112         }
1113     );
1114
1115 This allows you to apply a function to each entry in the elasticsearch mappings
1116 table, in order to build the mappings for whatever is needed.
1117
1118 In the provided function, the files are:
1119
1120 =over 4
1121
1122 =item C<$name>
1123
1124 The field name for elasticsearch (corresponds to the 'mapping' column in the
1125 database.
1126
1127 =item C<$type>
1128
1129 The type for this value, e.g. 'string'.
1130
1131 =item C<$facet>
1132
1133 True if this value should be facetised. This only really makes sense if the
1134 field is understood by the facet processing code anyway.
1135
1136 =item C<$sort>
1137
1138 True if this is a field that a) needs special sort handling, and b) if it
1139 should be sorted on. False if a) but not b). Undef if not a). This allows,
1140 for example, author to be sorted on but not everything marked with "author"
1141 to be included in that sort.
1142
1143 =item C<$marc_type>
1144
1145 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1146 'unimarc', 'normarc'.
1147
1148 =item C<$marc_field>
1149
1150 A string that describes the MARC field that contains the data to extract.
1151
1152 =back
1153
1154 =cut
1155
1156 sub _foreach_mapping {
1157     my ( $self, $sub ) = @_;
1158
1159     # TODO use a caching framework here
1160     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1161         {
1162             'search_marc_map.index_name' => $self->index,
1163         },
1164         {   join => { search_marc_to_fields => 'search_marc_map' },
1165             '+select' => [
1166                 'search_marc_to_fields.facet',
1167                 'search_marc_to_fields.suggestible',
1168                 'search_marc_to_fields.sort',
1169                 'search_marc_to_fields.search',
1170                 'search_marc_map.marc_type',
1171                 'search_marc_map.marc_field',
1172             ],
1173             '+as'     => [
1174                 'facet',
1175                 'suggestible',
1176                 'sort',
1177                 'search',
1178                 'marc_type',
1179                 'marc_field',
1180             ],
1181         }
1182     );
1183
1184     while ( my $search_field = $search_fields->next ) {
1185         $sub->(
1186             # Force lower case on indexed field names for case insensitive
1187             # field name searches
1188             lc($search_field->name),
1189             $search_field->type,
1190             $search_field->get_column('facet'),
1191             $search_field->get_column('suggestible'),
1192             $search_field->get_column('sort'),
1193             $search_field->get_column('search'),
1194             $search_field->get_column('marc_type'),
1195             $search_field->get_column('marc_field'),
1196         );
1197     }
1198 }
1199
1200 =head2 process_error
1201
1202     die process_error($@);
1203
1204 This parses an Elasticsearch error message and produces a human-readable
1205 result from it. This result is probably missing all the useful information
1206 that you might want in diagnosing an issue, so the warning is also logged.
1207
1208 Note that currently the resulting message is not internationalised. This
1209 will happen eventually by some method or other.
1210
1211 =cut
1212
1213 sub process_error {
1214     my ($self, $msg) = @_;
1215
1216     warn $msg; # simple logging
1217
1218     # This is super-primitive
1219     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1220
1221     return "Unable to perform your search. Please try again.\n";
1222 }
1223
1224 =head2 _read_configuration
1225
1226     my $conf = _read_configuration();
1227
1228 Reads the I<configuration file> and returns a hash structure with the
1229 configuration information. It raises an exception if mandatory entries
1230 are missing.
1231
1232 The hashref structure has the following form:
1233
1234     {
1235         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1236         'index_name' => 'koha_instance',
1237     }
1238
1239 This is configured by the following in the C<config> block in koha-conf.xml:
1240
1241     <elasticsearch>
1242         <server>127.0.0.1:9200</server>
1243         <server>anotherserver:9200</server>
1244         <index_name>koha_instance</index_name>
1245     </elasticsearch>
1246
1247 =cut
1248
1249 sub _read_configuration {
1250
1251     my $configuration;
1252
1253     my $conf = C4::Context->config('elasticsearch');
1254     unless ( defined $conf ) {
1255         Koha::Exceptions::Config::MissingEntry->throw(
1256             "Missing <elasticsearch> entry in koha-conf.xml"
1257         );
1258     }
1259
1260     if ( $conf && $conf->{server} ) {
1261         my $nodes = $conf->{server};
1262         if ( ref($nodes) eq 'ARRAY' ) {
1263             $configuration->{nodes} = $nodes;
1264         }
1265         else {
1266             $configuration->{nodes} = [$nodes];
1267         }
1268     }
1269     else {
1270         Koha::Exceptions::Config::MissingEntry->throw(
1271             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1272         );
1273     }
1274
1275     if ( defined $conf->{index_name} ) {
1276         $configuration->{index_name} = $conf->{index_name};
1277     }
1278     else {
1279         Koha::Exceptions::Config::MissingEntry->throw(
1280             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1281         );
1282     }
1283
1284     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1285
1286     $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1287
1288     return $configuration;
1289 }
1290
1291 =head2 get_facetable_fields
1292
1293 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1294
1295 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1296
1297 =cut
1298
1299 sub get_facetable_fields {
1300     my ($self) = @_;
1301
1302     # These should correspond to the ES field names, as opposed to the CCL
1303     # things that zebra uses.
1304     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1305     my @faceted_fields = Koha::SearchFields->search(
1306         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1307     );
1308     my @not_faceted_fields = Koha::SearchFields->search(
1309         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1310     );
1311     # This could certainly be improved
1312     return ( @faceted_fields, @not_faceted_fields );
1313 }
1314
1315 =head2 clear_search_fields_cache
1316
1317 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1318
1319 Clear cached values for ES search fields
1320
1321 =cut
1322
1323 sub clear_search_fields_cache {
1324
1325     my $cache = Koha::Caches->get_instance();
1326     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1327     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1328     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1329     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1330
1331 }
1332
1333 1;
1334
1335 __END__
1336
1337 =head1 AUTHOR
1338
1339 =over 4
1340
1341 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1342
1343 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1344
1345 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1346
1347 =back
1348
1349 =cut