Bug 23630: Do not remove field 999 in Elasticsearch indexing
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29
30 use Carp;
31 use Clone qw(clone);
32 use JSON;
33 use Modern::Perl;
34 use Readonly;
35 use Search::Elasticsearch;
36 use Try::Tiny;
37 use YAML::Syck;
38
39 use List::Util qw( sum0 reduce );
40 use MARC::File::XML;
41 use MIME::Base64;
42 use Encode qw(encode);
43 use Business::ISBN;
44
45 __PACKAGE__->mk_ro_accessors(qw( index ));
46 __PACKAGE__->mk_accessors(qw( sort_fields ));
47
48 # Constants to refer to the standard index names
49 Readonly our $BIBLIOS_INDEX     => 'biblios';
50 Readonly our $AUTHORITIES_INDEX => 'authorities';
51
52 =head1 NAME
53
54 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
55
56 =head1 ACCESSORS
57
58 =over 4
59
60 =item index
61
62 The name of the index to use, generally 'biblios' or 'authorities'.
63
64 =back
65
66 =head1 FUNCTIONS
67
68 =cut
69
70 sub new {
71     my $class = shift @_;
72     my $self = $class->SUPER::new(@_);
73     # Check for a valid index
74     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
75     return $self;
76 }
77
78 =head2 get_elasticsearch
79
80     my $elasticsearch_client = $self->get_elasticsearch();
81
82 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
83 instance level and will be reused if method is called multiple times.
84
85 =cut
86
87 sub get_elasticsearch {
88     my $self = shift @_;
89     unless (defined $self->{elasticsearch}) {
90         my $conf = $self->get_elasticsearch_params();
91         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
92     }
93     return $self->{elasticsearch};
94 }
95
96 =head2 get_elasticsearch_params
97
98     my $params = $self->get_elasticsearch_params();
99
100 This provides a hashref that contains the parameters for connecting to the
101 ElasicSearch servers, in the form:
102
103     {
104         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
105         'index_name' => 'koha_instance_index',
106     }
107
108 This is configured by the following in the C<config> block in koha-conf.xml:
109
110     <elasticsearch>
111         <server>127.0.0.1:9200</server>
112         <server>anotherserver:9200</server>
113         <index_name>koha_instance</index_name>
114     </elasticsearch>
115
116 =cut
117
118 sub get_elasticsearch_params {
119     my ($self) = @_;
120
121     # Copy the hash so that we're not modifying the original
122     my $conf = C4::Context->config('elasticsearch');
123     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
124     my $es = { %{ $conf } };
125
126     # Helpfully, the multiple server lines end up in an array for us anyway
127     # if there are multiple ones, but not if there's only one.
128     my $server = $es->{server};
129     delete $es->{server};
130     if ( ref($server) eq 'ARRAY' ) {
131
132         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
133         $es->{nodes} = $server;
134     }
135     elsif ($server) {
136         $es->{nodes} = [$server];
137     }
138     else {
139         die "No elasticsearch servers were specified in koha-conf.xml.\n";
140     }
141     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
142       if ( !$es->{index_name} );
143     # Append the name of this particular index to our namespace
144     $es->{index_name} .= '_' . $self->index;
145
146     $es->{key_prefix} = 'es_';
147     $es->{client} //= '5_0::Direct';
148     $es->{cxn_pool} //= 'Static';
149     $es->{request_timeout} //= 60;
150
151     return $es;
152 }
153
154 =head2 get_elasticsearch_settings
155
156     my $settings = $self->get_elasticsearch_settings();
157
158 This provides the settings provided to Elasticsearch when an index is created.
159 These can do things like define tokenization methods.
160
161 A hashref containing the settings is returned.
162
163 =cut
164
165 sub get_elasticsearch_settings {
166     my ($self) = @_;
167
168     # Use state to speed up repeated calls
169     state $settings = undef;
170     if (!defined $settings) {
171         my $config_file = C4::Context->config('elasticsearch_index_config');
172         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
173         $settings = LoadFile( $config_file );
174     }
175
176     return $settings;
177 }
178
179 =head2 get_elasticsearch_mappings
180
181     my $mappings = $self->get_elasticsearch_mappings();
182
183 This provides the mappings that get passed to Elasticsearch when an index is
184 created.
185
186 =cut
187
188 sub get_elasticsearch_mappings {
189     my ($self) = @_;
190
191     # Use state to speed up repeated calls
192     state %all_mappings;
193     state %sort_fields;
194
195     if (!defined $all_mappings{$self->index}) {
196         $sort_fields{$self->index} = {};
197         # Clone the general mapping to break ties with the original hash
198         my $mappings = {
199             data => clone(_get_elasticsearch_field_config('general', ''))
200         };
201         my $marcflavour = lc C4::Context->preference('marcflavour');
202         $self->_foreach_mapping(
203             sub {
204                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 if ($search) {
221                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222                 }
223
224                 if ($facet) {
225                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226                 }
227                 if ($suggestible) {
228                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
229                 }
230                 # Sort is a bit special as it can be true, false, undef.
231                 # We care about "true" or "undef",
232                 # "undef" means to do the default thing, which is make it sortable.
233                 if (!defined $sort || $sort) {
234                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
235                     $sort_fields{$self->index}{$name} = 1;
236                 }
237             }
238         );
239         $all_mappings{$self->index} = $mappings;
240     }
241     $self->sort_fields(\%{$sort_fields{$self->index}});
242
243     return $all_mappings{$self->index};
244 }
245
246 =head2 _get_elasticsearch_field_config
247
248 Get the Elasticsearch field config for the given purpose and data type.
249
250 $mapping = _get_elasticsearch_field_config('search', 'text');
251
252 =cut
253
254 sub _get_elasticsearch_field_config {
255
256     my ( $purpose, $type ) = @_;
257
258     # Use state to speed up repeated calls
259     state $settings = undef;
260     if (!defined $settings) {
261         my $config_file = C4::Context->config('elasticsearch_field_config');
262         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
263         $settings = LoadFile( $config_file );
264     }
265
266     if (!defined $settings->{$purpose}) {
267         die "Field purpose $purpose not defined in field config";
268     }
269     if ($type eq '') {
270         return $settings->{$purpose};
271     }
272     if (defined $settings->{$purpose}{$type}) {
273         return $settings->{$purpose}{$type};
274     }
275     if (defined $settings->{$purpose}{'default'}) {
276         return $settings->{$purpose}{'default'};
277     }
278     return;
279 }
280
281 =head2 _load_elasticsearch_mappings
282
283 Load Elasticsearch mappings in the format of mappings.yaml.
284
285 $indexes = _load_elasticsearch_mappings();
286
287 =cut
288
289 sub _load_elasticsearch_mappings {
290     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
291     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
292     return LoadFile( $mappings_yaml );
293 }
294
295 sub reset_elasticsearch_mappings {
296     my ( $self ) = @_;
297     my $indexes = $self->_load_elasticsearch_mappings();
298
299     while ( my ( $index_name, $fields ) = each %$indexes ) {
300         while ( my ( $field_name, $data ) = each %$fields ) {
301
302             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
303
304             # Set default values
305             $sf_params{staff_client} //= 1;
306             $sf_params{opac} //= 1;
307
308             $sf_params{name} = $field_name;
309
310             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
311
312             my $mappings = $data->{mappings};
313             for my $mapping ( @$mappings ) {
314                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
315                     index_name => $index_name,
316                     marc_type => $mapping->{marc_type},
317                     marc_field => $mapping->{marc_field}
318                 });
319                 $search_field->add_to_search_marc_maps($marc_field, {
320                     facet => $mapping->{facet} || 0,
321                     suggestible => $mapping->{suggestible} || 0,
322                     sort => $mapping->{sort},
323                     search => $mapping->{search} // 1
324                 });
325             }
326         }
327     }
328 }
329
330 # This overrides the accessor provided by Class::Accessor so that if
331 # sort_fields isn't set, then it'll generate it.
332 sub sort_fields {
333     my $self = shift;
334     if (@_) {
335         $self->_sort_fields_accessor(@_);
336         return;
337     }
338     my $val = $self->_sort_fields_accessor();
339     return $val if $val;
340
341     # This will populate the accessor as a side effect
342     $self->get_elasticsearch_mappings();
343     return $self->_sort_fields_accessor();
344 }
345
346 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
347
348     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
349
350 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
351 Since we group all mappings by MARC field targets C<$mappings> will contain
352 all targets for C<$data> and thus we need to fetch the MARC field only once.
353 C<$mappings> will be applied to C<$record_document> and new field values added.
354 The method has no return value.
355
356 =over 4
357
358 =item C<$mappings>
359
360 Arrayref of mappings containing arrayrefs in the format
361 [C<$target>, C<$options>] where C<$target> is the name of the target field and
362 C<$options> is a hashref containing processing directives for this particular
363 mapping.
364
365 =item C<$data>
366
367 The source data from a MARC record field.
368
369 =item C<$record_document>
370
371 Hashref representing the Elasticsearch document on which mappings should be
372 applied.
373
374 =item C<$altscript>
375
376 A boolean value indicating whether an alternate script presentation is being
377 processed.
378
379 =back
380
381 =cut
382
383 sub _process_mappings {
384     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
385     foreach my $mapping (@{$mappings}) {
386         my ($target, $options) = @{$mapping};
387
388         # Don't process sort fields for alternate scripts
389         my $sort = $target =~ /__sort$/;
390         if ($sort && $altscript) {
391             next;
392         }
393
394         # Copy (scalar) data since can have multiple targets
395         # with differing options for (possibly) mutating data
396         # so need a different copy for each
397         my $_data = $data;
398         $record_document->{$target} //= [];
399         if (defined $options->{substr}) {
400             my ($start, $length) = @{$options->{substr}};
401             $_data = length($data) > $start ? substr $data, $start, $length : '';
402         }
403         if (defined $options->{value_callbacks}) {
404             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
405         }
406         if (defined $options->{property}) {
407             $_data = {
408                 $options->{property} => $_data
409             }
410         }
411         push @{$record_document->{$target}}, $_data;
412     }
413 }
414
415 =head2 marc_records_to_documents($marc_records)
416
417     my $record_documents = $self->marc_records_to_documents($marc_records);
418
419 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
420
421 Returns array of hash references, representing Elasticsearch documents,
422 acceptable as body payload in C<Search::Elasticsearch> requests.
423
424 =over 4
425
426 =item C<$marc_documents>
427
428 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
429
430 =back
431
432 =cut
433
434 sub marc_records_to_documents {
435     my ($self, $records) = @_;
436     my $rules = $self->_get_marc_mapping_rules();
437     my $control_fields_rules = $rules->{control_fields};
438     my $data_fields_rules = $rules->{data_fields};
439     my $marcflavour = lc C4::Context->preference('marcflavour');
440     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
441
442     my @record_documents;
443
444     foreach my $record (@{$records}) {
445         my $record_document = {};
446         my $mappings = $rules->{leader};
447         if ($mappings) {
448             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
449         }
450         foreach my $field ($record->fields()) {
451             if ($field->is_control_field()) {
452                 my $mappings = $control_fields_rules->{$field->tag()};
453                 if ($mappings) {
454                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
455                 }
456             }
457             else {
458                 my $tag = $field->tag();
459                 # Handle alternate scripts in MARC 21
460                 my $altscript = 0;
461                 if ($marcflavour eq 'marc21' && $tag eq '880') {
462                     my $sub6 = $field->subfield('6');
463                     if ($sub6 =~ /^(...)-\d+/) {
464                         $tag = $1;
465                         $altscript = 1;
466                     }
467                 }
468
469                 my $data_field_rules = $data_fields_rules->{$tag};
470
471                 if ($data_field_rules) {
472                     my $subfields_mappings = $data_field_rules->{subfields};
473                     my $wildcard_mappings = $subfields_mappings->{'*'};
474                     foreach my $subfield ($field->subfields()) {
475                         my ($code, $data) = @{$subfield};
476                         my $mappings = $subfields_mappings->{$code} // [];
477                         if ($wildcard_mappings) {
478                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
479                         }
480                         if (@{$mappings}) {
481                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
482                         }
483                     }
484
485                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
486                     if ($subfields_join_mappings) {
487                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
488                             # Map each subfield to values, remove empty values, join with space
489                             my $data = join(
490                                 ' ',
491                                 grep(
492                                     $_,
493                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
494                                 )
495                             );
496                             if ($data) {
497                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
498                             }
499                         }
500                     }
501                 }
502             }
503         }
504         foreach my $field (keys %{$rules->{defaults}}) {
505             unless (defined $record_document->{$field}) {
506                 $record_document->{$field} = $rules->{defaults}->{$field};
507             }
508         }
509         foreach my $field (@{$rules->{sum}}) {
510             if (defined $record_document->{$field}) {
511                 # TODO: validate numeric? filter?
512                 # TODO: Or should only accept fields without nested values?
513                 # TODO: Quick and dirty, improve if needed
514                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
515             }
516         }
517         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
518         foreach my $field (@{$rules->{isbn}}) {
519             if (defined $record_document->{$field}) {
520                 my @isbns = ();
521                 foreach my $input_isbn (@{$record_document->{$field}}) {
522                     my $isbn = Business::ISBN->new($input_isbn);
523                     if (defined $isbn && $isbn->is_valid) {
524                         my $isbn13 = $isbn->as_isbn13->as_string;
525                         push @isbns, $isbn13;
526                         $isbn13 =~ s/\-//g;
527                         push @isbns, $isbn13;
528
529                         my $isbn10 = $isbn->as_isbn10;
530                         if ($isbn10) {
531                             $isbn10 = $isbn10->as_string;
532                             push @isbns, $isbn10;
533                             $isbn10 =~ s/\-//g;
534                             push @isbns, $isbn10;
535                         }
536                     } else {
537                         push @isbns, $input_isbn;
538                     }
539                 }
540                 $record_document->{$field} = \@isbns;
541             }
542         }
543
544         # Remove duplicate values and collapse sort fields
545         foreach my $field (keys %{$record_document}) {
546             if (ref($record_document->{$field}) eq 'ARRAY') {
547                 @{$record_document->{$field}} = do {
548                     my %seen;
549                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
550                 };
551                 if ($field =~ /__sort$/) {
552                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
553                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
554                 }
555             }
556         }
557
558         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
559         $record->encoding('UTF-8');
560         if ($use_array) {
561             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
562             $record_document->{'marc_format'} = 'ARRAY';
563         } else {
564             my @warnings;
565             {
566                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
567                 local $SIG{__WARN__} = sub {
568                     push @warnings, $_[0];
569                 };
570                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
571             }
572             if (@warnings) {
573                 # Suppress warnings if record length exceeded
574                 unless (substr($record->leader(), 0, 5) eq '99999') {
575                     foreach my $warning (@warnings) {
576                         carp $warning;
577                     }
578                 }
579                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
580                 $record_document->{'marc_format'} = 'MARCXML';
581             }
582             else {
583                 $record_document->{'marc_format'} = 'base64ISO2709';
584             }
585         }
586         push @record_documents, $record_document;
587     }
588     return \@record_documents;
589 }
590
591 =head2 _marc_to_array($record)
592
593     my @fields = _marc_to_array($record)
594
595 Convert a MARC::Record to an array modeled after MARC-in-JSON
596 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
597
598 =over 4
599
600 =item C<$record>
601
602 A MARC::Record object
603
604 =back
605
606 =cut
607
608 sub _marc_to_array {
609     my ($self, $record) = @_;
610
611     my $data = {
612         leader => $record->leader(),
613         fields => []
614     };
615     for my $field ($record->fields()) {
616         my $tag = $field->tag();
617         if ($field->is_control_field()) {
618             push @{$data->{fields}}, {$tag => $field->data()};
619         } else {
620             my $subfields = ();
621             foreach my $subfield ($field->subfields()) {
622                 my ($code, $contents) = @{$subfield};
623                 push @{$subfields}, {$code => $contents};
624             }
625             push @{$data->{fields}}, {
626                 $tag => {
627                     ind1 => $field->indicator(1),
628                     ind2 => $field->indicator(2),
629                     subfields => $subfields
630                 }
631             };
632         }
633     }
634     return $data;
635 }
636
637 =head2 _array_to_marc($data)
638
639     my $record = _array_to_marc($data)
640
641 Convert an array modeled after MARC-in-JSON to a MARC::Record
642
643 =over 4
644
645 =item C<$data>
646
647 An array modeled after MARC-in-JSON
648 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
649
650 =back
651
652 =cut
653
654 sub _array_to_marc {
655     my ($self, $data) = @_;
656
657     my $record = MARC::Record->new();
658
659     $record->leader($data->{leader});
660     for my $field (@{$data->{fields}}) {
661         my $tag = (keys %{$field})[0];
662         $field = $field->{$tag};
663         my $marc_field;
664         if (ref($field) eq 'HASH') {
665             my @subfields;
666             foreach my $subfield (@{$field->{subfields}}) {
667                 my $code = (keys %{$subfield})[0];
668                 push @subfields, $code;
669                 push @subfields, $subfield->{$code};
670             }
671             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
672         } else {
673             $marc_field = MARC::Field->new($tag, $field)
674         }
675         $record->append_fields($marc_field);
676     }
677 ;
678     return $record;
679 }
680
681 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
682
683     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
684
685 Get mappings, an internal data structure later used by
686 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
687 data for a MARC mapping.
688
689 The returned C<$mappings> is not to to be confused with mappings provided by
690 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
691 provided by C<_foreach_mapping> and expands it to this internal data structure.
692 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
693 is then applied to each MARC target (leader, control field data, subfield or
694 joined subfields) and integrated into the mapping rules data structure used in
695 C<marc_records_to_documents> to transform MARC records into Elasticsearch
696 documents.
697
698 =over 4
699
700 =item C<$facet>
701
702 Boolean indicating whether to create a facet field for this mapping.
703
704 =item C<$suggestible>
705
706 Boolean indicating whether to create a suggestion field for this mapping.
707
708 =item C<$sort>
709
710 Boolean indicating whether to create a sort field for this mapping.
711
712 =item C<$search>
713
714 Boolean indicating whether to create a search field for this mapping.
715
716 =item C<$target_name>
717
718 Elasticsearch document target field name.
719
720 =item C<$target_type>
721
722 Elasticsearch document target field type.
723
724 =item C<$range>
725
726 An optional range as a string in the format "<START>-<END>" or "<START>",
727 where "<START>" and "<END>" are integers specifying a range that will be used
728 for extracting a substring from MARC data as Elasticsearch field target value.
729
730 The first character position is "0", and the range is inclusive,
731 so "0-2" means the first three characters of MARC data.
732
733 If only "<START>" is provided only one character at position "<START>" will
734 be extracted.
735
736 =back
737
738 =cut
739
740 sub _field_mappings {
741     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
742     my %mapping_defaults = ();
743     my @mappings;
744
745     my $substr_args = undef;
746     if (defined $range) {
747         # TODO: use value_callback instead?
748         my ($start, $end) = map(int, split /-/, $range, 2);
749         $substr_args = [$start];
750         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
751     }
752     my $default_options = {};
753     if ($substr_args) {
754         $default_options->{substr} = $substr_args;
755     }
756
757     # TODO: Should probably have per type value callback/hook
758     # but hard code for now
759     if ($target_type eq 'boolean') {
760         $default_options->{value_callbacks} //= [];
761         push @{$default_options->{value_callbacks}}, sub {
762             my ($value) = @_;
763             # Trim whitespace at both ends
764             $value =~ s/^\s+|\s+$//g;
765             return $value ? 'true' : 'false';
766         };
767     }
768
769     if ($search) {
770         my $mapping = [$target_name, $default_options];
771         push @mappings, $mapping;
772     }
773
774     my @suffixes = ();
775     push @suffixes, 'facet' if $facet;
776     push @suffixes, 'suggestion' if $suggestible;
777     push @suffixes, 'sort' if !defined $sort || $sort;
778
779     foreach my $suffix (@suffixes) {
780         my $mapping = ["${target_name}__$suffix"];
781         # TODO: Hack, fix later in less hideous manner
782         if ($suffix eq 'suggestion') {
783             push @{$mapping}, {%{$default_options}, property => 'input'};
784         }
785         else {
786             push @{$mapping}, $default_options;
787         }
788         push @mappings, $mapping;
789     }
790     return @mappings;
791 };
792
793 =head2 _get_marc_mapping_rules
794
795     my $mapping_rules = $self->_get_marc_mapping_rules()
796
797 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
798
799 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
800 each call to C<MARC::Record>->field) we create an optimized structure of mapping
801 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
802
803 We can then iterate through all MARC fields for each record and apply all relevant
804 rules once per fields instead of retreiving fields multiple times for each mapping rule
805 which is terribly slow.
806
807 =cut
808
809 # TODO: This structure can be used for processing multiple MARC::Records so is currently
810 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
811 # memory cache which it is currently not. The performance gain of caching
812 # would probably be marginal, but to do this could be a further improvement.
813
814 sub _get_marc_mapping_rules {
815     my ($self) = @_;
816     my $marcflavour = lc C4::Context->preference('marcflavour');
817     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
818     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
819     my $rules = {
820         'leader' => [],
821         'control_fields' => {},
822         'data_fields' => {},
823         'sum' => [],
824         'isbn' => [],
825         'defaults' => {}
826     };
827
828     $self->_foreach_mapping(sub {
829         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
830         return if $marc_type ne $marcflavour;
831
832         if ($type eq 'sum') {
833             push @{$rules->{sum}}, $name;
834         }
835         elsif ($type eq 'isbn') {
836             push @{$rules->{isbn}}, $name;
837         }
838         elsif ($type eq 'boolean') {
839             # boolean gets special handling, if value doesn't exist for a field,
840             # it is set to false
841             $rules->{defaults}->{$name} = 'false';
842         }
843
844         if ($marc_field =~ $field_spec_regexp) {
845             my $field_tag = $1;
846
847             my @subfields;
848             my @subfield_groups;
849             # Parse and separate subfields form subfield groups
850             if (defined $2) {
851                 my $subfield_group = '';
852                 my $open_group = 0;
853
854                 foreach my $token (split //, $2) {
855                     if ($token eq "(") {
856                         if ($open_group) {
857                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
858                                 "Unmatched opening parenthesis for $marc_field"
859                             );
860                         }
861                         else {
862                             $open_group = 1;
863                         }
864                     }
865                     elsif ($token eq ")") {
866                         if ($open_group) {
867                             if ($subfield_group) {
868                                 push @subfield_groups, $subfield_group;
869                                 $subfield_group = '';
870                             }
871                             $open_group = 0;
872                         }
873                         else {
874                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
875                                 "Unmatched closing parenthesis for $marc_field"
876                             );
877                         }
878                     }
879                     elsif ($open_group) {
880                         $subfield_group .= $token;
881                     }
882                     else {
883                         push @subfields, $token;
884                     }
885                 }
886             }
887             else {
888                 push @subfields, '*';
889             }
890
891             my $range = defined $3 ? $3 : undef;
892             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
893             if ($field_tag < 10) {
894                 $rules->{control_fields}->{$field_tag} //= [];
895                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
896             }
897             else {
898                 $rules->{data_fields}->{$field_tag} //= {};
899                 foreach my $subfield (@subfields) {
900                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
901                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
902                 }
903                 foreach my $subfield_group (@subfield_groups) {
904                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
905                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
906                 }
907             }
908         }
909         elsif ($marc_field =~ $leader_regexp) {
910             my $range = defined $1 ? $1 : undef;
911             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
912             push @{$rules->{leader}}, @mappings;
913         }
914         else {
915             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
916                 "Invalid MARC field expression: $marc_field"
917             );
918         }
919     });
920     return $rules;
921 }
922
923 =head2 _foreach_mapping
924
925     $self->_foreach_mapping(
926         sub {
927             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
928                 $marc_field )
929               = @_;
930             return unless $marc_type eq 'marc21';
931             print "Data comes from: " . $marc_field . "\n";
932         }
933     );
934
935 This allows you to apply a function to each entry in the elasticsearch mappings
936 table, in order to build the mappings for whatever is needed.
937
938 In the provided function, the files are:
939
940 =over 4
941
942 =item C<$name>
943
944 The field name for elasticsearch (corresponds to the 'mapping' column in the
945 database.
946
947 =item C<$type>
948
949 The type for this value, e.g. 'string'.
950
951 =item C<$facet>
952
953 True if this value should be facetised. This only really makes sense if the
954 field is understood by the facet processing code anyway.
955
956 =item C<$sort>
957
958 True if this is a field that a) needs special sort handling, and b) if it
959 should be sorted on. False if a) but not b). Undef if not a). This allows,
960 for example, author to be sorted on but not everything marked with "author"
961 to be included in that sort.
962
963 =item C<$marc_type>
964
965 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
966 'unimarc', 'normarc'.
967
968 =item C<$marc_field>
969
970 A string that describes the MARC field that contains the data to extract.
971 These are of a form suited to Catmandu's MARC fixers.
972
973 =back
974
975 =cut
976
977 sub _foreach_mapping {
978     my ( $self, $sub ) = @_;
979
980     # TODO use a caching framework here
981     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
982         {
983             'search_marc_map.index_name' => $self->index,
984         },
985         {   join => { search_marc_to_fields => 'search_marc_map' },
986             '+select' => [
987                 'search_marc_to_fields.facet',
988                 'search_marc_to_fields.suggestible',
989                 'search_marc_to_fields.sort',
990                 'search_marc_to_fields.search',
991                 'search_marc_map.marc_type',
992                 'search_marc_map.marc_field',
993             ],
994             '+as'     => [
995                 'facet',
996                 'suggestible',
997                 'sort',
998                 'search',
999                 'marc_type',
1000                 'marc_field',
1001             ],
1002         }
1003     );
1004
1005     while ( my $search_field = $search_fields->next ) {
1006         $sub->(
1007             # Force lower case on indexed field names for case insensitive
1008             # field name searches
1009             lc($search_field->name),
1010             $search_field->type,
1011             $search_field->get_column('facet'),
1012             $search_field->get_column('suggestible'),
1013             $search_field->get_column('sort'),
1014             $search_field->get_column('search'),
1015             $search_field->get_column('marc_type'),
1016             $search_field->get_column('marc_field'),
1017         );
1018     }
1019 }
1020
1021 =head2 process_error
1022
1023     die process_error($@);
1024
1025 This parses an Elasticsearch error message and produces a human-readable
1026 result from it. This result is probably missing all the useful information
1027 that you might want in diagnosing an issue, so the warning is also logged.
1028
1029 Note that currently the resulting message is not internationalised. This
1030 will happen eventually by some method or other.
1031
1032 =cut
1033
1034 sub process_error {
1035     my ($self, $msg) = @_;
1036
1037     warn $msg; # simple logging
1038
1039     # This is super-primitive
1040     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1041
1042     return "Unable to perform your search. Please try again.\n";
1043 }
1044
1045 =head2 _read_configuration
1046
1047     my $conf = _read_configuration();
1048
1049 Reads the I<configuration file> and returns a hash structure with the
1050 configuration information. It raises an exception if mandatory entries
1051 are missing.
1052
1053 The hashref structure has the following form:
1054
1055     {
1056         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1057         'index_name' => 'koha_instance',
1058     }
1059
1060 This is configured by the following in the C<config> block in koha-conf.xml:
1061
1062     <elasticsearch>
1063         <server>127.0.0.1:9200</server>
1064         <server>anotherserver:9200</server>
1065         <index_name>koha_instance</index_name>
1066     </elasticsearch>
1067
1068 =cut
1069
1070 sub _read_configuration {
1071
1072     my $configuration;
1073
1074     my $conf = C4::Context->config('elasticsearch');
1075     Koha::Exceptions::Config::MissingEntry->throw(
1076         "Missing 'elasticsearch' block in config file")
1077       unless defined $conf;
1078
1079     if ( $conf && $conf->{server} ) {
1080         my $nodes = $conf->{server};
1081         if ( ref($nodes) eq 'ARRAY' ) {
1082             $configuration->{nodes} = $nodes;
1083         }
1084         else {
1085             $configuration->{nodes} = [$nodes];
1086         }
1087     }
1088     else {
1089         Koha::Exceptions::Config::MissingEntry->throw(
1090             "Missing 'server' entry in config file for elasticsearch");
1091     }
1092
1093     if ( defined $conf->{index_name} ) {
1094         $configuration->{index_name} = $conf->{index_name};
1095     }
1096     else {
1097         Koha::Exceptions::Config::MissingEntry->throw(
1098             "Missing 'index_name' entry in config file for elasticsearch");
1099     }
1100
1101     return $configuration;
1102 }
1103
1104 =head2 get_facetable_fields
1105
1106 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1107
1108 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1109
1110 =cut
1111
1112 sub get_facetable_fields {
1113     my ($self) = @_;
1114
1115     # These should correspond to the ES field names, as opposed to the CCL
1116     # things that zebra uses.
1117     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1118     my @faceted_fields = Koha::SearchFields->search(
1119         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1120     );
1121     my @not_faceted_fields = Koha::SearchFields->search(
1122         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1123     );
1124     # This could certainly be improved
1125     return ( @faceted_fields, @not_faceted_fields );
1126 }
1127
1128 1;
1129
1130 __END__
1131
1132 =head1 AUTHOR
1133
1134 =over 4
1135
1136 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1137
1138 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1139
1140 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1141
1142 =back
1143
1144 =cut