Bug 18235: (RM follow-up) fix mappings reset
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
90     }
91     return $self->{elasticsearch};
92 }
93
94 =head2 get_elasticsearch_params
95
96     my $params = $self->get_elasticsearch_params();
97
98 This provides a hashref that contains the parameters for connecting to the
99 ElasicSearch servers, in the form:
100
101     {
102         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
103         'index_name' => 'koha_instance_index',
104     }
105
106 This is configured by the following in the C<config> block in koha-conf.xml:
107
108     <elasticsearch>
109         <server>127.0.0.1:9200</server>
110         <server>anotherserver:9200</server>
111         <index_name>koha_instance</index_name>
112     </elasticsearch>
113
114 =cut
115
116 sub get_elasticsearch_params {
117     my ($self) = @_;
118
119     # Copy the hash so that we're not modifying the original
120     my $conf = C4::Context->config('elasticsearch');
121     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
122     my $es = { %{ $conf } };
123
124     # Helpfully, the multiple server lines end up in an array for us anyway
125     # if there are multiple ones, but not if there's only one.
126     my $server = $es->{server};
127     delete $es->{server};
128     if ( ref($server) eq 'ARRAY' ) {
129
130         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
131         $es->{nodes} = $server;
132     }
133     elsif ($server) {
134         $es->{nodes} = [$server];
135     }
136     else {
137         die "No elasticsearch servers were specified in koha-conf.xml.\n";
138     }
139     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
140       if ( !$es->{index_name} );
141     # Append the name of this particular index to our namespace
142     $es->{index_name} .= '_' . $self->index;
143
144     $es->{key_prefix} = 'es_';
145     $es->{client} //= '5_0::Direct';
146     $es->{cxn_pool} //= 'Sniff';
147     $es->{request_timeout} //= 60;
148
149     return $es;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         my $mappings = {
196             data => scalar _get_elasticsearch_mapping('general', '')
197         };
198         my $marcflavour = lc C4::Context->preference('marcflavour');
199         $self->_foreach_mapping(
200             sub {
201                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
202                 return if $marc_type ne $marcflavour;
203                 # TODO if this gets any sort of complexity to it, it should
204                 # be broken out into its own function.
205
206                 # TODO be aware of date formats, but this requires pre-parsing
207                 # as ES will simply reject anything with an invalid date.
208                 my $es_type = 'text';
209                 if ($type eq 'boolean') {
210                     $es_type = 'boolean';
211                 } elsif ($type eq 'number' || $type eq 'sum') {
212                     $es_type = 'integer';
213                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
214                     $es_type = 'stdno';
215                 }
216
217                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
218
219                 if ($facet) {
220                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
221                 }
222                 if ($suggestible) {
223                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
224                 }
225                 # Sort is a bit special as it can be true, false, undef.
226                 # We care about "true" or "undef",
227                 # "undef" means to do the default thing, which is make it sortable.
228                 if (!defined $sort || $sort) {
229                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
230                     $sort_fields{$self->index}{$name} = 1;
231                 }
232             }
233         );
234         $all_mappings{$self->index} = $mappings;
235     }
236     $self->sort_fields(\%{$sort_fields{$self->index}});
237
238     return $all_mappings{$self->index};
239 }
240
241 =head2 _get_elasticsearch_mapping
242
243 Get the Elasticsearch mappings for the given purpose and data type.
244
245 $mapping = _get_elasticsearch_mapping('search', 'text');
246
247 =cut
248
249 sub _get_elasticsearch_mapping {
250
251     my ( $purpose, $type ) = @_;
252
253     # Use state to speed up repeated calls
254     state $settings = undef;
255     if (!defined $settings) {
256         my $config_file = C4::Context->config('elasticsearch_field_config');
257         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
258         $settings = LoadFile( $config_file );
259     }
260
261     if (!defined $settings->{$purpose}) {
262         die "Field purpose $purpose not defined in field config";
263     }
264     if ($type eq '') {
265         return $settings->{$purpose};
266     }
267     if (defined $settings->{$purpose}{$type}) {
268         return $settings->{$purpose}{$type};
269     }
270     if (defined $settings->{$purpose}{'default'}) {
271         return $settings->{$purpose}{'default'};
272     }
273     return;
274 }
275
276 sub reset_elasticsearch_mappings {
277     my ( $reset_fields ) = @_;
278     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
279     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
280     my $indexes = LoadFile( $mappings_yaml );
281
282     while ( my ( $index_name, $fields ) = each %$indexes ) {
283         while ( my ( $field_name, $data ) = each %$fields ) {
284             my $field_type = $data->{type};
285             my $field_label = $data->{label};
286             my $mappings = $data->{mappings};
287             my $facet_order = $data->{facet_order};
288             my $search_field = Koha::SearchFields->find_or_create({ 
289                 name  => $field_name,
290                 label => $field_label,
291                 type  => $field_type,
292             },
293             {
294                 key => 'name'
295             });
296             $search_field->update(
297                 {
298                     facet_order => $facet_order
299                 }
300             );
301             for my $mapping ( @$mappings ) {
302                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
303                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
304             }
305         }
306     }
307 }
308
309 # This overrides the accessor provided by Class::Accessor so that if
310 # sort_fields isn't set, then it'll generate it.
311 sub sort_fields {
312     my $self = shift;
313     if (@_) {
314         $self->_sort_fields_accessor(@_);
315         return;
316     }
317     my $val = $self->_sort_fields_accessor();
318     return $val if $val;
319
320     # This will populate the accessor as a side effect
321     $self->get_elasticsearch_mappings();
322     return $self->_sort_fields_accessor();
323 }
324
325 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
326
327     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
328
329 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
330 Since we group all mappings by MARC field targets C<$mappings> will contain
331 all targets for C<$data> and thus we need to fetch the MARC field only once.
332 C<$mappings> will be applied to C<$record_document> and new field values added.
333 The method has no return value.
334
335 =over 4
336
337 =item C<$mappings>
338
339 Arrayref of mappings containing arrayrefs in the format
340 [C<$target>, C<$options>] where C<$target> is the name of the target field and
341 C<$options> is a hashref containing processing directives for this particular
342 mapping.
343
344 =item C<$data>
345
346 The source data from a MARC record field.
347
348 =item C<$record_document>
349
350 Hashref representing the Elasticsearch document on which mappings should be
351 applied.
352
353 =item C<$altscript>
354
355 A boolean value indicating whether an alternate script presentation is being
356 processed.
357
358 =back
359
360 =cut
361
362 sub _process_mappings {
363     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
364     foreach my $mapping (@{$mappings}) {
365         my ($target, $options) = @{$mapping};
366
367         # Don't process sort fields for alternate scripts
368         my $sort = $target =~ /__sort$/;
369         if ($sort && $altscript) {
370             next;
371         }
372
373         # Copy (scalar) data since can have multiple targets
374         # with differing options for (possibly) mutating data
375         # so need a different copy for each
376         my $_data = $data;
377         $record_document->{$target} //= [];
378         if (defined $options->{substr}) {
379             my ($start, $length) = @{$options->{substr}};
380             $_data = length($data) > $start ? substr $data, $start, $length : '';
381         }
382         if (defined $options->{value_callbacks}) {
383             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
384         }
385         if (defined $options->{property}) {
386             $_data = {
387                 $options->{property} => $_data
388             }
389         }
390         push @{$record_document->{$target}}, $_data;
391     }
392 }
393
394 =head2 marc_records_to_documents($marc_records)
395
396     my @record_documents = $self->marc_records_to_documents($marc_records);
397
398 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
399
400 Returns array of hash references, representing Elasticsearch documents,
401 acceptable as body payload in C<Search::Elasticsearch> requests.
402
403 =over 4
404
405 =item C<$marc_documents>
406
407 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
408
409 =back
410
411 =cut
412
413 sub marc_records_to_documents {
414     my ($self, $records) = @_;
415     my $rules = $self->_get_marc_mapping_rules();
416     my $control_fields_rules = $rules->{control_fields};
417     my $data_fields_rules = $rules->{data_fields};
418     my $marcflavour = lc C4::Context->preference('marcflavour');
419
420     my @record_documents;
421
422     foreach my $record (@{$records}) {
423         my $record_document = {};
424         my $mappings = $rules->{leader};
425         if ($mappings) {
426             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
427         }
428         foreach my $field ($record->fields()) {
429             if ($field->is_control_field()) {
430                 my $mappings = $control_fields_rules->{$field->tag()};
431                 if ($mappings) {
432                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
433                 }
434             }
435             else {
436                 my $tag = $field->tag();
437                 # Handle alternate scripts in MARC 21
438                 my $altscript = 0;
439                 if ($marcflavour eq 'marc21' && $tag eq '880') {
440                     my $sub6 = $field->subfield('6');
441                     if ($sub6 =~ /^(...)-\d+/) {
442                         $tag = $1;
443                         $altscript = 1;
444                     }
445                 }
446
447                 my $data_field_rules = $data_fields_rules->{$tag};
448
449                 if ($data_field_rules) {
450                     my $subfields_mappings = $data_field_rules->{subfields};
451                     my $wildcard_mappings = $subfields_mappings->{'*'};
452                     foreach my $subfield ($field->subfields()) {
453                         my ($code, $data) = @{$subfield};
454                         my $mappings = $subfields_mappings->{$code} // [];
455                         if ($wildcard_mappings) {
456                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
457                         }
458                         if (@{$mappings}) {
459                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
460                         }
461                     }
462
463                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
464                     if ($subfields_join_mappings) {
465                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
466                             # Map each subfield to values, remove empty values, join with space
467                             my $data = join(
468                                 ' ',
469                                 grep(
470                                     $_,
471                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
472                                 )
473                             );
474                             if ($data) {
475                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
476                             }
477                         }
478                     }
479                 }
480             }
481         }
482         foreach my $field (keys %{$rules->{defaults}}) {
483             unless (defined $record_document->{$field}) {
484                 $record_document->{$field} = $rules->{defaults}->{$field};
485             }
486         }
487         foreach my $field (@{$rules->{sum}}) {
488             if (defined $record_document->{$field}) {
489                 # TODO: validate numeric? filter?
490                 # TODO: Or should only accept fields without nested values?
491                 # TODO: Quick and dirty, improve if needed
492                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
493             }
494         }
495         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
496         foreach my $field (@{$rules->{isbn}}) {
497             if (defined $record_document->{$field}) {
498                 my @isbns = ();
499                 foreach my $input_isbn (@{$record_document->{$field}}) {
500                     my $isbn = Business::ISBN->new($input_isbn);
501                     if (defined $isbn && $isbn->is_valid) {
502                         my $isbn13 = $isbn->as_isbn13->as_string;
503                         push @isbns, $isbn13;
504                         $isbn13 =~ s/\-//g;
505                         push @isbns, $isbn13;
506
507                         my $isbn10 = $isbn->as_isbn10;
508                         if ($isbn10) {
509                             $isbn10 = $isbn10->as_string;
510                             push @isbns, $isbn10;
511                             $isbn10 =~ s/\-//g;
512                             push @isbns, $isbn10;
513                         }
514                     } else {
515                         push @isbns, $input_isbn;
516                     }
517                 }
518                 $record_document->{$field} = \@isbns;
519             }
520         }
521
522         # Remove duplicate values and collapse sort fields
523         foreach my $field (keys %{$record_document}) {
524             if (ref($record_document->{$field}) eq 'ARRAY') {
525                 @{$record_document->{$field}} = do {
526                     my %seen;
527                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
528                 };
529                 if ($field =~ /__sort$/) {
530                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
531                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
532                 }
533             }
534         }
535
536         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
537         $record->encoding('UTF-8');
538         my @warnings;
539         {
540             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
541             local $SIG{__WARN__} = sub {
542                 push @warnings, $_[0];
543             };
544             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
545         }
546         if (@warnings) {
547             # Suppress warnings if record length exceeded
548             unless (substr($record->leader(), 0, 5) eq '99999') {
549                 foreach my $warning (@warnings) {
550                     carp $warning;
551                 }
552             }
553             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
554             $record_document->{'marc_format'} = 'MARCXML';
555         }
556         else {
557             $record_document->{'marc_format'} = 'base64ISO2709';
558         }
559         my $id = $record->subfield('999', 'c');
560         push @record_documents, [$id, $record_document];
561     }
562     return \@record_documents;
563 }
564
565 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
566
567     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
568
569 Get mappings, an internal data structure later used by
570 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
571 data for a MARC mapping.
572
573 The returned C<$mappings> is not to to be confused with mappings provided by
574 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
575 provided by C<_foreach_mapping> and expands it to this internal data structure.
576 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
577 is then applied to each MARC target (leader, control field data, subfield or
578 joined subfields) and integrated into the mapping rules data structure used in
579 C<marc_records_to_documents> to transform MARC records into Elasticsearch
580 documents.
581
582 =over 4
583
584 =item C<$facet>
585
586 Boolean indicating whether to create a facet field for this mapping.
587
588 =item C<$suggestible>
589
590 Boolean indicating whether to create a suggestion field for this mapping.
591
592 =item C<$sort>
593
594 Boolean indicating whether to create a sort field for this mapping.
595
596 =item C<$target_name>
597
598 Elasticsearch document target field name.
599
600 =item C<$target_type>
601
602 Elasticsearch document target field type.
603
604 =item C<$range>
605
606 An optional range as a string in the format "<START>-<END>" or "<START>",
607 where "<START>" and "<END>" are integers specifying a range that will be used
608 for extracting a substring from MARC data as Elasticsearch field target value.
609
610 The first character position is "1", and the range is inclusive,
611 so "1-3" means the first three characters of MARC data.
612
613 If only "<START>" is provided only one character at position "<START>" will
614 be extracted.
615
616 =back
617
618 =cut
619
620 sub _field_mappings {
621     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
622     my %mapping_defaults = ();
623     my @mappings;
624
625     my $substr_args = undef;
626     if ($range) {
627         # TODO: use value_callback instead?
628         my ($start, $end) = map(int, split /-/, $range, 2);
629         $substr_args = [$start];
630         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
631     }
632     my $default_options = {};
633     if ($substr_args) {
634         $default_options->{substr} = $substr_args;
635     }
636
637     # TODO: Should probably have per type value callback/hook
638     # but hard code for now
639     if ($target_type eq 'boolean') {
640         $default_options->{value_callbacks} //= [];
641         push @{$default_options->{value_callbacks}}, sub {
642             my ($value) = @_;
643             # Trim whitespace at both ends
644             $value =~ s/^\s+|\s+$//g;
645             return $value ? 'true' : 'false';
646         };
647     }
648
649     my $mapping = [$target_name, $default_options];
650     push @mappings, $mapping;
651
652     my @suffixes = ();
653     push @suffixes, 'facet' if $facet;
654     push @suffixes, 'suggestion' if $suggestible;
655     push @suffixes, 'sort' if !defined $sort || $sort;
656
657     foreach my $suffix (@suffixes) {
658         my $mapping = ["${target_name}__$suffix"];
659         # TODO: Hack, fix later in less hideous manner
660         if ($suffix eq 'suggestion') {
661             push @{$mapping}, {%{$default_options}, property => 'input'};
662         }
663         else {
664             push @{$mapping}, $default_options;
665         }
666         push @mappings, $mapping;
667     }
668     return @mappings;
669 };
670
671 =head2 _get_marc_mapping_rules
672
673     my $mapping_rules = $self->_get_marc_mapping_rules()
674
675 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
676
677 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
678 each call to C<MARC::Record>->field) we create an optimized structure of mapping
679 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
680
681 We can then iterate through all MARC fields for each record and apply all relevant
682 rules once per fields instead of retreiving fields multiple times for each mapping rule
683 which is terribly slow.
684
685 =cut
686
687 # TODO: This structure can be used for processing multiple MARC::Records so is currently
688 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
689 # memory cache which it is currently not. The performance gain of caching
690 # would probably be marginal, but to do this could be a further improvement.
691
692 sub _get_marc_mapping_rules {
693     my ($self) = @_;
694     my $marcflavour = lc C4::Context->preference('marcflavour');
695     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
696     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
697     my $rules = {
698         'leader' => [],
699         'control_fields' => {},
700         'data_fields' => {},
701         'sum' => [],
702         'isbn' => [],
703         'defaults' => {}
704     };
705
706     $self->_foreach_mapping(sub {
707         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
708         return if $marc_type ne $marcflavour;
709
710         if ($type eq 'sum') {
711             push @{$rules->{sum}}, $name;
712         }
713         elsif ($type eq 'isbn') {
714             push @{$rules->{isbn}}, $name;
715         }
716         elsif ($type eq 'boolean') {
717             # boolean gets special handling, if value doesn't exist for a field,
718             # it is set to false
719             $rules->{defaults}->{$name} = 'false';
720         }
721
722         if ($marc_field =~ $field_spec_regexp) {
723             my $field_tag = $1;
724
725             my @subfields;
726             my @subfield_groups;
727             # Parse and separate subfields form subfield groups
728             if (defined $2) {
729                 my $subfield_group = '';
730                 my $open_group = 0;
731
732                 foreach my $token (split //, $2) {
733                     if ($token eq "(") {
734                         if ($open_group) {
735                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
736                                 "Unmatched opening parenthesis for $marc_field"
737                             );
738                         }
739                         else {
740                             $open_group = 1;
741                         }
742                     }
743                     elsif ($token eq ")") {
744                         if ($open_group) {
745                             if ($subfield_group) {
746                                 push @subfield_groups, $subfield_group;
747                                 $subfield_group = '';
748                             }
749                             $open_group = 0;
750                         }
751                         else {
752                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
753                                 "Unmatched closing parenthesis for $marc_field"
754                             );
755                         }
756                     }
757                     elsif ($open_group) {
758                         $subfield_group .= $token;
759                     }
760                     else {
761                         push @subfields, $token;
762                     }
763                 }
764             }
765             else {
766                 push @subfields, '*';
767             }
768
769             my $range = defined $3 ? $3 : undef;
770             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
771
772             if ($field_tag < 10) {
773                 $rules->{control_fields}->{$field_tag} //= [];
774                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
775             }
776             else {
777                 $rules->{data_fields}->{$field_tag} //= {};
778                 foreach my $subfield (@subfields) {
779                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
780                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
781                 }
782                 foreach my $subfield_group (@subfield_groups) {
783                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
784                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
785                 }
786             }
787         }
788         elsif ($marc_field =~ $leader_regexp) {
789             my $range = defined $1 ? $1 : undef;
790             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
791             push @{$rules->{leader}}, @mappings;
792         }
793         else {
794             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
795                 "Invalid MARC field expression: $marc_field"
796             );
797         }
798     });
799     return $rules;
800 }
801
802 =head2 _foreach_mapping
803
804     $self->_foreach_mapping(
805         sub {
806             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
807                 $marc_field )
808               = @_;
809             return unless $marc_type eq 'marc21';
810             print "Data comes from: " . $marc_field . "\n";
811         }
812     );
813
814 This allows you to apply a function to each entry in the elasticsearch mappings
815 table, in order to build the mappings for whatever is needed.
816
817 In the provided function, the files are:
818
819 =over 4
820
821 =item C<$name>
822
823 The field name for elasticsearch (corresponds to the 'mapping' column in the
824 database.
825
826 =item C<$type>
827
828 The type for this value, e.g. 'string'.
829
830 =item C<$facet>
831
832 True if this value should be facetised. This only really makes sense if the
833 field is understood by the facet processing code anyway.
834
835 =item C<$sort>
836
837 True if this is a field that a) needs special sort handling, and b) if it
838 should be sorted on. False if a) but not b). Undef if not a). This allows,
839 for example, author to be sorted on but not everything marked with "author"
840 to be included in that sort.
841
842 =item C<$marc_type>
843
844 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
845 'unimarc', 'normarc'.
846
847 =item C<$marc_field>
848
849 A string that describes the MARC field that contains the data to extract.
850 These are of a form suited to Catmandu's MARC fixers.
851
852 =back
853
854 =cut
855
856 sub _foreach_mapping {
857     my ( $self, $sub ) = @_;
858
859     # TODO use a caching framework here
860     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
861         {
862             'search_marc_map.index_name' => $self->index,
863         },
864         {   join => { search_marc_to_fields => 'search_marc_map' },
865             '+select' => [
866                 'search_marc_to_fields.facet',
867                 'search_marc_to_fields.suggestible',
868                 'search_marc_to_fields.sort',
869                 'search_marc_map.marc_type',
870                 'search_marc_map.marc_field',
871             ],
872             '+as'     => [
873                 'facet',
874                 'suggestible',
875                 'sort',
876                 'marc_type',
877                 'marc_field',
878             ],
879         }
880     );
881
882     while ( my $search_field = $search_fields->next ) {
883         $sub->(
884             # Force lower case on indexed field names for case insensitive
885             # field name searches
886             lc($search_field->name),
887             $search_field->type,
888             $search_field->get_column('facet'),
889             $search_field->get_column('suggestible'),
890             $search_field->get_column('sort'),
891             $search_field->get_column('marc_type'),
892             $search_field->get_column('marc_field'),
893         );
894     }
895 }
896
897 =head2 process_error
898
899     die process_error($@);
900
901 This parses an Elasticsearch error message and produces a human-readable
902 result from it. This result is probably missing all the useful information
903 that you might want in diagnosing an issue, so the warning is also logged.
904
905 Note that currently the resulting message is not internationalised. This
906 will happen eventually by some method or other.
907
908 =cut
909
910 sub process_error {
911     my ($self, $msg) = @_;
912
913     warn $msg; # simple logging
914
915     # This is super-primitive
916     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
917
918     return "Unable to perform your search. Please try again.\n";
919 }
920
921 =head2 _read_configuration
922
923     my $conf = _read_configuration();
924
925 Reads the I<configuration file> and returns a hash structure with the
926 configuration information. It raises an exception if mandatory entries
927 are missing.
928
929 The hashref structure has the following form:
930
931     {
932         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
933         'index_name' => 'koha_instance',
934     }
935
936 This is configured by the following in the C<config> block in koha-conf.xml:
937
938     <elasticsearch>
939         <server>127.0.0.1:9200</server>
940         <server>anotherserver:9200</server>
941         <index_name>koha_instance</index_name>
942     </elasticsearch>
943
944 =cut
945
946 sub _read_configuration {
947
948     my $configuration;
949
950     my $conf = C4::Context->config('elasticsearch');
951     Koha::Exceptions::Config::MissingEntry->throw(
952         "Missing 'elasticsearch' block in config file")
953       unless defined $conf;
954
955     if ( $conf && $conf->{server} ) {
956         my $nodes = $conf->{server};
957         if ( ref($nodes) eq 'ARRAY' ) {
958             $configuration->{nodes} = $nodes;
959         }
960         else {
961             $configuration->{nodes} = [$nodes];
962         }
963     }
964     else {
965         Koha::Exceptions::Config::MissingEntry->throw(
966             "Missing 'server' entry in config file for elasticsearch");
967     }
968
969     if ( defined $conf->{index_name} ) {
970         $configuration->{index_name} = $conf->{index_name};
971     }
972     else {
973         Koha::Exceptions::Config::MissingEntry->throw(
974             "Missing 'index_name' entry in config file for elasticsearch");
975     }
976
977     return $configuration;
978 }
979
980 =head2 get_facetable_fields
981
982 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
983
984 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
985
986 =cut
987
988 sub get_facetable_fields {
989     my ($self) = @_;
990
991     # These should correspond to the ES field names, as opposed to the CCL
992     # things that zebra uses.
993     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch );
994     my @faceted_fields = Koha::SearchFields->search(
995         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
996     );
997     my @not_faceted_fields = Koha::SearchFields->search(
998         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
999     );
1000     # This could certainly be improved
1001     return ( @faceted_fields, @not_faceted_fields );
1002 }
1003
1004 1;
1005
1006 __END__
1007
1008 =head1 AUTHOR
1009
1010 =over 4
1011
1012 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1013
1014 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1015
1016 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1017
1018 =back
1019
1020 =cut