Bug 22771: (QA follow-up) Fix POD
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
30
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
39
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
46
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
49
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX     => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
53
54 =head1 NAME
55
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
57
58 =head1 ACCESSORS
59
60 =over 4
61
62 =item index
63
64 The name of the index to use, generally 'biblios' or 'authorities'.
65
66 =back
67
68 =head1 FUNCTIONS
69
70 =cut
71
72 sub new {
73     my $class = shift @_;
74     my $self = $class->SUPER::new(@_);
75     # Check for a valid index
76     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77     return $self;
78 }
79
80 =head2 get_elasticsearch
81
82     my $elasticsearch_client = $self->get_elasticsearch();
83
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
86
87 =cut
88
89 sub get_elasticsearch {
90     my $self = shift @_;
91     unless (defined $self->{elasticsearch}) {
92         my $conf = $self->get_elasticsearch_params();
93         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
94     }
95     return $self->{elasticsearch};
96 }
97
98 =head2 get_elasticsearch_params
99
100     my $params = $self->get_elasticsearch_params();
101
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
104
105     {
106         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107         'index_name' => 'koha_instance_index',
108     }
109
110 This is configured by the following in the C<config> block in koha-conf.xml:
111
112     <elasticsearch>
113         <server>127.0.0.1:9200</server>
114         <server>anotherserver:9200</server>
115         <index_name>koha_instance</index_name>
116     </elasticsearch>
117
118 =cut
119
120 sub get_elasticsearch_params {
121     my ($self) = @_;
122
123     # Copy the hash so that we're not modifying the original
124     my $conf = C4::Context->config('elasticsearch');
125     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126     my $es = { %{ $conf } };
127
128     # Helpfully, the multiple server lines end up in an array for us anyway
129     # if there are multiple ones, but not if there's only one.
130     my $server = $es->{server};
131     delete $es->{server};
132     if ( ref($server) eq 'ARRAY' ) {
133
134         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135         $es->{nodes} = $server;
136     }
137     elsif ($server) {
138         $es->{nodes} = [$server];
139     }
140     else {
141         die "No elasticsearch servers were specified in koha-conf.xml.\n";
142     }
143     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144       if ( !$es->{index_name} );
145     # Append the name of this particular index to our namespace
146     $es->{index_name} .= '_' . $self->index;
147
148     $es->{key_prefix} = 'es_';
149     $es->{cxn_pool} //= 'Static';
150     $es->{request_timeout} //= 60;
151
152     return $es;
153 }
154
155 =head2 get_elasticsearch_settings
156
157     my $settings = $self->get_elasticsearch_settings();
158
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
161
162 A hashref containing the settings is returned.
163
164 =cut
165
166 sub get_elasticsearch_settings {
167     my ($self) = @_;
168
169     # Use state to speed up repeated calls
170     state $settings = undef;
171     if (!defined $settings) {
172         my $config_file = C4::Context->config('elasticsearch_index_config');
173         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174         $settings = LoadFile( $config_file );
175     }
176
177     return $settings;
178 }
179
180 =head2 get_elasticsearch_mappings
181
182     my $mappings = $self->get_elasticsearch_mappings();
183
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
186
187 =cut
188
189 sub get_elasticsearch_mappings {
190     my ($self) = @_;
191
192     # Use state to speed up repeated calls
193     state %all_mappings;
194     state %sort_fields;
195
196     if (!defined $all_mappings{$self->index}) {
197         $sort_fields{$self->index} = {};
198         # Clone the general mapping to break ties with the original hash
199         my $mappings = {
200             data => clone(_get_elasticsearch_field_config('general', ''))
201         };
202         my $marcflavour = lc C4::Context->preference('marcflavour');
203         $self->_foreach_mapping(
204             sub {
205                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206                 return if $marc_type ne $marcflavour;
207                 # TODO if this gets any sort of complexity to it, it should
208                 # be broken out into its own function.
209
210                 # TODO be aware of date formats, but this requires pre-parsing
211                 # as ES will simply reject anything with an invalid date.
212                 my $es_type = 'text';
213                 if ($type eq 'boolean') {
214                     $es_type = 'boolean';
215                 } elsif ($type eq 'number' || $type eq 'sum') {
216                     $es_type = 'integer';
217                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218                     $es_type = 'stdno';
219                 }
220
221                 if ($search) {
222                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223                 }
224
225                 if ($facet) {
226                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
227                 }
228                 if ($suggestible) {
229                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
230                 }
231                 # Sort is a bit special as it can be true, false, undef.
232                 # We care about "true" or "undef",
233                 # "undef" means to do the default thing, which is make it sortable.
234                 if (!defined $sort || $sort) {
235                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236                     $sort_fields{$self->index}{$name} = 1;
237                 }
238             }
239         );
240         $all_mappings{$self->index} = $mappings;
241     }
242     $self->sort_fields(\%{$sort_fields{$self->index}});
243
244     return $all_mappings{$self->index};
245 }
246
247 =head2 _get_elasticsearch_field_config
248
249 Get the Elasticsearch field config for the given purpose and data type.
250
251 $mapping = _get_elasticsearch_field_config('search', 'text');
252
253 =cut
254
255 sub _get_elasticsearch_field_config {
256
257     my ( $purpose, $type ) = @_;
258
259     # Use state to speed up repeated calls
260     state $settings = undef;
261     if (!defined $settings) {
262         my $config_file = C4::Context->config('elasticsearch_field_config');
263         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
264         $settings = LoadFile( $config_file );
265     }
266
267     if (!defined $settings->{$purpose}) {
268         die "Field purpose $purpose not defined in field config";
269     }
270     if ($type eq '') {
271         return $settings->{$purpose};
272     }
273     if (defined $settings->{$purpose}{$type}) {
274         return $settings->{$purpose}{$type};
275     }
276     if (defined $settings->{$purpose}{'default'}) {
277         return $settings->{$purpose}{'default'};
278     }
279     return;
280 }
281
282 =head2 _load_elasticsearch_mappings
283
284 Load Elasticsearch mappings in the format of mappings.yaml.
285
286 $indexes = _load_elasticsearch_mappings();
287
288 =cut
289
290 sub _load_elasticsearch_mappings {
291     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
292     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
293     return LoadFile( $mappings_yaml );
294 }
295
296 sub reset_elasticsearch_mappings {
297     my ( $self ) = @_;
298     my $indexes = $self->_load_elasticsearch_mappings();
299
300     Koha::SearchMarcMaps->delete;
301     Koha::SearchFields->delete;
302
303     while ( my ( $index_name, $fields ) = each %$indexes ) {
304         while ( my ( $field_name, $data ) = each %$fields ) {
305
306             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
307
308             # Set default values
309             $sf_params{staff_client} //= 1;
310             $sf_params{opac} //= 1;
311
312             $sf_params{name} = $field_name;
313
314             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
315
316             my $mappings = $data->{mappings};
317             for my $mapping ( @$mappings ) {
318                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
319                     index_name => $index_name,
320                     marc_type => $mapping->{marc_type},
321                     marc_field => $mapping->{marc_field}
322                 });
323                 $search_field->add_to_search_marc_maps($marc_field, {
324                     facet => $mapping->{facet} || 0,
325                     suggestible => $mapping->{suggestible} || 0,
326                     sort => $mapping->{sort},
327                     search => $mapping->{search} // 1
328                 });
329             }
330         }
331     }
332 }
333
334 # This overrides the accessor provided by Class::Accessor so that if
335 # sort_fields isn't set, then it'll generate it.
336 sub sort_fields {
337     my $self = shift;
338     if (@_) {
339         $self->_sort_fields_accessor(@_);
340         return;
341     }
342     my $val = $self->_sort_fields_accessor();
343     return $val if $val;
344
345     # This will populate the accessor as a side effect
346     $self->get_elasticsearch_mappings();
347     return $self->_sort_fields_accessor();
348 }
349
350 =head2 _process_mappings($mappings, $data, $record_document, $meta)
351
352     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
353
354 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
355 Since we group all mappings by MARC field targets C<$mappings> will contain
356 all targets for C<$data> and thus we need to fetch the MARC field only once.
357 C<$mappings> will be applied to C<$record_document> and new field values added.
358 The method has no return value.
359
360 =over 4
361
362 =item C<$mappings>
363
364 Arrayref of mappings containing arrayrefs in the format
365 [C<$target>, C<$options>] where C<$target> is the name of the target field and
366 C<$options> is a hashref containing processing directives for this particular
367 mapping.
368
369 =item C<$data>
370
371 The source data from a MARC record field.
372
373 =item C<$record_document>
374
375 Hashref representing the Elasticsearch document on which mappings should be
376 applied.
377
378 =item C<$meta>
379
380 A hashref containing metadata useful for enforcing per mapping rules. For
381 example for providing extra context for mapping options, or treating mapping
382 targets differently depending on type (sort, search, facet etc). Combining
383 this metadata with the mapping options and metadata allows us to mutate the
384 data per mapping, or even replace it with other data retrieved from the
385 metadata context.
386
387 Current properties are:
388
389 C<altscript>: A boolean value indicating whether an alternate script presentation is being
390 processed.
391
392 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
393 'subfield' or 'subfields_group'.
394
395 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
396
397 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
398 is 'subfields_group'.
399
400 C<field>: The original C<MARC::Record> object.
401
402 =back
403
404 =cut
405
406 sub _process_mappings {
407     my ($_self, $mappings, $data, $record_document, $meta) = @_;
408     foreach my $mapping (@{$mappings}) {
409         my ($target, $options) = @{$mapping};
410
411         # Don't process sort fields for alternate scripts
412         my $sort = $target =~ /__sort$/;
413         if ($sort && $meta->{altscript}) {
414             next;
415         }
416
417         # Copy (scalar) data since can have multiple targets
418         # with differing options for (possibly) mutating data
419         # so need a different copy for each
420         my $_data = $data;
421         $record_document->{$target} //= [];
422         if (defined $options->{substr}) {
423             my ($start, $length) = @{$options->{substr}};
424             $_data = length($data) > $start ? substr $data, $start, $length : '';
425         }
426         if (defined $options->{value_callbacks}) {
427             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
428         }
429         if (defined $options->{property}) {
430             $_data = {
431                 $options->{property} => $_data
432             }
433         }
434         if (defined $options->{nonfiling_characters_indicator}) {
435             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
436             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
437             if ($nonfiling_chars) {
438                 $_data = substr $_data, $nonfiling_chars;
439             }
440         }
441         push @{$record_document->{$target}}, $_data;
442     }
443 }
444
445 =head2 marc_records_to_documents($marc_records)
446
447     my $record_documents = $self->marc_records_to_documents($marc_records);
448
449 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
450
451 Returns array of hash references, representing Elasticsearch documents,
452 acceptable as body payload in C<Search::Elasticsearch> requests.
453
454 =over 4
455
456 =item C<$marc_documents>
457
458 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
459
460 =back
461
462 =cut
463
464 sub marc_records_to_documents {
465     my ($self, $records) = @_;
466     my $rules = $self->_get_marc_mapping_rules();
467     my $control_fields_rules = $rules->{control_fields};
468     my $data_fields_rules = $rules->{data_fields};
469     my $marcflavour = lc C4::Context->preference('marcflavour');
470     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
471
472     my @record_documents;
473
474     foreach my $record (@{$records}) {
475         my $record_document = {};
476         my $mappings = $rules->{leader};
477         if ($mappings) {
478             $self->_process_mappings($mappings, $record->leader(), $record_document, {
479                     altscript => 0,
480                     data_source => 'leader'
481                 }
482             );
483         }
484         foreach my $field ($record->fields()) {
485             if ($field->is_control_field()) {
486                 my $mappings = $control_fields_rules->{$field->tag()};
487                 if ($mappings) {
488                     $self->_process_mappings($mappings, $field->data(), $record_document, {
489                             altscript => 0,
490                             data_source => 'control_field',
491                             field => $field
492                         }
493                     );
494                 }
495             }
496             else {
497                 my $tag = $field->tag();
498                 # Handle alternate scripts in MARC 21
499                 my $altscript = 0;
500                 if ($marcflavour eq 'marc21' && $tag eq '880') {
501                     my $sub6 = $field->subfield('6');
502                     if ($sub6 =~ /^(...)-\d+/) {
503                         $tag = $1;
504                         $altscript = 1;
505                     }
506                 }
507
508                 my $data_field_rules = $data_fields_rules->{$tag};
509                 if ($data_field_rules) {
510                     my $subfields_mappings = $data_field_rules->{subfields};
511                     my $wildcard_mappings = $subfields_mappings->{'*'};
512                     foreach my $subfield ($field->subfields()) {
513                         my ($code, $data) = @{$subfield};
514                         my $mappings = $subfields_mappings->{$code} // [];
515                         if ($wildcard_mappings) {
516                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
517                         }
518                         if (@{$mappings}) {
519                             $self->_process_mappings($mappings, $data, $record_document, {
520                                     altscript => $altscript,
521                                     data_source => 'subfield',
522                                     code => $code,
523                                     field => $field
524                                 }
525                             );
526                         }
527                         if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
528                             # Used by the authority linker the match-heading field requires a specific syntax
529                             # that is specified in C4/Heading
530                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
531                             next unless $heading;
532                             push @{$record_document->{'match-heading'}}, $heading->search_form;
533                         }
534                     }
535
536                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
537                     if ($subfields_join_mappings) {
538                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
539                             # Map each subfield to values, remove empty values, join with space
540                             my $data = join(
541                                 ' ',
542                                 grep(
543                                     $_,
544                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
545                                 )
546                             );
547                             if ($data) {
548                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
549                                         altscript => $altscript,
550                                         data_source => 'subfields_group',
551                                         codes => $subfields_group,
552                                         field => $field
553                                     }
554                                 );
555                             }
556                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
557                                 # Used by the authority linker the match-heading field requires a specific syntax
558                                 # that is specified in C4/Heading
559                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
560                                 next unless $heading;
561                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
562                             }
563                         }
564                     }
565                 }
566             }
567         }
568         foreach my $field (keys %{$rules->{defaults}}) {
569             unless (defined $record_document->{$field}) {
570                 $record_document->{$field} = $rules->{defaults}->{$field};
571             }
572         }
573         foreach my $field (@{$rules->{sum}}) {
574             if (defined $record_document->{$field}) {
575                 # TODO: validate numeric? filter?
576                 # TODO: Or should only accept fields without nested values?
577                 # TODO: Quick and dirty, improve if needed
578                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
579             }
580         }
581         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
582         foreach my $field (@{$rules->{isbn}}) {
583             if (defined $record_document->{$field}) {
584                 my @isbns = ();
585                 foreach my $input_isbn (@{$record_document->{$field}}) {
586                     my $isbn = Business::ISBN->new($input_isbn);
587                     if (defined $isbn && $isbn->is_valid) {
588                         my $isbn13 = $isbn->as_isbn13->as_string;
589                         push @isbns, $isbn13;
590                         $isbn13 =~ s/\-//g;
591                         push @isbns, $isbn13;
592
593                         my $isbn10 = $isbn->as_isbn10;
594                         if ($isbn10) {
595                             $isbn10 = $isbn10->as_string;
596                             push @isbns, $isbn10;
597                             $isbn10 =~ s/\-//g;
598                             push @isbns, $isbn10;
599                         }
600                     } else {
601                         push @isbns, $input_isbn;
602                     }
603                 }
604                 $record_document->{$field} = \@isbns;
605             }
606         }
607
608         # Remove duplicate values and collapse sort fields
609         foreach my $field (keys %{$record_document}) {
610             if (ref($record_document->{$field}) eq 'ARRAY') {
611                 @{$record_document->{$field}} = do {
612                     my %seen;
613                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
614                 };
615                 if ($field =~ /__sort$/) {
616                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
617                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
618                 }
619             }
620         }
621
622         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
623         $record->encoding('UTF-8');
624         if ($use_array) {
625             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
626             $record_document->{'marc_format'} = 'ARRAY';
627         } else {
628             my @warnings;
629             {
630                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
631                 local $SIG{__WARN__} = sub {
632                     push @warnings, $_[0];
633                 };
634                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
635             }
636             if (@warnings) {
637                 # Suppress warnings if record length exceeded
638                 unless (substr($record->leader(), 0, 5) eq '99999') {
639                     foreach my $warning (@warnings) {
640                         carp $warning;
641                     }
642                 }
643                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
644                 $record_document->{'marc_format'} = 'MARCXML';
645             }
646             else {
647                 $record_document->{'marc_format'} = 'base64ISO2709';
648             }
649         }
650         push @record_documents, $record_document;
651     }
652     return \@record_documents;
653 }
654
655 =head2 _marc_to_array($record)
656
657     my @fields = _marc_to_array($record)
658
659 Convert a MARC::Record to an array modeled after MARC-in-JSON
660 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
661
662 =over 4
663
664 =item C<$record>
665
666 A MARC::Record object
667
668 =back
669
670 =cut
671
672 sub _marc_to_array {
673     my ($self, $record) = @_;
674
675     my $data = {
676         leader => $record->leader(),
677         fields => []
678     };
679     for my $field ($record->fields()) {
680         my $tag = $field->tag();
681         if ($field->is_control_field()) {
682             push @{$data->{fields}}, {$tag => $field->data()};
683         } else {
684             my $subfields = ();
685             foreach my $subfield ($field->subfields()) {
686                 my ($code, $contents) = @{$subfield};
687                 push @{$subfields}, {$code => $contents};
688             }
689             push @{$data->{fields}}, {
690                 $tag => {
691                     ind1 => $field->indicator(1),
692                     ind2 => $field->indicator(2),
693                     subfields => $subfields
694                 }
695             };
696         }
697     }
698     return $data;
699 }
700
701 =head2 _array_to_marc($data)
702
703     my $record = _array_to_marc($data)
704
705 Convert an array modeled after MARC-in-JSON to a MARC::Record
706
707 =over 4
708
709 =item C<$data>
710
711 An array modeled after MARC-in-JSON
712 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
713
714 =back
715
716 =cut
717
718 sub _array_to_marc {
719     my ($self, $data) = @_;
720
721     my $record = MARC::Record->new();
722
723     $record->leader($data->{leader});
724     for my $field (@{$data->{fields}}) {
725         my $tag = (keys %{$field})[0];
726         $field = $field->{$tag};
727         my $marc_field;
728         if (ref($field) eq 'HASH') {
729             my @subfields;
730             foreach my $subfield (@{$field->{subfields}}) {
731                 my $code = (keys %{$subfield})[0];
732                 push @subfields, $code;
733                 push @subfields, $subfield->{$code};
734             }
735             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
736         } else {
737             $marc_field = MARC::Field->new($tag, $field)
738         }
739         $record->append_fields($marc_field);
740     }
741 ;
742     return $record;
743 }
744
745 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
746
747     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
748
749 Get mappings, an internal data structure later used by
750 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
751 data for a MARC mapping.
752
753 The returned C<$mappings> is not to to be confused with mappings provided by
754 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
755 provided by C<_foreach_mapping> and expands it to this internal data structure.
756 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
757 is then applied to each MARC target (leader, control field data, subfield or
758 joined subfields) and integrated into the mapping rules data structure used in
759 C<marc_records_to_documents> to transform MARC records into Elasticsearch
760 documents.
761
762 =over 4
763
764 =item C<$facet>
765
766 Boolean indicating whether to create a facet field for this mapping.
767
768 =item C<$suggestible>
769
770 Boolean indicating whether to create a suggestion field for this mapping.
771
772 =item C<$sort>
773
774 Boolean indicating whether to create a sort field for this mapping.
775
776 =item C<$search>
777
778 Boolean indicating whether to create a search field for this mapping.
779
780 =item C<$target_name>
781
782 Elasticsearch document target field name.
783
784 =item C<$target_type>
785
786 Elasticsearch document target field type.
787
788 =item C<$range>
789
790 An optional range as a string in the format "<START>-<END>" or "<START>",
791 where "<START>" and "<END>" are integers specifying a range that will be used
792 for extracting a substring from MARC data as Elasticsearch field target value.
793
794 The first character position is "0", and the range is inclusive,
795 so "0-2" means the first three characters of MARC data.
796
797 If only "<START>" is provided only one character at position "<START>" will
798 be extracted.
799
800 =back
801
802 =cut
803
804 sub _field_mappings {
805     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
806     my %mapping_defaults = ();
807     my @mappings;
808
809     my $substr_args = undef;
810     if (defined $range) {
811         # TODO: use value_callback instead?
812         my ($start, $end) = map(int, split /-/, $range, 2);
813         $substr_args = [$start];
814         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
815     }
816     my $default_options = {};
817     if ($substr_args) {
818         $default_options->{substr} = $substr_args;
819     }
820
821     # TODO: Should probably have per type value callback/hook
822     # but hard code for now
823     if ($target_type eq 'boolean') {
824         $default_options->{value_callbacks} //= [];
825         push @{$default_options->{value_callbacks}}, sub {
826             my ($value) = @_;
827             # Trim whitespace at both ends
828             $value =~ s/^\s+|\s+$//g;
829             return $value ? 'true' : 'false';
830         };
831     }
832
833     if ($search) {
834         my $mapping = [$target_name, $default_options];
835         push @mappings, $mapping;
836     }
837
838     my @suffixes = ();
839     push @suffixes, 'facet' if $facet;
840     push @suffixes, 'suggestion' if $suggestible;
841     push @suffixes, 'sort' if !defined $sort || $sort;
842
843     foreach my $suffix (@suffixes) {
844         my $mapping = ["${target_name}__$suffix"];
845         # TODO: Hack, fix later in less hideous manner
846         if ($suffix eq 'suggestion') {
847             push @{$mapping}, {%{$default_options}, property => 'input'};
848         }
849         else {
850             # Important! Make shallow clone, or we end up with the same hashref
851             # shared by all mappings
852             push @{$mapping}, {%{$default_options}};
853         }
854         push @mappings, $mapping;
855     }
856     return @mappings;
857 };
858
859 =head2 _get_marc_mapping_rules
860
861     my $mapping_rules = $self->_get_marc_mapping_rules()
862
863 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
864
865 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
866 each call to C<MARC::Record>->field) we create an optimized structure of mapping
867 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
868
869 We can then iterate through all MARC fields for each record and apply all relevant
870 rules once per fields instead of retreiving fields multiple times for each mapping rule
871 which is terribly slow.
872
873 =cut
874
875 # TODO: This structure can be used for processing multiple MARC::Records so is currently
876 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
877 # memory cache which it is currently not. The performance gain of caching
878 # would probably be marginal, but to do this could be a further improvement.
879
880 sub _get_marc_mapping_rules {
881     my ($self) = @_;
882     my $marcflavour = lc C4::Context->preference('marcflavour');
883     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
884     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
885     my $rules = {
886         'leader' => [],
887         'control_fields' => {},
888         'data_fields' => {},
889         'sum' => [],
890         'isbn' => [],
891         'defaults' => {}
892     };
893
894     $self->_foreach_mapping(sub {
895         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
896         return if $marc_type ne $marcflavour;
897
898         if ($type eq 'sum') {
899             push @{$rules->{sum}}, $name;
900             push @{$rules->{sum}}, $name."__sort" if $sort;
901         }
902         elsif ($type eq 'isbn') {
903             push @{$rules->{isbn}}, $name;
904         }
905         elsif ($type eq 'boolean') {
906             # boolean gets special handling, if value doesn't exist for a field,
907             # it is set to false
908             $rules->{defaults}->{$name} = 'false';
909         }
910
911         if ($marc_field =~ $field_spec_regexp) {
912             my $field_tag = $1;
913
914             my @subfields;
915             my @subfield_groups;
916             # Parse and separate subfields form subfield groups
917             if (defined $2) {
918                 my $subfield_group = '';
919                 my $open_group = 0;
920
921                 foreach my $token (split //, $2) {
922                     if ($token eq "(") {
923                         if ($open_group) {
924                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
925                                 "Unmatched opening parenthesis for $marc_field"
926                             );
927                         }
928                         else {
929                             $open_group = 1;
930                         }
931                     }
932                     elsif ($token eq ")") {
933                         if ($open_group) {
934                             if ($subfield_group) {
935                                 push @subfield_groups, $subfield_group;
936                                 $subfield_group = '';
937                             }
938                             $open_group = 0;
939                         }
940                         else {
941                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
942                                 "Unmatched closing parenthesis for $marc_field"
943                             );
944                         }
945                     }
946                     elsif ($open_group) {
947                         $subfield_group .= $token;
948                     }
949                     else {
950                         push @subfields, $token;
951                     }
952                 }
953             }
954             else {
955                 push @subfields, '*';
956             }
957
958             my $range = defined $3 ? $3 : undef;
959             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
960             if ($field_tag < 10) {
961                 $rules->{control_fields}->{$field_tag} //= [];
962                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
963             }
964             else {
965                 $rules->{data_fields}->{$field_tag} //= {};
966                 foreach my $subfield (@subfields) {
967                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
968                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
969                 }
970                 foreach my $subfield_group (@subfield_groups) {
971                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
972                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
973                 }
974             }
975         }
976         elsif ($marc_field =~ $leader_regexp) {
977             my $range = defined $1 ? $1 : undef;
978             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
979             push @{$rules->{leader}}, @mappings;
980         }
981         else {
982             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
983                 "Invalid MARC field expression: $marc_field"
984             );
985         }
986     });
987
988     # Marc-flavour specific rule tweaks, could/should also provide hook for this
989     if ($marcflavour eq 'marc21') {
990         # Nonfiling characters processing for sort fields
991         my %title_fields;
992         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
993             # Format is: nonfiling characters indicator => field names list
994             %title_fields = (
995                 1 => [130, 630, 730, 740],
996                 2 => [222, 240, 242, 243, 245, 440, 830]
997             );
998         }
999         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1000             %title_fields = (
1001                 1 => [730],
1002                 2 => [130, 430, 530]
1003             );
1004         }
1005         foreach my $indicator (keys %title_fields) {
1006             foreach my $field_tag (@{$title_fields{$indicator}}) {
1007                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1008                 foreach my $mapping (@{$mappings}) {
1009                     if ($mapping->[0] =~ /__sort$/) {
1010                         # Mark this as to be processed for nonfiling characters indicator
1011                         # later on in _process_mappings
1012                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1013                     }
1014                 }
1015             }
1016         }
1017     }
1018
1019     return $rules;
1020 }
1021
1022 =head2 _foreach_mapping
1023
1024     $self->_foreach_mapping(
1025         sub {
1026             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1027                 $marc_field )
1028               = @_;
1029             return unless $marc_type eq 'marc21';
1030             print "Data comes from: " . $marc_field . "\n";
1031         }
1032     );
1033
1034 This allows you to apply a function to each entry in the elasticsearch mappings
1035 table, in order to build the mappings for whatever is needed.
1036
1037 In the provided function, the files are:
1038
1039 =over 4
1040
1041 =item C<$name>
1042
1043 The field name for elasticsearch (corresponds to the 'mapping' column in the
1044 database.
1045
1046 =item C<$type>
1047
1048 The type for this value, e.g. 'string'.
1049
1050 =item C<$facet>
1051
1052 True if this value should be facetised. This only really makes sense if the
1053 field is understood by the facet processing code anyway.
1054
1055 =item C<$sort>
1056
1057 True if this is a field that a) needs special sort handling, and b) if it
1058 should be sorted on. False if a) but not b). Undef if not a). This allows,
1059 for example, author to be sorted on but not everything marked with "author"
1060 to be included in that sort.
1061
1062 =item C<$marc_type>
1063
1064 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1065 'unimarc', 'normarc'.
1066
1067 =item C<$marc_field>
1068
1069 A string that describes the MARC field that contains the data to extract.
1070 These are of a form suited to Catmandu's MARC fixers.
1071
1072 =back
1073
1074 =cut
1075
1076 sub _foreach_mapping {
1077     my ( $self, $sub ) = @_;
1078
1079     # TODO use a caching framework here
1080     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1081         {
1082             'search_marc_map.index_name' => $self->index,
1083         },
1084         {   join => { search_marc_to_fields => 'search_marc_map' },
1085             '+select' => [
1086                 'search_marc_to_fields.facet',
1087                 'search_marc_to_fields.suggestible',
1088                 'search_marc_to_fields.sort',
1089                 'search_marc_to_fields.search',
1090                 'search_marc_map.marc_type',
1091                 'search_marc_map.marc_field',
1092             ],
1093             '+as'     => [
1094                 'facet',
1095                 'suggestible',
1096                 'sort',
1097                 'search',
1098                 'marc_type',
1099                 'marc_field',
1100             ],
1101         }
1102     );
1103
1104     while ( my $search_field = $search_fields->next ) {
1105         $sub->(
1106             # Force lower case on indexed field names for case insensitive
1107             # field name searches
1108             lc($search_field->name),
1109             $search_field->type,
1110             $search_field->get_column('facet'),
1111             $search_field->get_column('suggestible'),
1112             $search_field->get_column('sort'),
1113             $search_field->get_column('search'),
1114             $search_field->get_column('marc_type'),
1115             $search_field->get_column('marc_field'),
1116         );
1117     }
1118 }
1119
1120 =head2 process_error
1121
1122     die process_error($@);
1123
1124 This parses an Elasticsearch error message and produces a human-readable
1125 result from it. This result is probably missing all the useful information
1126 that you might want in diagnosing an issue, so the warning is also logged.
1127
1128 Note that currently the resulting message is not internationalised. This
1129 will happen eventually by some method or other.
1130
1131 =cut
1132
1133 sub process_error {
1134     my ($self, $msg) = @_;
1135
1136     warn $msg; # simple logging
1137
1138     # This is super-primitive
1139     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1140
1141     return "Unable to perform your search. Please try again.\n";
1142 }
1143
1144 =head2 _read_configuration
1145
1146     my $conf = _read_configuration();
1147
1148 Reads the I<configuration file> and returns a hash structure with the
1149 configuration information. It raises an exception if mandatory entries
1150 are missing.
1151
1152 The hashref structure has the following form:
1153
1154     {
1155         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1156         'index_name' => 'koha_instance',
1157     }
1158
1159 This is configured by the following in the C<config> block in koha-conf.xml:
1160
1161     <elasticsearch>
1162         <server>127.0.0.1:9200</server>
1163         <server>anotherserver:9200</server>
1164         <index_name>koha_instance</index_name>
1165     </elasticsearch>
1166
1167 =cut
1168
1169 sub _read_configuration {
1170
1171     my $configuration;
1172
1173     my $conf = C4::Context->config('elasticsearch');
1174     Koha::Exceptions::Config::MissingEntry->throw(
1175         "Missing 'elasticsearch' block in config file")
1176       unless defined $conf;
1177
1178     if ( $conf && $conf->{server} ) {
1179         my $nodes = $conf->{server};
1180         if ( ref($nodes) eq 'ARRAY' ) {
1181             $configuration->{nodes} = $nodes;
1182         }
1183         else {
1184             $configuration->{nodes} = [$nodes];
1185         }
1186     }
1187     else {
1188         Koha::Exceptions::Config::MissingEntry->throw(
1189             "Missing 'server' entry in config file for elasticsearch");
1190     }
1191
1192     if ( defined $conf->{index_name} ) {
1193         $configuration->{index_name} = $conf->{index_name};
1194     }
1195     else {
1196         Koha::Exceptions::Config::MissingEntry->throw(
1197             "Missing 'index_name' entry in config file for elasticsearch");
1198     }
1199
1200     return $configuration;
1201 }
1202
1203 =head2 get_facetable_fields
1204
1205 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1206
1207 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1208
1209 =cut
1210
1211 sub get_facetable_fields {
1212     my ($self) = @_;
1213
1214     # These should correspond to the ES field names, as opposed to the CCL
1215     # things that zebra uses.
1216     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1217     my @faceted_fields = Koha::SearchFields->search(
1218         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1219     );
1220     my @not_faceted_fields = Koha::SearchFields->search(
1221         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1222     );
1223     # This could certainly be improved
1224     return ( @faceted_fields, @not_faceted_fields );
1225 }
1226
1227 1;
1228
1229 __END__
1230
1231 =head1 AUTHOR
1232
1233 =over 4
1234
1235 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1236
1237 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1238
1239 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1240
1241 =back
1242
1243 =cut