Bug 22246: Fix indexing of large fields with Elasticsearch
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new(
90             client => "5_0::Direct",
91             nodes => $conf->{nodes},
92             cxn_pool => 'Sniff',
93             request_timeout => 60
94         );
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticserver index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         my $mappings = {
197             data => scalar _get_elasticsearch_mapping('general', '')
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $all_mappings{$self->index} = $mappings;
236     }
237     $self->sort_fields(\%{$sort_fields{$self->index}});
238
239     return $all_mappings{$self->index};
240 }
241
242 =head2 _get_elasticsearch_mapping
243
244 Get the Elasticsearch mappings for the given purpose and data type.
245
246 $mapping = _get_elasticsearch_mapping('search', 'text');
247
248 =cut
249
250 sub _get_elasticsearch_mapping {
251
252     my ( $purpose, $type ) = @_;
253
254     # Use state to speed up repeated calls
255     state $settings = undef;
256     if (!defined $settings) {
257         my $config_file = C4::Context->config('elasticsearch_field_config');
258         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
259         $settings = LoadFile( $config_file );
260     }
261
262     if (!defined $settings->{$purpose}) {
263         die "Field purpose $purpose not defined in field config";
264     }
265     if ($type eq '') {
266         return $settings->{$purpose};
267     }
268     if (defined $settings->{$purpose}{$type}) {
269         return $settings->{$purpose}{$type};
270     }
271     if (defined $settings->{$purpose}{'default'}) {
272         return $settings->{$purpose}{'default'};
273     }
274     return;
275 }
276
277 sub reset_elasticsearch_mappings {
278     my ( $reset_fields ) = @_;
279     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
280     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
281     my $indexes = LoadFile( $mappings_yaml );
282
283     while ( my ( $index_name, $fields ) = each %$indexes ) {
284         while ( my ( $field_name, $data ) = each %$fields ) {
285             my $field_type = $data->{type};
286             my $field_label = $data->{label};
287             my $mappings = $data->{mappings};
288             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
289             for my $mapping ( @$mappings ) {
290                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
291                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
292             }
293         }
294     }
295 }
296
297 # This overrides the accessor provided by Class::Accessor so that if
298 # sort_fields isn't set, then it'll generate it.
299 sub sort_fields {
300     my $self = shift;
301     if (@_) {
302         $self->_sort_fields_accessor(@_);
303         return;
304     }
305     my $val = $self->_sort_fields_accessor();
306     return $val if $val;
307
308     # This will populate the accessor as a side effect
309     $self->get_elasticsearch_mappings();
310     return $self->_sort_fields_accessor();
311 }
312
313 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
314
315     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
316
317 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
318 Since we group all mappings by MARC field targets C<$mappings> will contain
319 all targets for C<$data> and thus we need to fetch the MARC field only once.
320 C<$mappings> will be applied to C<$record_document> and new field values added.
321 The method has no return value.
322
323 =over 4
324
325 =item C<$mappings>
326
327 Arrayref of mappings containing arrayrefs in the format
328 [C<$target>, C<$options>] where C<$target> is the name of the target field and
329 C<$options> is a hashref containing processing directives for this particular
330 mapping.
331
332 =item C<$data>
333
334 The source data from a MARC record field.
335
336 =item C<$record_document>
337
338 Hashref representing the Elasticsearch document on which mappings should be
339 applied.
340
341 =item C<$altscript>
342
343 A boolean value indicating whether an alternate script presentation is being
344 processed.
345
346 =back
347
348 =cut
349
350 sub _process_mappings {
351     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
352     foreach my $mapping (@{$mappings}) {
353         my ($target, $options) = @{$mapping};
354
355         # Don't process sort fields for alternate scripts
356         my $sort = $target =~ /__sort$/;
357         if ($sort && $altscript) {
358             next;
359         }
360
361         # Copy (scalar) data since can have multiple targets
362         # with differing options for (possibly) mutating data
363         # so need a different copy for each
364         my $_data = $data;
365         $record_document->{$target} //= [];
366         if (defined $options->{substr}) {
367             my ($start, $length) = @{$options->{substr}};
368             $_data = length($data) > $start ? substr $data, $start, $length : '';
369         }
370         if (defined $options->{value_callbacks}) {
371             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
372         }
373         if (defined $options->{property}) {
374             $_data = {
375                 $options->{property} => $_data
376             }
377         }
378         push @{$record_document->{$target}}, $_data;
379     }
380 }
381
382 =head2 marc_records_to_documents($marc_records)
383
384     my @record_documents = $self->marc_records_to_documents($marc_records);
385
386 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
387
388 Returns array of hash references, representing Elasticsearch documents,
389 acceptable as body payload in C<Search::Elasticsearch> requests.
390
391 =over 4
392
393 =item C<$marc_documents>
394
395 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
396
397 =back
398
399 =cut
400
401 sub marc_records_to_documents {
402     my ($self, $records) = @_;
403     my $rules = $self->_get_marc_mapping_rules();
404     my $control_fields_rules = $rules->{control_fields};
405     my $data_fields_rules = $rules->{data_fields};
406     my $marcflavour = lc C4::Context->preference('marcflavour');
407
408     my @record_documents;
409
410     foreach my $record (@{$records}) {
411         my $record_document = {};
412         my $mappings = $rules->{leader};
413         if ($mappings) {
414             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
415         }
416         foreach my $field ($record->fields()) {
417             if ($field->is_control_field()) {
418                 my $mappings = $control_fields_rules->{$field->tag()};
419                 if ($mappings) {
420                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
421                 }
422             }
423             else {
424                 my $tag = $field->tag();
425                 # Handle alternate scripts in MARC 21
426                 my $altscript = 0;
427                 if ($marcflavour eq 'marc21' && $tag eq '880') {
428                     my $sub6 = $field->subfield('6');
429                     if ($sub6 =~ /^(...)-\d+/) {
430                         $tag = $1;
431                         $altscript = 1;
432                     }
433                 }
434
435                 my $data_field_rules = $data_fields_rules->{$tag};
436
437                 if ($data_field_rules) {
438                     my $subfields_mappings = $data_field_rules->{subfields};
439                     my $wildcard_mappings = $subfields_mappings->{'*'};
440                     foreach my $subfield ($field->subfields()) {
441                         my ($code, $data) = @{$subfield};
442                         my $mappings = $subfields_mappings->{$code} // [];
443                         if ($wildcard_mappings) {
444                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
445                         }
446                         if (@{$mappings}) {
447                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
448                         }
449                     }
450
451                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
452                     if ($subfields_join_mappings) {
453                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
454                             # Map each subfield to values, remove empty values, join with space
455                             my $data = join(
456                                 ' ',
457                                 grep(
458                                     $_,
459                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
460                                 )
461                             );
462                             if ($data) {
463                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
464                             }
465                         }
466                     }
467                 }
468             }
469         }
470         foreach my $field (keys %{$rules->{defaults}}) {
471             unless (defined $record_document->{$field}) {
472                 $record_document->{$field} = $rules->{defaults}->{$field};
473             }
474         }
475         foreach my $field (@{$rules->{sum}}) {
476             if (defined $record_document->{$field}) {
477                 # TODO: validate numeric? filter?
478                 # TODO: Or should only accept fields without nested values?
479                 # TODO: Quick and dirty, improve if needed
480                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
481             }
482         }
483         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
484         foreach my $field (@{$rules->{isbn}}) {
485             if (defined $record_document->{$field}) {
486                 my @isbns = ();
487                 foreach my $input_isbn (@{$record_document->{$field}}) {
488                     my $isbn = Business::ISBN->new($input_isbn);
489                     if (defined $isbn && $isbn->is_valid) {
490                         my $isbn13 = $isbn->as_isbn13->as_string;
491                         push @isbns, $isbn13;
492                         $isbn13 =~ s/\-//g;
493                         push @isbns, $isbn13;
494
495                         my $isbn10 = $isbn->as_isbn10;
496                         if ($isbn10) {
497                             $isbn10 = $isbn10->as_string;
498                             push @isbns, $isbn10;
499                             $isbn10 =~ s/\-//g;
500                             push @isbns, $isbn10;
501                         }
502                     } else {
503                         push @isbns, $input_isbn;
504                     }
505                 }
506                 $record_document->{$field} = \@isbns;
507             }
508         }
509
510         # Remove duplicate values and collapse sort fields
511         foreach my $field (keys %{$record_document}) {
512             if (ref($record_document->{$field}) eq 'ARRAY') {
513                 @{$record_document->{$field}} = do {
514                     my %seen;
515                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
516                 };
517                 if ($field =~ /__sort$/) {
518                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
519                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
520                 }
521             }
522         }
523
524         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
525         $record->encoding('UTF-8');
526         my @warnings;
527         {
528             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
529             local $SIG{__WARN__} = sub {
530                 push @warnings, $_[0];
531             };
532             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
533         }
534         if (@warnings) {
535             # Suppress warnings if record length exceeded
536             unless (substr($record->leader(), 0, 5) eq '99999') {
537                 foreach my $warning (@warnings) {
538                     carp $warning;
539                 }
540             }
541             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
542             $record_document->{'marc_format'} = 'MARCXML';
543         }
544         else {
545             $record_document->{'marc_format'} = 'base64ISO2709';
546         }
547         my $id = $record->subfield('999', 'c');
548         push @record_documents, [$id, $record_document];
549     }
550     return \@record_documents;
551 }
552
553 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
554
555     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
556
557 Get mappings, an internal data structure later used by
558 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
559 data for a MARC mapping.
560
561 The returned C<$mappings> is not to to be confused with mappings provided by
562 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
563 provided by C<_foreach_mapping> and expands it to this internal data structure.
564 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
565 is then applied to each MARC target (leader, control field data, subfield or
566 joined subfields) and integrated into the mapping rules data structure used in
567 C<marc_records_to_documents> to transform MARC records into Elasticsearch
568 documents.
569
570 =over 4
571
572 =item C<$facet>
573
574 Boolean indicating whether to create a facet field for this mapping.
575
576 =item C<$suggestible>
577
578 Boolean indicating whether to create a suggestion field for this mapping.
579
580 =item C<$sort>
581
582 Boolean indicating whether to create a sort field for this mapping.
583
584 =item C<$target_name>
585
586 Elasticsearch document target field name.
587
588 =item C<$target_type>
589
590 Elasticsearch document target field type.
591
592 =item C<$range>
593
594 An optional range as a string in the format "<START>-<END>" or "<START>",
595 where "<START>" and "<END>" are integers specifying a range that will be used
596 for extracting a substring from MARC data as Elasticsearch field target value.
597
598 The first character position is "1", and the range is inclusive,
599 so "1-3" means the first three characters of MARC data.
600
601 If only "<START>" is provided only one character at position "<START>" will
602 be extracted.
603
604 =back
605
606 =cut
607
608 sub _field_mappings {
609     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
610     my %mapping_defaults = ();
611     my @mappings;
612
613     my $substr_args = undef;
614     if ($range) {
615         # TODO: use value_callback instead?
616         my ($start, $end) = map(int, split /-/, $range, 2);
617         $substr_args = [$start];
618         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
619     }
620     my $default_options = {};
621     if ($substr_args) {
622         $default_options->{substr} = $substr_args;
623     }
624
625     # TODO: Should probably have per type value callback/hook
626     # but hard code for now
627     if ($target_type eq 'boolean') {
628         $default_options->{value_callbacks} //= [];
629         push @{$default_options->{value_callbacks}}, sub {
630             my ($value) = @_;
631             # Trim whitespace at both ends
632             $value =~ s/^\s+|\s+$//g;
633             return $value ? 'true' : 'false';
634         };
635     }
636
637     my $mapping = [$target_name, $default_options];
638     push @mappings, $mapping;
639
640     my @suffixes = ();
641     push @suffixes, 'facet' if $facet;
642     push @suffixes, 'suggestion' if $suggestible;
643     push @suffixes, 'sort' if !defined $sort || $sort;
644
645     foreach my $suffix (@suffixes) {
646         my $mapping = ["${target_name}__$suffix"];
647         # TODO: Hack, fix later in less hideous manner
648         if ($suffix eq 'suggestion') {
649             push @{$mapping}, {%{$default_options}, property => 'input'};
650         }
651         else {
652             push @{$mapping}, $default_options;
653         }
654         push @mappings, $mapping;
655     }
656     return @mappings;
657 };
658
659 =head2 _get_marc_mapping_rules
660
661     my $mapping_rules = $self->_get_marc_mapping_rules()
662
663 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
664
665 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
666 each call to C<MARC::Record>->field) we create an optimized structure of mapping
667 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
668
669 We can then iterate through all MARC fields for each record and apply all relevant
670 rules once per fields instead of retreiving fields multiple times for each mapping rule
671 which is terribly slow.
672
673 =cut
674
675 # TODO: This structure can be used for processing multiple MARC::Records so is currently
676 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
677 # memory cache which it is currently not. The performance gain of caching
678 # would probably be marginal, but to do this could be a further improvement.
679
680 sub _get_marc_mapping_rules {
681     my ($self) = @_;
682     my $marcflavour = lc C4::Context->preference('marcflavour');
683     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
684     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
685     my $rules = {
686         'leader' => [],
687         'control_fields' => {},
688         'data_fields' => {},
689         'sum' => [],
690         'isbn' => [],
691         'defaults' => {}
692     };
693
694     $self->_foreach_mapping(sub {
695         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
696         return if $marc_type ne $marcflavour;
697
698         if ($type eq 'sum') {
699             push @{$rules->{sum}}, $name;
700         }
701         elsif ($type eq 'isbn') {
702             push @{$rules->{isbn}}, $name;
703         }
704         elsif ($type eq 'boolean') {
705             # boolean gets special handling, if value doesn't exist for a field,
706             # it is set to false
707             $rules->{defaults}->{$name} = 'false';
708         }
709
710         if ($marc_field =~ $field_spec_regexp) {
711             my $field_tag = $1;
712
713             my @subfields;
714             my @subfield_groups;
715             # Parse and separate subfields form subfield groups
716             if (defined $2) {
717                 my $subfield_group = '';
718                 my $open_group = 0;
719
720                 foreach my $token (split //, $2) {
721                     if ($token eq "(") {
722                         if ($open_group) {
723                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
724                                 "Unmatched opening parenthesis for $marc_field"
725                             );
726                         }
727                         else {
728                             $open_group = 1;
729                         }
730                     }
731                     elsif ($token eq ")") {
732                         if ($open_group) {
733                             if ($subfield_group) {
734                                 push @subfield_groups, $subfield_group;
735                                 $subfield_group = '';
736                             }
737                             $open_group = 0;
738                         }
739                         else {
740                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
741                                 "Unmatched closing parenthesis for $marc_field"
742                             );
743                         }
744                     }
745                     elsif ($open_group) {
746                         $subfield_group .= $token;
747                     }
748                     else {
749                         push @subfields, $token;
750                     }
751                 }
752             }
753             else {
754                 push @subfields, '*';
755             }
756
757             my $range = defined $3 ? $3 : undef;
758             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
759
760             if ($field_tag < 10) {
761                 $rules->{control_fields}->{$field_tag} //= [];
762                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
763             }
764             else {
765                 $rules->{data_fields}->{$field_tag} //= {};
766                 foreach my $subfield (@subfields) {
767                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
768                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
769                 }
770                 foreach my $subfield_group (@subfield_groups) {
771                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
772                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
773                 }
774             }
775         }
776         elsif ($marc_field =~ $leader_regexp) {
777             my $range = defined $1 ? $1 : undef;
778             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
779             push @{$rules->{leader}}, @mappings;
780         }
781         else {
782             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
783                 "Invalid MARC field expression: $marc_field"
784             );
785         }
786     });
787     return $rules;
788 }
789
790 =head2 _foreach_mapping
791
792     $self->_foreach_mapping(
793         sub {
794             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
795                 $marc_field )
796               = @_;
797             return unless $marc_type eq 'marc21';
798             print "Data comes from: " . $marc_field . "\n";
799         }
800     );
801
802 This allows you to apply a function to each entry in the elasticsearch mappings
803 table, in order to build the mappings for whatever is needed.
804
805 In the provided function, the files are:
806
807 =over 4
808
809 =item C<$name>
810
811 The field name for elasticsearch (corresponds to the 'mapping' column in the
812 database.
813
814 =item C<$type>
815
816 The type for this value, e.g. 'string'.
817
818 =item C<$facet>
819
820 True if this value should be facetised. This only really makes sense if the
821 field is understood by the facet processing code anyway.
822
823 =item C<$sort>
824
825 True if this is a field that a) needs special sort handling, and b) if it
826 should be sorted on. False if a) but not b). Undef if not a). This allows,
827 for example, author to be sorted on but not everything marked with "author"
828 to be included in that sort.
829
830 =item C<$marc_type>
831
832 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
833 'unimarc', 'normarc'.
834
835 =item C<$marc_field>
836
837 A string that describes the MARC field that contains the data to extract.
838 These are of a form suited to Catmandu's MARC fixers.
839
840 =back
841
842 =cut
843
844 sub _foreach_mapping {
845     my ( $self, $sub ) = @_;
846
847     # TODO use a caching framework here
848     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
849         {
850             'search_marc_map.index_name' => $self->index,
851         },
852         {   join => { search_marc_to_fields => 'search_marc_map' },
853             '+select' => [
854                 'search_marc_to_fields.facet',
855                 'search_marc_to_fields.suggestible',
856                 'search_marc_to_fields.sort',
857                 'search_marc_map.marc_type',
858                 'search_marc_map.marc_field',
859             ],
860             '+as'     => [
861                 'facet',
862                 'suggestible',
863                 'sort',
864                 'marc_type',
865                 'marc_field',
866             ],
867         }
868     );
869
870     while ( my $search_field = $search_fields->next ) {
871         $sub->(
872             # Force lower case on indexed field names for case insensitive
873             # field name searches
874             lc($search_field->name),
875             $search_field->type,
876             $search_field->get_column('facet'),
877             $search_field->get_column('suggestible'),
878             $search_field->get_column('sort'),
879             $search_field->get_column('marc_type'),
880             $search_field->get_column('marc_field'),
881         );
882     }
883 }
884
885 =head2 process_error
886
887     die process_error($@);
888
889 This parses an Elasticsearch error message and produces a human-readable
890 result from it. This result is probably missing all the useful information
891 that you might want in diagnosing an issue, so the warning is also logged.
892
893 Note that currently the resulting message is not internationalised. This
894 will happen eventually by some method or other.
895
896 =cut
897
898 sub process_error {
899     my ($self, $msg) = @_;
900
901     warn $msg; # simple logging
902
903     # This is super-primitive
904     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
905
906     return "Unable to perform your search. Please try again.\n";
907 }
908
909 =head2 _read_configuration
910
911     my $conf = _read_configuration();
912
913 Reads the I<configuration file> and returns a hash structure with the
914 configuration information. It raises an exception if mandatory entries
915 are missing.
916
917 The hashref structure has the following form:
918
919     {
920         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
921         'index_name' => 'koha_instance',
922     }
923
924 This is configured by the following in the C<config> block in koha-conf.xml:
925
926     <elasticsearch>
927         <server>127.0.0.1:9200</server>
928         <server>anotherserver:9200</server>
929         <index_name>koha_instance</index_name>
930     </elasticsearch>
931
932 =cut
933
934 sub _read_configuration {
935
936     my $configuration;
937
938     my $conf = C4::Context->config('elasticsearch');
939     Koha::Exceptions::Config::MissingEntry->throw(
940         "Missing 'elasticsearch' block in config file")
941       unless defined $conf;
942
943     if ( $conf && $conf->{server} ) {
944         my $nodes = $conf->{server};
945         if ( ref($nodes) eq 'ARRAY' ) {
946             $configuration->{nodes} = $nodes;
947         }
948         else {
949             $configuration->{nodes} = [$nodes];
950         }
951     }
952     else {
953         Koha::Exceptions::Config::MissingEntry->throw(
954             "Missing 'server' entry in config file for elasticsearch");
955     }
956
957     if ( defined $conf->{index_name} ) {
958         $configuration->{index_name} = $conf->{index_name};
959     }
960     else {
961         Koha::Exceptions::Config::MissingEntry->throw(
962             "Missing 'index_name' entry in config file for elasticsearch");
963     }
964
965     return $configuration;
966 }
967
968 1;
969
970 __END__
971
972 =head1 AUTHOR
973
974 =over 4
975
976 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
977
978 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
979
980 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
981
982 =back
983
984 =cut