Bug 21974: Make Elasticsearch connection settings configurable
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
90     }
91     return $self->{elasticsearch};
92 }
93
94 =head2 get_elasticsearch_params
95
96     my $params = $self->get_elasticsearch_params();
97
98 This provides a hashref that contains the parameters for connecting to the
99 ElasicSearch servers, in the form:
100
101     {
102         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
103         'index_name' => 'koha_instance_index',
104     }
105
106 This is configured by the following in the C<config> block in koha-conf.xml:
107
108     <elasticsearch>
109         <server>127.0.0.1:9200</server>
110         <server>anotherserver:9200</server>
111         <index_name>koha_instance</index_name>
112     </elasticsearch>
113
114 =cut
115
116 sub get_elasticsearch_params {
117     my ($self) = @_;
118
119     # Copy the hash so that we're not modifying the original
120     my $conf = C4::Context->config('elasticsearch');
121     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
122     my $es = { %{ $conf } };
123
124     # Helpfully, the multiple server lines end up in an array for us anyway
125     # if there are multiple ones, but not if there's only one.
126     my $server = $es->{server};
127     delete $es->{server};
128     if ( ref($server) eq 'ARRAY' ) {
129
130         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
131         $es->{nodes} = $server;
132     }
133     elsif ($server) {
134         $es->{nodes} = [$server];
135     }
136     else {
137         die "No elasticsearch servers were specified in koha-conf.xml.\n";
138     }
139     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
140       if ( !$es->{index_name} );
141     # Append the name of this particular index to our namespace
142     $es->{index_name} .= '_' . $self->index;
143
144     $es->{key_prefix} = 'es_';
145     $es->{client} //= '5_0::Direct';
146     $es->{cxn_pool} //= 'Sniff';
147     $es->{request_timeout} //= 60;
148
149     return $es;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         my $mappings = {
196             data => scalar _get_elasticsearch_mapping('general', '')
197         };
198         my $marcflavour = lc C4::Context->preference('marcflavour');
199         $self->_foreach_mapping(
200             sub {
201                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
202                 return if $marc_type ne $marcflavour;
203                 # TODO if this gets any sort of complexity to it, it should
204                 # be broken out into its own function.
205
206                 # TODO be aware of date formats, but this requires pre-parsing
207                 # as ES will simply reject anything with an invalid date.
208                 my $es_type = 'text';
209                 if ($type eq 'boolean') {
210                     $es_type = 'boolean';
211                 } elsif ($type eq 'number' || $type eq 'sum') {
212                     $es_type = 'integer';
213                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
214                     $es_type = 'stdno';
215                 }
216
217                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
218
219                 if ($facet) {
220                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
221                 }
222                 if ($suggestible) {
223                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
224                 }
225                 # Sort is a bit special as it can be true, false, undef.
226                 # We care about "true" or "undef",
227                 # "undef" means to do the default thing, which is make it sortable.
228                 if (!defined $sort || $sort) {
229                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
230                     $sort_fields{$self->index}{$name} = 1;
231                 }
232             }
233         );
234         $all_mappings{$self->index} = $mappings;
235     }
236     $self->sort_fields(\%{$sort_fields{$self->index}});
237
238     return $all_mappings{$self->index};
239 }
240
241 =head2 _get_elasticsearch_mapping
242
243 Get the Elasticsearch mappings for the given purpose and data type.
244
245 $mapping = _get_elasticsearch_mapping('search', 'text');
246
247 =cut
248
249 sub _get_elasticsearch_mapping {
250
251     my ( $purpose, $type ) = @_;
252
253     # Use state to speed up repeated calls
254     state $settings = undef;
255     if (!defined $settings) {
256         my $config_file = C4::Context->config('elasticsearch_field_config');
257         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
258         $settings = LoadFile( $config_file );
259     }
260
261     if (!defined $settings->{$purpose}) {
262         die "Field purpose $purpose not defined in field config";
263     }
264     if ($type eq '') {
265         return $settings->{$purpose};
266     }
267     if (defined $settings->{$purpose}{$type}) {
268         return $settings->{$purpose}{$type};
269     }
270     if (defined $settings->{$purpose}{'default'}) {
271         return $settings->{$purpose}{'default'};
272     }
273     return;
274 }
275
276 sub reset_elasticsearch_mappings {
277     my ( $reset_fields ) = @_;
278     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
279     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
280     my $indexes = LoadFile( $mappings_yaml );
281
282     while ( my ( $index_name, $fields ) = each %$indexes ) {
283         while ( my ( $field_name, $data ) = each %$fields ) {
284             my $field_type = $data->{type};
285             my $field_label = $data->{label};
286             my $mappings = $data->{mappings};
287             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
288             for my $mapping ( @$mappings ) {
289                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
290                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
291             }
292         }
293     }
294 }
295
296 # This overrides the accessor provided by Class::Accessor so that if
297 # sort_fields isn't set, then it'll generate it.
298 sub sort_fields {
299     my $self = shift;
300     if (@_) {
301         $self->_sort_fields_accessor(@_);
302         return;
303     }
304     my $val = $self->_sort_fields_accessor();
305     return $val if $val;
306
307     # This will populate the accessor as a side effect
308     $self->get_elasticsearch_mappings();
309     return $self->_sort_fields_accessor();
310 }
311
312 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
313
314     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
315
316 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
317 Since we group all mappings by MARC field targets C<$mappings> will contain
318 all targets for C<$data> and thus we need to fetch the MARC field only once.
319 C<$mappings> will be applied to C<$record_document> and new field values added.
320 The method has no return value.
321
322 =over 4
323
324 =item C<$mappings>
325
326 Arrayref of mappings containing arrayrefs in the format
327 [C<$target>, C<$options>] where C<$target> is the name of the target field and
328 C<$options> is a hashref containing processing directives for this particular
329 mapping.
330
331 =item C<$data>
332
333 The source data from a MARC record field.
334
335 =item C<$record_document>
336
337 Hashref representing the Elasticsearch document on which mappings should be
338 applied.
339
340 =item C<$altscript>
341
342 A boolean value indicating whether an alternate script presentation is being
343 processed.
344
345 =back
346
347 =cut
348
349 sub _process_mappings {
350     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
351     foreach my $mapping (@{$mappings}) {
352         my ($target, $options) = @{$mapping};
353
354         # Don't process sort fields for alternate scripts
355         my $sort = $target =~ /__sort$/;
356         if ($sort && $altscript) {
357             next;
358         }
359
360         # Copy (scalar) data since can have multiple targets
361         # with differing options for (possibly) mutating data
362         # so need a different copy for each
363         my $_data = $data;
364         $record_document->{$target} //= [];
365         if (defined $options->{substr}) {
366             my ($start, $length) = @{$options->{substr}};
367             $_data = length($data) > $start ? substr $data, $start, $length : '';
368         }
369         if (defined $options->{value_callbacks}) {
370             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
371         }
372         if (defined $options->{property}) {
373             $_data = {
374                 $options->{property} => $_data
375             }
376         }
377         push @{$record_document->{$target}}, $_data;
378     }
379 }
380
381 =head2 marc_records_to_documents($marc_records)
382
383     my @record_documents = $self->marc_records_to_documents($marc_records);
384
385 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
386
387 Returns array of hash references, representing Elasticsearch documents,
388 acceptable as body payload in C<Search::Elasticsearch> requests.
389
390 =over 4
391
392 =item C<$marc_documents>
393
394 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
395
396 =back
397
398 =cut
399
400 sub marc_records_to_documents {
401     my ($self, $records) = @_;
402     my $rules = $self->_get_marc_mapping_rules();
403     my $control_fields_rules = $rules->{control_fields};
404     my $data_fields_rules = $rules->{data_fields};
405     my $marcflavour = lc C4::Context->preference('marcflavour');
406
407     my @record_documents;
408
409     foreach my $record (@{$records}) {
410         my $record_document = {};
411         my $mappings = $rules->{leader};
412         if ($mappings) {
413             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
414         }
415         foreach my $field ($record->fields()) {
416             if ($field->is_control_field()) {
417                 my $mappings = $control_fields_rules->{$field->tag()};
418                 if ($mappings) {
419                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
420                 }
421             }
422             else {
423                 my $tag = $field->tag();
424                 # Handle alternate scripts in MARC 21
425                 my $altscript = 0;
426                 if ($marcflavour eq 'marc21' && $tag eq '880') {
427                     my $sub6 = $field->subfield('6');
428                     if ($sub6 =~ /^(...)-\d+/) {
429                         $tag = $1;
430                         $altscript = 1;
431                     }
432                 }
433
434                 my $data_field_rules = $data_fields_rules->{$tag};
435
436                 if ($data_field_rules) {
437                     my $subfields_mappings = $data_field_rules->{subfields};
438                     my $wildcard_mappings = $subfields_mappings->{'*'};
439                     foreach my $subfield ($field->subfields()) {
440                         my ($code, $data) = @{$subfield};
441                         my $mappings = $subfields_mappings->{$code} // [];
442                         if ($wildcard_mappings) {
443                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
444                         }
445                         if (@{$mappings}) {
446                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
447                         }
448                     }
449
450                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
451                     if ($subfields_join_mappings) {
452                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
453                             # Map each subfield to values, remove empty values, join with space
454                             my $data = join(
455                                 ' ',
456                                 grep(
457                                     $_,
458                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
459                                 )
460                             );
461                             if ($data) {
462                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
463                             }
464                         }
465                     }
466                 }
467             }
468         }
469         foreach my $field (keys %{$rules->{defaults}}) {
470             unless (defined $record_document->{$field}) {
471                 $record_document->{$field} = $rules->{defaults}->{$field};
472             }
473         }
474         foreach my $field (@{$rules->{sum}}) {
475             if (defined $record_document->{$field}) {
476                 # TODO: validate numeric? filter?
477                 # TODO: Or should only accept fields without nested values?
478                 # TODO: Quick and dirty, improve if needed
479                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
480             }
481         }
482         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
483         foreach my $field (@{$rules->{isbn}}) {
484             if (defined $record_document->{$field}) {
485                 my @isbns = ();
486                 foreach my $input_isbn (@{$record_document->{$field}}) {
487                     my $isbn = Business::ISBN->new($input_isbn);
488                     if (defined $isbn && $isbn->is_valid) {
489                         my $isbn13 = $isbn->as_isbn13->as_string;
490                         push @isbns, $isbn13;
491                         $isbn13 =~ s/\-//g;
492                         push @isbns, $isbn13;
493
494                         my $isbn10 = $isbn->as_isbn10;
495                         if ($isbn10) {
496                             $isbn10 = $isbn10->as_string;
497                             push @isbns, $isbn10;
498                             $isbn10 =~ s/\-//g;
499                             push @isbns, $isbn10;
500                         }
501                     } else {
502                         push @isbns, $input_isbn;
503                     }
504                 }
505                 $record_document->{$field} = \@isbns;
506             }
507         }
508
509         # Remove duplicate values and collapse sort fields
510         foreach my $field (keys %{$record_document}) {
511             if (ref($record_document->{$field}) eq 'ARRAY') {
512                 @{$record_document->{$field}} = do {
513                     my %seen;
514                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
515                 };
516                 if ($field =~ /__sort$/) {
517                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
518                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
519                 }
520             }
521         }
522
523         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
524         $record->encoding('UTF-8');
525         my @warnings;
526         {
527             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
528             local $SIG{__WARN__} = sub {
529                 push @warnings, $_[0];
530             };
531             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
532         }
533         if (@warnings) {
534             # Suppress warnings if record length exceeded
535             unless (substr($record->leader(), 0, 5) eq '99999') {
536                 foreach my $warning (@warnings) {
537                     carp $warning;
538                 }
539             }
540             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
541             $record_document->{'marc_format'} = 'MARCXML';
542         }
543         else {
544             $record_document->{'marc_format'} = 'base64ISO2709';
545         }
546         my $id = $record->subfield('999', 'c');
547         push @record_documents, [$id, $record_document];
548     }
549     return \@record_documents;
550 }
551
552 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
553
554     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
555
556 Get mappings, an internal data structure later used by
557 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
558 data for a MARC mapping.
559
560 The returned C<$mappings> is not to to be confused with mappings provided by
561 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
562 provided by C<_foreach_mapping> and expands it to this internal data structure.
563 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
564 is then applied to each MARC target (leader, control field data, subfield or
565 joined subfields) and integrated into the mapping rules data structure used in
566 C<marc_records_to_documents> to transform MARC records into Elasticsearch
567 documents.
568
569 =over 4
570
571 =item C<$facet>
572
573 Boolean indicating whether to create a facet field for this mapping.
574
575 =item C<$suggestible>
576
577 Boolean indicating whether to create a suggestion field for this mapping.
578
579 =item C<$sort>
580
581 Boolean indicating whether to create a sort field for this mapping.
582
583 =item C<$target_name>
584
585 Elasticsearch document target field name.
586
587 =item C<$target_type>
588
589 Elasticsearch document target field type.
590
591 =item C<$range>
592
593 An optional range as a string in the format "<START>-<END>" or "<START>",
594 where "<START>" and "<END>" are integers specifying a range that will be used
595 for extracting a substring from MARC data as Elasticsearch field target value.
596
597 The first character position is "1", and the range is inclusive,
598 so "1-3" means the first three characters of MARC data.
599
600 If only "<START>" is provided only one character at position "<START>" will
601 be extracted.
602
603 =back
604
605 =cut
606
607 sub _field_mappings {
608     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
609     my %mapping_defaults = ();
610     my @mappings;
611
612     my $substr_args = undef;
613     if ($range) {
614         # TODO: use value_callback instead?
615         my ($start, $end) = map(int, split /-/, $range, 2);
616         $substr_args = [$start];
617         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
618     }
619     my $default_options = {};
620     if ($substr_args) {
621         $default_options->{substr} = $substr_args;
622     }
623
624     # TODO: Should probably have per type value callback/hook
625     # but hard code for now
626     if ($target_type eq 'boolean') {
627         $default_options->{value_callbacks} //= [];
628         push @{$default_options->{value_callbacks}}, sub {
629             my ($value) = @_;
630             # Trim whitespace at both ends
631             $value =~ s/^\s+|\s+$//g;
632             return $value ? 'true' : 'false';
633         };
634     }
635
636     my $mapping = [$target_name, $default_options];
637     push @mappings, $mapping;
638
639     my @suffixes = ();
640     push @suffixes, 'facet' if $facet;
641     push @suffixes, 'suggestion' if $suggestible;
642     push @suffixes, 'sort' if !defined $sort || $sort;
643
644     foreach my $suffix (@suffixes) {
645         my $mapping = ["${target_name}__$suffix"];
646         # TODO: Hack, fix later in less hideous manner
647         if ($suffix eq 'suggestion') {
648             push @{$mapping}, {%{$default_options}, property => 'input'};
649         }
650         else {
651             push @{$mapping}, $default_options;
652         }
653         push @mappings, $mapping;
654     }
655     return @mappings;
656 };
657
658 =head2 _get_marc_mapping_rules
659
660     my $mapping_rules = $self->_get_marc_mapping_rules()
661
662 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
663
664 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
665 each call to C<MARC::Record>->field) we create an optimized structure of mapping
666 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
667
668 We can then iterate through all MARC fields for each record and apply all relevant
669 rules once per fields instead of retreiving fields multiple times for each mapping rule
670 which is terribly slow.
671
672 =cut
673
674 # TODO: This structure can be used for processing multiple MARC::Records so is currently
675 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
676 # memory cache which it is currently not. The performance gain of caching
677 # would probably be marginal, but to do this could be a further improvement.
678
679 sub _get_marc_mapping_rules {
680     my ($self) = @_;
681     my $marcflavour = lc C4::Context->preference('marcflavour');
682     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
683     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
684     my $rules = {
685         'leader' => [],
686         'control_fields' => {},
687         'data_fields' => {},
688         'sum' => [],
689         'isbn' => [],
690         'defaults' => {}
691     };
692
693     $self->_foreach_mapping(sub {
694         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
695         return if $marc_type ne $marcflavour;
696
697         if ($type eq 'sum') {
698             push @{$rules->{sum}}, $name;
699         }
700         elsif ($type eq 'isbn') {
701             push @{$rules->{isbn}}, $name;
702         }
703         elsif ($type eq 'boolean') {
704             # boolean gets special handling, if value doesn't exist for a field,
705             # it is set to false
706             $rules->{defaults}->{$name} = 'false';
707         }
708
709         if ($marc_field =~ $field_spec_regexp) {
710             my $field_tag = $1;
711
712             my @subfields;
713             my @subfield_groups;
714             # Parse and separate subfields form subfield groups
715             if (defined $2) {
716                 my $subfield_group = '';
717                 my $open_group = 0;
718
719                 foreach my $token (split //, $2) {
720                     if ($token eq "(") {
721                         if ($open_group) {
722                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
723                                 "Unmatched opening parenthesis for $marc_field"
724                             );
725                         }
726                         else {
727                             $open_group = 1;
728                         }
729                     }
730                     elsif ($token eq ")") {
731                         if ($open_group) {
732                             if ($subfield_group) {
733                                 push @subfield_groups, $subfield_group;
734                                 $subfield_group = '';
735                             }
736                             $open_group = 0;
737                         }
738                         else {
739                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
740                                 "Unmatched closing parenthesis for $marc_field"
741                             );
742                         }
743                     }
744                     elsif ($open_group) {
745                         $subfield_group .= $token;
746                     }
747                     else {
748                         push @subfields, $token;
749                     }
750                 }
751             }
752             else {
753                 push @subfields, '*';
754             }
755
756             my $range = defined $3 ? $3 : undef;
757             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
758
759             if ($field_tag < 10) {
760                 $rules->{control_fields}->{$field_tag} //= [];
761                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
762             }
763             else {
764                 $rules->{data_fields}->{$field_tag} //= {};
765                 foreach my $subfield (@subfields) {
766                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
767                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
768                 }
769                 foreach my $subfield_group (@subfield_groups) {
770                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
771                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
772                 }
773             }
774         }
775         elsif ($marc_field =~ $leader_regexp) {
776             my $range = defined $1 ? $1 : undef;
777             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
778             push @{$rules->{leader}}, @mappings;
779         }
780         else {
781             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
782                 "Invalid MARC field expression: $marc_field"
783             );
784         }
785     });
786     return $rules;
787 }
788
789 =head2 _foreach_mapping
790
791     $self->_foreach_mapping(
792         sub {
793             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
794                 $marc_field )
795               = @_;
796             return unless $marc_type eq 'marc21';
797             print "Data comes from: " . $marc_field . "\n";
798         }
799     );
800
801 This allows you to apply a function to each entry in the elasticsearch mappings
802 table, in order to build the mappings for whatever is needed.
803
804 In the provided function, the files are:
805
806 =over 4
807
808 =item C<$name>
809
810 The field name for elasticsearch (corresponds to the 'mapping' column in the
811 database.
812
813 =item C<$type>
814
815 The type for this value, e.g. 'string'.
816
817 =item C<$facet>
818
819 True if this value should be facetised. This only really makes sense if the
820 field is understood by the facet processing code anyway.
821
822 =item C<$sort>
823
824 True if this is a field that a) needs special sort handling, and b) if it
825 should be sorted on. False if a) but not b). Undef if not a). This allows,
826 for example, author to be sorted on but not everything marked with "author"
827 to be included in that sort.
828
829 =item C<$marc_type>
830
831 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
832 'unimarc', 'normarc'.
833
834 =item C<$marc_field>
835
836 A string that describes the MARC field that contains the data to extract.
837 These are of a form suited to Catmandu's MARC fixers.
838
839 =back
840
841 =cut
842
843 sub _foreach_mapping {
844     my ( $self, $sub ) = @_;
845
846     # TODO use a caching framework here
847     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
848         {
849             'search_marc_map.index_name' => $self->index,
850         },
851         {   join => { search_marc_to_fields => 'search_marc_map' },
852             '+select' => [
853                 'search_marc_to_fields.facet',
854                 'search_marc_to_fields.suggestible',
855                 'search_marc_to_fields.sort',
856                 'search_marc_map.marc_type',
857                 'search_marc_map.marc_field',
858             ],
859             '+as'     => [
860                 'facet',
861                 'suggestible',
862                 'sort',
863                 'marc_type',
864                 'marc_field',
865             ],
866         }
867     );
868
869     while ( my $search_field = $search_fields->next ) {
870         $sub->(
871             # Force lower case on indexed field names for case insensitive
872             # field name searches
873             lc($search_field->name),
874             $search_field->type,
875             $search_field->get_column('facet'),
876             $search_field->get_column('suggestible'),
877             $search_field->get_column('sort'),
878             $search_field->get_column('marc_type'),
879             $search_field->get_column('marc_field'),
880         );
881     }
882 }
883
884 =head2 process_error
885
886     die process_error($@);
887
888 This parses an Elasticsearch error message and produces a human-readable
889 result from it. This result is probably missing all the useful information
890 that you might want in diagnosing an issue, so the warning is also logged.
891
892 Note that currently the resulting message is not internationalised. This
893 will happen eventually by some method or other.
894
895 =cut
896
897 sub process_error {
898     my ($self, $msg) = @_;
899
900     warn $msg; # simple logging
901
902     # This is super-primitive
903     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
904
905     return "Unable to perform your search. Please try again.\n";
906 }
907
908 =head2 _read_configuration
909
910     my $conf = _read_configuration();
911
912 Reads the I<configuration file> and returns a hash structure with the
913 configuration information. It raises an exception if mandatory entries
914 are missing.
915
916 The hashref structure has the following form:
917
918     {
919         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
920         'index_name' => 'koha_instance',
921     }
922
923 This is configured by the following in the C<config> block in koha-conf.xml:
924
925     <elasticsearch>
926         <server>127.0.0.1:9200</server>
927         <server>anotherserver:9200</server>
928         <index_name>koha_instance</index_name>
929     </elasticsearch>
930
931 =cut
932
933 sub _read_configuration {
934
935     my $configuration;
936
937     my $conf = C4::Context->config('elasticsearch');
938     Koha::Exceptions::Config::MissingEntry->throw(
939         "Missing 'elasticsearch' block in config file")
940       unless defined $conf;
941
942     if ( $conf && $conf->{server} ) {
943         my $nodes = $conf->{server};
944         if ( ref($nodes) eq 'ARRAY' ) {
945             $configuration->{nodes} = $nodes;
946         }
947         else {
948             $configuration->{nodes} = [$nodes];
949         }
950     }
951     else {
952         Koha::Exceptions::Config::MissingEntry->throw(
953             "Missing 'server' entry in config file for elasticsearch");
954     }
955
956     if ( defined $conf->{index_name} ) {
957         $configuration->{index_name} = $conf->{index_name};
958     }
959     else {
960         Koha::Exceptions::Config::MissingEntry->throw(
961             "Missing 'index_name' entry in config file for elasticsearch");
962     }
963
964     return $configuration;
965 }
966
967 1;
968
969 __END__
970
971 =head1 AUTHOR
972
973 =over 4
974
975 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
976
977 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
978
979 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
980
981 =back
982
983 =cut