Bug 20244: Improve Elasticsearch ISBN indexing
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new(
90             client => "5_0::Direct",
91             nodes => $conf->{nodes},
92             cxn_pool => 'Sniff',
93             request_timeout => 60
94         );
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticserver index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         my $mappings = {
197             data => scalar _get_elasticsearch_mapping('general', '')
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $all_mappings{$self->index} = $mappings;
236     }
237     $self->sort_fields(\%{$sort_fields{$self->index}});
238
239     return $all_mappings{$self->index};
240 }
241
242 =head2 _get_elasticsearch_mapping
243
244 Get the Elasticsearch mappings for the given purpose and data type.
245
246 $mapping = _get_elasticsearch_mapping('search', 'text');
247
248 =cut
249
250 sub _get_elasticsearch_mapping {
251
252     my ( $purpose, $type ) = @_;
253
254     # Use state to speed up repeated calls
255     state $settings = undef;
256     if (!defined $settings) {
257         my $config_file = C4::Context->config('elasticsearch_field_config');
258         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
259         $settings = LoadFile( $config_file );
260     }
261
262     if (!defined $settings->{$purpose}) {
263         die "Field purpose $purpose not defined in field config";
264     }
265     if ($type eq '') {
266         return $settings->{$purpose};
267     }
268     if (defined $settings->{$purpose}{$type}) {
269         return $settings->{$purpose}{$type};
270     }
271     if (defined $settings->{$purpose}{'default'}) {
272         return $settings->{$purpose}{'default'};
273     }
274     return;
275 }
276
277 sub reset_elasticsearch_mappings {
278     my ( $reset_fields ) = @_;
279     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
280     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
281     my $indexes = LoadFile( $mappings_yaml );
282
283     while ( my ( $index_name, $fields ) = each %$indexes ) {
284         while ( my ( $field_name, $data ) = each %$fields ) {
285             my $field_type = $data->{type};
286             my $field_label = $data->{label};
287             my $mappings = $data->{mappings};
288             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
289             for my $mapping ( @$mappings ) {
290                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
291                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
292             }
293         }
294     }
295 }
296
297 # This overrides the accessor provided by Class::Accessor so that if
298 # sort_fields isn't set, then it'll generate it.
299 sub sort_fields {
300     my $self = shift;
301     if (@_) {
302         $self->_sort_fields_accessor(@_);
303         return;
304     }
305     my $val = $self->_sort_fields_accessor();
306     return $val if $val;
307
308     # This will populate the accessor as a side effect
309     $self->get_elasticsearch_mappings();
310     return $self->_sort_fields_accessor();
311 }
312
313 =head2 _process_mappings($mappings, $data, $record_document)
314
315     $self->_process_mappings($mappings, $marc_field_data, $record_document)
316
317 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
318 Since we group all mappings by MARC field targets C<$mappings> will contain
319 all targets for C<$data> and thus we need to fetch the MARC field only once.
320 C<$mappings> will be applied to C<$record_document> and new field values added.
321 The method has no return value.
322
323 =over 4
324
325 =item C<$mappings>
326
327 Arrayref of mappings containing arrayrefs in the format
328 [C<$taget>, C<$options>] where C<$target> is the name of the target field and
329 C<$options> is a hashref containing processing directives for this particular
330 mapping.
331
332 =item C<$data>
333
334 The source data from a MARC record field.
335
336 =item C<$record_document>
337
338 Hashref representing the Elasticsearch document on which mappings should be
339 applied.
340
341 =back
342
343 =cut
344
345 sub _process_mappings {
346     my ($_self, $mappings, $data, $record_document) = @_;
347     foreach my $mapping (@{$mappings}) {
348         my ($target, $options) = @{$mapping};
349         # Copy (scalar) data since can have multiple targets
350         # with differing options for (possibly) mutating data
351         # so need a different copy for each
352         my $_data = $data;
353         $record_document->{$target} //= [];
354         if (defined $options->{substr}) {
355             my ($start, $length) = @{$options->{substr}};
356             $_data = length($data) > $start ? substr $data, $start, $length : '';
357         }
358         if (defined $options->{value_callbacks}) {
359             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
360         }
361         if (defined $options->{property}) {
362             $_data = {
363                 $options->{property} => $_data
364             }
365         }
366         push @{$record_document->{$target}}, $_data;
367     }
368 }
369
370 =head2 marc_records_to_documents($marc_records)
371
372     my @record_documents = $self->marc_records_to_documents($marc_records);
373
374 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
375
376 Returns array of hash references, representing Elasticsearch documents,
377 acceptable as body payload in C<Search::Elasticsearch> requests.
378
379 =over 4
380
381 =item C<$marc_documents>
382
383 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
384
385 =back
386
387 =cut
388
389 sub marc_records_to_documents {
390     my ($self, $records) = @_;
391     my $rules = $self->_get_marc_mapping_rules();
392     my $control_fields_rules = $rules->{control_fields};
393     my $data_fields_rules = $rules->{data_fields};
394     my $marcflavour = lc C4::Context->preference('marcflavour');
395
396     my @record_documents;
397
398     foreach my $record (@{$records}) {
399         my $record_document = {};
400         my $mappings = $rules->{leader};
401         if ($mappings) {
402             $self->_process_mappings($mappings, $record->leader(), $record_document);
403         }
404         foreach my $field ($record->fields()) {
405             if($field->is_control_field()) {
406                 my $mappings = $control_fields_rules->{$field->tag()};
407                 if ($mappings) {
408                     $self->_process_mappings($mappings, $field->data(), $record_document);
409                 }
410             }
411             else {
412                 my $data_field_rules = $data_fields_rules->{$field->tag()};
413
414                 if ($data_field_rules) {
415                     my $subfields_mappings = $data_field_rules->{subfields};
416                     my $wildcard_mappings = $subfields_mappings->{'*'};
417                     foreach my $subfield ($field->subfields()) {
418                         my ($code, $data) = @{$subfield};
419                         my $mappings = $subfields_mappings->{$code} // [];
420                         if ($wildcard_mappings) {
421                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
422                         }
423                         if (@{$mappings}) {
424                             $self->_process_mappings($mappings, $data, $record_document);
425                         }
426                     }
427
428                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
429                     if ($subfields_join_mappings) {
430                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
431                             # Map each subfield to values, remove empty values, join with space
432                             my $data = join(
433                                 ' ',
434                                 grep(
435                                     $_,
436                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
437                                 )
438                             );
439                             if ($data) {
440                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document);
441                             }
442                         }
443                     }
444                 }
445             }
446         }
447         foreach my $field (keys %{$rules->{defaults}}) {
448             unless (defined $record_document->{$field}) {
449                 $record_document->{$field} = $rules->{defaults}->{$field};
450             }
451         }
452         foreach my $field (@{$rules->{sum}}) {
453             if (defined $record_document->{$field}) {
454                 # TODO: validate numeric? filter?
455                 # TODO: Or should only accept fields without nested values?
456                 # TODO: Quick and dirty, improve if needed
457                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
458             }
459         }
460         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
461         foreach my $field (@{$rules->{isbn}}) {
462             if (defined $record_document->{$field}) {
463                 my @isbns = ();
464                 foreach my $input_isbn (@{$record_document->{$field}}) {
465                     my $isbn = Business::ISBN->new($input_isbn);
466                     if (defined $isbn && $isbn->is_valid) {
467                         my $isbn13 = $isbn->as_isbn13->as_string;
468                         push @isbns, $isbn13;
469                         $isbn13 =~ s/\-//g;
470                         push @isbns, $isbn13;
471
472                         my $isbn10 = $isbn->as_isbn10;
473                         if ($isbn10) {
474                             $isbn10 = $isbn10->as_string;
475                             push @isbns, $isbn10;
476                             $isbn10 =~ s/\-//g;
477                             push @isbns, $isbn10;
478                         }
479                     } else {
480                         push @isbns, $input_isbn;
481                     }
482                 }
483                 $record_document->{$field} = \@isbns;
484             }
485         }
486
487         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
488         $record->encoding('UTF-8');
489         my @warnings;
490         {
491             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
492             local $SIG{__WARN__} = sub {
493                 push @warnings, $_[0];
494             };
495             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
496         }
497         if (@warnings) {
498             # Suppress warnings if record length exceeded
499             unless (substr($record->leader(), 0, 5) eq '99999') {
500                 foreach my $warning (@warnings) {
501                     carp $warning;
502                 }
503             }
504             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
505             $record_document->{'marc_format'} = 'MARCXML';
506         }
507         else {
508             $record_document->{'marc_format'} = 'base64ISO2709';
509         }
510         my $id = $record->subfield('999', 'c');
511         push @record_documents, [$id, $record_document];
512     }
513     return \@record_documents;
514 }
515
516 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
517
518     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
519
520 Get mappings, an internal data structure later used by
521 L<_process_mappings($mappings, $data, $record_document)> to process MARC target
522 data for a MARC mapping.
523
524 The returned C<$mappings> is not to to be confused with mappings provided by
525 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
526 provided by C<_foreach_mapping> and expands it to this internal data structure.
527 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
528 is then applied to each MARC target (leader, control field data, subfield or
529 joined subfields) and integrated into the mapping rules data structure used in
530 C<marc_records_to_documents> to transform MARC records into Elasticsearch
531 documents.
532
533 =over 4
534
535 =item C<$facet>
536
537 Boolean indicating whether to create a facet field for this mapping.
538
539 =item C<$suggestible>
540
541 Boolean indicating whether to create a suggestion field for this mapping.
542
543 =item C<$sort>
544
545 Boolean indicating whether to create a sort field for this mapping.
546
547 =item C<$target_name>
548
549 Elasticsearch document target field name.
550
551 =item C<$target_type>
552
553 Elasticsearch document target field type.
554
555 =item C<$range>
556
557 An optional range as a string in the format "<START>-<END>" or "<START>",
558 where "<START>" and "<END>" are integers specifying a range that will be used
559 for extracting a substring from MARC data as Elasticsearch field target value.
560
561 The first character position is "1", and the range is inclusive,
562 so "1-3" means the first three characters of MARC data.
563
564 If only "<START>" is provided only one character at position "<START>" will
565 be extracted.
566
567 =back
568
569 =cut
570
571 sub _field_mappings {
572     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
573     my %mapping_defaults = ();
574     my @mappings;
575
576     my $substr_args = undef;
577     if ($range) {
578         # TODO: use value_callback instead?
579         my ($start, $end) = map(int, split /-/, $range, 2);
580         $substr_args = [$start];
581         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
582     }
583     my $default_options = {};
584     if ($substr_args) {
585         $default_options->{substr} = $substr_args;
586     }
587
588     # TODO: Should probably have per type value callback/hook
589     # but hard code for now
590     if ($target_type eq 'boolean') {
591         $default_options->{value_callbacks} //= [];
592         push @{$default_options->{value_callbacks}}, sub {
593             my ($value) = @_;
594             # Trim whitespace at both ends
595             $value =~ s/^\s+|\s+$//g;
596             return $value ? 'true' : 'false';
597         };
598     }
599
600     my $mapping = [$target_name, $default_options];
601     push @mappings, $mapping;
602
603     my @suffixes = ();
604     push @suffixes, 'facet' if $facet;
605     push @suffixes, 'suggestion' if $suggestible;
606     push @suffixes, 'sort' if !defined $sort || $sort;
607
608     foreach my $suffix (@suffixes) {
609         my $mapping = ["${target_name}__$suffix"];
610         # TODO: Hack, fix later in less hideous manner
611         if ($suffix eq 'suggestion') {
612             push @{$mapping}, {%{$default_options}, property => 'input'};
613         }
614         else {
615             push @{$mapping}, $default_options;
616         }
617         push @mappings, $mapping;
618     }
619     return @mappings;
620 };
621
622 =head2 _get_marc_mapping_rules
623
624     my $mapping_rules = $self->_get_marc_mapping_rules()
625
626 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
627
628 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
629 each call to C<MARC::Record>->field) we create an optimized structure of mapping
630 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
631
632 We can then iterate through all MARC fields for each record and apply all relevant
633 rules once per fields instead of retreiving fields multiple times for each mapping rule
634 which is terribly slow.
635
636 =cut
637
638 # TODO: This structure can be used for processing multiple MARC::Records so is currently
639 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
640 # memory cache which it is currently not. The performance gain of caching
641 # would probably be marginal, but to do this could be a further improvement.
642
643 sub _get_marc_mapping_rules {
644     my ($self) = @_;
645     my $marcflavour = lc C4::Context->preference('marcflavour');
646     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
647     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
648     my $rules = {
649         'leader' => [],
650         'control_fields' => {},
651         'data_fields' => {},
652         'sum' => [],
653         'isbn' => [],
654         'defaults' => {}
655     };
656
657     $self->_foreach_mapping(sub {
658         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
659         return if $marc_type ne $marcflavour;
660
661         if ($type eq 'sum') {
662             push @{$rules->{sum}}, $name;
663         }
664         elsif ($type eq 'isbn') {
665             push @{$rules->{isbn}}, $name;
666         }
667         elsif ($type eq 'boolean') {
668             # boolean gets special handling, if value doesn't exist for a field,
669             # it is set to false
670             $rules->{defaults}->{$name} = 'false';
671         }
672
673         if ($marc_field =~ $field_spec_regexp) {
674             my $field_tag = $1;
675
676             my @subfields;
677             my @subfield_groups;
678             # Parse and separate subfields form subfield groups
679             if (defined $2) {
680                 my $subfield_group = '';
681                 my $open_group = 0;
682
683                 foreach my $token (split //, $2) {
684                     if ($token eq "(") {
685                         if ($open_group) {
686                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
687                                 "Unmatched opening parenthesis for $marc_field"
688                             );
689                         }
690                         else {
691                             $open_group = 1;
692                         }
693                     }
694                     elsif ($token eq ")") {
695                         if ($open_group) {
696                             if ($subfield_group) {
697                                 push @subfield_groups, $subfield_group;
698                                 $subfield_group = '';
699                             }
700                             $open_group = 0;
701                         }
702                         else {
703                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
704                                 "Unmatched closing parenthesis for $marc_field"
705                             );
706                         }
707                     }
708                     elsif ($open_group) {
709                         $subfield_group .= $token;
710                     }
711                     else {
712                         push @subfields, $token;
713                     }
714                 }
715             }
716             else {
717                 push @subfields, '*';
718             }
719
720             my $range = defined $3 ? $3 : undef;
721             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
722
723             if ($field_tag < 10) {
724                 $rules->{control_fields}->{$field_tag} //= [];
725                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
726             }
727             else {
728                 $rules->{data_fields}->{$field_tag} //= {};
729                 foreach my $subfield (@subfields) {
730                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
731                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
732                 }
733                 foreach my $subfield_group (@subfield_groups) {
734                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
735                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
736                 }
737             }
738         }
739         elsif ($marc_field =~ $leader_regexp) {
740             my $range = defined $1 ? $1 : undef;
741             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
742             push @{$rules->{leader}}, @mappings;
743         }
744         else {
745             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
746                 "Invalid MARC field expression: $marc_field"
747             );
748         }
749     });
750     return $rules;
751 }
752
753 =head2 _foreach_mapping
754
755     $self->_foreach_mapping(
756         sub {
757             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
758                 $marc_field )
759               = @_;
760             return unless $marc_type eq 'marc21';
761             print "Data comes from: " . $marc_field . "\n";
762         }
763     );
764
765 This allows you to apply a function to each entry in the elasticsearch mappings
766 table, in order to build the mappings for whatever is needed.
767
768 In the provided function, the files are:
769
770 =over 4
771
772 =item C<$name>
773
774 The field name for elasticsearch (corresponds to the 'mapping' column in the
775 database.
776
777 =item C<$type>
778
779 The type for this value, e.g. 'string'.
780
781 =item C<$facet>
782
783 True if this value should be facetised. This only really makes sense if the
784 field is understood by the facet processing code anyway.
785
786 =item C<$sort>
787
788 True if this is a field that a) needs special sort handling, and b) if it
789 should be sorted on. False if a) but not b). Undef if not a). This allows,
790 for example, author to be sorted on but not everything marked with "author"
791 to be included in that sort.
792
793 =item C<$marc_type>
794
795 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
796 'unimarc', 'normarc'.
797
798 =item C<$marc_field>
799
800 A string that describes the MARC field that contains the data to extract.
801 These are of a form suited to Catmandu's MARC fixers.
802
803 =back
804
805 =cut
806
807 sub _foreach_mapping {
808     my ( $self, $sub ) = @_;
809
810     # TODO use a caching framework here
811     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
812         {
813             'search_marc_map.index_name' => $self->index,
814         },
815         {   join => { search_marc_to_fields => 'search_marc_map' },
816             '+select' => [
817                 'search_marc_to_fields.facet',
818                 'search_marc_to_fields.suggestible',
819                 'search_marc_to_fields.sort',
820                 'search_marc_map.marc_type',
821                 'search_marc_map.marc_field',
822             ],
823             '+as'     => [
824                 'facet',
825                 'suggestible',
826                 'sort',
827                 'marc_type',
828                 'marc_field',
829             ],
830         }
831     );
832
833     while ( my $search_field = $search_fields->next ) {
834         $sub->(
835             $search_field->name,
836             $search_field->type,
837             $search_field->get_column('facet'),
838             $search_field->get_column('suggestible'),
839             $search_field->get_column('sort'),
840             $search_field->get_column('marc_type'),
841             $search_field->get_column('marc_field'),
842         );
843     }
844 }
845
846 =head2 process_error
847
848     die process_error($@);
849
850 This parses an Elasticsearch error message and produces a human-readable
851 result from it. This result is probably missing all the useful information
852 that you might want in diagnosing an issue, so the warning is also logged.
853
854 Note that currently the resulting message is not internationalised. This
855 will happen eventually by some method or other.
856
857 =cut
858
859 sub process_error {
860     my ($self, $msg) = @_;
861
862     warn $msg; # simple logging
863
864     # This is super-primitive
865     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
866
867     return "Unable to perform your search. Please try again.\n";
868 }
869
870 =head2 _read_configuration
871
872     my $conf = _read_configuration();
873
874 Reads the I<configuration file> and returns a hash structure with the
875 configuration information. It raises an exception if mandatory entries
876 are missing.
877
878 The hashref structure has the following form:
879
880     {
881         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
882         'index_name' => 'koha_instance',
883     }
884
885 This is configured by the following in the C<config> block in koha-conf.xml:
886
887     <elasticsearch>
888         <server>127.0.0.1:9200</server>
889         <server>anotherserver:9200</server>
890         <index_name>koha_instance</index_name>
891     </elasticsearch>
892
893 =cut
894
895 sub _read_configuration {
896
897     my $configuration;
898
899     my $conf = C4::Context->config('elasticsearch');
900     Koha::Exceptions::Config::MissingEntry->throw(
901         "Missing 'elasticsearch' block in config file")
902       unless defined $conf;
903
904     if ( $conf && $conf->{server} ) {
905         my $nodes = $conf->{server};
906         if ( ref($nodes) eq 'ARRAY' ) {
907             $configuration->{nodes} = $nodes;
908         }
909         else {
910             $configuration->{nodes} = [$nodes];
911         }
912     }
913     else {
914         Koha::Exceptions::Config::MissingEntry->throw(
915             "Missing 'server' entry in config file for elasticsearch");
916     }
917
918     if ( defined $conf->{index_name} ) {
919         $configuration->{index_name} = $conf->{index_name};
920     }
921     else {
922         Koha::Exceptions::Config::MissingEntry->throw(
923             "Missing 'index_name' entry in config file for elasticsearch");
924     }
925
926     return $configuration;
927 }
928
929 1;
930
931 __END__
932
933 =head1 AUTHOR
934
935 =over 4
936
937 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
938
939 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
940
941 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
942
943 =back
944
945 =cut