Bug 20244: Pod fixes
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new(
90             client => "5_0::Direct",
91             nodes => $conf->{nodes},
92             cxn_pool => 'Sniff',
93             request_timeout => 60
94         );
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticserver index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         my $mappings = {
197             data => scalar _get_elasticsearch_mapping('general', '')
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $all_mappings{$self->index} = $mappings;
236     }
237     $self->sort_fields(\%{$sort_fields{$self->index}});
238
239     return $all_mappings{$self->index};
240 }
241
242 =head2 _get_elasticsearch_mapping
243
244 Get the Elasticsearch mappings for the given purpose and data type.
245
246 $mapping = _get_elasticsearch_mapping('search', 'text');
247
248 =cut
249
250 sub _get_elasticsearch_mapping {
251
252     my ( $purpose, $type ) = @_;
253
254     # Use state to speed up repeated calls
255     state $settings = undef;
256     if (!defined $settings) {
257         my $config_file = C4::Context->config('elasticsearch_field_config');
258         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
259         $settings = LoadFile( $config_file );
260     }
261
262     if (!defined $settings->{$purpose}) {
263         die "Field purpose $purpose not defined in field config";
264     }
265     if ($type eq '') {
266         return $settings->{$purpose};
267     }
268     if (defined $settings->{$purpose}{$type}) {
269         return $settings->{$purpose}{$type};
270     }
271     if (defined $settings->{$purpose}{'default'}) {
272         return $settings->{$purpose}{'default'};
273     }
274     return;
275 }
276
277 sub reset_elasticsearch_mappings {
278     my ( $reset_fields ) = @_;
279     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
280     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
281     my $indexes = LoadFile( $mappings_yaml );
282
283     while ( my ( $index_name, $fields ) = each %$indexes ) {
284         while ( my ( $field_name, $data ) = each %$fields ) {
285             my $field_type = $data->{type};
286             my $field_label = $data->{label};
287             my $mappings = $data->{mappings};
288             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
289             for my $mapping ( @$mappings ) {
290                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
291                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
292             }
293         }
294     }
295 }
296
297 # This overrides the accessor provided by Class::Accessor so that if
298 # sort_fields isn't set, then it'll generate it.
299 sub sort_fields {
300     my $self = shift;
301     if (@_) {
302         $self->_sort_fields_accessor(@_);
303         return;
304     }
305     my $val = $self->_sort_fields_accessor();
306     return $val if $val;
307
308     # This will populate the accessor as a side effect
309     $self->get_elasticsearch_mappings();
310     return $self->_sort_fields_accessor();
311 }
312
313 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
314
315     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
316
317 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
318 Since we group all mappings by MARC field targets C<$mappings> will contain
319 all targets for C<$data> and thus we need to fetch the MARC field only once.
320 C<$mappings> will be applied to C<$record_document> and new field values added.
321 The method has no return value.
322
323 =over 4
324
325 =item C<$mappings>
326
327 Arrayref of mappings containing arrayrefs in the format
328 [C<$target>, C<$options>] where C<$target> is the name of the target field and
329 C<$options> is a hashref containing processing directives for this particular
330 mapping.
331
332 =item C<$data>
333
334 The source data from a MARC record field.
335
336 =item C<$record_document>
337
338 Hashref representing the Elasticsearch document on which mappings should be
339 applied.
340
341 =item C<$altscript>
342
343 A boolean value indicating whether an alternate script presentation is being
344 processed.
345
346 =back
347
348 =cut
349
350 sub _process_mappings {
351     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
352     foreach my $mapping (@{$mappings}) {
353         my ($target, $options) = @{$mapping};
354
355         # Don't process sort fields for alternate scripts
356         my $sort = $target =~ /__sort$/;
357         if ($sort && $altscript) {
358             next;
359         }
360
361         # Copy (scalar) data since can have multiple targets
362         # with differing options for (possibly) mutating data
363         # so need a different copy for each
364         my $_data = $data;
365         $record_document->{$target} //= [];
366         if (defined $options->{substr}) {
367             my ($start, $length) = @{$options->{substr}};
368             $_data = length($data) > $start ? substr $data, $start, $length : '';
369         }
370         if (defined $options->{value_callbacks}) {
371             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
372         }
373         if (defined $options->{property}) {
374             $_data = {
375                 $options->{property} => $_data
376             }
377         }
378         # For sort fields, index only a single field with concatenated values
379         if ($sort && @{$record_document->{$target}}) {
380             @{$record_document->{$target}}[0] .= " $_data";
381         } else {
382             push @{$record_document->{$target}}, $_data;
383         }
384     }
385 }
386
387 =head2 marc_records_to_documents($marc_records)
388
389     my @record_documents = $self->marc_records_to_documents($marc_records);
390
391 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
392
393 Returns array of hash references, representing Elasticsearch documents,
394 acceptable as body payload in C<Search::Elasticsearch> requests.
395
396 =over 4
397
398 =item C<$marc_documents>
399
400 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
401
402 =back
403
404 =cut
405
406 sub marc_records_to_documents {
407     my ($self, $records) = @_;
408     my $rules = $self->_get_marc_mapping_rules();
409     my $control_fields_rules = $rules->{control_fields};
410     my $data_fields_rules = $rules->{data_fields};
411     my $marcflavour = lc C4::Context->preference('marcflavour');
412
413     my @record_documents;
414
415     foreach my $record (@{$records}) {
416         my $record_document = {};
417         my $mappings = $rules->{leader};
418         if ($mappings) {
419             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
420         }
421         foreach my $field ($record->fields()) {
422             if ($field->is_control_field()) {
423                 my $mappings = $control_fields_rules->{$field->tag()};
424                 if ($mappings) {
425                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
426                 }
427             }
428             else {
429                 my $tag = $field->tag();
430                 # Handle alternate scripts in MARC 21
431                 my $altscript = 0;
432                 if ($marcflavour eq 'marc21' && $tag eq '880') {
433                     my $sub6 = $field->subfield('6');
434                     if ($sub6 =~ /^(...)-\d+/) {
435                         $tag = $1;
436                         $altscript = 1;
437                     }
438                 }
439
440                 my $data_field_rules = $data_fields_rules->{$tag};
441
442                 if ($data_field_rules) {
443                     my $subfields_mappings = $data_field_rules->{subfields};
444                     my $wildcard_mappings = $subfields_mappings->{'*'};
445                     foreach my $subfield ($field->subfields()) {
446                         my ($code, $data) = @{$subfield};
447                         my $mappings = $subfields_mappings->{$code} // [];
448                         if ($wildcard_mappings) {
449                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
450                         }
451                         if (@{$mappings}) {
452                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
453                         }
454                     }
455
456                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
457                     if ($subfields_join_mappings) {
458                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
459                             # Map each subfield to values, remove empty values, join with space
460                             my $data = join(
461                                 ' ',
462                                 grep(
463                                     $_,
464                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
465                                 )
466                             );
467                             if ($data) {
468                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
469                             }
470                         }
471                     }
472                 }
473             }
474         }
475         foreach my $field (keys %{$rules->{defaults}}) {
476             unless (defined $record_document->{$field}) {
477                 $record_document->{$field} = $rules->{defaults}->{$field};
478             }
479         }
480         foreach my $field (@{$rules->{sum}}) {
481             if (defined $record_document->{$field}) {
482                 # TODO: validate numeric? filter?
483                 # TODO: Or should only accept fields without nested values?
484                 # TODO: Quick and dirty, improve if needed
485                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
486             }
487         }
488         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
489         foreach my $field (@{$rules->{isbn}}) {
490             if (defined $record_document->{$field}) {
491                 my @isbns = ();
492                 foreach my $input_isbn (@{$record_document->{$field}}) {
493                     my $isbn = Business::ISBN->new($input_isbn);
494                     if (defined $isbn && $isbn->is_valid) {
495                         my $isbn13 = $isbn->as_isbn13->as_string;
496                         push @isbns, $isbn13;
497                         $isbn13 =~ s/\-//g;
498                         push @isbns, $isbn13;
499
500                         my $isbn10 = $isbn->as_isbn10;
501                         if ($isbn10) {
502                             $isbn10 = $isbn10->as_string;
503                             push @isbns, $isbn10;
504                             $isbn10 =~ s/\-//g;
505                             push @isbns, $isbn10;
506                         }
507                     } else {
508                         push @isbns, $input_isbn;
509                     }
510                 }
511                 $record_document->{$field} = \@isbns;
512             }
513         }
514
515         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
516         $record->encoding('UTF-8');
517         my @warnings;
518         {
519             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
520             local $SIG{__WARN__} = sub {
521                 push @warnings, $_[0];
522             };
523             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
524         }
525         if (@warnings) {
526             # Suppress warnings if record length exceeded
527             unless (substr($record->leader(), 0, 5) eq '99999') {
528                 foreach my $warning (@warnings) {
529                     carp $warning;
530                 }
531             }
532             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
533             $record_document->{'marc_format'} = 'MARCXML';
534         }
535         else {
536             $record_document->{'marc_format'} = 'base64ISO2709';
537         }
538         my $id = $record->subfield('999', 'c');
539         push @record_documents, [$id, $record_document];
540     }
541     return \@record_documents;
542 }
543
544 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
545
546     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
547
548 Get mappings, an internal data structure later used by
549 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
550 data for a MARC mapping.
551
552 The returned C<$mappings> is not to to be confused with mappings provided by
553 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
554 provided by C<_foreach_mapping> and expands it to this internal data structure.
555 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
556 is then applied to each MARC target (leader, control field data, subfield or
557 joined subfields) and integrated into the mapping rules data structure used in
558 C<marc_records_to_documents> to transform MARC records into Elasticsearch
559 documents.
560
561 =over 4
562
563 =item C<$facet>
564
565 Boolean indicating whether to create a facet field for this mapping.
566
567 =item C<$suggestible>
568
569 Boolean indicating whether to create a suggestion field for this mapping.
570
571 =item C<$sort>
572
573 Boolean indicating whether to create a sort field for this mapping.
574
575 =item C<$target_name>
576
577 Elasticsearch document target field name.
578
579 =item C<$target_type>
580
581 Elasticsearch document target field type.
582
583 =item C<$range>
584
585 An optional range as a string in the format "<START>-<END>" or "<START>",
586 where "<START>" and "<END>" are integers specifying a range that will be used
587 for extracting a substring from MARC data as Elasticsearch field target value.
588
589 The first character position is "1", and the range is inclusive,
590 so "1-3" means the first three characters of MARC data.
591
592 If only "<START>" is provided only one character at position "<START>" will
593 be extracted.
594
595 =back
596
597 =cut
598
599 sub _field_mappings {
600     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
601     my %mapping_defaults = ();
602     my @mappings;
603
604     my $substr_args = undef;
605     if ($range) {
606         # TODO: use value_callback instead?
607         my ($start, $end) = map(int, split /-/, $range, 2);
608         $substr_args = [$start];
609         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
610     }
611     my $default_options = {};
612     if ($substr_args) {
613         $default_options->{substr} = $substr_args;
614     }
615
616     # TODO: Should probably have per type value callback/hook
617     # but hard code for now
618     if ($target_type eq 'boolean') {
619         $default_options->{value_callbacks} //= [];
620         push @{$default_options->{value_callbacks}}, sub {
621             my ($value) = @_;
622             # Trim whitespace at both ends
623             $value =~ s/^\s+|\s+$//g;
624             return $value ? 'true' : 'false';
625         };
626     }
627
628     my $mapping = [$target_name, $default_options];
629     push @mappings, $mapping;
630
631     my @suffixes = ();
632     push @suffixes, 'facet' if $facet;
633     push @suffixes, 'suggestion' if $suggestible;
634     push @suffixes, 'sort' if !defined $sort || $sort;
635
636     foreach my $suffix (@suffixes) {
637         my $mapping = ["${target_name}__$suffix"];
638         # TODO: Hack, fix later in less hideous manner
639         if ($suffix eq 'suggestion') {
640             push @{$mapping}, {%{$default_options}, property => 'input'};
641         }
642         else {
643             push @{$mapping}, $default_options;
644         }
645         push @mappings, $mapping;
646     }
647     return @mappings;
648 };
649
650 =head2 _get_marc_mapping_rules
651
652     my $mapping_rules = $self->_get_marc_mapping_rules()
653
654 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
655
656 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
657 each call to C<MARC::Record>->field) we create an optimized structure of mapping
658 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
659
660 We can then iterate through all MARC fields for each record and apply all relevant
661 rules once per fields instead of retreiving fields multiple times for each mapping rule
662 which is terribly slow.
663
664 =cut
665
666 # TODO: This structure can be used for processing multiple MARC::Records so is currently
667 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
668 # memory cache which it is currently not. The performance gain of caching
669 # would probably be marginal, but to do this could be a further improvement.
670
671 sub _get_marc_mapping_rules {
672     my ($self) = @_;
673     my $marcflavour = lc C4::Context->preference('marcflavour');
674     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
675     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
676     my $rules = {
677         'leader' => [],
678         'control_fields' => {},
679         'data_fields' => {},
680         'sum' => [],
681         'isbn' => [],
682         'defaults' => {}
683     };
684
685     $self->_foreach_mapping(sub {
686         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
687         return if $marc_type ne $marcflavour;
688
689         if ($type eq 'sum') {
690             push @{$rules->{sum}}, $name;
691         }
692         elsif ($type eq 'isbn') {
693             push @{$rules->{isbn}}, $name;
694         }
695         elsif ($type eq 'boolean') {
696             # boolean gets special handling, if value doesn't exist for a field,
697             # it is set to false
698             $rules->{defaults}->{$name} = 'false';
699         }
700
701         if ($marc_field =~ $field_spec_regexp) {
702             my $field_tag = $1;
703
704             my @subfields;
705             my @subfield_groups;
706             # Parse and separate subfields form subfield groups
707             if (defined $2) {
708                 my $subfield_group = '';
709                 my $open_group = 0;
710
711                 foreach my $token (split //, $2) {
712                     if ($token eq "(") {
713                         if ($open_group) {
714                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
715                                 "Unmatched opening parenthesis for $marc_field"
716                             );
717                         }
718                         else {
719                             $open_group = 1;
720                         }
721                     }
722                     elsif ($token eq ")") {
723                         if ($open_group) {
724                             if ($subfield_group) {
725                                 push @subfield_groups, $subfield_group;
726                                 $subfield_group = '';
727                             }
728                             $open_group = 0;
729                         }
730                         else {
731                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
732                                 "Unmatched closing parenthesis for $marc_field"
733                             );
734                         }
735                     }
736                     elsif ($open_group) {
737                         $subfield_group .= $token;
738                     }
739                     else {
740                         push @subfields, $token;
741                     }
742                 }
743             }
744             else {
745                 push @subfields, '*';
746             }
747
748             my $range = defined $3 ? $3 : undef;
749             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
750
751             if ($field_tag < 10) {
752                 $rules->{control_fields}->{$field_tag} //= [];
753                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
754             }
755             else {
756                 $rules->{data_fields}->{$field_tag} //= {};
757                 foreach my $subfield (@subfields) {
758                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
759                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
760                 }
761                 foreach my $subfield_group (@subfield_groups) {
762                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
763                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
764                 }
765             }
766         }
767         elsif ($marc_field =~ $leader_regexp) {
768             my $range = defined $1 ? $1 : undef;
769             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
770             push @{$rules->{leader}}, @mappings;
771         }
772         else {
773             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
774                 "Invalid MARC field expression: $marc_field"
775             );
776         }
777     });
778     return $rules;
779 }
780
781 =head2 _foreach_mapping
782
783     $self->_foreach_mapping(
784         sub {
785             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
786                 $marc_field )
787               = @_;
788             return unless $marc_type eq 'marc21';
789             print "Data comes from: " . $marc_field . "\n";
790         }
791     );
792
793 This allows you to apply a function to each entry in the elasticsearch mappings
794 table, in order to build the mappings for whatever is needed.
795
796 In the provided function, the files are:
797
798 =over 4
799
800 =item C<$name>
801
802 The field name for elasticsearch (corresponds to the 'mapping' column in the
803 database.
804
805 =item C<$type>
806
807 The type for this value, e.g. 'string'.
808
809 =item C<$facet>
810
811 True if this value should be facetised. This only really makes sense if the
812 field is understood by the facet processing code anyway.
813
814 =item C<$sort>
815
816 True if this is a field that a) needs special sort handling, and b) if it
817 should be sorted on. False if a) but not b). Undef if not a). This allows,
818 for example, author to be sorted on but not everything marked with "author"
819 to be included in that sort.
820
821 =item C<$marc_type>
822
823 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
824 'unimarc', 'normarc'.
825
826 =item C<$marc_field>
827
828 A string that describes the MARC field that contains the data to extract.
829 These are of a form suited to Catmandu's MARC fixers.
830
831 =back
832
833 =cut
834
835 sub _foreach_mapping {
836     my ( $self, $sub ) = @_;
837
838     # TODO use a caching framework here
839     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
840         {
841             'search_marc_map.index_name' => $self->index,
842         },
843         {   join => { search_marc_to_fields => 'search_marc_map' },
844             '+select' => [
845                 'search_marc_to_fields.facet',
846                 'search_marc_to_fields.suggestible',
847                 'search_marc_to_fields.sort',
848                 'search_marc_map.marc_type',
849                 'search_marc_map.marc_field',
850             ],
851             '+as'     => [
852                 'facet',
853                 'suggestible',
854                 'sort',
855                 'marc_type',
856                 'marc_field',
857             ],
858         }
859     );
860
861     while ( my $search_field = $search_fields->next ) {
862         $sub->(
863             $search_field->name,
864             $search_field->type,
865             $search_field->get_column('facet'),
866             $search_field->get_column('suggestible'),
867             $search_field->get_column('sort'),
868             $search_field->get_column('marc_type'),
869             $search_field->get_column('marc_field'),
870         );
871     }
872 }
873
874 =head2 process_error
875
876     die process_error($@);
877
878 This parses an Elasticsearch error message and produces a human-readable
879 result from it. This result is probably missing all the useful information
880 that you might want in diagnosing an issue, so the warning is also logged.
881
882 Note that currently the resulting message is not internationalised. This
883 will happen eventually by some method or other.
884
885 =cut
886
887 sub process_error {
888     my ($self, $msg) = @_;
889
890     warn $msg; # simple logging
891
892     # This is super-primitive
893     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
894
895     return "Unable to perform your search. Please try again.\n";
896 }
897
898 =head2 _read_configuration
899
900     my $conf = _read_configuration();
901
902 Reads the I<configuration file> and returns a hash structure with the
903 configuration information. It raises an exception if mandatory entries
904 are missing.
905
906 The hashref structure has the following form:
907
908     {
909         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
910         'index_name' => 'koha_instance',
911     }
912
913 This is configured by the following in the C<config> block in koha-conf.xml:
914
915     <elasticsearch>
916         <server>127.0.0.1:9200</server>
917         <server>anotherserver:9200</server>
918         <index_name>koha_instance</index_name>
919     </elasticsearch>
920
921 =cut
922
923 sub _read_configuration {
924
925     my $configuration;
926
927     my $conf = C4::Context->config('elasticsearch');
928     Koha::Exceptions::Config::MissingEntry->throw(
929         "Missing 'elasticsearch' block in config file")
930       unless defined $conf;
931
932     if ( $conf && $conf->{server} ) {
933         my $nodes = $conf->{server};
934         if ( ref($nodes) eq 'ARRAY' ) {
935             $configuration->{nodes} = $nodes;
936         }
937         else {
938             $configuration->{nodes} = [$nodes];
939         }
940     }
941     else {
942         Koha::Exceptions::Config::MissingEntry->throw(
943             "Missing 'server' entry in config file for elasticsearch");
944     }
945
946     if ( defined $conf->{index_name} ) {
947         $configuration->{index_name} = $conf->{index_name};
948     }
949     else {
950         Koha::Exceptions::Config::MissingEntry->throw(
951             "Missing 'index_name' entry in config file for elasticsearch");
952     }
953
954     return $configuration;
955 }
956
957 1;
958
959 __END__
960
961 =head1 AUTHOR
962
963 =over 4
964
965 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
966
967 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
968
969 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
970
971 =back
972
973 =cut