Bug 22705: Change default value of cxn_pool to 'Static'
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use Clone qw(clone);
31 use JSON;
32 use Modern::Perl;
33 use Readonly;
34 use Search::Elasticsearch;
35 use Try::Tiny;
36 use YAML::Syck;
37
38 use List::Util qw( sum0 reduce );
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42 use Business::ISBN;
43
44 __PACKAGE__->mk_ro_accessors(qw( index ));
45 __PACKAGE__->mk_accessors(qw( sort_fields ));
46
47 # Constants to refer to the standard index names
48 Readonly our $BIBLIOS_INDEX     => 'biblios';
49 Readonly our $AUTHORITIES_INDEX => 'authorities';
50
51 =head1 NAME
52
53 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
54
55 =head1 ACCESSORS
56
57 =over 4
58
59 =item index
60
61 The name of the index to use, generally 'biblios' or 'authorities'.
62
63 =back
64
65 =head1 FUNCTIONS
66
67 =cut
68
69 sub new {
70     my $class = shift @_;
71     my $self = $class->SUPER::new(@_);
72     # Check for a valid index
73     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
74     return $self;
75 }
76
77 =head2 get_elasticsearch
78
79     my $elasticsearch_client = $self->get_elasticsearch();
80
81 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
82 instance level and will be reused if method is called multiple times.
83
84 =cut
85
86 sub get_elasticsearch {
87     my $self = shift @_;
88     unless (defined $self->{elasticsearch}) {
89         my $conf = $self->get_elasticsearch_params();
90         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
91     }
92     return $self->{elasticsearch};
93 }
94
95 =head2 get_elasticsearch_params
96
97     my $params = $self->get_elasticsearch_params();
98
99 This provides a hashref that contains the parameters for connecting to the
100 ElasicSearch servers, in the form:
101
102     {
103         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
104         'index_name' => 'koha_instance_index',
105     }
106
107 This is configured by the following in the C<config> block in koha-conf.xml:
108
109     <elasticsearch>
110         <server>127.0.0.1:9200</server>
111         <server>anotherserver:9200</server>
112         <index_name>koha_instance</index_name>
113     </elasticsearch>
114
115 =cut
116
117 sub get_elasticsearch_params {
118     my ($self) = @_;
119
120     # Copy the hash so that we're not modifying the original
121     my $conf = C4::Context->config('elasticsearch');
122     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
123     my $es = { %{ $conf } };
124
125     # Helpfully, the multiple server lines end up in an array for us anyway
126     # if there are multiple ones, but not if there's only one.
127     my $server = $es->{server};
128     delete $es->{server};
129     if ( ref($server) eq 'ARRAY' ) {
130
131         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
132         $es->{nodes} = $server;
133     }
134     elsif ($server) {
135         $es->{nodes} = [$server];
136     }
137     else {
138         die "No elasticsearch servers were specified in koha-conf.xml.\n";
139     }
140     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
141       if ( !$es->{index_name} );
142     # Append the name of this particular index to our namespace
143     $es->{index_name} .= '_' . $self->index;
144
145     $es->{key_prefix} = 'es_';
146     $es->{client} //= '5_0::Direct';
147     $es->{cxn_pool} //= 'Static';
148     $es->{request_timeout} //= 60;
149
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         # Clone the general mapping to break ties with the original hash
197         my $mappings = {
198             data => clone(_get_elasticsearch_field_config('general', ''))
199         };
200         my $marcflavour = lc C4::Context->preference('marcflavour');
201         $self->_foreach_mapping(
202             sub {
203                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
204
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 _get_elasticsearch_field_config
245
246 Get the Elasticsearch field config for the given purpose and data type.
247
248 $mapping = _get_elasticsearch_field_config('search', 'text');
249
250 =cut
251
252 sub _get_elasticsearch_field_config {
253
254     my ( $purpose, $type ) = @_;
255
256     # Use state to speed up repeated calls
257     state $settings = undef;
258     if (!defined $settings) {
259         my $config_file = C4::Context->config('elasticsearch_field_config');
260         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
261         $settings = LoadFile( $config_file );
262     }
263
264     if (!defined $settings->{$purpose}) {
265         die "Field purpose $purpose not defined in field config";
266     }
267     if ($type eq '') {
268         return $settings->{$purpose};
269     }
270     if (defined $settings->{$purpose}{$type}) {
271         return $settings->{$purpose}{$type};
272     }
273     if (defined $settings->{$purpose}{'default'}) {
274         return $settings->{$purpose}{'default'};
275     }
276     return;
277 }
278
279 sub reset_elasticsearch_mappings {
280     my ( $reset_fields ) = @_;
281     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
282     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
283     my $indexes = LoadFile( $mappings_yaml );
284
285     while ( my ( $index_name, $fields ) = each %$indexes ) {
286         while ( my ( $field_name, $data ) = each %$fields ) {
287             my $field_type = $data->{type};
288             my $field_label = $data->{label};
289             my $mappings = $data->{mappings};
290             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
291             for my $mapping ( @$mappings ) {
292                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
293                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
294             }
295         }
296     }
297 }
298
299 # This overrides the accessor provided by Class::Accessor so that if
300 # sort_fields isn't set, then it'll generate it.
301 sub sort_fields {
302     my $self = shift;
303     if (@_) {
304         $self->_sort_fields_accessor(@_);
305         return;
306     }
307     my $val = $self->_sort_fields_accessor();
308     return $val if $val;
309
310     # This will populate the accessor as a side effect
311     $self->get_elasticsearch_mappings();
312     return $self->_sort_fields_accessor();
313 }
314
315 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
316
317     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
318
319 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
320 Since we group all mappings by MARC field targets C<$mappings> will contain
321 all targets for C<$data> and thus we need to fetch the MARC field only once.
322 C<$mappings> will be applied to C<$record_document> and new field values added.
323 The method has no return value.
324
325 =over 4
326
327 =item C<$mappings>
328
329 Arrayref of mappings containing arrayrefs in the format
330 [C<$target>, C<$options>] where C<$target> is the name of the target field and
331 C<$options> is a hashref containing processing directives for this particular
332 mapping.
333
334 =item C<$data>
335
336 The source data from a MARC record field.
337
338 =item C<$record_document>
339
340 Hashref representing the Elasticsearch document on which mappings should be
341 applied.
342
343 =item C<$altscript>
344
345 A boolean value indicating whether an alternate script presentation is being
346 processed.
347
348 =back
349
350 =cut
351
352 sub _process_mappings {
353     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
354     foreach my $mapping (@{$mappings}) {
355         my ($target, $options) = @{$mapping};
356
357         # Don't process sort fields for alternate scripts
358         my $sort = $target =~ /__sort$/;
359         if ($sort && $altscript) {
360             next;
361         }
362
363         # Copy (scalar) data since can have multiple targets
364         # with differing options for (possibly) mutating data
365         # so need a different copy for each
366         my $_data = $data;
367         $record_document->{$target} //= [];
368         if (defined $options->{substr}) {
369             my ($start, $length) = @{$options->{substr}};
370             $_data = length($data) > $start ? substr $data, $start, $length : '';
371         }
372         if (defined $options->{value_callbacks}) {
373             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
374         }
375         if (defined $options->{property}) {
376             $_data = {
377                 $options->{property} => $_data
378             }
379         }
380         push @{$record_document->{$target}}, $_data;
381     }
382 }
383
384 =head2 marc_records_to_documents($marc_records)
385
386     my @record_documents = $self->marc_records_to_documents($marc_records);
387
388 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
389
390 Returns array of hash references, representing Elasticsearch documents,
391 acceptable as body payload in C<Search::Elasticsearch> requests.
392
393 =over 4
394
395 =item C<$marc_documents>
396
397 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
398
399 =back
400
401 =cut
402
403 sub marc_records_to_documents {
404     my ($self, $records) = @_;
405     my $rules = $self->_get_marc_mapping_rules();
406     my $control_fields_rules = $rules->{control_fields};
407     my $data_fields_rules = $rules->{data_fields};
408     my $marcflavour = lc C4::Context->preference('marcflavour');
409
410     my @record_documents;
411
412     foreach my $record (@{$records}) {
413         my $record_document = {};
414         my $mappings = $rules->{leader};
415         if ($mappings) {
416             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
417         }
418         foreach my $field ($record->fields()) {
419             if ($field->is_control_field()) {
420                 my $mappings = $control_fields_rules->{$field->tag()};
421                 if ($mappings) {
422                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
423                 }
424             }
425             else {
426                 my $tag = $field->tag();
427                 # Handle alternate scripts in MARC 21
428                 my $altscript = 0;
429                 if ($marcflavour eq 'marc21' && $tag eq '880') {
430                     my $sub6 = $field->subfield('6');
431                     if ($sub6 =~ /^(...)-\d+/) {
432                         $tag = $1;
433                         $altscript = 1;
434                     }
435                 }
436
437                 my $data_field_rules = $data_fields_rules->{$tag};
438
439                 if ($data_field_rules) {
440                     my $subfields_mappings = $data_field_rules->{subfields};
441                     my $wildcard_mappings = $subfields_mappings->{'*'};
442                     foreach my $subfield ($field->subfields()) {
443                         my ($code, $data) = @{$subfield};
444                         my $mappings = $subfields_mappings->{$code} // [];
445                         if ($wildcard_mappings) {
446                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
447                         }
448                         if (@{$mappings}) {
449                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
450                         }
451                     }
452
453                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
454                     if ($subfields_join_mappings) {
455                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
456                             # Map each subfield to values, remove empty values, join with space
457                             my $data = join(
458                                 ' ',
459                                 grep(
460                                     $_,
461                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
462                                 )
463                             );
464                             if ($data) {
465                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
466                             }
467                         }
468                     }
469                 }
470             }
471         }
472         foreach my $field (keys %{$rules->{defaults}}) {
473             unless (defined $record_document->{$field}) {
474                 $record_document->{$field} = $rules->{defaults}->{$field};
475             }
476         }
477         foreach my $field (@{$rules->{sum}}) {
478             if (defined $record_document->{$field}) {
479                 # TODO: validate numeric? filter?
480                 # TODO: Or should only accept fields without nested values?
481                 # TODO: Quick and dirty, improve if needed
482                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
483             }
484         }
485         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
486         foreach my $field (@{$rules->{isbn}}) {
487             if (defined $record_document->{$field}) {
488                 my @isbns = ();
489                 foreach my $input_isbn (@{$record_document->{$field}}) {
490                     my $isbn = Business::ISBN->new($input_isbn);
491                     if (defined $isbn && $isbn->is_valid) {
492                         my $isbn13 = $isbn->as_isbn13->as_string;
493                         push @isbns, $isbn13;
494                         $isbn13 =~ s/\-//g;
495                         push @isbns, $isbn13;
496
497                         my $isbn10 = $isbn->as_isbn10;
498                         if ($isbn10) {
499                             $isbn10 = $isbn10->as_string;
500                             push @isbns, $isbn10;
501                             $isbn10 =~ s/\-//g;
502                             push @isbns, $isbn10;
503                         }
504                     } else {
505                         push @isbns, $input_isbn;
506                     }
507                 }
508                 $record_document->{$field} = \@isbns;
509             }
510         }
511
512         # Remove duplicate values and collapse sort fields
513         foreach my $field (keys %{$record_document}) {
514             if (ref($record_document->{$field}) eq 'ARRAY') {
515                 @{$record_document->{$field}} = do {
516                     my %seen;
517                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
518                 };
519                 if ($field =~ /__sort$/) {
520                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
521                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
522                 }
523             }
524         }
525
526         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
527         $record->encoding('UTF-8');
528         my @warnings;
529         {
530             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
531             local $SIG{__WARN__} = sub {
532                 push @warnings, $_[0];
533             };
534             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
535         }
536         if (@warnings) {
537             # Suppress warnings if record length exceeded
538             unless (substr($record->leader(), 0, 5) eq '99999') {
539                 foreach my $warning (@warnings) {
540                     carp $warning;
541                 }
542             }
543             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
544             $record_document->{'marc_format'} = 'MARCXML';
545         }
546         else {
547             $record_document->{'marc_format'} = 'base64ISO2709';
548         }
549         my $id = $record->subfield('999', 'c');
550         push @record_documents, [$id, $record_document];
551     }
552     return \@record_documents;
553 }
554
555 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
556
557     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
558
559 Get mappings, an internal data structure later used by
560 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
561 data for a MARC mapping.
562
563 The returned C<$mappings> is not to to be confused with mappings provided by
564 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
565 provided by C<_foreach_mapping> and expands it to this internal data structure.
566 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
567 is then applied to each MARC target (leader, control field data, subfield or
568 joined subfields) and integrated into the mapping rules data structure used in
569 C<marc_records_to_documents> to transform MARC records into Elasticsearch
570 documents.
571
572 =over 4
573
574 =item C<$facet>
575
576 Boolean indicating whether to create a facet field for this mapping.
577
578 =item C<$suggestible>
579
580 Boolean indicating whether to create a suggestion field for this mapping.
581
582 =item C<$sort>
583
584 Boolean indicating whether to create a sort field for this mapping.
585
586 =item C<$target_name>
587
588 Elasticsearch document target field name.
589
590 =item C<$target_type>
591
592 Elasticsearch document target field type.
593
594 =item C<$range>
595
596 An optional range as a string in the format "<START>-<END>" or "<START>",
597 where "<START>" and "<END>" are integers specifying a range that will be used
598 for extracting a substring from MARC data as Elasticsearch field target value.
599
600 The first character position is "0", and the range is inclusive,
601 so "0-2" means the first three characters of MARC data.
602
603 If only "<START>" is provided only one character at position "<START>" will
604 be extracted.
605
606 =back
607
608 =cut
609
610 sub _field_mappings {
611     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
612     my %mapping_defaults = ();
613     my @mappings;
614
615     my $substr_args = undef;
616     if (defined $range) {
617         # TODO: use value_callback instead?
618         my ($start, $end) = map(int, split /-/, $range, 2);
619         $substr_args = [$start];
620         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
621     }
622     my $default_options = {};
623     if ($substr_args) {
624         $default_options->{substr} = $substr_args;
625     }
626
627     # TODO: Should probably have per type value callback/hook
628     # but hard code for now
629     if ($target_type eq 'boolean') {
630         $default_options->{value_callbacks} //= [];
631         push @{$default_options->{value_callbacks}}, sub {
632             my ($value) = @_;
633             # Trim whitespace at both ends
634             $value =~ s/^\s+|\s+$//g;
635             return $value ? 'true' : 'false';
636         };
637     }
638
639     my $mapping = [$target_name, $default_options];
640     push @mappings, $mapping;
641
642     my @suffixes = ();
643     push @suffixes, 'facet' if $facet;
644     push @suffixes, 'suggestion' if $suggestible;
645     push @suffixes, 'sort' if !defined $sort || $sort;
646
647     foreach my $suffix (@suffixes) {
648         my $mapping = ["${target_name}__$suffix"];
649         # TODO: Hack, fix later in less hideous manner
650         if ($suffix eq 'suggestion') {
651             push @{$mapping}, {%{$default_options}, property => 'input'};
652         }
653         else {
654             push @{$mapping}, $default_options;
655         }
656         push @mappings, $mapping;
657     }
658     return @mappings;
659 };
660
661 =head2 _get_marc_mapping_rules
662
663     my $mapping_rules = $self->_get_marc_mapping_rules()
664
665 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
666
667 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
668 each call to C<MARC::Record>->field) we create an optimized structure of mapping
669 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
670
671 We can then iterate through all MARC fields for each record and apply all relevant
672 rules once per fields instead of retreiving fields multiple times for each mapping rule
673 which is terribly slow.
674
675 =cut
676
677 # TODO: This structure can be used for processing multiple MARC::Records so is currently
678 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
679 # memory cache which it is currently not. The performance gain of caching
680 # would probably be marginal, but to do this could be a further improvement.
681
682 sub _get_marc_mapping_rules {
683     my ($self) = @_;
684     my $marcflavour = lc C4::Context->preference('marcflavour');
685     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
686     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
687     my $rules = {
688         'leader' => [],
689         'control_fields' => {},
690         'data_fields' => {},
691         'sum' => [],
692         'isbn' => [],
693         'defaults' => {}
694     };
695
696     $self->_foreach_mapping(sub {
697         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
698         return if $marc_type ne $marcflavour;
699
700         if ($type eq 'sum') {
701             push @{$rules->{sum}}, $name;
702         }
703         elsif ($type eq 'isbn') {
704             push @{$rules->{isbn}}, $name;
705         }
706         elsif ($type eq 'boolean') {
707             # boolean gets special handling, if value doesn't exist for a field,
708             # it is set to false
709             $rules->{defaults}->{$name} = 'false';
710         }
711
712         if ($marc_field =~ $field_spec_regexp) {
713             my $field_tag = $1;
714
715             my @subfields;
716             my @subfield_groups;
717             # Parse and separate subfields form subfield groups
718             if (defined $2) {
719                 my $subfield_group = '';
720                 my $open_group = 0;
721
722                 foreach my $token (split //, $2) {
723                     if ($token eq "(") {
724                         if ($open_group) {
725                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
726                                 "Unmatched opening parenthesis for $marc_field"
727                             );
728                         }
729                         else {
730                             $open_group = 1;
731                         }
732                     }
733                     elsif ($token eq ")") {
734                         if ($open_group) {
735                             if ($subfield_group) {
736                                 push @subfield_groups, $subfield_group;
737                                 $subfield_group = '';
738                             }
739                             $open_group = 0;
740                         }
741                         else {
742                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
743                                 "Unmatched closing parenthesis for $marc_field"
744                             );
745                         }
746                     }
747                     elsif ($open_group) {
748                         $subfield_group .= $token;
749                     }
750                     else {
751                         push @subfields, $token;
752                     }
753                 }
754             }
755             else {
756                 push @subfields, '*';
757             }
758
759             my $range = defined $3 ? $3 : undef;
760             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
761
762             if ($field_tag < 10) {
763                 $rules->{control_fields}->{$field_tag} //= [];
764                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
765             }
766             else {
767                 $rules->{data_fields}->{$field_tag} //= {};
768                 foreach my $subfield (@subfields) {
769                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
770                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
771                 }
772                 foreach my $subfield_group (@subfield_groups) {
773                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
774                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
775                 }
776             }
777         }
778         elsif ($marc_field =~ $leader_regexp) {
779             my $range = defined $1 ? $1 : undef;
780             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
781             push @{$rules->{leader}}, @mappings;
782         }
783         else {
784             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
785                 "Invalid MARC field expression: $marc_field"
786             );
787         }
788     });
789     return $rules;
790 }
791
792 =head2 _foreach_mapping
793
794     $self->_foreach_mapping(
795         sub {
796             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
797                 $marc_field )
798               = @_;
799             return unless $marc_type eq 'marc21';
800             print "Data comes from: " . $marc_field . "\n";
801         }
802     );
803
804 This allows you to apply a function to each entry in the elasticsearch mappings
805 table, in order to build the mappings for whatever is needed.
806
807 In the provided function, the files are:
808
809 =over 4
810
811 =item C<$name>
812
813 The field name for elasticsearch (corresponds to the 'mapping' column in the
814 database.
815
816 =item C<$type>
817
818 The type for this value, e.g. 'string'.
819
820 =item C<$facet>
821
822 True if this value should be facetised. This only really makes sense if the
823 field is understood by the facet processing code anyway.
824
825 =item C<$sort>
826
827 True if this is a field that a) needs special sort handling, and b) if it
828 should be sorted on. False if a) but not b). Undef if not a). This allows,
829 for example, author to be sorted on but not everything marked with "author"
830 to be included in that sort.
831
832 =item C<$marc_type>
833
834 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
835 'unimarc', 'normarc'.
836
837 =item C<$marc_field>
838
839 A string that describes the MARC field that contains the data to extract.
840 These are of a form suited to Catmandu's MARC fixers.
841
842 =back
843
844 =cut
845
846 sub _foreach_mapping {
847     my ( $self, $sub ) = @_;
848
849     # TODO use a caching framework here
850     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
851         {
852             'search_marc_map.index_name' => $self->index,
853         },
854         {   join => { search_marc_to_fields => 'search_marc_map' },
855             '+select' => [
856                 'search_marc_to_fields.facet',
857                 'search_marc_to_fields.suggestible',
858                 'search_marc_to_fields.sort',
859                 'search_marc_map.marc_type',
860                 'search_marc_map.marc_field',
861             ],
862             '+as'     => [
863                 'facet',
864                 'suggestible',
865                 'sort',
866                 'marc_type',
867                 'marc_field',
868             ],
869         }
870     );
871
872     while ( my $search_field = $search_fields->next ) {
873         $sub->(
874             # Force lower case on indexed field names for case insensitive
875             # field name searches
876             lc($search_field->name),
877             $search_field->type,
878             $search_field->get_column('facet'),
879             $search_field->get_column('suggestible'),
880             $search_field->get_column('sort'),
881             $search_field->get_column('marc_type'),
882             $search_field->get_column('marc_field'),
883         );
884     }
885 }
886
887 =head2 process_error
888
889     die process_error($@);
890
891 This parses an Elasticsearch error message and produces a human-readable
892 result from it. This result is probably missing all the useful information
893 that you might want in diagnosing an issue, so the warning is also logged.
894
895 Note that currently the resulting message is not internationalised. This
896 will happen eventually by some method or other.
897
898 =cut
899
900 sub process_error {
901     my ($self, $msg) = @_;
902
903     warn $msg; # simple logging
904
905     # This is super-primitive
906     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
907
908     return "Unable to perform your search. Please try again.\n";
909 }
910
911 =head2 _read_configuration
912
913     my $conf = _read_configuration();
914
915 Reads the I<configuration file> and returns a hash structure with the
916 configuration information. It raises an exception if mandatory entries
917 are missing.
918
919 The hashref structure has the following form:
920
921     {
922         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
923         'index_name' => 'koha_instance',
924     }
925
926 This is configured by the following in the C<config> block in koha-conf.xml:
927
928     <elasticsearch>
929         <server>127.0.0.1:9200</server>
930         <server>anotherserver:9200</server>
931         <index_name>koha_instance</index_name>
932     </elasticsearch>
933
934 =cut
935
936 sub _read_configuration {
937
938     my $configuration;
939
940     my $conf = C4::Context->config('elasticsearch');
941     Koha::Exceptions::Config::MissingEntry->throw(
942         "Missing 'elasticsearch' block in config file")
943       unless defined $conf;
944
945     if ( $conf && $conf->{server} ) {
946         my $nodes = $conf->{server};
947         if ( ref($nodes) eq 'ARRAY' ) {
948             $configuration->{nodes} = $nodes;
949         }
950         else {
951             $configuration->{nodes} = [$nodes];
952         }
953     }
954     else {
955         Koha::Exceptions::Config::MissingEntry->throw(
956             "Missing 'server' entry in config file for elasticsearch");
957     }
958
959     if ( defined $conf->{index_name} ) {
960         $configuration->{index_name} = $conf->{index_name};
961     }
962     else {
963         Koha::Exceptions::Config::MissingEntry->throw(
964             "Missing 'index_name' entry in config file for elasticsearch");
965     }
966
967     return $configuration;
968 }
969
970 1;
971
972 __END__
973
974 =head1 AUTHOR
975
976 =over 4
977
978 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
979
980 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
981
982 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
983
984 =back
985
986 =cut