Bug 18984: Remove NORMARC support
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
32
33 use Carp qw( carp croak );
34 use Clone qw( clone );
35 use Modern::Perl;
36 use Readonly qw( Readonly );
37 use Search::Elasticsearch;
38 use Try::Tiny qw( catch try );
39 use YAML::XS;
40
41 use List::Util qw( sum0 );
42 use MARC::File::XML;
43 use MIME::Base64 qw( encode_base64 );
44 use Encode qw( encode );
45 use Business::ISBN;
46 use Scalar::Util qw( looks_like_number );
47
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =item index_name
68
69 The Elasticsearch index name with Koha instance prefix.
70
71 =back
72
73
74 =head1 FUNCTIONS
75
76 =cut
77
78 sub new {
79     my $class = shift @_;
80     my ($params) = @_;
81
82     # Check for a valid index
83     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84     my $config = _read_configuration();
85     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
86
87     my $self = $class->SUPER::new(@_);
88     return $self;
89 }
90
91 =head2 get_elasticsearch
92
93     my $elasticsearch_client = $self->get_elasticsearch();
94
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
97
98 =cut
99
100 sub get_elasticsearch {
101     my $self = shift @_;
102     unless (defined $self->{elasticsearch}) {
103         $self->{elasticsearch} = Search::Elasticsearch->new(
104             $self->get_elasticsearch_params()
105         );
106     }
107     return $self->{elasticsearch};
108 }
109
110 =head2 get_elasticsearch_params
111
112     my $params = $self->get_elasticsearch_params();
113
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
116
117     {
118         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119         'index_name' => 'koha_instance_index',
120     }
121
122 This is configured by the following in the C<config> block in koha-conf.xml:
123
124     <elasticsearch>
125         <server>127.0.0.1:9200</server>
126         <server>anotherserver:9200</server>
127         <index_name>koha_instance</index_name>
128     </elasticsearch>
129
130 =cut
131
132 sub get_elasticsearch_params {
133     my ($self) = @_;
134
135     my $conf;
136     try {
137         $conf = _read_configuration();
138     } catch {
139         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
140             croak($_->message);
141         }
142     };
143
144     return $conf
145 }
146
147 =head2 get_elasticsearch_settings
148
149     my $settings = $self->get_elasticsearch_settings();
150
151 This provides the settings provided to Elasticsearch when an index is created.
152 These can do things like define tokenization methods.
153
154 A hashref containing the settings is returned.
155
156 =cut
157
158 sub get_elasticsearch_settings {
159     my ($self) = @_;
160
161     # Use state to speed up repeated calls
162     state $settings = undef;
163     if (!defined $settings) {
164         my $config_file = C4::Context->config('elasticsearch_index_config');
165         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
166         $settings = YAML::XS::LoadFile( $config_file );
167     }
168
169     return $settings;
170 }
171
172 =head2 get_elasticsearch_mappings
173
174     my $mappings = $self->get_elasticsearch_mappings();
175
176 This provides the mappings that get passed to Elasticsearch when an index is
177 created.
178
179 =cut
180
181 sub get_elasticsearch_mappings {
182     my ($self) = @_;
183
184     # Use state to speed up repeated calls
185     state %all_mappings;
186     state %sort_fields;
187
188     if (!defined $all_mappings{$self->index}) {
189         $sort_fields{$self->index} = {};
190         # Clone the general mapping to break ties with the original hash
191         my $mappings = {
192             data => clone(_get_elasticsearch_field_config('general', ''))
193         };
194         my $marcflavour = lc C4::Context->preference('marcflavour');
195         $self->_foreach_mapping(
196             sub {
197                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
198                 return if $marc_type ne $marcflavour;
199                 # TODO if this gets any sort of complexity to it, it should
200                 # be broken out into its own function.
201
202                 # TODO be aware of date formats, but this requires pre-parsing
203                 # as ES will simply reject anything with an invalid date.
204                 my $es_type = 'text';
205                 if ($type eq 'boolean') {
206                     $es_type = 'boolean';
207                 } elsif ($type eq 'number' || $type eq 'sum') {
208                     $es_type = 'integer';
209                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
210                     $es_type = 'stdno';
211                 } elsif ($type eq 'year') {
212                     $es_type = 'year';
213                 }
214
215                 if ($search) {
216                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
217                 }
218
219                 if ($facet) {
220                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
221                 }
222                 if ($suggestible) {
223                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
224                 }
225                 # Sort is a bit special as it can be true, false, undef.
226                 # We care about "true" or "undef",
227                 # "undef" means to do the default thing, which is make it sortable.
228                 if (!defined $sort || $sort) {
229                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
230                     $sort_fields{$self->index}{$name} = 1;
231                 }
232             }
233         );
234         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
235         $all_mappings{$self->index} = $mappings;
236     }
237     $self->sort_fields(\%{$sort_fields{$self->index}});
238     return $all_mappings{$self->index};
239 }
240
241 =head2 raw_elasticsearch_mappings
242
243 Return elasticsearch mapping as it is in database.
244 marc_type: marc21|unimarc
245
246 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
247
248 =cut
249
250 sub raw_elasticsearch_mappings {
251     my ( $marc_type ) = @_;
252
253     my $schema = Koha::Database->new()->schema();
254
255     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
256
257     my $mappings = {};
258     while ( my $search_field = $search_fields->next ) {
259
260         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
261             { search_field_id => $search_field->id },
262             {
263                 join     => 'search_marc_map',
264                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
265             }
266         );
267
268         while ( my $marc_to_field = $marc_to_fields->next ) {
269
270             my $marc_map = $marc_to_field->search_marc_map;
271
272             next if $marc_type && $marc_map->marc_type ne $marc_type;
273
274             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
281
282             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
283                 {
284                     facet   => $marc_to_field->facet || '',
285                     marc_type => $marc_map->marc_type,
286                     marc_field => $marc_map->marc_field,
287                     sort        => $marc_to_field->sort,
288                     suggestible => $marc_to_field->suggestible || ''
289                 });
290
291         }
292     }
293
294     return $mappings;
295 }
296
297 =head2 _get_elasticsearch_field_config
298
299 Get the Elasticsearch field config for the given purpose and data type.
300
301 $mapping = _get_elasticsearch_field_config('search', 'text');
302
303 =cut
304
305 sub _get_elasticsearch_field_config {
306
307     my ( $purpose, $type ) = @_;
308
309     # Use state to speed up repeated calls
310     state $settings = undef;
311     if (!defined $settings) {
312         my $config_file = C4::Context->config('elasticsearch_field_config');
313         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314         local $YAML::XS::Boolean = 'JSON::PP';
315         $settings = YAML::XS::LoadFile( $config_file );
316     }
317
318     if (!defined $settings->{$purpose}) {
319         die "Field purpose $purpose not defined in field config";
320     }
321     if ($type eq '') {
322         return $settings->{$purpose};
323     }
324     if (defined $settings->{$purpose}{$type}) {
325         return $settings->{$purpose}{$type};
326     }
327     if (defined $settings->{$purpose}{'default'}) {
328         return $settings->{$purpose}{'default'};
329     }
330     return;
331 }
332
333 =head2 _load_elasticsearch_mappings
334
335 Load Elasticsearch mappings in the format of mappings.yaml.
336
337 $indexes = _load_elasticsearch_mappings();
338
339 =cut
340
341 sub _load_elasticsearch_mappings {
342     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
343     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
344     return YAML::XS::LoadFile( $mappings_yaml );
345 }
346
347 sub reset_elasticsearch_mappings {
348     my ( $self ) = @_;
349     my $indexes = $self->_load_elasticsearch_mappings();
350
351     Koha::SearchMarcMaps->delete;
352     Koha::SearchFields->delete;
353
354     while ( my ( $index_name, $fields ) = each %$indexes ) {
355         while ( my ( $field_name, $data ) = each %$fields ) {
356
357             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
358
359             # Set default values
360             $sf_params{staff_client} //= 1;
361             $sf_params{opac} //= 1;
362
363             $sf_params{name} = $field_name;
364
365             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366
367             my $mappings = $data->{mappings};
368             for my $mapping ( @$mappings ) {
369                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
370                     index_name => $index_name,
371                     marc_type => $mapping->{marc_type},
372                     marc_field => $mapping->{marc_field}
373                 });
374                 $search_field->add_to_search_marc_maps($marc_field, {
375                     facet => $mapping->{facet} || 0,
376                     suggestible => $mapping->{suggestible} || 0,
377                     sort => $mapping->{sort} // 1,
378                     search => $mapping->{search} // 1
379                 });
380             }
381         }
382     }
383
384     $self->clear_search_fields_cache();
385
386     # FIXME return the mappings?
387 }
388
389 # This overrides the accessor provided by Class::Accessor so that if
390 # sort_fields isn't set, then it'll generate it.
391 sub sort_fields {
392     my $self = shift;
393     if (@_) {
394         $self->_sort_fields_accessor(@_);
395         return;
396     }
397     my $val = $self->_sort_fields_accessor();
398     return $val if $val;
399
400     # This will populate the accessor as a side effect
401     $self->get_elasticsearch_mappings();
402     return $self->_sort_fields_accessor();
403 }
404
405 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406
407     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408
409 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
410 Since we group all mappings by MARC field targets C<$mappings> will contain
411 all targets for C<$data> and thus we need to fetch the MARC field only once.
412 C<$mappings> will be applied to C<$record_document> and new field values added.
413 The method has no return value.
414
415 =over 4
416
417 =item C<$mappings>
418
419 Arrayref of mappings containing arrayrefs in the format
420 [C<$target>, C<$options>] where C<$target> is the name of the target field and
421 C<$options> is a hashref containing processing directives for this particular
422 mapping.
423
424 =item C<$data>
425
426 The source data from a MARC record field.
427
428 =item C<$record_document>
429
430 Hashref representing the Elasticsearch document on which mappings should be
431 applied.
432
433 =item C<$meta>
434
435 A hashref containing metadata useful for enforcing per mapping rules. For
436 example for providing extra context for mapping options, or treating mapping
437 targets differently depending on type (sort, search, facet etc). Combining
438 this metadata with the mapping options and metadata allows us to mutate the
439 data per mapping, or even replace it with other data retrieved from the
440 metadata context.
441
442 Current properties are:
443
444 C<altscript>: A boolean value indicating whether an alternate script presentation is being
445 processed.
446
447 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
448 'subfield' or 'subfields_group'.
449
450 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451
452 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
453 is 'subfields_group'.
454
455 C<field>: The original C<MARC::Record> object.
456
457 =back
458
459 =cut
460
461 sub _process_mappings {
462     my ($_self, $mappings, $data, $record_document, $meta) = @_;
463     foreach my $mapping (@{$mappings}) {
464         my ($target, $options) = @{$mapping};
465
466         # Don't process sort fields for alternate scripts
467         my $sort = $target =~ /__sort$/;
468         if ($sort && $meta->{altscript}) {
469             next;
470         }
471
472         # Copy (scalar) data since can have multiple targets
473         # with differing options for (possibly) mutating data
474         # so need a different copy for each
475         my $data_copy = $data;
476         if (defined $options->{substr}) {
477             my ($start, $length) = @{$options->{substr}};
478             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
479         }
480
481         # Add data to values array for callbacks processing
482         my $values = [$data_copy];
483
484         # Value callbacks takes subfield data (or values from previous
485         # callbacks) as argument, and returns a possibly different list of values.
486         # Note that the returned list may also be empty.
487         if (defined $options->{value_callbacks}) {
488             foreach my $callback (@{$options->{value_callbacks}}) {
489                 # Pass each value to current callback which returns a list
490                 # (scalar is fine too) resulting either in a list or
491                 # a list of lists that will be flattened by perl.
492                 # The next callback will receive the possibly expanded list of values.
493                 $values = [ map { $callback->($_) } @{$values} ];
494             }
495         }
496
497         # Skip mapping if all values has been removed
498         next unless @{$values};
499
500         if (defined $options->{property}) {
501             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
502         }
503         if (defined $options->{nonfiling_characters_indicator}) {
504             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
505             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
506             # Nonfiling chars does not make sense for multiple values
507             # Only apply on first element
508             $values->[0] = substr $values->[0], $nonfiling_chars;
509         }
510
511         $values = [ grep(!/^$/, @{$values}) ];
512
513         $record_document->{$target} //= [];
514         push @{$record_document->{$target}}, @{$values};
515     }
516 }
517
518 =head2 marc_records_to_documents($marc_records)
519
520     my $record_documents = $self->marc_records_to_documents($marc_records);
521
522 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
523
524 Returns array of hash references, representing Elasticsearch documents,
525 acceptable as body payload in C<Search::Elasticsearch> requests.
526
527 =over 4
528
529 =item C<$marc_documents>
530
531 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
532
533 =back
534
535 =cut
536
537 sub marc_records_to_documents {
538     my ($self, $records) = @_;
539     my $rules = $self->_get_marc_mapping_rules();
540     my $control_fields_rules = $rules->{control_fields};
541     my $data_fields_rules = $rules->{data_fields};
542     my $marcflavour = lc C4::Context->preference('marcflavour');
543     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
544
545     my @record_documents;
546
547     my %auth_match_headings;
548     if( $self->index eq 'authorities' ){
549         my @auth_types = Koha::Authority::Types->search();
550         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
551     }
552
553     foreach my $record (@{$records}) {
554         my $record_document = {};
555
556         if ( $self->index eq 'authorities' ){
557             my $authtypecode = GuessAuthTypeCode( $record );
558             if( $authtypecode ){
559                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
560                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
561                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
562                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
563                 }
564             } else {
565                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
566             }
567         }
568
569         my $mappings = $rules->{leader};
570         if ($mappings) {
571             $self->_process_mappings($mappings, $record->leader(), $record_document, {
572                     altscript => 0,
573                     data_source => 'leader'
574                 }
575             );
576         }
577         foreach my $field ($record->fields()) {
578             if ($field->is_control_field()) {
579                 my $mappings = $control_fields_rules->{$field->tag()};
580                 if ($mappings) {
581                     $self->_process_mappings($mappings, $field->data(), $record_document, {
582                             altscript => 0,
583                             data_source => 'control_field',
584                             field => $field
585                         }
586                     );
587                 }
588             }
589             else {
590                 my $tag = $field->tag();
591                 # Handle alternate scripts in MARC 21
592                 my $altscript = 0;
593                 if ($marcflavour eq 'marc21' && $tag eq '880') {
594                     my $sub6 = $field->subfield('6');
595                     if ($sub6 =~ /^(...)-\d+/) {
596                         $tag = $1;
597                         $altscript = 1;
598                     }
599                 }
600
601                 my $data_field_rules = $data_fields_rules->{$tag};
602                 if ($data_field_rules) {
603                     my $subfields_mappings = $data_field_rules->{subfields};
604                     my $wildcard_mappings = $subfields_mappings->{'*'};
605                     foreach my $subfield ($field->subfields()) {
606                         my ($code, $data) = @{$subfield};
607                         my $mappings = $subfields_mappings->{$code} // [];
608                         if ($wildcard_mappings) {
609                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
610                         }
611                         if (@{$mappings}) {
612                             $self->_process_mappings($mappings, $data, $record_document, {
613                                     altscript => $altscript,
614                                     data_source => 'subfield',
615                                     code => $code,
616                                     field => $field
617                                 }
618                             );
619                         }
620                     }
621
622                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
623                     if ($subfields_join_mappings) {
624                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
625                             my $data_field = $field->clone; #copy field to preserve for alt scripts
626                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
627                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
628                             if ($data) {
629                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
630                                         altscript => $altscript,
631                                         data_source => 'subfields_group',
632                                         codes => $subfields_group,
633                                         field => $field
634                                     }
635                                 );
636                             }
637                         }
638                     }
639                 }
640             }
641         }
642         foreach my $field (keys %{$rules->{defaults}}) {
643             unless (defined $record_document->{$field}) {
644                 $record_document->{$field} = $rules->{defaults}->{$field};
645             }
646         }
647         foreach my $field (@{$rules->{sum}}) {
648             if (defined $record_document->{$field}) {
649                 # TODO: validate numeric? filter?
650                 # TODO: Or should only accept fields without nested values?
651                 # TODO: Quick and dirty, improve if needed
652                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
653             }
654         }
655         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
656         foreach my $field (@{$rules->{isbn}}) {
657             if (defined $record_document->{$field}) {
658                 my @isbns = ();
659                 foreach my $input_isbn (@{$record_document->{$field}}) {
660                     my $isbn = Business::ISBN->new($input_isbn);
661                     if (defined $isbn && $isbn->is_valid) {
662                         my $isbn13 = $isbn->as_isbn13->as_string;
663                         push @isbns, $isbn13;
664                         $isbn13 =~ s/\-//g;
665                         push @isbns, $isbn13;
666
667                         my $isbn10 = $isbn->as_isbn10;
668                         if ($isbn10) {
669                             $isbn10 = $isbn10->as_string;
670                             push @isbns, $isbn10;
671                             $isbn10 =~ s/\-//g;
672                             push @isbns, $isbn10;
673                         }
674                     } else {
675                         push @isbns, $input_isbn;
676                     }
677                 }
678                 $record_document->{$field} = \@isbns;
679             }
680         }
681
682         # Remove duplicate values and collapse sort fields
683         foreach my $field (keys %{$record_document}) {
684             if (ref($record_document->{$field}) eq 'ARRAY') {
685                 @{$record_document->{$field}} = do {
686                     my %seen;
687                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
688                 };
689                 if ($field =~ /__sort$/) {
690                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
691                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
692                 }
693             }
694         }
695
696         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
697         $record->encoding('UTF-8');
698         if ($use_array) {
699             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
700             $record_document->{'marc_format'} = 'ARRAY';
701         } else {
702             my @warnings;
703             {
704                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
705                 local $SIG{__WARN__} = sub {
706                     push @warnings, $_[0];
707                 };
708                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
709             }
710             if (@warnings) {
711                 # Suppress warnings if record length exceeded
712                 unless (substr($record->leader(), 0, 5) eq '99999') {
713                     foreach my $warning (@warnings) {
714                         carp $warning;
715                     }
716                 }
717                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
718                 $record_document->{'marc_format'} = 'MARCXML';
719             }
720             else {
721                 $record_document->{'marc_format'} = 'base64ISO2709';
722             }
723         }
724         push @record_documents, $record_document;
725     }
726     return \@record_documents;
727 }
728
729 =head2 _marc_to_array($record)
730
731     my @fields = _marc_to_array($record)
732
733 Convert a MARC::Record to an array modeled after MARC-in-JSON
734 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
735
736 =over 4
737
738 =item C<$record>
739
740 A MARC::Record object
741
742 =back
743
744 =cut
745
746 sub _marc_to_array {
747     my ($self, $record) = @_;
748
749     my $data = {
750         leader => $record->leader(),
751         fields => []
752     };
753     for my $field ($record->fields()) {
754         my $tag = $field->tag();
755         if ($field->is_control_field()) {
756             push @{$data->{fields}}, {$tag => $field->data()};
757         } else {
758             my $subfields = ();
759             foreach my $subfield ($field->subfields()) {
760                 my ($code, $contents) = @{$subfield};
761                 push @{$subfields}, {$code => $contents};
762             }
763             push @{$data->{fields}}, {
764                 $tag => {
765                     ind1 => $field->indicator(1),
766                     ind2 => $field->indicator(2),
767                     subfields => $subfields
768                 }
769             };
770         }
771     }
772     return $data;
773 }
774
775 =head2 _array_to_marc($data)
776
777     my $record = _array_to_marc($data)
778
779 Convert an array modeled after MARC-in-JSON to a MARC::Record
780
781 =over 4
782
783 =item C<$data>
784
785 An array modeled after MARC-in-JSON
786 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
787
788 =back
789
790 =cut
791
792 sub _array_to_marc {
793     my ($self, $data) = @_;
794
795     my $record = MARC::Record->new();
796
797     $record->leader($data->{leader});
798     for my $field (@{$data->{fields}}) {
799         my $tag = (keys %{$field})[0];
800         $field = $field->{$tag};
801         my $marc_field;
802         if (ref($field) eq 'HASH') {
803             my @subfields;
804             foreach my $subfield (@{$field->{subfields}}) {
805                 my $code = (keys %{$subfield})[0];
806                 push @subfields, $code;
807                 push @subfields, $subfield->{$code};
808             }
809             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
810         } else {
811             $marc_field = MARC::Field->new($tag, $field)
812         }
813         $record->append_fields($marc_field);
814     }
815 ;
816     return $record;
817 }
818
819 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
820
821     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
822
823 Get mappings, an internal data structure later used by
824 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
825 data for a MARC mapping.
826
827 The returned C<$mappings> is not to to be confused with mappings provided by
828 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
829 provided by C<_foreach_mapping> and expands it to this internal data structure.
830 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
831 is then applied to each MARC target (leader, control field data, subfield or
832 joined subfields) and integrated into the mapping rules data structure used in
833 C<marc_records_to_documents> to transform MARC records into Elasticsearch
834 documents.
835
836 =over 4
837
838 =item C<$facet>
839
840 Boolean indicating whether to create a facet field for this mapping.
841
842 =item C<$suggestible>
843
844 Boolean indicating whether to create a suggestion field for this mapping.
845
846 =item C<$sort>
847
848 Boolean indicating whether to create a sort field for this mapping.
849
850 =item C<$search>
851
852 Boolean indicating whether to create a search field for this mapping.
853
854 =item C<$target_name>
855
856 Elasticsearch document target field name.
857
858 =item C<$target_type>
859
860 Elasticsearch document target field type.
861
862 =item C<$range>
863
864 An optional range as a string in the format "<START>-<END>" or "<START>",
865 where "<START>" and "<END>" are integers specifying a range that will be used
866 for extracting a substring from MARC data as Elasticsearch field target value.
867
868 The first character position is "0", and the range is inclusive,
869 so "0-2" means the first three characters of MARC data.
870
871 If only "<START>" is provided only one character at position "<START>" will
872 be extracted.
873
874 =back
875
876 =cut
877
878 sub _field_mappings {
879     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
880     my %mapping_defaults = ();
881     my @mappings;
882
883     my $substr_args = undef;
884     if (defined $range) {
885         # TODO: use value_callback instead?
886         my ($start, $end) = map(int, split /-/, $range, 2);
887         $substr_args = [$start];
888         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
889     }
890     my $default_options = {};
891     if ($substr_args) {
892         $default_options->{substr} = $substr_args;
893     }
894
895     # TODO: Should probably have per type value callback/hook
896     # but hard code for now
897     if ($target_type eq 'boolean') {
898         $default_options->{value_callbacks} //= [];
899         push @{$default_options->{value_callbacks}}, sub {
900             my ($value) = @_;
901             # Trim whitespace at both ends
902             $value =~ s/^\s+|\s+$//g;
903             return $value ? 'true' : 'false';
904         };
905     }
906     elsif ($target_type eq 'year') {
907         $default_options->{value_callbacks} //= [];
908         # Only accept years containing digits and "u"
909         push @{$default_options->{value_callbacks}}, sub {
910             my ($value) = @_;
911             # Replace "u" with "0" for sorting
912             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
913         };
914     }
915
916     if ($search) {
917         my $mapping = [$target_name, $default_options];
918         push @mappings, $mapping;
919     }
920
921     my @suffixes = ();
922     push @suffixes, 'facet' if $facet;
923     push @suffixes, 'suggestion' if $suggestible;
924     push @suffixes, 'sort' if !defined $sort || $sort;
925
926     foreach my $suffix (@suffixes) {
927         my $mapping = ["${target_name}__$suffix"];
928         # TODO: Hack, fix later in less hideous manner
929         if ($suffix eq 'suggestion') {
930             push @{$mapping}, {%{$default_options}, property => 'input'};
931         }
932         else {
933             # Important! Make shallow clone, or we end up with the same hashref
934             # shared by all mappings
935             push @{$mapping}, {%{$default_options}};
936         }
937         push @mappings, $mapping;
938     }
939     return @mappings;
940 };
941
942 =head2 _get_marc_mapping_rules
943
944     my $mapping_rules = $self->_get_marc_mapping_rules()
945
946 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
947
948 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
949 each call to C<MARC::Record>->field) we create an optimized structure of mapping
950 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
951
952 We can then iterate through all MARC fields for each record and apply all relevant
953 rules once per fields instead of retreiving fields multiple times for each mapping rule
954 which is terribly slow.
955
956 =cut
957
958 # TODO: This structure can be used for processing multiple MARC::Records so is currently
959 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
960 # memory cache which it is currently not. The performance gain of caching
961 # would probably be marginal, but to do this could be a further improvement.
962
963 sub _get_marc_mapping_rules {
964     my ($self) = @_;
965     my $marcflavour = lc C4::Context->preference('marcflavour');
966     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
967     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
968     my $rules = {
969         'leader' => [],
970         'control_fields' => {},
971         'data_fields' => {},
972         'sum' => [],
973         'isbn' => [],
974         'defaults' => {}
975     };
976
977     $self->_foreach_mapping(sub {
978         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
979         return if $marc_type ne $marcflavour;
980
981         if ($type eq 'sum') {
982             push @{$rules->{sum}}, $name;
983             push @{$rules->{sum}}, $name."__sort" if $sort;
984         }
985         elsif ($type eq 'isbn') {
986             push @{$rules->{isbn}}, $name;
987         }
988         elsif ($type eq 'boolean') {
989             # boolean gets special handling, if value doesn't exist for a field,
990             # it is set to false
991             $rules->{defaults}->{$name} = 'false';
992         }
993
994         if ($marc_field =~ $field_spec_regexp) {
995             my $field_tag = $1;
996
997             my @subfields;
998             my @subfield_groups;
999             # Parse and separate subfields form subfield groups
1000             if (defined $2) {
1001                 my $subfield_group = '';
1002                 my $open_group = 0;
1003
1004                 foreach my $token (split //, $2) {
1005                     if ($token eq "(") {
1006                         if ($open_group) {
1007                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1008                                 "Unmatched opening parenthesis for $marc_field"
1009                             );
1010                         }
1011                         else {
1012                             $open_group = 1;
1013                         }
1014                     }
1015                     elsif ($token eq ")") {
1016                         if ($open_group) {
1017                             if ($subfield_group) {
1018                                 push @subfield_groups, $subfield_group;
1019                                 $subfield_group = '';
1020                             }
1021                             $open_group = 0;
1022                         }
1023                         else {
1024                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1025                                 "Unmatched closing parenthesis for $marc_field"
1026                             );
1027                         }
1028                     }
1029                     elsif ($open_group) {
1030                         $subfield_group .= $token;
1031                     }
1032                     else {
1033                         push @subfields, $token;
1034                     }
1035                 }
1036             }
1037             else {
1038                 push @subfields, '*';
1039             }
1040
1041             my $range = defined $3 ? $3 : undef;
1042             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1043             if ($field_tag < 10) {
1044                 $rules->{control_fields}->{$field_tag} //= [];
1045                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1046             }
1047             else {
1048                 $rules->{data_fields}->{$field_tag} //= {};
1049                 foreach my $subfield (@subfields) {
1050                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1051                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1052                 }
1053                 foreach my $subfield_group (@subfield_groups) {
1054                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1055                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1056                 }
1057             }
1058         }
1059         elsif ($marc_field =~ $leader_regexp) {
1060             my $range = defined $1 ? $1 : undef;
1061             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1062             push @{$rules->{leader}}, @mappings;
1063         }
1064         else {
1065             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1066                 "Invalid MARC field expression: $marc_field"
1067             );
1068         }
1069     });
1070
1071     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1072     if ($marcflavour eq 'marc21') {
1073         # Nonfiling characters processing for sort fields
1074         my %title_fields;
1075         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1076             # Format is: nonfiling characters indicator => field names list
1077             %title_fields = (
1078                 1 => [130, 630, 730, 740],
1079                 2 => [222, 240, 242, 243, 245, 440, 830]
1080             );
1081         }
1082         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1083             %title_fields = (
1084                 1 => [730],
1085                 2 => [130, 430, 530]
1086             );
1087         }
1088         foreach my $indicator (keys %title_fields) {
1089             foreach my $field_tag (@{$title_fields{$indicator}}) {
1090                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1091                 foreach my $mapping (@{$mappings}) {
1092                     if ($mapping->[0] =~ /__sort$/) {
1093                         # Mark this as to be processed for nonfiling characters indicator
1094                         # later on in _process_mappings
1095                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1096                     }
1097                 }
1098             }
1099         }
1100     }
1101
1102     return $rules;
1103 }
1104
1105 =head2 _foreach_mapping
1106
1107     $self->_foreach_mapping(
1108         sub {
1109             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1110                 $marc_field )
1111               = @_;
1112             return unless $marc_type eq 'marc21';
1113             print "Data comes from: " . $marc_field . "\n";
1114         }
1115     );
1116
1117 This allows you to apply a function to each entry in the elasticsearch mappings
1118 table, in order to build the mappings for whatever is needed.
1119
1120 In the provided function, the files are:
1121
1122 =over 4
1123
1124 =item C<$name>
1125
1126 The field name for elasticsearch (corresponds to the 'mapping' column in the
1127 database.
1128
1129 =item C<$type>
1130
1131 The type for this value, e.g. 'string'.
1132
1133 =item C<$facet>
1134
1135 True if this value should be facetised. This only really makes sense if the
1136 field is understood by the facet processing code anyway.
1137
1138 =item C<$sort>
1139
1140 True if this is a field that a) needs special sort handling, and b) if it
1141 should be sorted on. False if a) but not b). Undef if not a). This allows,
1142 for example, author to be sorted on but not everything marked with "author"
1143 to be included in that sort.
1144
1145 =item C<$marc_type>
1146
1147 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1148 'unimarc'.
1149
1150 =item C<$marc_field>
1151
1152 A string that describes the MARC field that contains the data to extract.
1153
1154 =back
1155
1156 =cut
1157
1158 sub _foreach_mapping {
1159     my ( $self, $sub ) = @_;
1160
1161     # TODO use a caching framework here
1162     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1163         {
1164             'search_marc_map.index_name' => $self->index,
1165         },
1166         {   join => { search_marc_to_fields => 'search_marc_map' },
1167             '+select' => [
1168                 'search_marc_to_fields.facet',
1169                 'search_marc_to_fields.suggestible',
1170                 'search_marc_to_fields.sort',
1171                 'search_marc_to_fields.search',
1172                 'search_marc_map.marc_type',
1173                 'search_marc_map.marc_field',
1174             ],
1175             '+as'     => [
1176                 'facet',
1177                 'suggestible',
1178                 'sort',
1179                 'search',
1180                 'marc_type',
1181                 'marc_field',
1182             ],
1183         }
1184     );
1185
1186     while ( my $search_field = $search_fields->next ) {
1187         $sub->(
1188             # Force lower case on indexed field names for case insensitive
1189             # field name searches
1190             lc($search_field->name),
1191             $search_field->type,
1192             $search_field->get_column('facet'),
1193             $search_field->get_column('suggestible'),
1194             $search_field->get_column('sort'),
1195             $search_field->get_column('search'),
1196             $search_field->get_column('marc_type'),
1197             $search_field->get_column('marc_field'),
1198         );
1199     }
1200 }
1201
1202 =head2 process_error
1203
1204     die process_error($@);
1205
1206 This parses an Elasticsearch error message and produces a human-readable
1207 result from it. This result is probably missing all the useful information
1208 that you might want in diagnosing an issue, so the warning is also logged.
1209
1210 Note that currently the resulting message is not internationalised. This
1211 will happen eventually by some method or other.
1212
1213 =cut
1214
1215 sub process_error {
1216     my ($self, $msg) = @_;
1217
1218     warn $msg; # simple logging
1219
1220     # This is super-primitive
1221     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1222
1223     return "Unable to perform your search. Please try again.\n";
1224 }
1225
1226 =head2 _read_configuration
1227
1228     my $conf = _read_configuration();
1229
1230 Reads the I<configuration file> and returns a hash structure with the
1231 configuration information. It raises an exception if mandatory entries
1232 are missing.
1233
1234 The hashref structure has the following form:
1235
1236     {
1237         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1238         'index_name' => 'koha_instance',
1239     }
1240
1241 This is configured by the following in the C<config> block in koha-conf.xml:
1242
1243     <elasticsearch>
1244         <server>127.0.0.1:9200</server>
1245         <server>anotherserver:9200</server>
1246         <index_name>koha_instance</index_name>
1247     </elasticsearch>
1248
1249 =cut
1250
1251 sub _read_configuration {
1252
1253     my $configuration;
1254
1255     my $conf = C4::Context->config('elasticsearch');
1256     unless ( defined $conf ) {
1257         Koha::Exceptions::Config::MissingEntry->throw(
1258             "Missing <elasticsearch> entry in koha-conf.xml"
1259         );
1260     }
1261
1262     if ( $conf && $conf->{server} ) {
1263         my $nodes = $conf->{server};
1264         if ( ref($nodes) eq 'ARRAY' ) {
1265             $configuration->{nodes} = $nodes;
1266         }
1267         else {
1268             $configuration->{nodes} = [$nodes];
1269         }
1270     }
1271     else {
1272         Koha::Exceptions::Config::MissingEntry->throw(
1273             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1274         );
1275     }
1276
1277     if ( defined $conf->{index_name} ) {
1278         $configuration->{index_name} = $conf->{index_name};
1279     }
1280     else {
1281         Koha::Exceptions::Config::MissingEntry->throw(
1282             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1283         );
1284     }
1285
1286     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1287
1288     $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1289
1290     return $configuration;
1291 }
1292
1293 =head2 get_facetable_fields
1294
1295 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1296
1297 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1298
1299 =cut
1300
1301 sub get_facetable_fields {
1302     my ($self) = @_;
1303
1304     # These should correspond to the ES field names, as opposed to the CCL
1305     # things that zebra uses.
1306     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1307     my @faceted_fields = Koha::SearchFields->search(
1308         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1309     );
1310     my @not_faceted_fields = Koha::SearchFields->search(
1311         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1312     );
1313     # This could certainly be improved
1314     return ( @faceted_fields, @not_faceted_fields );
1315 }
1316
1317 =head2 clear_search_fields_cache
1318
1319 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1320
1321 Clear cached values for ES search fields
1322
1323 =cut
1324
1325 sub clear_search_fields_cache {
1326
1327     my $cache = Koha::Caches->get_instance();
1328     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1329     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1330     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1331     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1332
1333 }
1334
1335 1;
1336
1337 __END__
1338
1339 =head1 AUTHOR
1340
1341 =over 4
1342
1343 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1344
1345 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1346
1347 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1348
1349 =back
1350
1351 =cut