Bug 23671: Elasticsearch shouldn't throw exception on an uppercase subfield identifier
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29
30 use Carp;
31 use Clone qw(clone);
32 use JSON;
33 use Modern::Perl;
34 use Readonly;
35 use Search::Elasticsearch;
36 use Try::Tiny;
37 use YAML::Syck;
38
39 use List::Util qw( sum0 reduce );
40 use MARC::File::XML;
41 use MIME::Base64;
42 use Encode qw(encode);
43 use Business::ISBN;
44
45 __PACKAGE__->mk_ro_accessors(qw( index ));
46 __PACKAGE__->mk_accessors(qw( sort_fields ));
47
48 # Constants to refer to the standard index names
49 Readonly our $BIBLIOS_INDEX     => 'biblios';
50 Readonly our $AUTHORITIES_INDEX => 'authorities';
51
52 =head1 NAME
53
54 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
55
56 =head1 ACCESSORS
57
58 =over 4
59
60 =item index
61
62 The name of the index to use, generally 'biblios' or 'authorities'.
63
64 =back
65
66 =head1 FUNCTIONS
67
68 =cut
69
70 sub new {
71     my $class = shift @_;
72     my $self = $class->SUPER::new(@_);
73     # Check for a valid index
74     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
75     return $self;
76 }
77
78 =head2 get_elasticsearch
79
80     my $elasticsearch_client = $self->get_elasticsearch();
81
82 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
83 instance level and will be reused if method is called multiple times.
84
85 =cut
86
87 sub get_elasticsearch {
88     my $self = shift @_;
89     unless (defined $self->{elasticsearch}) {
90         my $conf = $self->get_elasticsearch_params();
91         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
92     }
93     return $self->{elasticsearch};
94 }
95
96 =head2 get_elasticsearch_params
97
98     my $params = $self->get_elasticsearch_params();
99
100 This provides a hashref that contains the parameters for connecting to the
101 ElasicSearch servers, in the form:
102
103     {
104         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
105         'index_name' => 'koha_instance_index',
106     }
107
108 This is configured by the following in the C<config> block in koha-conf.xml:
109
110     <elasticsearch>
111         <server>127.0.0.1:9200</server>
112         <server>anotherserver:9200</server>
113         <index_name>koha_instance</index_name>
114     </elasticsearch>
115
116 =cut
117
118 sub get_elasticsearch_params {
119     my ($self) = @_;
120
121     # Copy the hash so that we're not modifying the original
122     my $conf = C4::Context->config('elasticsearch');
123     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
124     my $es = { %{ $conf } };
125
126     # Helpfully, the multiple server lines end up in an array for us anyway
127     # if there are multiple ones, but not if there's only one.
128     my $server = $es->{server};
129     delete $es->{server};
130     if ( ref($server) eq 'ARRAY' ) {
131
132         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
133         $es->{nodes} = $server;
134     }
135     elsif ($server) {
136         $es->{nodes} = [$server];
137     }
138     else {
139         die "No elasticsearch servers were specified in koha-conf.xml.\n";
140     }
141     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
142       if ( !$es->{index_name} );
143     # Append the name of this particular index to our namespace
144     $es->{index_name} .= '_' . $self->index;
145
146     $es->{key_prefix} = 'es_';
147     $es->{client} //= '5_0::Direct';
148     $es->{cxn_pool} //= 'Static';
149     $es->{request_timeout} //= 60;
150
151     return $es;
152 }
153
154 =head2 get_elasticsearch_settings
155
156     my $settings = $self->get_elasticsearch_settings();
157
158 This provides the settings provided to Elasticsearch when an index is created.
159 These can do things like define tokenization methods.
160
161 A hashref containing the settings is returned.
162
163 =cut
164
165 sub get_elasticsearch_settings {
166     my ($self) = @_;
167
168     # Use state to speed up repeated calls
169     state $settings = undef;
170     if (!defined $settings) {
171         my $config_file = C4::Context->config('elasticsearch_index_config');
172         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
173         $settings = LoadFile( $config_file );
174     }
175
176     return $settings;
177 }
178
179 =head2 get_elasticsearch_mappings
180
181     my $mappings = $self->get_elasticsearch_mappings();
182
183 This provides the mappings that get passed to Elasticsearch when an index is
184 created.
185
186 =cut
187
188 sub get_elasticsearch_mappings {
189     my ($self) = @_;
190
191     # Use state to speed up repeated calls
192     state %all_mappings;
193     state %sort_fields;
194
195     if (!defined $all_mappings{$self->index}) {
196         $sort_fields{$self->index} = {};
197         # Clone the general mapping to break ties with the original hash
198         my $mappings = {
199             data => clone(_get_elasticsearch_field_config('general', ''))
200         };
201         my $marcflavour = lc C4::Context->preference('marcflavour');
202         $self->_foreach_mapping(
203             sub {
204                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 if ($search) {
221                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222                 }
223
224                 if ($facet) {
225                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226                 }
227                 if ($suggestible) {
228                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
229                 }
230                 # Sort is a bit special as it can be true, false, undef.
231                 # We care about "true" or "undef",
232                 # "undef" means to do the default thing, which is make it sortable.
233                 if (!defined $sort || $sort) {
234                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
235                     $sort_fields{$self->index}{$name} = 1;
236                 }
237             }
238         );
239         $all_mappings{$self->index} = $mappings;
240     }
241     $self->sort_fields(\%{$sort_fields{$self->index}});
242
243     return $all_mappings{$self->index};
244 }
245
246 =head2 _get_elasticsearch_field_config
247
248 Get the Elasticsearch field config for the given purpose and data type.
249
250 $mapping = _get_elasticsearch_field_config('search', 'text');
251
252 =cut
253
254 sub _get_elasticsearch_field_config {
255
256     my ( $purpose, $type ) = @_;
257
258     # Use state to speed up repeated calls
259     state $settings = undef;
260     if (!defined $settings) {
261         my $config_file = C4::Context->config('elasticsearch_field_config');
262         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
263         $settings = LoadFile( $config_file );
264     }
265
266     if (!defined $settings->{$purpose}) {
267         die "Field purpose $purpose not defined in field config";
268     }
269     if ($type eq '') {
270         return $settings->{$purpose};
271     }
272     if (defined $settings->{$purpose}{$type}) {
273         return $settings->{$purpose}{$type};
274     }
275     if (defined $settings->{$purpose}{'default'}) {
276         return $settings->{$purpose}{'default'};
277     }
278     return;
279 }
280
281 =head2 _load_elasticsearch_mappings
282
283 Load Elasticsearch mappings in the format of mappings.yaml.
284
285 $indexes = _load_elasticsearch_mappings();
286
287 =cut
288
289 sub _load_elasticsearch_mappings {
290     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
291     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
292     return LoadFile( $mappings_yaml );
293 }
294
295 sub reset_elasticsearch_mappings {
296     my ( $self ) = @_;
297     my $indexes = $self->_load_elasticsearch_mappings();
298
299     while ( my ( $index_name, $fields ) = each %$indexes ) {
300         while ( my ( $field_name, $data ) = each %$fields ) {
301
302             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
303
304             # Set default values
305             $sf_params{staff_client} //= 1;
306             $sf_params{opac} //= 1;
307
308             $sf_params{name} = $field_name;
309
310             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
311
312             my $mappings = $data->{mappings};
313             for my $mapping ( @$mappings ) {
314                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
315                     index_name => $index_name,
316                     marc_type => $mapping->{marc_type},
317                     marc_field => $mapping->{marc_field}
318                 });
319                 $search_field->add_to_search_marc_maps($marc_field, {
320                     facet => $mapping->{facet} || 0,
321                     suggestible => $mapping->{suggestible} || 0,
322                     sort => $mapping->{sort},
323                     search => $mapping->{search} // 1
324                 });
325             }
326         }
327     }
328 }
329
330 # This overrides the accessor provided by Class::Accessor so that if
331 # sort_fields isn't set, then it'll generate it.
332 sub sort_fields {
333     my $self = shift;
334     if (@_) {
335         $self->_sort_fields_accessor(@_);
336         return;
337     }
338     my $val = $self->_sort_fields_accessor();
339     return $val if $val;
340
341     # This will populate the accessor as a side effect
342     $self->get_elasticsearch_mappings();
343     return $self->_sort_fields_accessor();
344 }
345
346 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
347
348     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
349
350 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
351 Since we group all mappings by MARC field targets C<$mappings> will contain
352 all targets for C<$data> and thus we need to fetch the MARC field only once.
353 C<$mappings> will be applied to C<$record_document> and new field values added.
354 The method has no return value.
355
356 =over 4
357
358 =item C<$mappings>
359
360 Arrayref of mappings containing arrayrefs in the format
361 [C<$target>, C<$options>] where C<$target> is the name of the target field and
362 C<$options> is a hashref containing processing directives for this particular
363 mapping.
364
365 =item C<$data>
366
367 The source data from a MARC record field.
368
369 =item C<$record_document>
370
371 Hashref representing the Elasticsearch document on which mappings should be
372 applied.
373
374 =item C<$altscript>
375
376 A boolean value indicating whether an alternate script presentation is being
377 processed.
378
379 =back
380
381 =cut
382
383 sub _process_mappings {
384     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
385     foreach my $mapping (@{$mappings}) {
386         my ($target, $options) = @{$mapping};
387
388         # Don't process sort fields for alternate scripts
389         my $sort = $target =~ /__sort$/;
390         if ($sort && $altscript) {
391             next;
392         }
393
394         # Copy (scalar) data since can have multiple targets
395         # with differing options for (possibly) mutating data
396         # so need a different copy for each
397         my $_data = $data;
398         $record_document->{$target} //= [];
399         if (defined $options->{substr}) {
400             my ($start, $length) = @{$options->{substr}};
401             $_data = length($data) > $start ? substr $data, $start, $length : '';
402         }
403         if (defined $options->{value_callbacks}) {
404             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
405         }
406         if (defined $options->{property}) {
407             $_data = {
408                 $options->{property} => $_data
409             }
410         }
411         push @{$record_document->{$target}}, $_data;
412     }
413 }
414
415 =head2 marc_records_to_documents($marc_records)
416
417     my @record_documents = $self->marc_records_to_documents($marc_records);
418
419 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
420
421 Returns array of hash references, representing Elasticsearch documents,
422 acceptable as body payload in C<Search::Elasticsearch> requests.
423
424 =over 4
425
426 =item C<$marc_documents>
427
428 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
429
430 =back
431
432 =cut
433
434 sub marc_records_to_documents {
435     my ($self, $records) = @_;
436     my $rules = $self->_get_marc_mapping_rules();
437     my $control_fields_rules = $rules->{control_fields};
438     my $data_fields_rules = $rules->{data_fields};
439     my $marcflavour = lc C4::Context->preference('marcflavour');
440     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
441
442     my @record_documents;
443
444     foreach my $record (@{$records}) {
445         my $record_document = {};
446         my $mappings = $rules->{leader};
447         if ($mappings) {
448             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
449         }
450         foreach my $field ($record->fields()) {
451             if ($field->is_control_field()) {
452                 my $mappings = $control_fields_rules->{$field->tag()};
453                 if ($mappings) {
454                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
455                 }
456             }
457             else {
458                 my $tag = $field->tag();
459                 # Handle alternate scripts in MARC 21
460                 my $altscript = 0;
461                 if ($marcflavour eq 'marc21' && $tag eq '880') {
462                     my $sub6 = $field->subfield('6');
463                     if ($sub6 =~ /^(...)-\d+/) {
464                         $tag = $1;
465                         $altscript = 1;
466                     }
467                 }
468
469                 my $data_field_rules = $data_fields_rules->{$tag};
470
471                 if ($data_field_rules) {
472                     my $subfields_mappings = $data_field_rules->{subfields};
473                     my $wildcard_mappings = $subfields_mappings->{'*'};
474                     foreach my $subfield ($field->subfields()) {
475                         my ($code, $data) = @{$subfield};
476                         my $mappings = $subfields_mappings->{$code} // [];
477                         if ($wildcard_mappings) {
478                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
479                         }
480                         if (@{$mappings}) {
481                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
482                         }
483                     }
484
485                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
486                     if ($subfields_join_mappings) {
487                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
488                             # Map each subfield to values, remove empty values, join with space
489                             my $data = join(
490                                 ' ',
491                                 grep(
492                                     $_,
493                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
494                                 )
495                             );
496                             if ($data) {
497                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
498                             }
499                         }
500                     }
501                 }
502             }
503         }
504         foreach my $field (keys %{$rules->{defaults}}) {
505             unless (defined $record_document->{$field}) {
506                 $record_document->{$field} = $rules->{defaults}->{$field};
507             }
508         }
509         foreach my $field (@{$rules->{sum}}) {
510             if (defined $record_document->{$field}) {
511                 # TODO: validate numeric? filter?
512                 # TODO: Or should only accept fields without nested values?
513                 # TODO: Quick and dirty, improve if needed
514                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
515             }
516         }
517         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
518         foreach my $field (@{$rules->{isbn}}) {
519             if (defined $record_document->{$field}) {
520                 my @isbns = ();
521                 foreach my $input_isbn (@{$record_document->{$field}}) {
522                     my $isbn = Business::ISBN->new($input_isbn);
523                     if (defined $isbn && $isbn->is_valid) {
524                         my $isbn13 = $isbn->as_isbn13->as_string;
525                         push @isbns, $isbn13;
526                         $isbn13 =~ s/\-//g;
527                         push @isbns, $isbn13;
528
529                         my $isbn10 = $isbn->as_isbn10;
530                         if ($isbn10) {
531                             $isbn10 = $isbn10->as_string;
532                             push @isbns, $isbn10;
533                             $isbn10 =~ s/\-//g;
534                             push @isbns, $isbn10;
535                         }
536                     } else {
537                         push @isbns, $input_isbn;
538                     }
539                 }
540                 $record_document->{$field} = \@isbns;
541             }
542         }
543
544         # Remove duplicate values and collapse sort fields
545         foreach my $field (keys %{$record_document}) {
546             if (ref($record_document->{$field}) eq 'ARRAY') {
547                 @{$record_document->{$field}} = do {
548                     my %seen;
549                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
550                 };
551                 if ($field =~ /__sort$/) {
552                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
553                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
554                 }
555             }
556         }
557
558         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
559         $record->encoding('UTF-8');
560         if ($use_array) {
561             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
562             $record_document->{'marc_format'} = 'ARRAY';
563         } else {
564             my @warnings;
565             {
566                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
567                 local $SIG{__WARN__} = sub {
568                     push @warnings, $_[0];
569                 };
570                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
571             }
572             if (@warnings) {
573                 # Suppress warnings if record length exceeded
574                 unless (substr($record->leader(), 0, 5) eq '99999') {
575                     foreach my $warning (@warnings) {
576                         carp $warning;
577                     }
578                 }
579                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
580                 $record_document->{'marc_format'} = 'MARCXML';
581             }
582             else {
583                 $record_document->{'marc_format'} = 'base64ISO2709';
584             }
585         }
586         my $id = $record->subfield('999', 'c');
587         push @record_documents, [$id, $record_document];
588     }
589     return \@record_documents;
590 }
591
592 =head2 _marc_to_array($record)
593
594     my @fields = _marc_to_array($record)
595
596 Convert a MARC::Record to an array modeled after MARC-in-JSON
597 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
598
599 =over 4
600
601 =item C<$record>
602
603 A MARC::Record object
604
605 =back
606
607 =cut
608
609 sub _marc_to_array {
610     my ($self, $record) = @_;
611
612     my $data = {
613         leader => $record->leader(),
614         fields => []
615     };
616     for my $field ($record->fields()) {
617         my $tag = $field->tag();
618         if ($field->is_control_field()) {
619             push @{$data->{fields}}, {$tag => $field->data()};
620         } else {
621             my $subfields = ();
622             foreach my $subfield ($field->subfields()) {
623                 my ($code, $contents) = @{$subfield};
624                 push @{$subfields}, {$code => $contents};
625             }
626             push @{$data->{fields}}, {
627                 $tag => {
628                     ind1 => $field->indicator(1),
629                     ind2 => $field->indicator(2),
630                     subfields => $subfields
631                 }
632             };
633         }
634     }
635     return $data;
636 }
637
638 =head2 _array_to_marc($data)
639
640     my $record = _array_to_marc($data)
641
642 Convert an array modeled after MARC-in-JSON to a MARC::Record
643
644 =over 4
645
646 =item C<$data>
647
648 An array modeled after MARC-in-JSON
649 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
650
651 =back
652
653 =cut
654
655 sub _array_to_marc {
656     my ($self, $data) = @_;
657
658     my $record = MARC::Record->new();
659
660     $record->leader($data->{leader});
661     for my $field (@{$data->{fields}}) {
662         my $tag = (keys %{$field})[0];
663         $field = $field->{$tag};
664         my $marc_field;
665         if (ref($field) eq 'HASH') {
666             my @subfields;
667             foreach my $subfield (@{$field->{subfields}}) {
668                 my $code = (keys %{$subfield})[0];
669                 push @subfields, $code;
670                 push @subfields, $subfield->{$code};
671             }
672             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
673         } else {
674             $marc_field = MARC::Field->new($tag, $field)
675         }
676         $record->append_fields($marc_field);
677     }
678 ;
679     return $record;
680 }
681
682 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
683
684     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
685
686 Get mappings, an internal data structure later used by
687 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
688 data for a MARC mapping.
689
690 The returned C<$mappings> is not to to be confused with mappings provided by
691 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
692 provided by C<_foreach_mapping> and expands it to this internal data structure.
693 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
694 is then applied to each MARC target (leader, control field data, subfield or
695 joined subfields) and integrated into the mapping rules data structure used in
696 C<marc_records_to_documents> to transform MARC records into Elasticsearch
697 documents.
698
699 =over 4
700
701 =item C<$facet>
702
703 Boolean indicating whether to create a facet field for this mapping.
704
705 =item C<$suggestible>
706
707 Boolean indicating whether to create a suggestion field for this mapping.
708
709 =item C<$sort>
710
711 Boolean indicating whether to create a sort field for this mapping.
712
713 =item C<$search>
714
715 Boolean indicating whether to create a search field for this mapping.
716
717 =item C<$target_name>
718
719 Elasticsearch document target field name.
720
721 =item C<$target_type>
722
723 Elasticsearch document target field type.
724
725 =item C<$range>
726
727 An optional range as a string in the format "<START>-<END>" or "<START>",
728 where "<START>" and "<END>" are integers specifying a range that will be used
729 for extracting a substring from MARC data as Elasticsearch field target value.
730
731 The first character position is "0", and the range is inclusive,
732 so "0-2" means the first three characters of MARC data.
733
734 If only "<START>" is provided only one character at position "<START>" will
735 be extracted.
736
737 =back
738
739 =cut
740
741 sub _field_mappings {
742     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
743     my %mapping_defaults = ();
744     my @mappings;
745
746     my $substr_args = undef;
747     if (defined $range) {
748         # TODO: use value_callback instead?
749         my ($start, $end) = map(int, split /-/, $range, 2);
750         $substr_args = [$start];
751         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
752     }
753     my $default_options = {};
754     if ($substr_args) {
755         $default_options->{substr} = $substr_args;
756     }
757
758     # TODO: Should probably have per type value callback/hook
759     # but hard code for now
760     if ($target_type eq 'boolean') {
761         $default_options->{value_callbacks} //= [];
762         push @{$default_options->{value_callbacks}}, sub {
763             my ($value) = @_;
764             # Trim whitespace at both ends
765             $value =~ s/^\s+|\s+$//g;
766             return $value ? 'true' : 'false';
767         };
768     }
769
770     if ($search) {
771         my $mapping = [$target_name, $default_options];
772         push @mappings, $mapping;
773     }
774
775     my @suffixes = ();
776     push @suffixes, 'facet' if $facet;
777     push @suffixes, 'suggestion' if $suggestible;
778     push @suffixes, 'sort' if !defined $sort || $sort;
779
780     foreach my $suffix (@suffixes) {
781         my $mapping = ["${target_name}__$suffix"];
782         # TODO: Hack, fix later in less hideous manner
783         if ($suffix eq 'suggestion') {
784             push @{$mapping}, {%{$default_options}, property => 'input'};
785         }
786         else {
787             push @{$mapping}, $default_options;
788         }
789         push @mappings, $mapping;
790     }
791     return @mappings;
792 };
793
794 =head2 _get_marc_mapping_rules
795
796     my $mapping_rules = $self->_get_marc_mapping_rules()
797
798 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
799
800 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
801 each call to C<MARC::Record>->field) we create an optimized structure of mapping
802 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
803
804 We can then iterate through all MARC fields for each record and apply all relevant
805 rules once per fields instead of retreiving fields multiple times for each mapping rule
806 which is terribly slow.
807
808 =cut
809
810 # TODO: This structure can be used for processing multiple MARC::Records so is currently
811 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
812 # memory cache which it is currently not. The performance gain of caching
813 # would probably be marginal, but to do this could be a further improvement.
814
815 sub _get_marc_mapping_rules {
816     my ($self) = @_;
817     my $marcflavour = lc C4::Context->preference('marcflavour');
818     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
819     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
820     my $rules = {
821         'leader' => [],
822         'control_fields' => {},
823         'data_fields' => {},
824         'sum' => [],
825         'isbn' => [],
826         'defaults' => {}
827     };
828
829     $self->_foreach_mapping(sub {
830         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
831         return if $marc_type ne $marcflavour;
832
833         if ($type eq 'sum') {
834             push @{$rules->{sum}}, $name;
835         }
836         elsif ($type eq 'isbn') {
837             push @{$rules->{isbn}}, $name;
838         }
839         elsif ($type eq 'boolean') {
840             # boolean gets special handling, if value doesn't exist for a field,
841             # it is set to false
842             $rules->{defaults}->{$name} = 'false';
843         }
844
845         if ($marc_field =~ $field_spec_regexp) {
846             my $field_tag = $1;
847
848             my @subfields;
849             my @subfield_groups;
850             # Parse and separate subfields form subfield groups
851             if (defined $2) {
852                 my $subfield_group = '';
853                 my $open_group = 0;
854
855                 foreach my $token (split //, $2) {
856                     if ($token eq "(") {
857                         if ($open_group) {
858                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
859                                 "Unmatched opening parenthesis for $marc_field"
860                             );
861                         }
862                         else {
863                             $open_group = 1;
864                         }
865                     }
866                     elsif ($token eq ")") {
867                         if ($open_group) {
868                             if ($subfield_group) {
869                                 push @subfield_groups, $subfield_group;
870                                 $subfield_group = '';
871                             }
872                             $open_group = 0;
873                         }
874                         else {
875                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
876                                 "Unmatched closing parenthesis for $marc_field"
877                             );
878                         }
879                     }
880                     elsif ($open_group) {
881                         $subfield_group .= $token;
882                     }
883                     else {
884                         push @subfields, $token;
885                     }
886                 }
887             }
888             else {
889                 push @subfields, '*';
890             }
891
892             my $range = defined $3 ? $3 : undef;
893             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
894             if ($field_tag < 10) {
895                 $rules->{control_fields}->{$field_tag} //= [];
896                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
897             }
898             else {
899                 $rules->{data_fields}->{$field_tag} //= {};
900                 foreach my $subfield (@subfields) {
901                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
902                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
903                 }
904                 foreach my $subfield_group (@subfield_groups) {
905                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
906                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
907                 }
908             }
909         }
910         elsif ($marc_field =~ $leader_regexp) {
911             my $range = defined $1 ? $1 : undef;
912             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
913             push @{$rules->{leader}}, @mappings;
914         }
915         else {
916             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
917                 "Invalid MARC field expression: $marc_field"
918             );
919         }
920     });
921     return $rules;
922 }
923
924 =head2 _foreach_mapping
925
926     $self->_foreach_mapping(
927         sub {
928             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
929                 $marc_field )
930               = @_;
931             return unless $marc_type eq 'marc21';
932             print "Data comes from: " . $marc_field . "\n";
933         }
934     );
935
936 This allows you to apply a function to each entry in the elasticsearch mappings
937 table, in order to build the mappings for whatever is needed.
938
939 In the provided function, the files are:
940
941 =over 4
942
943 =item C<$name>
944
945 The field name for elasticsearch (corresponds to the 'mapping' column in the
946 database.
947
948 =item C<$type>
949
950 The type for this value, e.g. 'string'.
951
952 =item C<$facet>
953
954 True if this value should be facetised. This only really makes sense if the
955 field is understood by the facet processing code anyway.
956
957 =item C<$sort>
958
959 True if this is a field that a) needs special sort handling, and b) if it
960 should be sorted on. False if a) but not b). Undef if not a). This allows,
961 for example, author to be sorted on but not everything marked with "author"
962 to be included in that sort.
963
964 =item C<$marc_type>
965
966 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
967 'unimarc', 'normarc'.
968
969 =item C<$marc_field>
970
971 A string that describes the MARC field that contains the data to extract.
972 These are of a form suited to Catmandu's MARC fixers.
973
974 =back
975
976 =cut
977
978 sub _foreach_mapping {
979     my ( $self, $sub ) = @_;
980
981     # TODO use a caching framework here
982     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
983         {
984             'search_marc_map.index_name' => $self->index,
985         },
986         {   join => { search_marc_to_fields => 'search_marc_map' },
987             '+select' => [
988                 'search_marc_to_fields.facet',
989                 'search_marc_to_fields.suggestible',
990                 'search_marc_to_fields.sort',
991                 'search_marc_to_fields.search',
992                 'search_marc_map.marc_type',
993                 'search_marc_map.marc_field',
994             ],
995             '+as'     => [
996                 'facet',
997                 'suggestible',
998                 'sort',
999                 'search',
1000                 'marc_type',
1001                 'marc_field',
1002             ],
1003         }
1004     );
1005
1006     while ( my $search_field = $search_fields->next ) {
1007         $sub->(
1008             # Force lower case on indexed field names for case insensitive
1009             # field name searches
1010             lc($search_field->name),
1011             $search_field->type,
1012             $search_field->get_column('facet'),
1013             $search_field->get_column('suggestible'),
1014             $search_field->get_column('sort'),
1015             $search_field->get_column('search'),
1016             $search_field->get_column('marc_type'),
1017             $search_field->get_column('marc_field'),
1018         );
1019     }
1020 }
1021
1022 =head2 process_error
1023
1024     die process_error($@);
1025
1026 This parses an Elasticsearch error message and produces a human-readable
1027 result from it. This result is probably missing all the useful information
1028 that you might want in diagnosing an issue, so the warning is also logged.
1029
1030 Note that currently the resulting message is not internationalised. This
1031 will happen eventually by some method or other.
1032
1033 =cut
1034
1035 sub process_error {
1036     my ($self, $msg) = @_;
1037
1038     warn $msg; # simple logging
1039
1040     # This is super-primitive
1041     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1042
1043     return "Unable to perform your search. Please try again.\n";
1044 }
1045
1046 =head2 _read_configuration
1047
1048     my $conf = _read_configuration();
1049
1050 Reads the I<configuration file> and returns a hash structure with the
1051 configuration information. It raises an exception if mandatory entries
1052 are missing.
1053
1054 The hashref structure has the following form:
1055
1056     {
1057         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1058         'index_name' => 'koha_instance',
1059     }
1060
1061 This is configured by the following in the C<config> block in koha-conf.xml:
1062
1063     <elasticsearch>
1064         <server>127.0.0.1:9200</server>
1065         <server>anotherserver:9200</server>
1066         <index_name>koha_instance</index_name>
1067     </elasticsearch>
1068
1069 =cut
1070
1071 sub _read_configuration {
1072
1073     my $configuration;
1074
1075     my $conf = C4::Context->config('elasticsearch');
1076     Koha::Exceptions::Config::MissingEntry->throw(
1077         "Missing 'elasticsearch' block in config file")
1078       unless defined $conf;
1079
1080     if ( $conf && $conf->{server} ) {
1081         my $nodes = $conf->{server};
1082         if ( ref($nodes) eq 'ARRAY' ) {
1083             $configuration->{nodes} = $nodes;
1084         }
1085         else {
1086             $configuration->{nodes} = [$nodes];
1087         }
1088     }
1089     else {
1090         Koha::Exceptions::Config::MissingEntry->throw(
1091             "Missing 'server' entry in config file for elasticsearch");
1092     }
1093
1094     if ( defined $conf->{index_name} ) {
1095         $configuration->{index_name} = $conf->{index_name};
1096     }
1097     else {
1098         Koha::Exceptions::Config::MissingEntry->throw(
1099             "Missing 'index_name' entry in config file for elasticsearch");
1100     }
1101
1102     return $configuration;
1103 }
1104
1105 =head2 get_facetable_fields
1106
1107 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1108
1109 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1110
1111 =cut
1112
1113 sub get_facetable_fields {
1114     my ($self) = @_;
1115
1116     # These should correspond to the ES field names, as opposed to the CCL
1117     # things that zebra uses.
1118     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1119     my @faceted_fields = Koha::SearchFields->search(
1120         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1121     );
1122     my @not_faceted_fields = Koha::SearchFields->search(
1123         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1124     );
1125     # This could certainly be improved
1126     return ( @faceted_fields, @not_faceted_fields );
1127 }
1128
1129 1;
1130
1131 __END__
1132
1133 =head1 AUTHOR
1134
1135 =over 4
1136
1137 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1138
1139 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1140
1141 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1142
1143 =back
1144
1145 =cut