Bug 17885: Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings throws...
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29
30 use Carp;
31 use Clone qw(clone);
32 use JSON;
33 use Modern::Perl;
34 use Readonly;
35 use Search::Elasticsearch;
36 use Try::Tiny;
37 use YAML::Syck;
38
39 use List::Util qw( sum0 reduce );
40 use MARC::File::XML;
41 use MIME::Base64;
42 use Encode qw(encode);
43 use Business::ISBN;
44
45 __PACKAGE__->mk_ro_accessors(qw( index ));
46 __PACKAGE__->mk_accessors(qw( sort_fields ));
47
48 # Constants to refer to the standard index names
49 Readonly our $BIBLIOS_INDEX     => 'biblios';
50 Readonly our $AUTHORITIES_INDEX => 'authorities';
51
52 =head1 NAME
53
54 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
55
56 =head1 ACCESSORS
57
58 =over 4
59
60 =item index
61
62 The name of the index to use, generally 'biblios' or 'authorities'.
63
64 =back
65
66 =head1 FUNCTIONS
67
68 =cut
69
70 sub new {
71     my $class = shift @_;
72     my $self = $class->SUPER::new(@_);
73     # Check for a valid index
74     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
75     return $self;
76 }
77
78 =head2 get_elasticsearch
79
80     my $elasticsearch_client = $self->get_elasticsearch();
81
82 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
83 instance level and will be reused if method is called multiple times.
84
85 =cut
86
87 sub get_elasticsearch {
88     my $self = shift @_;
89     unless (defined $self->{elasticsearch}) {
90         my $conf = $self->get_elasticsearch_params();
91         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
92     }
93     return $self->{elasticsearch};
94 }
95
96 =head2 get_elasticsearch_params
97
98     my $params = $self->get_elasticsearch_params();
99
100 This provides a hashref that contains the parameters for connecting to the
101 ElasicSearch servers, in the form:
102
103     {
104         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
105         'index_name' => 'koha_instance_index',
106     }
107
108 This is configured by the following in the C<config> block in koha-conf.xml:
109
110     <elasticsearch>
111         <server>127.0.0.1:9200</server>
112         <server>anotherserver:9200</server>
113         <index_name>koha_instance</index_name>
114     </elasticsearch>
115
116 =cut
117
118 sub get_elasticsearch_params {
119     my ($self) = @_;
120
121     # Copy the hash so that we're not modifying the original
122     my $conf = C4::Context->config('elasticsearch');
123     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
124     my $es = { %{ $conf } };
125
126     # Helpfully, the multiple server lines end up in an array for us anyway
127     # if there are multiple ones, but not if there's only one.
128     my $server = $es->{server};
129     delete $es->{server};
130     if ( ref($server) eq 'ARRAY' ) {
131
132         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
133         $es->{nodes} = $server;
134     }
135     elsif ($server) {
136         $es->{nodes} = [$server];
137     }
138     else {
139         die "No elasticsearch servers were specified in koha-conf.xml.\n";
140     }
141     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
142       if ( !$es->{index_name} );
143     # Append the name of this particular index to our namespace
144     $es->{index_name} .= '_' . $self->index;
145
146     $es->{key_prefix} = 'es_';
147     $es->{client} //= '5_0::Direct';
148     $es->{cxn_pool} //= 'Static';
149     $es->{request_timeout} //= 60;
150
151     return $es;
152 }
153
154 =head2 get_elasticsearch_settings
155
156     my $settings = $self->get_elasticsearch_settings();
157
158 This provides the settings provided to Elasticsearch when an index is created.
159 These can do things like define tokenization methods.
160
161 A hashref containing the settings is returned.
162
163 =cut
164
165 sub get_elasticsearch_settings {
166     my ($self) = @_;
167
168     # Use state to speed up repeated calls
169     state $settings = undef;
170     if (!defined $settings) {
171         my $config_file = C4::Context->config('elasticsearch_index_config');
172         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
173         $settings = LoadFile( $config_file );
174     }
175
176     return $settings;
177 }
178
179 =head2 get_elasticsearch_mappings
180
181     my $mappings = $self->get_elasticsearch_mappings();
182
183 This provides the mappings that get passed to Elasticsearch when an index is
184 created.
185
186 =cut
187
188 sub get_elasticsearch_mappings {
189     my ($self) = @_;
190
191     # Use state to speed up repeated calls
192     state %all_mappings;
193     state %sort_fields;
194
195     if (!defined $all_mappings{$self->index}) {
196         $sort_fields{$self->index} = {};
197         # Clone the general mapping to break ties with the original hash
198         my $mappings = {
199             data => clone(_get_elasticsearch_field_config('general', ''))
200         };
201         my $marcflavour = lc C4::Context->preference('marcflavour');
202         $self->_foreach_mapping(
203             sub {
204                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 if ($search) {
221                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222                 }
223
224                 if ($facet) {
225                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226                 }
227                 if ($suggestible) {
228                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
229                 }
230                 # Sort is a bit special as it can be true, false, undef.
231                 # We care about "true" or "undef",
232                 # "undef" means to do the default thing, which is make it sortable.
233                 if (!defined $sort || $sort) {
234                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
235                     $sort_fields{$self->index}{$name} = 1;
236                 }
237             }
238         );
239         $all_mappings{$self->index} = $mappings;
240     }
241     $self->sort_fields(\%{$sort_fields{$self->index}});
242
243     return $all_mappings{$self->index};
244 }
245
246 =head2 _get_elasticsearch_field_config
247
248 Get the Elasticsearch field config for the given purpose and data type.
249
250 $mapping = _get_elasticsearch_field_config('search', 'text');
251
252 =cut
253
254 sub _get_elasticsearch_field_config {
255
256     my ( $purpose, $type ) = @_;
257
258     # Use state to speed up repeated calls
259     state $settings = undef;
260     if (!defined $settings) {
261         my $config_file = C4::Context->config('elasticsearch_field_config');
262         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
263         $settings = LoadFile( $config_file );
264     }
265
266     if (!defined $settings->{$purpose}) {
267         die "Field purpose $purpose not defined in field config";
268     }
269     if ($type eq '') {
270         return $settings->{$purpose};
271     }
272     if (defined $settings->{$purpose}{$type}) {
273         return $settings->{$purpose}{$type};
274     }
275     if (defined $settings->{$purpose}{'default'}) {
276         return $settings->{$purpose}{'default'};
277     }
278     return;
279 }
280
281 =head2 _load_elasticsearch_mappings
282
283 Load Elasticsearch mappings in the format of mappings.yaml.
284
285 $indexes = _load_elasticsearch_mappings();
286
287 =cut
288
289 sub _load_elasticsearch_mappings {
290     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
291     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
292     return LoadFile( $mappings_yaml );
293 }
294
295 sub reset_elasticsearch_mappings {
296     my ( $self ) = @_;
297     my $indexes = $self->_load_elasticsearch_mappings();
298
299     Koha::SearchMarcMaps->delete;
300     Koha::SearchFields->delete;
301
302     while ( my ( $index_name, $fields ) = each %$indexes ) {
303         while ( my ( $field_name, $data ) = each %$fields ) {
304
305             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
306
307             # Set default values
308             $sf_params{staff_client} //= 1;
309             $sf_params{opac} //= 1;
310
311             $sf_params{name} = $field_name;
312
313             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
314
315             my $mappings = $data->{mappings};
316             for my $mapping ( @$mappings ) {
317                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
318                     index_name => $index_name,
319                     marc_type => $mapping->{marc_type},
320                     marc_field => $mapping->{marc_field}
321                 });
322                 $search_field->add_to_search_marc_maps($marc_field, {
323                     facet => $mapping->{facet} || 0,
324                     suggestible => $mapping->{suggestible} || 0,
325                     sort => $mapping->{sort},
326                     search => $mapping->{search} // 1
327                 });
328             }
329         }
330     }
331 }
332
333 # This overrides the accessor provided by Class::Accessor so that if
334 # sort_fields isn't set, then it'll generate it.
335 sub sort_fields {
336     my $self = shift;
337     if (@_) {
338         $self->_sort_fields_accessor(@_);
339         return;
340     }
341     my $val = $self->_sort_fields_accessor();
342     return $val if $val;
343
344     # This will populate the accessor as a side effect
345     $self->get_elasticsearch_mappings();
346     return $self->_sort_fields_accessor();
347 }
348
349 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
350
351     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
352
353 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
354 Since we group all mappings by MARC field targets C<$mappings> will contain
355 all targets for C<$data> and thus we need to fetch the MARC field only once.
356 C<$mappings> will be applied to C<$record_document> and new field values added.
357 The method has no return value.
358
359 =over 4
360
361 =item C<$mappings>
362
363 Arrayref of mappings containing arrayrefs in the format
364 [C<$target>, C<$options>] where C<$target> is the name of the target field and
365 C<$options> is a hashref containing processing directives for this particular
366 mapping.
367
368 =item C<$data>
369
370 The source data from a MARC record field.
371
372 =item C<$record_document>
373
374 Hashref representing the Elasticsearch document on which mappings should be
375 applied.
376
377 =item C<$altscript>
378
379 A boolean value indicating whether an alternate script presentation is being
380 processed.
381
382 =back
383
384 =cut
385
386 sub _process_mappings {
387     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
388     foreach my $mapping (@{$mappings}) {
389         my ($target, $options) = @{$mapping};
390
391         # Don't process sort fields for alternate scripts
392         my $sort = $target =~ /__sort$/;
393         if ($sort && $altscript) {
394             next;
395         }
396
397         # Copy (scalar) data since can have multiple targets
398         # with differing options for (possibly) mutating data
399         # so need a different copy for each
400         my $_data = $data;
401         $record_document->{$target} //= [];
402         if (defined $options->{substr}) {
403             my ($start, $length) = @{$options->{substr}};
404             $_data = length($data) > $start ? substr $data, $start, $length : '';
405         }
406         if (defined $options->{value_callbacks}) {
407             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
408         }
409         if (defined $options->{property}) {
410             $_data = {
411                 $options->{property} => $_data
412             }
413         }
414         push @{$record_document->{$target}}, $_data;
415     }
416 }
417
418 =head2 marc_records_to_documents($marc_records)
419
420     my $record_documents = $self->marc_records_to_documents($marc_records);
421
422 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
423
424 Returns array of hash references, representing Elasticsearch documents,
425 acceptable as body payload in C<Search::Elasticsearch> requests.
426
427 =over 4
428
429 =item C<$marc_documents>
430
431 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
432
433 =back
434
435 =cut
436
437 sub marc_records_to_documents {
438     my ($self, $records) = @_;
439     my $rules = $self->_get_marc_mapping_rules();
440     my $control_fields_rules = $rules->{control_fields};
441     my $data_fields_rules = $rules->{data_fields};
442     my $marcflavour = lc C4::Context->preference('marcflavour');
443     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
444
445     my @record_documents;
446
447     foreach my $record (@{$records}) {
448         my $record_document = {};
449         my $mappings = $rules->{leader};
450         if ($mappings) {
451             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
452         }
453         foreach my $field ($record->fields()) {
454             if ($field->is_control_field()) {
455                 my $mappings = $control_fields_rules->{$field->tag()};
456                 if ($mappings) {
457                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
458                 }
459             }
460             else {
461                 my $tag = $field->tag();
462                 # Handle alternate scripts in MARC 21
463                 my $altscript = 0;
464                 if ($marcflavour eq 'marc21' && $tag eq '880') {
465                     my $sub6 = $field->subfield('6');
466                     if ($sub6 =~ /^(...)-\d+/) {
467                         $tag = $1;
468                         $altscript = 1;
469                     }
470                 }
471
472                 my $data_field_rules = $data_fields_rules->{$tag};
473
474                 if ($data_field_rules) {
475                     my $subfields_mappings = $data_field_rules->{subfields};
476                     my $wildcard_mappings = $subfields_mappings->{'*'};
477                     foreach my $subfield ($field->subfields()) {
478                         my ($code, $data) = @{$subfield};
479                         my $mappings = $subfields_mappings->{$code} // [];
480                         if ($wildcard_mappings) {
481                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
482                         }
483                         if (@{$mappings}) {
484                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
485                         }
486                     }
487
488                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
489                     if ($subfields_join_mappings) {
490                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
491                             # Map each subfield to values, remove empty values, join with space
492                             my $data = join(
493                                 ' ',
494                                 grep(
495                                     $_,
496                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
497                                 )
498                             );
499                             if ($data) {
500                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
501                             }
502                         }
503                     }
504                 }
505             }
506         }
507         foreach my $field (keys %{$rules->{defaults}}) {
508             unless (defined $record_document->{$field}) {
509                 $record_document->{$field} = $rules->{defaults}->{$field};
510             }
511         }
512         foreach my $field (@{$rules->{sum}}) {
513             if (defined $record_document->{$field}) {
514                 # TODO: validate numeric? filter?
515                 # TODO: Or should only accept fields without nested values?
516                 # TODO: Quick and dirty, improve if needed
517                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
518             }
519         }
520         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
521         foreach my $field (@{$rules->{isbn}}) {
522             if (defined $record_document->{$field}) {
523                 my @isbns = ();
524                 foreach my $input_isbn (@{$record_document->{$field}}) {
525                     my $isbn = Business::ISBN->new($input_isbn);
526                     if (defined $isbn && $isbn->is_valid) {
527                         my $isbn13 = $isbn->as_isbn13->as_string;
528                         push @isbns, $isbn13;
529                         $isbn13 =~ s/\-//g;
530                         push @isbns, $isbn13;
531
532                         my $isbn10 = $isbn->as_isbn10;
533                         if ($isbn10) {
534                             $isbn10 = $isbn10->as_string;
535                             push @isbns, $isbn10;
536                             $isbn10 =~ s/\-//g;
537                             push @isbns, $isbn10;
538                         }
539                     } else {
540                         push @isbns, $input_isbn;
541                     }
542                 }
543                 $record_document->{$field} = \@isbns;
544             }
545         }
546
547         # Remove duplicate values and collapse sort fields
548         foreach my $field (keys %{$record_document}) {
549             if (ref($record_document->{$field}) eq 'ARRAY') {
550                 @{$record_document->{$field}} = do {
551                     my %seen;
552                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
553                 };
554                 if ($field =~ /__sort$/) {
555                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
556                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
557                 }
558             }
559         }
560
561         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
562         $record->encoding('UTF-8');
563         if ($use_array) {
564             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
565             $record_document->{'marc_format'} = 'ARRAY';
566         } else {
567             my @warnings;
568             {
569                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
570                 local $SIG{__WARN__} = sub {
571                     push @warnings, $_[0];
572                 };
573                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
574             }
575             if (@warnings) {
576                 # Suppress warnings if record length exceeded
577                 unless (substr($record->leader(), 0, 5) eq '99999') {
578                     foreach my $warning (@warnings) {
579                         carp $warning;
580                     }
581                 }
582                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
583                 $record_document->{'marc_format'} = 'MARCXML';
584             }
585             else {
586                 $record_document->{'marc_format'} = 'base64ISO2709';
587             }
588         }
589         push @record_documents, $record_document;
590     }
591     return \@record_documents;
592 }
593
594 =head2 _marc_to_array($record)
595
596     my @fields = _marc_to_array($record)
597
598 Convert a MARC::Record to an array modeled after MARC-in-JSON
599 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
600
601 =over 4
602
603 =item C<$record>
604
605 A MARC::Record object
606
607 =back
608
609 =cut
610
611 sub _marc_to_array {
612     my ($self, $record) = @_;
613
614     my $data = {
615         leader => $record->leader(),
616         fields => []
617     };
618     for my $field ($record->fields()) {
619         my $tag = $field->tag();
620         if ($field->is_control_field()) {
621             push @{$data->{fields}}, {$tag => $field->data()};
622         } else {
623             my $subfields = ();
624             foreach my $subfield ($field->subfields()) {
625                 my ($code, $contents) = @{$subfield};
626                 push @{$subfields}, {$code => $contents};
627             }
628             push @{$data->{fields}}, {
629                 $tag => {
630                     ind1 => $field->indicator(1),
631                     ind2 => $field->indicator(2),
632                     subfields => $subfields
633                 }
634             };
635         }
636     }
637     return $data;
638 }
639
640 =head2 _array_to_marc($data)
641
642     my $record = _array_to_marc($data)
643
644 Convert an array modeled after MARC-in-JSON to a MARC::Record
645
646 =over 4
647
648 =item C<$data>
649
650 An array modeled after MARC-in-JSON
651 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
652
653 =back
654
655 =cut
656
657 sub _array_to_marc {
658     my ($self, $data) = @_;
659
660     my $record = MARC::Record->new();
661
662     $record->leader($data->{leader});
663     for my $field (@{$data->{fields}}) {
664         my $tag = (keys %{$field})[0];
665         $field = $field->{$tag};
666         my $marc_field;
667         if (ref($field) eq 'HASH') {
668             my @subfields;
669             foreach my $subfield (@{$field->{subfields}}) {
670                 my $code = (keys %{$subfield})[0];
671                 push @subfields, $code;
672                 push @subfields, $subfield->{$code};
673             }
674             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
675         } else {
676             $marc_field = MARC::Field->new($tag, $field)
677         }
678         $record->append_fields($marc_field);
679     }
680 ;
681     return $record;
682 }
683
684 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
685
686     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
687
688 Get mappings, an internal data structure later used by
689 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
690 data for a MARC mapping.
691
692 The returned C<$mappings> is not to to be confused with mappings provided by
693 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
694 provided by C<_foreach_mapping> and expands it to this internal data structure.
695 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
696 is then applied to each MARC target (leader, control field data, subfield or
697 joined subfields) and integrated into the mapping rules data structure used in
698 C<marc_records_to_documents> to transform MARC records into Elasticsearch
699 documents.
700
701 =over 4
702
703 =item C<$facet>
704
705 Boolean indicating whether to create a facet field for this mapping.
706
707 =item C<$suggestible>
708
709 Boolean indicating whether to create a suggestion field for this mapping.
710
711 =item C<$sort>
712
713 Boolean indicating whether to create a sort field for this mapping.
714
715 =item C<$search>
716
717 Boolean indicating whether to create a search field for this mapping.
718
719 =item C<$target_name>
720
721 Elasticsearch document target field name.
722
723 =item C<$target_type>
724
725 Elasticsearch document target field type.
726
727 =item C<$range>
728
729 An optional range as a string in the format "<START>-<END>" or "<START>",
730 where "<START>" and "<END>" are integers specifying a range that will be used
731 for extracting a substring from MARC data as Elasticsearch field target value.
732
733 The first character position is "0", and the range is inclusive,
734 so "0-2" means the first three characters of MARC data.
735
736 If only "<START>" is provided only one character at position "<START>" will
737 be extracted.
738
739 =back
740
741 =cut
742
743 sub _field_mappings {
744     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
745     my %mapping_defaults = ();
746     my @mappings;
747
748     my $substr_args = undef;
749     if (defined $range) {
750         # TODO: use value_callback instead?
751         my ($start, $end) = map(int, split /-/, $range, 2);
752         $substr_args = [$start];
753         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
754     }
755     my $default_options = {};
756     if ($substr_args) {
757         $default_options->{substr} = $substr_args;
758     }
759
760     # TODO: Should probably have per type value callback/hook
761     # but hard code for now
762     if ($target_type eq 'boolean') {
763         $default_options->{value_callbacks} //= [];
764         push @{$default_options->{value_callbacks}}, sub {
765             my ($value) = @_;
766             # Trim whitespace at both ends
767             $value =~ s/^\s+|\s+$//g;
768             return $value ? 'true' : 'false';
769         };
770     }
771
772     if ($search) {
773         my $mapping = [$target_name, $default_options];
774         push @mappings, $mapping;
775     }
776
777     my @suffixes = ();
778     push @suffixes, 'facet' if $facet;
779     push @suffixes, 'suggestion' if $suggestible;
780     push @suffixes, 'sort' if !defined $sort || $sort;
781
782     foreach my $suffix (@suffixes) {
783         my $mapping = ["${target_name}__$suffix"];
784         # TODO: Hack, fix later in less hideous manner
785         if ($suffix eq 'suggestion') {
786             push @{$mapping}, {%{$default_options}, property => 'input'};
787         }
788         else {
789             push @{$mapping}, $default_options;
790         }
791         push @mappings, $mapping;
792     }
793     return @mappings;
794 };
795
796 =head2 _get_marc_mapping_rules
797
798     my $mapping_rules = $self->_get_marc_mapping_rules()
799
800 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
801
802 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
803 each call to C<MARC::Record>->field) we create an optimized structure of mapping
804 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
805
806 We can then iterate through all MARC fields for each record and apply all relevant
807 rules once per fields instead of retreiving fields multiple times for each mapping rule
808 which is terribly slow.
809
810 =cut
811
812 # TODO: This structure can be used for processing multiple MARC::Records so is currently
813 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
814 # memory cache which it is currently not. The performance gain of caching
815 # would probably be marginal, but to do this could be a further improvement.
816
817 sub _get_marc_mapping_rules {
818     my ($self) = @_;
819     my $marcflavour = lc C4::Context->preference('marcflavour');
820     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
821     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
822     my $rules = {
823         'leader' => [],
824         'control_fields' => {},
825         'data_fields' => {},
826         'sum' => [],
827         'isbn' => [],
828         'defaults' => {}
829     };
830
831     $self->_foreach_mapping(sub {
832         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
833         return if $marc_type ne $marcflavour;
834
835         if ($type eq 'sum') {
836             push @{$rules->{sum}}, $name;
837             push @{$rules->{sum}}, $name."__sort" if $sort;
838         }
839         elsif ($type eq 'isbn') {
840             push @{$rules->{isbn}}, $name;
841         }
842         elsif ($type eq 'boolean') {
843             # boolean gets special handling, if value doesn't exist for a field,
844             # it is set to false
845             $rules->{defaults}->{$name} = 'false';
846         }
847
848         if ($marc_field =~ $field_spec_regexp) {
849             my $field_tag = $1;
850
851             my @subfields;
852             my @subfield_groups;
853             # Parse and separate subfields form subfield groups
854             if (defined $2) {
855                 my $subfield_group = '';
856                 my $open_group = 0;
857
858                 foreach my $token (split //, $2) {
859                     if ($token eq "(") {
860                         if ($open_group) {
861                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
862                                 "Unmatched opening parenthesis for $marc_field"
863                             );
864                         }
865                         else {
866                             $open_group = 1;
867                         }
868                     }
869                     elsif ($token eq ")") {
870                         if ($open_group) {
871                             if ($subfield_group) {
872                                 push @subfield_groups, $subfield_group;
873                                 $subfield_group = '';
874                             }
875                             $open_group = 0;
876                         }
877                         else {
878                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
879                                 "Unmatched closing parenthesis for $marc_field"
880                             );
881                         }
882                     }
883                     elsif ($open_group) {
884                         $subfield_group .= $token;
885                     }
886                     else {
887                         push @subfields, $token;
888                     }
889                 }
890             }
891             else {
892                 push @subfields, '*';
893             }
894
895             my $range = defined $3 ? $3 : undef;
896             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
897             if ($field_tag < 10) {
898                 $rules->{control_fields}->{$field_tag} //= [];
899                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
900             }
901             else {
902                 $rules->{data_fields}->{$field_tag} //= {};
903                 foreach my $subfield (@subfields) {
904                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
905                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
906                 }
907                 foreach my $subfield_group (@subfield_groups) {
908                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
909                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
910                 }
911             }
912         }
913         elsif ($marc_field =~ $leader_regexp) {
914             my $range = defined $1 ? $1 : undef;
915             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
916             push @{$rules->{leader}}, @mappings;
917         }
918         else {
919             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
920                 "Invalid MARC field expression: $marc_field"
921             );
922         }
923     });
924     return $rules;
925 }
926
927 =head2 _foreach_mapping
928
929     $self->_foreach_mapping(
930         sub {
931             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
932                 $marc_field )
933               = @_;
934             return unless $marc_type eq 'marc21';
935             print "Data comes from: " . $marc_field . "\n";
936         }
937     );
938
939 This allows you to apply a function to each entry in the elasticsearch mappings
940 table, in order to build the mappings for whatever is needed.
941
942 In the provided function, the files are:
943
944 =over 4
945
946 =item C<$name>
947
948 The field name for elasticsearch (corresponds to the 'mapping' column in the
949 database.
950
951 =item C<$type>
952
953 The type for this value, e.g. 'string'.
954
955 =item C<$facet>
956
957 True if this value should be facetised. This only really makes sense if the
958 field is understood by the facet processing code anyway.
959
960 =item C<$sort>
961
962 True if this is a field that a) needs special sort handling, and b) if it
963 should be sorted on. False if a) but not b). Undef if not a). This allows,
964 for example, author to be sorted on but not everything marked with "author"
965 to be included in that sort.
966
967 =item C<$marc_type>
968
969 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
970 'unimarc', 'normarc'.
971
972 =item C<$marc_field>
973
974 A string that describes the MARC field that contains the data to extract.
975 These are of a form suited to Catmandu's MARC fixers.
976
977 =back
978
979 =cut
980
981 sub _foreach_mapping {
982     my ( $self, $sub ) = @_;
983
984     # TODO use a caching framework here
985     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
986         {
987             'search_marc_map.index_name' => $self->index,
988         },
989         {   join => { search_marc_to_fields => 'search_marc_map' },
990             '+select' => [
991                 'search_marc_to_fields.facet',
992                 'search_marc_to_fields.suggestible',
993                 'search_marc_to_fields.sort',
994                 'search_marc_to_fields.search',
995                 'search_marc_map.marc_type',
996                 'search_marc_map.marc_field',
997             ],
998             '+as'     => [
999                 'facet',
1000                 'suggestible',
1001                 'sort',
1002                 'search',
1003                 'marc_type',
1004                 'marc_field',
1005             ],
1006         }
1007     );
1008
1009     while ( my $search_field = $search_fields->next ) {
1010         $sub->(
1011             # Force lower case on indexed field names for case insensitive
1012             # field name searches
1013             lc($search_field->name),
1014             $search_field->type,
1015             $search_field->get_column('facet'),
1016             $search_field->get_column('suggestible'),
1017             $search_field->get_column('sort'),
1018             $search_field->get_column('search'),
1019             $search_field->get_column('marc_type'),
1020             $search_field->get_column('marc_field'),
1021         );
1022     }
1023 }
1024
1025 =head2 process_error
1026
1027     die process_error($@);
1028
1029 This parses an Elasticsearch error message and produces a human-readable
1030 result from it. This result is probably missing all the useful information
1031 that you might want in diagnosing an issue, so the warning is also logged.
1032
1033 Note that currently the resulting message is not internationalised. This
1034 will happen eventually by some method or other.
1035
1036 =cut
1037
1038 sub process_error {
1039     my ($self, $msg) = @_;
1040
1041     warn $msg; # simple logging
1042
1043     # This is super-primitive
1044     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1045
1046     return "Unable to perform your search. Please try again.\n";
1047 }
1048
1049 =head2 _read_configuration
1050
1051     my $conf = _read_configuration();
1052
1053 Reads the I<configuration file> and returns a hash structure with the
1054 configuration information. It raises an exception if mandatory entries
1055 are missing.
1056
1057 The hashref structure has the following form:
1058
1059     {
1060         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1061         'index_name' => 'koha_instance',
1062     }
1063
1064 This is configured by the following in the C<config> block in koha-conf.xml:
1065
1066     <elasticsearch>
1067         <server>127.0.0.1:9200</server>
1068         <server>anotherserver:9200</server>
1069         <index_name>koha_instance</index_name>
1070     </elasticsearch>
1071
1072 =cut
1073
1074 sub _read_configuration {
1075
1076     my $configuration;
1077
1078     my $conf = C4::Context->config('elasticsearch');
1079     Koha::Exceptions::Config::MissingEntry->throw(
1080         "Missing 'elasticsearch' block in config file")
1081       unless defined $conf;
1082
1083     if ( $conf && $conf->{server} ) {
1084         my $nodes = $conf->{server};
1085         if ( ref($nodes) eq 'ARRAY' ) {
1086             $configuration->{nodes} = $nodes;
1087         }
1088         else {
1089             $configuration->{nodes} = [$nodes];
1090         }
1091     }
1092     else {
1093         Koha::Exceptions::Config::MissingEntry->throw(
1094             "Missing 'server' entry in config file for elasticsearch");
1095     }
1096
1097     if ( defined $conf->{index_name} ) {
1098         $configuration->{index_name} = $conf->{index_name};
1099     }
1100     else {
1101         Koha::Exceptions::Config::MissingEntry->throw(
1102             "Missing 'index_name' entry in config file for elasticsearch");
1103     }
1104
1105     return $configuration;
1106 }
1107
1108 =head2 get_facetable_fields
1109
1110 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1111
1112 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1113
1114 =cut
1115
1116 sub get_facetable_fields {
1117     my ($self) = @_;
1118
1119     # These should correspond to the ES field names, as opposed to the CCL
1120     # things that zebra uses.
1121     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1122     my @faceted_fields = Koha::SearchFields->search(
1123         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1124     );
1125     my @not_faceted_fields = Koha::SearchFields->search(
1126         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1127     );
1128     # This could certainly be improved
1129     return ( @faceted_fields, @not_faceted_fields );
1130 }
1131
1132 1;
1133
1134 __END__
1135
1136 =head1 AUTHOR
1137
1138 =over 4
1139
1140 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1141
1142 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1143
1144 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1145
1146 =back
1147
1148 =cut