Bug 20589: Add field boosting and use query_string fields parameter
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use Clone qw(clone);
31 use JSON;
32 use Modern::Perl;
33 use Readonly;
34 use Search::Elasticsearch;
35 use Try::Tiny;
36 use YAML::Syck;
37
38 use List::Util qw( sum0 reduce );
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42 use Business::ISBN;
43
44 __PACKAGE__->mk_ro_accessors(qw( index ));
45 __PACKAGE__->mk_accessors(qw( sort_fields ));
46
47 # Constants to refer to the standard index names
48 Readonly our $BIBLIOS_INDEX     => 'biblios';
49 Readonly our $AUTHORITIES_INDEX => 'authorities';
50
51 =head1 NAME
52
53 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
54
55 =head1 ACCESSORS
56
57 =over 4
58
59 =item index
60
61 The name of the index to use, generally 'biblios' or 'authorities'.
62
63 =back
64
65 =head1 FUNCTIONS
66
67 =cut
68
69 sub new {
70     my $class = shift @_;
71     my $self = $class->SUPER::new(@_);
72     # Check for a valid index
73     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
74     return $self;
75 }
76
77 =head2 get_elasticsearch
78
79     my $elasticsearch_client = $self->get_elasticsearch();
80
81 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
82 instance level and will be reused if method is called multiple times.
83
84 =cut
85
86 sub get_elasticsearch {
87     my $self = shift @_;
88     unless (defined $self->{elasticsearch}) {
89         my $conf = $self->get_elasticsearch_params();
90         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
91     }
92     return $self->{elasticsearch};
93 }
94
95 =head2 get_elasticsearch_params
96
97     my $params = $self->get_elasticsearch_params();
98
99 This provides a hashref that contains the parameters for connecting to the
100 ElasicSearch servers, in the form:
101
102     {
103         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
104         'index_name' => 'koha_instance_index',
105     }
106
107 This is configured by the following in the C<config> block in koha-conf.xml:
108
109     <elasticsearch>
110         <server>127.0.0.1:9200</server>
111         <server>anotherserver:9200</server>
112         <index_name>koha_instance</index_name>
113     </elasticsearch>
114
115 =cut
116
117 sub get_elasticsearch_params {
118     my ($self) = @_;
119
120     # Copy the hash so that we're not modifying the original
121     my $conf = C4::Context->config('elasticsearch');
122     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
123     my $es = { %{ $conf } };
124
125     # Helpfully, the multiple server lines end up in an array for us anyway
126     # if there are multiple ones, but not if there's only one.
127     my $server = $es->{server};
128     delete $es->{server};
129     if ( ref($server) eq 'ARRAY' ) {
130
131         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
132         $es->{nodes} = $server;
133     }
134     elsif ($server) {
135         $es->{nodes} = [$server];
136     }
137     else {
138         die "No elasticsearch servers were specified in koha-conf.xml.\n";
139     }
140     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
141       if ( !$es->{index_name} );
142     # Append the name of this particular index to our namespace
143     $es->{index_name} .= '_' . $self->index;
144
145     $es->{key_prefix} = 'es_';
146     $es->{client} //= '5_0::Direct';
147     $es->{cxn_pool} //= 'Static';
148     $es->{request_timeout} //= 60;
149
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         # Clone the general mapping to break ties with the original hash
197         my $mappings = {
198             data => clone(_get_elasticsearch_field_config('general', ''))
199         };
200         my $marcflavour = lc C4::Context->preference('marcflavour');
201         $self->_foreach_mapping(
202             sub {
203                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
204                 return if $marc_type ne $marcflavour;
205                 # TODO if this gets any sort of complexity to it, it should
206                 # be broken out into its own function.
207
208                 # TODO be aware of date formats, but this requires pre-parsing
209                 # as ES will simply reject anything with an invalid date.
210                 my $es_type = 'text';
211                 if ($type eq 'boolean') {
212                     $es_type = 'boolean';
213                 } elsif ($type eq 'number' || $type eq 'sum') {
214                     $es_type = 'integer';
215                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
216                     $es_type = 'stdno';
217                 }
218
219                 if ($search) {
220                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221                 }
222
223                 if ($facet) {
224                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
225                 }
226                 if ($suggestible) {
227                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
228                 }
229                 # Sort is a bit special as it can be true, false, undef.
230                 # We care about "true" or "undef",
231                 # "undef" means to do the default thing, which is make it sortable.
232                 if (!defined $sort || $sort) {
233                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
234                     $sort_fields{$self->index}{$name} = 1;
235                 }
236             }
237         );
238         $all_mappings{$self->index} = $mappings;
239     }
240     $self->sort_fields(\%{$sort_fields{$self->index}});
241
242     return $all_mappings{$self->index};
243 }
244
245 =head2 _get_elasticsearch_field_config
246
247 Get the Elasticsearch field config for the given purpose and data type.
248
249 $mapping = _get_elasticsearch_field_config('search', 'text');
250
251 =cut
252
253 sub _get_elasticsearch_field_config {
254
255     my ( $purpose, $type ) = @_;
256
257     # Use state to speed up repeated calls
258     state $settings = undef;
259     if (!defined $settings) {
260         my $config_file = C4::Context->config('elasticsearch_field_config');
261         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
262         $settings = LoadFile( $config_file );
263     }
264
265     if (!defined $settings->{$purpose}) {
266         die "Field purpose $purpose not defined in field config";
267     }
268     if ($type eq '') {
269         return $settings->{$purpose};
270     }
271     if (defined $settings->{$purpose}{$type}) {
272         return $settings->{$purpose}{$type};
273     }
274     if (defined $settings->{$purpose}{'default'}) {
275         return $settings->{$purpose}{'default'};
276     }
277     return;
278 }
279
280 sub reset_elasticsearch_mappings {
281     my ( $reset_fields ) = @_;
282     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
283     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
284     my $indexes = LoadFile( $mappings_yaml );
285
286     while ( my ( $index_name, $fields ) = each %$indexes ) {
287         while ( my ( $field_name, $data ) = each %$fields ) {
288
289             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
290
291             # Set default values
292             $sf_params{staff_client} //= 1;
293             $sf_params{opac} //= 1;
294
295             $sf_params{name} = $field_name;
296
297             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
298
299             my $mappings = $data->{mappings};
300             for my $mapping ( @$mappings ) {
301                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
302                     index_name => $index_name,
303                     marc_type => $mapping->{marc_type},
304                     marc_field => $mapping->{marc_field}
305                 });
306                 $search_field->add_to_search_marc_maps($marc_field, {
307                     facet => $mapping->{facet} || 0,
308                     suggestible => $mapping->{suggestible} || 0,
309                     sort => $mapping->{sort},
310                     search => $mapping->{search} || 1
311                 });
312             }
313         }
314     }
315 }
316
317 # This overrides the accessor provided by Class::Accessor so that if
318 # sort_fields isn't set, then it'll generate it.
319 sub sort_fields {
320     my $self = shift;
321     if (@_) {
322         $self->_sort_fields_accessor(@_);
323         return;
324     }
325     my $val = $self->_sort_fields_accessor();
326     return $val if $val;
327
328     # This will populate the accessor as a side effect
329     $self->get_elasticsearch_mappings();
330     return $self->_sort_fields_accessor();
331 }
332
333 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
334
335     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
336
337 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
338 Since we group all mappings by MARC field targets C<$mappings> will contain
339 all targets for C<$data> and thus we need to fetch the MARC field only once.
340 C<$mappings> will be applied to C<$record_document> and new field values added.
341 The method has no return value.
342
343 =over 4
344
345 =item C<$mappings>
346
347 Arrayref of mappings containing arrayrefs in the format
348 [C<$target>, C<$options>] where C<$target> is the name of the target field and
349 C<$options> is a hashref containing processing directives for this particular
350 mapping.
351
352 =item C<$data>
353
354 The source data from a MARC record field.
355
356 =item C<$record_document>
357
358 Hashref representing the Elasticsearch document on which mappings should be
359 applied.
360
361 =item C<$altscript>
362
363 A boolean value indicating whether an alternate script presentation is being
364 processed.
365
366 =back
367
368 =cut
369
370 sub _process_mappings {
371     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
372     foreach my $mapping (@{$mappings}) {
373         my ($target, $options) = @{$mapping};
374
375         # Don't process sort fields for alternate scripts
376         my $sort = $target =~ /__sort$/;
377         if ($sort && $altscript) {
378             next;
379         }
380
381         # Copy (scalar) data since can have multiple targets
382         # with differing options for (possibly) mutating data
383         # so need a different copy for each
384         my $_data = $data;
385         $record_document->{$target} //= [];
386         if (defined $options->{substr}) {
387             my ($start, $length) = @{$options->{substr}};
388             $_data = length($data) > $start ? substr $data, $start, $length : '';
389         }
390         if (defined $options->{value_callbacks}) {
391             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
392         }
393         if (defined $options->{property}) {
394             $_data = {
395                 $options->{property} => $_data
396             }
397         }
398         push @{$record_document->{$target}}, $_data;
399     }
400 }
401
402 =head2 marc_records_to_documents($marc_records)
403
404     my @record_documents = $self->marc_records_to_documents($marc_records);
405
406 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
407
408 Returns array of hash references, representing Elasticsearch documents,
409 acceptable as body payload in C<Search::Elasticsearch> requests.
410
411 =over 4
412
413 =item C<$marc_documents>
414
415 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
416
417 =back
418
419 =cut
420
421 sub marc_records_to_documents {
422     my ($self, $records) = @_;
423     my $rules = $self->_get_marc_mapping_rules();
424     my $control_fields_rules = $rules->{control_fields};
425     my $data_fields_rules = $rules->{data_fields};
426     my $marcflavour = lc C4::Context->preference('marcflavour');
427     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
428
429     my @record_documents;
430
431     foreach my $record (@{$records}) {
432         my $record_document = {};
433         my $mappings = $rules->{leader};
434         if ($mappings) {
435             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
436         }
437         foreach my $field ($record->fields()) {
438             if ($field->is_control_field()) {
439                 my $mappings = $control_fields_rules->{$field->tag()};
440                 if ($mappings) {
441                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
442                 }
443             }
444             else {
445                 my $tag = $field->tag();
446                 # Handle alternate scripts in MARC 21
447                 my $altscript = 0;
448                 if ($marcflavour eq 'marc21' && $tag eq '880') {
449                     my $sub6 = $field->subfield('6');
450                     if ($sub6 =~ /^(...)-\d+/) {
451                         $tag = $1;
452                         $altscript = 1;
453                     }
454                 }
455
456                 my $data_field_rules = $data_fields_rules->{$tag};
457
458                 if ($data_field_rules) {
459                     my $subfields_mappings = $data_field_rules->{subfields};
460                     my $wildcard_mappings = $subfields_mappings->{'*'};
461                     foreach my $subfield ($field->subfields()) {
462                         my ($code, $data) = @{$subfield};
463                         my $mappings = $subfields_mappings->{$code} // [];
464                         if ($wildcard_mappings) {
465                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
466                         }
467                         if (@{$mappings}) {
468                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
469                         }
470                     }
471
472                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
473                     if ($subfields_join_mappings) {
474                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
475                             # Map each subfield to values, remove empty values, join with space
476                             my $data = join(
477                                 ' ',
478                                 grep(
479                                     $_,
480                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
481                                 )
482                             );
483                             if ($data) {
484                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
485                             }
486                         }
487                     }
488                 }
489             }
490         }
491         foreach my $field (keys %{$rules->{defaults}}) {
492             unless (defined $record_document->{$field}) {
493                 $record_document->{$field} = $rules->{defaults}->{$field};
494             }
495         }
496         foreach my $field (@{$rules->{sum}}) {
497             if (defined $record_document->{$field}) {
498                 # TODO: validate numeric? filter?
499                 # TODO: Or should only accept fields without nested values?
500                 # TODO: Quick and dirty, improve if needed
501                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
502             }
503         }
504         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
505         foreach my $field (@{$rules->{isbn}}) {
506             if (defined $record_document->{$field}) {
507                 my @isbns = ();
508                 foreach my $input_isbn (@{$record_document->{$field}}) {
509                     my $isbn = Business::ISBN->new($input_isbn);
510                     if (defined $isbn && $isbn->is_valid) {
511                         my $isbn13 = $isbn->as_isbn13->as_string;
512                         push @isbns, $isbn13;
513                         $isbn13 =~ s/\-//g;
514                         push @isbns, $isbn13;
515
516                         my $isbn10 = $isbn->as_isbn10;
517                         if ($isbn10) {
518                             $isbn10 = $isbn10->as_string;
519                             push @isbns, $isbn10;
520                             $isbn10 =~ s/\-//g;
521                             push @isbns, $isbn10;
522                         }
523                     } else {
524                         push @isbns, $input_isbn;
525                     }
526                 }
527                 $record_document->{$field} = \@isbns;
528             }
529         }
530
531         # Remove duplicate values and collapse sort fields
532         foreach my $field (keys %{$record_document}) {
533             if (ref($record_document->{$field}) eq 'ARRAY') {
534                 @{$record_document->{$field}} = do {
535                     my %seen;
536                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
537                 };
538                 if ($field =~ /__sort$/) {
539                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
540                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
541                 }
542             }
543         }
544
545         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
546         $record->encoding('UTF-8');
547         if ($use_array) {
548             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
549             $record_document->{'marc_format'} = 'ARRAY';
550         } else {
551             my @warnings;
552             {
553                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
554                 local $SIG{__WARN__} = sub {
555                     push @warnings, $_[0];
556                 };
557                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
558             }
559             if (@warnings) {
560                 # Suppress warnings if record length exceeded
561                 unless (substr($record->leader(), 0, 5) eq '99999') {
562                     foreach my $warning (@warnings) {
563                         carp $warning;
564                     }
565                 }
566                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
567                 $record_document->{'marc_format'} = 'MARCXML';
568             }
569             else {
570                 $record_document->{'marc_format'} = 'base64ISO2709';
571             }
572         }
573         my $id = $record->subfield('999', 'c');
574         push @record_documents, [$id, $record_document];
575     }
576     return \@record_documents;
577 }
578
579 =head2 _marc_to_array($record)
580
581     my @fields = _marc_to_array($record)
582
583 Convert a MARC::Record to an array modeled after MARC-in-JSON
584 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
585
586 =over 4
587
588 =item C<$record>
589
590 A MARC::Record object
591
592 =back
593
594 =cut
595
596 sub _marc_to_array {
597     my ($self, $record) = @_;
598
599     my $data = {
600         leader => $record->leader(),
601         fields => []
602     };
603     for my $field ($record->fields()) {
604         my $tag = $field->tag();
605         if ($field->is_control_field()) {
606             push @{$data->{fields}}, {$tag => $field->data()};
607         } else {
608             my $subfields = ();
609             foreach my $subfield ($field->subfields()) {
610                 my ($code, $contents) = @{$subfield};
611                 push @{$subfields}, {$code => $contents};
612             }
613             push @{$data->{fields}}, {
614                 $tag => {
615                     ind1 => $field->indicator(1),
616                     ind2 => $field->indicator(2),
617                     subfields => $subfields
618                 }
619             };
620         }
621     }
622     return $data;
623 }
624
625 =head2 _array_to_marc($data)
626
627     my $record = _array_to_marc($data)
628
629 Convert an array modeled after MARC-in-JSON to a MARC::Record
630
631 =over 4
632
633 =item C<$data>
634
635 An array modeled after MARC-in-JSON
636 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
637
638 =back
639
640 =cut
641
642 sub _array_to_marc {
643     my ($self, $data) = @_;
644
645     my $record = MARC::Record->new();
646
647     $record->leader($data->{leader});
648     for my $field (@{$data->{fields}}) {
649         my $tag = (keys %{$field})[0];
650         $field = $field->{$tag};
651         my $marc_field;
652         if (ref($field) eq 'HASH') {
653             my @subfields;
654             foreach my $subfield (@{$field->{subfields}}) {
655                 my $code = (keys %{$subfield})[0];
656                 push @subfields, $code;
657                 push @subfields, $subfield->{$code};
658             }
659             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
660         } else {
661             $marc_field = MARC::Field->new($tag, $field)
662         }
663         $record->append_fields($marc_field);
664     }
665 ;
666     return $record;
667 }
668
669 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
670
671     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
672
673 Get mappings, an internal data structure later used by
674 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
675 data for a MARC mapping.
676
677 The returned C<$mappings> is not to to be confused with mappings provided by
678 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
679 provided by C<_foreach_mapping> and expands it to this internal data structure.
680 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
681 is then applied to each MARC target (leader, control field data, subfield or
682 joined subfields) and integrated into the mapping rules data structure used in
683 C<marc_records_to_documents> to transform MARC records into Elasticsearch
684 documents.
685
686 =over 4
687
688 =item C<$facet>
689
690 Boolean indicating whether to create a facet field for this mapping.
691
692 =item C<$suggestible>
693
694 Boolean indicating whether to create a suggestion field for this mapping.
695
696 =item C<$sort>
697
698 Boolean indicating whether to create a sort field for this mapping.
699
700 =item C<$search>
701
702 Boolean indicating whether to create a search field for this mapping.
703
704 =item C<$target_name>
705
706 Elasticsearch document target field name.
707
708 =item C<$target_type>
709
710 Elasticsearch document target field type.
711
712 =item C<$range>
713
714 An optional range as a string in the format "<START>-<END>" or "<START>",
715 where "<START>" and "<END>" are integers specifying a range that will be used
716 for extracting a substring from MARC data as Elasticsearch field target value.
717
718 The first character position is "0", and the range is inclusive,
719 so "0-2" means the first three characters of MARC data.
720
721 If only "<START>" is provided only one character at position "<START>" will
722 be extracted.
723
724 =back
725
726 =cut
727
728 sub _field_mappings {
729     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
730     my %mapping_defaults = ();
731     my @mappings;
732
733     my $substr_args = undef;
734     if (defined $range) {
735         # TODO: use value_callback instead?
736         my ($start, $end) = map(int, split /-/, $range, 2);
737         $substr_args = [$start];
738         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
739     }
740     my $default_options = {};
741     if ($substr_args) {
742         $default_options->{substr} = $substr_args;
743     }
744
745     # TODO: Should probably have per type value callback/hook
746     # but hard code for now
747     if ($target_type eq 'boolean') {
748         $default_options->{value_callbacks} //= [];
749         push @{$default_options->{value_callbacks}}, sub {
750             my ($value) = @_;
751             # Trim whitespace at both ends
752             $value =~ s/^\s+|\s+$//g;
753             return $value ? 'true' : 'false';
754         };
755     }
756
757     if ($search) {
758         my $mapping = [$target_name, $default_options];
759         push @mappings, $mapping;
760     }
761
762     my @suffixes = ();
763     push @suffixes, 'facet' if $facet;
764     push @suffixes, 'suggestion' if $suggestible;
765     push @suffixes, 'sort' if !defined $sort || $sort;
766
767     foreach my $suffix (@suffixes) {
768         my $mapping = ["${target_name}__$suffix"];
769         # TODO: Hack, fix later in less hideous manner
770         if ($suffix eq 'suggestion') {
771             push @{$mapping}, {%{$default_options}, property => 'input'};
772         }
773         else {
774             push @{$mapping}, $default_options;
775         }
776         push @mappings, $mapping;
777     }
778     return @mappings;
779 };
780
781 =head2 _get_marc_mapping_rules
782
783     my $mapping_rules = $self->_get_marc_mapping_rules()
784
785 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
786
787 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
788 each call to C<MARC::Record>->field) we create an optimized structure of mapping
789 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
790
791 We can then iterate through all MARC fields for each record and apply all relevant
792 rules once per fields instead of retreiving fields multiple times for each mapping rule
793 which is terribly slow.
794
795 =cut
796
797 # TODO: This structure can be used for processing multiple MARC::Records so is currently
798 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
799 # memory cache which it is currently not. The performance gain of caching
800 # would probably be marginal, but to do this could be a further improvement.
801
802 sub _get_marc_mapping_rules {
803     my ($self) = @_;
804     my $marcflavour = lc C4::Context->preference('marcflavour');
805     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
806     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
807     my $rules = {
808         'leader' => [],
809         'control_fields' => {},
810         'data_fields' => {},
811         'sum' => [],
812         'isbn' => [],
813         'defaults' => {}
814     };
815
816     $self->_foreach_mapping(sub {
817         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
818         return if $marc_type ne $marcflavour;
819
820         if ($type eq 'sum') {
821             push @{$rules->{sum}}, $name;
822         }
823         elsif ($type eq 'isbn') {
824             push @{$rules->{isbn}}, $name;
825         }
826         elsif ($type eq 'boolean') {
827             # boolean gets special handling, if value doesn't exist for a field,
828             # it is set to false
829             $rules->{defaults}->{$name} = 'false';
830         }
831
832         if ($marc_field =~ $field_spec_regexp) {
833             my $field_tag = $1;
834
835             my @subfields;
836             my @subfield_groups;
837             # Parse and separate subfields form subfield groups
838             if (defined $2) {
839                 my $subfield_group = '';
840                 my $open_group = 0;
841
842                 foreach my $token (split //, $2) {
843                     if ($token eq "(") {
844                         if ($open_group) {
845                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
846                                 "Unmatched opening parenthesis for $marc_field"
847                             );
848                         }
849                         else {
850                             $open_group = 1;
851                         }
852                     }
853                     elsif ($token eq ")") {
854                         if ($open_group) {
855                             if ($subfield_group) {
856                                 push @subfield_groups, $subfield_group;
857                                 $subfield_group = '';
858                             }
859                             $open_group = 0;
860                         }
861                         else {
862                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
863                                 "Unmatched closing parenthesis for $marc_field"
864                             );
865                         }
866                     }
867                     elsif ($open_group) {
868                         $subfield_group .= $token;
869                     }
870                     else {
871                         push @subfields, $token;
872                     }
873                 }
874             }
875             else {
876                 push @subfields, '*';
877             }
878
879             my $range = defined $3 ? $3 : undef;
880             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
881
882             if ($field_tag < 10) {
883                 $rules->{control_fields}->{$field_tag} //= [];
884                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
885             }
886             else {
887                 $rules->{data_fields}->{$field_tag} //= {};
888                 foreach my $subfield (@subfields) {
889                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
890                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
891                 }
892                 foreach my $subfield_group (@subfield_groups) {
893                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
894                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
895                 }
896             }
897         }
898         elsif ($marc_field =~ $leader_regexp) {
899             my $range = defined $1 ? $1 : undef;
900             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
901             push @{$rules->{leader}}, @mappings;
902         }
903         else {
904             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
905                 "Invalid MARC field expression: $marc_field"
906             );
907         }
908     });
909     return $rules;
910 }
911
912 =head2 _foreach_mapping
913
914     $self->_foreach_mapping(
915         sub {
916             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
917                 $marc_field )
918               = @_;
919             return unless $marc_type eq 'marc21';
920             print "Data comes from: " . $marc_field . "\n";
921         }
922     );
923
924 This allows you to apply a function to each entry in the elasticsearch mappings
925 table, in order to build the mappings for whatever is needed.
926
927 In the provided function, the files are:
928
929 =over 4
930
931 =item C<$name>
932
933 The field name for elasticsearch (corresponds to the 'mapping' column in the
934 database.
935
936 =item C<$type>
937
938 The type for this value, e.g. 'string'.
939
940 =item C<$facet>
941
942 True if this value should be facetised. This only really makes sense if the
943 field is understood by the facet processing code anyway.
944
945 =item C<$sort>
946
947 True if this is a field that a) needs special sort handling, and b) if it
948 should be sorted on. False if a) but not b). Undef if not a). This allows,
949 for example, author to be sorted on but not everything marked with "author"
950 to be included in that sort.
951
952 =item C<$marc_type>
953
954 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
955 'unimarc', 'normarc'.
956
957 =item C<$marc_field>
958
959 A string that describes the MARC field that contains the data to extract.
960 These are of a form suited to Catmandu's MARC fixers.
961
962 =back
963
964 =cut
965
966 sub _foreach_mapping {
967     my ( $self, $sub ) = @_;
968
969     # TODO use a caching framework here
970     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
971         {
972             'search_marc_map.index_name' => $self->index,
973         },
974         {   join => { search_marc_to_fields => 'search_marc_map' },
975             '+select' => [
976                 'search_marc_to_fields.facet',
977                 'search_marc_to_fields.suggestible',
978                 'search_marc_to_fields.sort',
979                 'search_marc_to_fields.search',
980                 'search_marc_map.marc_type',
981                 'search_marc_map.marc_field',
982             ],
983             '+as'     => [
984                 'facet',
985                 'suggestible',
986                 'sort',
987                 'search',
988                 'marc_type',
989                 'marc_field',
990             ],
991         }
992     );
993
994     while ( my $search_field = $search_fields->next ) {
995         $sub->(
996             # Force lower case on indexed field names for case insensitive
997             # field name searches
998             lc($search_field->name),
999             $search_field->type,
1000             $search_field->get_column('facet'),
1001             $search_field->get_column('suggestible'),
1002             $search_field->get_column('sort'),
1003             $search_field->get_column('search'),
1004             $search_field->get_column('marc_type'),
1005             $search_field->get_column('marc_field'),
1006         );
1007     }
1008 }
1009
1010 =head2 process_error
1011
1012     die process_error($@);
1013
1014 This parses an Elasticsearch error message and produces a human-readable
1015 result from it. This result is probably missing all the useful information
1016 that you might want in diagnosing an issue, so the warning is also logged.
1017
1018 Note that currently the resulting message is not internationalised. This
1019 will happen eventually by some method or other.
1020
1021 =cut
1022
1023 sub process_error {
1024     my ($self, $msg) = @_;
1025
1026     warn $msg; # simple logging
1027
1028     # This is super-primitive
1029     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1030
1031     return "Unable to perform your search. Please try again.\n";
1032 }
1033
1034 =head2 _read_configuration
1035
1036     my $conf = _read_configuration();
1037
1038 Reads the I<configuration file> and returns a hash structure with the
1039 configuration information. It raises an exception if mandatory entries
1040 are missing.
1041
1042 The hashref structure has the following form:
1043
1044     {
1045         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1046         'index_name' => 'koha_instance',
1047     }
1048
1049 This is configured by the following in the C<config> block in koha-conf.xml:
1050
1051     <elasticsearch>
1052         <server>127.0.0.1:9200</server>
1053         <server>anotherserver:9200</server>
1054         <index_name>koha_instance</index_name>
1055     </elasticsearch>
1056
1057 =cut
1058
1059 sub _read_configuration {
1060
1061     my $configuration;
1062
1063     my $conf = C4::Context->config('elasticsearch');
1064     Koha::Exceptions::Config::MissingEntry->throw(
1065         "Missing 'elasticsearch' block in config file")
1066       unless defined $conf;
1067
1068     if ( $conf && $conf->{server} ) {
1069         my $nodes = $conf->{server};
1070         if ( ref($nodes) eq 'ARRAY' ) {
1071             $configuration->{nodes} = $nodes;
1072         }
1073         else {
1074             $configuration->{nodes} = [$nodes];
1075         }
1076     }
1077     else {
1078         Koha::Exceptions::Config::MissingEntry->throw(
1079             "Missing 'server' entry in config file for elasticsearch");
1080     }
1081
1082     if ( defined $conf->{index_name} ) {
1083         $configuration->{index_name} = $conf->{index_name};
1084     }
1085     else {
1086         Koha::Exceptions::Config::MissingEntry->throw(
1087             "Missing 'index_name' entry in config file for elasticsearch");
1088     }
1089
1090     return $configuration;
1091 }
1092
1093 =head2 get_facetable_fields
1094
1095 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1096
1097 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1098
1099 =cut
1100
1101 sub get_facetable_fields {
1102     my ($self) = @_;
1103
1104     # These should correspond to the ES field names, as opposed to the CCL
1105     # things that zebra uses.
1106     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1107     my @faceted_fields = Koha::SearchFields->search(
1108         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1109     );
1110     my @not_faceted_fields = Koha::SearchFields->search(
1111         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1112     );
1113     # This could certainly be improved
1114     return ( @faceted_fields, @not_faceted_fields );
1115 }
1116
1117 1;
1118
1119 __END__
1120
1121 =head1 AUTHOR
1122
1123 =over 4
1124
1125 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1126
1127 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1128
1129 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1130
1131 =back
1132
1133 =cut