Bug 22258: Elasticsearch: Add array as an alternative MARC format
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use Clone qw(clone);
31 use JSON;
32 use Modern::Perl;
33 use Readonly;
34 use Search::Elasticsearch;
35 use Try::Tiny;
36 use YAML::Syck;
37
38 use List::Util qw( sum0 reduce );
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42 use Business::ISBN;
43
44 __PACKAGE__->mk_ro_accessors(qw( index ));
45 __PACKAGE__->mk_accessors(qw( sort_fields ));
46
47 # Constants to refer to the standard index names
48 Readonly our $BIBLIOS_INDEX     => 'biblios';
49 Readonly our $AUTHORITIES_INDEX => 'authorities';
50
51 =head1 NAME
52
53 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
54
55 =head1 ACCESSORS
56
57 =over 4
58
59 =item index
60
61 The name of the index to use, generally 'biblios' or 'authorities'.
62
63 =back
64
65 =head1 FUNCTIONS
66
67 =cut
68
69 sub new {
70     my $class = shift @_;
71     my $self = $class->SUPER::new(@_);
72     # Check for a valid index
73     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
74     return $self;
75 }
76
77 =head2 get_elasticsearch
78
79     my $elasticsearch_client = $self->get_elasticsearch();
80
81 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
82 instance level and will be reused if method is called multiple times.
83
84 =cut
85
86 sub get_elasticsearch {
87     my $self = shift @_;
88     unless (defined $self->{elasticsearch}) {
89         my $conf = $self->get_elasticsearch_params();
90         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
91     }
92     return $self->{elasticsearch};
93 }
94
95 =head2 get_elasticsearch_params
96
97     my $params = $self->get_elasticsearch_params();
98
99 This provides a hashref that contains the parameters for connecting to the
100 ElasicSearch servers, in the form:
101
102     {
103         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
104         'index_name' => 'koha_instance_index',
105     }
106
107 This is configured by the following in the C<config> block in koha-conf.xml:
108
109     <elasticsearch>
110         <server>127.0.0.1:9200</server>
111         <server>anotherserver:9200</server>
112         <index_name>koha_instance</index_name>
113     </elasticsearch>
114
115 =cut
116
117 sub get_elasticsearch_params {
118     my ($self) = @_;
119
120     # Copy the hash so that we're not modifying the original
121     my $conf = C4::Context->config('elasticsearch');
122     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
123     my $es = { %{ $conf } };
124
125     # Helpfully, the multiple server lines end up in an array for us anyway
126     # if there are multiple ones, but not if there's only one.
127     my $server = $es->{server};
128     delete $es->{server};
129     if ( ref($server) eq 'ARRAY' ) {
130
131         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
132         $es->{nodes} = $server;
133     }
134     elsif ($server) {
135         $es->{nodes} = [$server];
136     }
137     else {
138         die "No elasticsearch servers were specified in koha-conf.xml.\n";
139     }
140     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
141       if ( !$es->{index_name} );
142     # Append the name of this particular index to our namespace
143     $es->{index_name} .= '_' . $self->index;
144
145     $es->{key_prefix} = 'es_';
146     $es->{client} //= '5_0::Direct';
147     $es->{cxn_pool} //= 'Static';
148     $es->{request_timeout} //= 60;
149
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         # Clone the general mapping to break ties with the original hash
197         my $mappings = {
198             data => clone(_get_elasticsearch_field_config('general', ''))
199         };
200         my $marcflavour = lc C4::Context->preference('marcflavour');
201         $self->_foreach_mapping(
202             sub {
203                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
204
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 _get_elasticsearch_field_config
245
246 Get the Elasticsearch field config for the given purpose and data type.
247
248 $mapping = _get_elasticsearch_field_config('search', 'text');
249
250 =cut
251
252 sub _get_elasticsearch_field_config {
253
254     my ( $purpose, $type ) = @_;
255
256     # Use state to speed up repeated calls
257     state $settings = undef;
258     if (!defined $settings) {
259         my $config_file = C4::Context->config('elasticsearch_field_config');
260         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
261         $settings = LoadFile( $config_file );
262     }
263
264     if (!defined $settings->{$purpose}) {
265         die "Field purpose $purpose not defined in field config";
266     }
267     if ($type eq '') {
268         return $settings->{$purpose};
269     }
270     if (defined $settings->{$purpose}{$type}) {
271         return $settings->{$purpose}{$type};
272     }
273     if (defined $settings->{$purpose}{'default'}) {
274         return $settings->{$purpose}{'default'};
275     }
276     return;
277 }
278
279 sub reset_elasticsearch_mappings {
280     my ( $reset_fields ) = @_;
281     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
282     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
283     my $indexes = LoadFile( $mappings_yaml );
284
285     while ( my ( $index_name, $fields ) = each %$indexes ) {
286         while ( my ( $field_name, $data ) = each %$fields ) {
287             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight facet_order /;
288             $sf_params{name} = $field_name;
289
290             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
291
292             my $mappings = $data->{mappings};
293             for my $mapping ( @$mappings ) {
294                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
295                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
296             }
297         }
298     }
299 }
300
301 # This overrides the accessor provided by Class::Accessor so that if
302 # sort_fields isn't set, then it'll generate it.
303 sub sort_fields {
304     my $self = shift;
305     if (@_) {
306         $self->_sort_fields_accessor(@_);
307         return;
308     }
309     my $val = $self->_sort_fields_accessor();
310     return $val if $val;
311
312     # This will populate the accessor as a side effect
313     $self->get_elasticsearch_mappings();
314     return $self->_sort_fields_accessor();
315 }
316
317 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
318
319     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
320
321 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
322 Since we group all mappings by MARC field targets C<$mappings> will contain
323 all targets for C<$data> and thus we need to fetch the MARC field only once.
324 C<$mappings> will be applied to C<$record_document> and new field values added.
325 The method has no return value.
326
327 =over 4
328
329 =item C<$mappings>
330
331 Arrayref of mappings containing arrayrefs in the format
332 [C<$target>, C<$options>] where C<$target> is the name of the target field and
333 C<$options> is a hashref containing processing directives for this particular
334 mapping.
335
336 =item C<$data>
337
338 The source data from a MARC record field.
339
340 =item C<$record_document>
341
342 Hashref representing the Elasticsearch document on which mappings should be
343 applied.
344
345 =item C<$altscript>
346
347 A boolean value indicating whether an alternate script presentation is being
348 processed.
349
350 =back
351
352 =cut
353
354 sub _process_mappings {
355     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
356     foreach my $mapping (@{$mappings}) {
357         my ($target, $options) = @{$mapping};
358
359         # Don't process sort fields for alternate scripts
360         my $sort = $target =~ /__sort$/;
361         if ($sort && $altscript) {
362             next;
363         }
364
365         # Copy (scalar) data since can have multiple targets
366         # with differing options for (possibly) mutating data
367         # so need a different copy for each
368         my $_data = $data;
369         $record_document->{$target} //= [];
370         if (defined $options->{substr}) {
371             my ($start, $length) = @{$options->{substr}};
372             $_data = length($data) > $start ? substr $data, $start, $length : '';
373         }
374         if (defined $options->{value_callbacks}) {
375             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
376         }
377         if (defined $options->{property}) {
378             $_data = {
379                 $options->{property} => $_data
380             }
381         }
382         push @{$record_document->{$target}}, $_data;
383     }
384 }
385
386 =head2 marc_records_to_documents($marc_records)
387
388     my @record_documents = $self->marc_records_to_documents($marc_records);
389
390 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
391
392 Returns array of hash references, representing Elasticsearch documents,
393 acceptable as body payload in C<Search::Elasticsearch> requests.
394
395 =over 4
396
397 =item C<$marc_documents>
398
399 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
400
401 =back
402
403 =cut
404
405 sub marc_records_to_documents {
406     my ($self, $records) = @_;
407     my $rules = $self->_get_marc_mapping_rules();
408     my $control_fields_rules = $rules->{control_fields};
409     my $data_fields_rules = $rules->{data_fields};
410     my $marcflavour = lc C4::Context->preference('marcflavour');
411     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
412
413     my @record_documents;
414
415     foreach my $record (@{$records}) {
416         my $record_document = {};
417         my $mappings = $rules->{leader};
418         if ($mappings) {
419             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
420         }
421         foreach my $field ($record->fields()) {
422             if ($field->is_control_field()) {
423                 my $mappings = $control_fields_rules->{$field->tag()};
424                 if ($mappings) {
425                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
426                 }
427             }
428             else {
429                 my $tag = $field->tag();
430                 # Handle alternate scripts in MARC 21
431                 my $altscript = 0;
432                 if ($marcflavour eq 'marc21' && $tag eq '880') {
433                     my $sub6 = $field->subfield('6');
434                     if ($sub6 =~ /^(...)-\d+/) {
435                         $tag = $1;
436                         $altscript = 1;
437                     }
438                 }
439
440                 my $data_field_rules = $data_fields_rules->{$tag};
441
442                 if ($data_field_rules) {
443                     my $subfields_mappings = $data_field_rules->{subfields};
444                     my $wildcard_mappings = $subfields_mappings->{'*'};
445                     foreach my $subfield ($field->subfields()) {
446                         my ($code, $data) = @{$subfield};
447                         my $mappings = $subfields_mappings->{$code} // [];
448                         if ($wildcard_mappings) {
449                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
450                         }
451                         if (@{$mappings}) {
452                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
453                         }
454                     }
455
456                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
457                     if ($subfields_join_mappings) {
458                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
459                             # Map each subfield to values, remove empty values, join with space
460                             my $data = join(
461                                 ' ',
462                                 grep(
463                                     $_,
464                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
465                                 )
466                             );
467                             if ($data) {
468                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
469                             }
470                         }
471                     }
472                 }
473             }
474         }
475         foreach my $field (keys %{$rules->{defaults}}) {
476             unless (defined $record_document->{$field}) {
477                 $record_document->{$field} = $rules->{defaults}->{$field};
478             }
479         }
480         foreach my $field (@{$rules->{sum}}) {
481             if (defined $record_document->{$field}) {
482                 # TODO: validate numeric? filter?
483                 # TODO: Or should only accept fields without nested values?
484                 # TODO: Quick and dirty, improve if needed
485                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
486             }
487         }
488         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
489         foreach my $field (@{$rules->{isbn}}) {
490             if (defined $record_document->{$field}) {
491                 my @isbns = ();
492                 foreach my $input_isbn (@{$record_document->{$field}}) {
493                     my $isbn = Business::ISBN->new($input_isbn);
494                     if (defined $isbn && $isbn->is_valid) {
495                         my $isbn13 = $isbn->as_isbn13->as_string;
496                         push @isbns, $isbn13;
497                         $isbn13 =~ s/\-//g;
498                         push @isbns, $isbn13;
499
500                         my $isbn10 = $isbn->as_isbn10;
501                         if ($isbn10) {
502                             $isbn10 = $isbn10->as_string;
503                             push @isbns, $isbn10;
504                             $isbn10 =~ s/\-//g;
505                             push @isbns, $isbn10;
506                         }
507                     } else {
508                         push @isbns, $input_isbn;
509                     }
510                 }
511                 $record_document->{$field} = \@isbns;
512             }
513         }
514
515         # Remove duplicate values and collapse sort fields
516         foreach my $field (keys %{$record_document}) {
517             if (ref($record_document->{$field}) eq 'ARRAY') {
518                 @{$record_document->{$field}} = do {
519                     my %seen;
520                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
521                 };
522                 if ($field =~ /__sort$/) {
523                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
524                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
525                 }
526             }
527         }
528
529         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
530         $record->encoding('UTF-8');
531         if ($use_array) {
532             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
533             $record_document->{'marc_format'} = 'ARRAY';
534         } else {
535             my @warnings;
536             {
537                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
538                 local $SIG{__WARN__} = sub {
539                     push @warnings, $_[0];
540                 };
541                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
542             }
543             if (@warnings) {
544                 # Suppress warnings if record length exceeded
545                 unless (substr($record->leader(), 0, 5) eq '99999') {
546                     foreach my $warning (@warnings) {
547                         carp $warning;
548                     }
549                 }
550                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
551                 $record_document->{'marc_format'} = 'MARCXML';
552             }
553             else {
554                 $record_document->{'marc_format'} = 'base64ISO2709';
555             }
556         }
557         my $id = $record->subfield('999', 'c');
558         push @record_documents, [$id, $record_document];
559     }
560     return \@record_documents;
561 }
562
563 =head2 _marc_to_array($record)
564
565     my @fields = _marc_to_array($record)
566
567 Convert a MARC::Record to an array modeled after MARC-in-JSON
568 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
569
570 =over 4
571
572 =item C<$record>
573
574 A MARC::Record object
575
576 =back
577
578 =cut
579
580 sub _marc_to_array {
581     my ($self, $record) = @_;
582
583     my $data = {
584         leader => $record->leader(),
585         fields => []
586     };
587     for my $field ($record->fields()) {
588         my $tag = $field->tag();
589         if ($field->is_control_field()) {
590             push @{$data->{fields}}, {$tag => $field->data()};
591         } else {
592             my $subfields = ();
593             foreach my $subfield ($field->subfields()) {
594                 my ($code, $contents) = @{$subfield};
595                 push @{$subfields}, {$code => $contents};
596             }
597             push @{$data->{fields}}, {
598                 $tag => {
599                     ind1 => $field->indicator(1),
600                     ind2 => $field->indicator(2),
601                     subfields => $subfields
602                 }
603             };
604         }
605     }
606     return $data;
607 }
608
609 =head2 _array_to_marc($data)
610
611     my $record = _array_to_marc($data)
612
613 Convert an array modeled after MARC-in-JSON to a MARC::Record
614
615 =over 4
616
617 =item C<$data>
618
619 An array modeled after MARC-in-JSON
620 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
621
622 =back
623
624 =cut
625
626 sub _array_to_marc {
627     my ($self, $data) = @_;
628
629     my $record = MARC::Record->new();
630
631     $record->leader($data->{leader});
632     for my $field (@{$data->{fields}}) {
633         my $tag = (keys %{$field})[0];
634         $field = $field->{$tag};
635         my $marc_field;
636         if (ref($field) eq 'HASH') {
637             my @subfields;
638             foreach my $subfield (@{$field->{subfields}}) {
639                 my $code = (keys %{$subfield})[0];
640                 push @subfields, $code;
641                 push @subfields, $subfield->{$code};
642             }
643             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
644         } else {
645             $marc_field = MARC::Field->new($tag, $field)
646         }
647         $record->append_fields($marc_field);
648     }
649 ;
650     return $record;
651 }
652
653 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
654
655     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
656
657 Get mappings, an internal data structure later used by
658 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
659 data for a MARC mapping.
660
661 The returned C<$mappings> is not to to be confused with mappings provided by
662 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
663 provided by C<_foreach_mapping> and expands it to this internal data structure.
664 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
665 is then applied to each MARC target (leader, control field data, subfield or
666 joined subfields) and integrated into the mapping rules data structure used in
667 C<marc_records_to_documents> to transform MARC records into Elasticsearch
668 documents.
669
670 =over 4
671
672 =item C<$facet>
673
674 Boolean indicating whether to create a facet field for this mapping.
675
676 =item C<$suggestible>
677
678 Boolean indicating whether to create a suggestion field for this mapping.
679
680 =item C<$sort>
681
682 Boolean indicating whether to create a sort field for this mapping.
683
684 =item C<$target_name>
685
686 Elasticsearch document target field name.
687
688 =item C<$target_type>
689
690 Elasticsearch document target field type.
691
692 =item C<$range>
693
694 An optional range as a string in the format "<START>-<END>" or "<START>",
695 where "<START>" and "<END>" are integers specifying a range that will be used
696 for extracting a substring from MARC data as Elasticsearch field target value.
697
698 The first character position is "0", and the range is inclusive,
699 so "0-2" means the first three characters of MARC data.
700
701 If only "<START>" is provided only one character at position "<START>" will
702 be extracted.
703
704 =back
705
706 =cut
707
708 sub _field_mappings {
709     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
710     my %mapping_defaults = ();
711     my @mappings;
712
713     my $substr_args = undef;
714     if (defined $range) {
715         # TODO: use value_callback instead?
716         my ($start, $end) = map(int, split /-/, $range, 2);
717         $substr_args = [$start];
718         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
719     }
720     my $default_options = {};
721     if ($substr_args) {
722         $default_options->{substr} = $substr_args;
723     }
724
725     # TODO: Should probably have per type value callback/hook
726     # but hard code for now
727     if ($target_type eq 'boolean') {
728         $default_options->{value_callbacks} //= [];
729         push @{$default_options->{value_callbacks}}, sub {
730             my ($value) = @_;
731             # Trim whitespace at both ends
732             $value =~ s/^\s+|\s+$//g;
733             return $value ? 'true' : 'false';
734         };
735     }
736
737     my $mapping = [$target_name, $default_options];
738     push @mappings, $mapping;
739
740     my @suffixes = ();
741     push @suffixes, 'facet' if $facet;
742     push @suffixes, 'suggestion' if $suggestible;
743     push @suffixes, 'sort' if !defined $sort || $sort;
744
745     foreach my $suffix (@suffixes) {
746         my $mapping = ["${target_name}__$suffix"];
747         # TODO: Hack, fix later in less hideous manner
748         if ($suffix eq 'suggestion') {
749             push @{$mapping}, {%{$default_options}, property => 'input'};
750         }
751         else {
752             push @{$mapping}, $default_options;
753         }
754         push @mappings, $mapping;
755     }
756     return @mappings;
757 };
758
759 =head2 _get_marc_mapping_rules
760
761     my $mapping_rules = $self->_get_marc_mapping_rules()
762
763 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
764
765 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
766 each call to C<MARC::Record>->field) we create an optimized structure of mapping
767 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
768
769 We can then iterate through all MARC fields for each record and apply all relevant
770 rules once per fields instead of retreiving fields multiple times for each mapping rule
771 which is terribly slow.
772
773 =cut
774
775 # TODO: This structure can be used for processing multiple MARC::Records so is currently
776 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
777 # memory cache which it is currently not. The performance gain of caching
778 # would probably be marginal, but to do this could be a further improvement.
779
780 sub _get_marc_mapping_rules {
781     my ($self) = @_;
782     my $marcflavour = lc C4::Context->preference('marcflavour');
783     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
784     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
785     my $rules = {
786         'leader' => [],
787         'control_fields' => {},
788         'data_fields' => {},
789         'sum' => [],
790         'isbn' => [],
791         'defaults' => {}
792     };
793
794     $self->_foreach_mapping(sub {
795         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
796         return if $marc_type ne $marcflavour;
797
798         if ($type eq 'sum') {
799             push @{$rules->{sum}}, $name;
800         }
801         elsif ($type eq 'isbn') {
802             push @{$rules->{isbn}}, $name;
803         }
804         elsif ($type eq 'boolean') {
805             # boolean gets special handling, if value doesn't exist for a field,
806             # it is set to false
807             $rules->{defaults}->{$name} = 'false';
808         }
809
810         if ($marc_field =~ $field_spec_regexp) {
811             my $field_tag = $1;
812
813             my @subfields;
814             my @subfield_groups;
815             # Parse and separate subfields form subfield groups
816             if (defined $2) {
817                 my $subfield_group = '';
818                 my $open_group = 0;
819
820                 foreach my $token (split //, $2) {
821                     if ($token eq "(") {
822                         if ($open_group) {
823                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
824                                 "Unmatched opening parenthesis for $marc_field"
825                             );
826                         }
827                         else {
828                             $open_group = 1;
829                         }
830                     }
831                     elsif ($token eq ")") {
832                         if ($open_group) {
833                             if ($subfield_group) {
834                                 push @subfield_groups, $subfield_group;
835                                 $subfield_group = '';
836                             }
837                             $open_group = 0;
838                         }
839                         else {
840                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
841                                 "Unmatched closing parenthesis for $marc_field"
842                             );
843                         }
844                     }
845                     elsif ($open_group) {
846                         $subfield_group .= $token;
847                     }
848                     else {
849                         push @subfields, $token;
850                     }
851                 }
852             }
853             else {
854                 push @subfields, '*';
855             }
856
857             my $range = defined $3 ? $3 : undef;
858             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
859
860             if ($field_tag < 10) {
861                 $rules->{control_fields}->{$field_tag} //= [];
862                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
863             }
864             else {
865                 $rules->{data_fields}->{$field_tag} //= {};
866                 foreach my $subfield (@subfields) {
867                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
868                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
869                 }
870                 foreach my $subfield_group (@subfield_groups) {
871                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
872                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
873                 }
874             }
875         }
876         elsif ($marc_field =~ $leader_regexp) {
877             my $range = defined $1 ? $1 : undef;
878             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
879             push @{$rules->{leader}}, @mappings;
880         }
881         else {
882             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
883                 "Invalid MARC field expression: $marc_field"
884             );
885         }
886     });
887     return $rules;
888 }
889
890 =head2 _foreach_mapping
891
892     $self->_foreach_mapping(
893         sub {
894             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
895                 $marc_field )
896               = @_;
897             return unless $marc_type eq 'marc21';
898             print "Data comes from: " . $marc_field . "\n";
899         }
900     );
901
902 This allows you to apply a function to each entry in the elasticsearch mappings
903 table, in order to build the mappings for whatever is needed.
904
905 In the provided function, the files are:
906
907 =over 4
908
909 =item C<$name>
910
911 The field name for elasticsearch (corresponds to the 'mapping' column in the
912 database.
913
914 =item C<$type>
915
916 The type for this value, e.g. 'string'.
917
918 =item C<$facet>
919
920 True if this value should be facetised. This only really makes sense if the
921 field is understood by the facet processing code anyway.
922
923 =item C<$sort>
924
925 True if this is a field that a) needs special sort handling, and b) if it
926 should be sorted on. False if a) but not b). Undef if not a). This allows,
927 for example, author to be sorted on but not everything marked with "author"
928 to be included in that sort.
929
930 =item C<$marc_type>
931
932 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
933 'unimarc', 'normarc'.
934
935 =item C<$marc_field>
936
937 A string that describes the MARC field that contains the data to extract.
938 These are of a form suited to Catmandu's MARC fixers.
939
940 =back
941
942 =cut
943
944 sub _foreach_mapping {
945     my ( $self, $sub ) = @_;
946
947     # TODO use a caching framework here
948     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
949         {
950             'search_marc_map.index_name' => $self->index,
951         },
952         {   join => { search_marc_to_fields => 'search_marc_map' },
953             '+select' => [
954                 'search_marc_to_fields.facet',
955                 'search_marc_to_fields.suggestible',
956                 'search_marc_to_fields.sort',
957                 'search_marc_map.marc_type',
958                 'search_marc_map.marc_field',
959             ],
960             '+as'     => [
961                 'facet',
962                 'suggestible',
963                 'sort',
964                 'marc_type',
965                 'marc_field',
966             ],
967         }
968     );
969
970     while ( my $search_field = $search_fields->next ) {
971         $sub->(
972             # Force lower case on indexed field names for case insensitive
973             # field name searches
974             lc($search_field->name),
975             $search_field->type,
976             $search_field->get_column('facet'),
977             $search_field->get_column('suggestible'),
978             $search_field->get_column('sort'),
979             $search_field->get_column('marc_type'),
980             $search_field->get_column('marc_field'),
981         );
982     }
983 }
984
985 =head2 process_error
986
987     die process_error($@);
988
989 This parses an Elasticsearch error message and produces a human-readable
990 result from it. This result is probably missing all the useful information
991 that you might want in diagnosing an issue, so the warning is also logged.
992
993 Note that currently the resulting message is not internationalised. This
994 will happen eventually by some method or other.
995
996 =cut
997
998 sub process_error {
999     my ($self, $msg) = @_;
1000
1001     warn $msg; # simple logging
1002
1003     # This is super-primitive
1004     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1005
1006     return "Unable to perform your search. Please try again.\n";
1007 }
1008
1009 =head2 _read_configuration
1010
1011     my $conf = _read_configuration();
1012
1013 Reads the I<configuration file> and returns a hash structure with the
1014 configuration information. It raises an exception if mandatory entries
1015 are missing.
1016
1017 The hashref structure has the following form:
1018
1019     {
1020         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1021         'index_name' => 'koha_instance',
1022     }
1023
1024 This is configured by the following in the C<config> block in koha-conf.xml:
1025
1026     <elasticsearch>
1027         <server>127.0.0.1:9200</server>
1028         <server>anotherserver:9200</server>
1029         <index_name>koha_instance</index_name>
1030     </elasticsearch>
1031
1032 =cut
1033
1034 sub _read_configuration {
1035
1036     my $configuration;
1037
1038     my $conf = C4::Context->config('elasticsearch');
1039     Koha::Exceptions::Config::MissingEntry->throw(
1040         "Missing 'elasticsearch' block in config file")
1041       unless defined $conf;
1042
1043     if ( $conf && $conf->{server} ) {
1044         my $nodes = $conf->{server};
1045         if ( ref($nodes) eq 'ARRAY' ) {
1046             $configuration->{nodes} = $nodes;
1047         }
1048         else {
1049             $configuration->{nodes} = [$nodes];
1050         }
1051     }
1052     else {
1053         Koha::Exceptions::Config::MissingEntry->throw(
1054             "Missing 'server' entry in config file for elasticsearch");
1055     }
1056
1057     if ( defined $conf->{index_name} ) {
1058         $configuration->{index_name} = $conf->{index_name};
1059     }
1060     else {
1061         Koha::Exceptions::Config::MissingEntry->throw(
1062             "Missing 'index_name' entry in config file for elasticsearch");
1063     }
1064
1065     return $configuration;
1066 }
1067
1068 =head2 get_facetable_fields
1069
1070 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1071
1072 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1073
1074 =cut
1075
1076 sub get_facetable_fields {
1077     my ($self) = @_;
1078
1079     # These should correspond to the ES field names, as opposed to the CCL
1080     # things that zebra uses.
1081     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1082     my @faceted_fields = Koha::SearchFields->search(
1083         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1084     );
1085     my @not_faceted_fields = Koha::SearchFields->search(
1086         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1087     );
1088     # This could certainly be improved
1089     return ( @faceted_fields, @not_faceted_fields );
1090 }
1091
1092 1;
1093
1094 __END__
1095
1096 =head1 AUTHOR
1097
1098 =over 4
1099
1100 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1101
1102 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1103
1104 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1105
1106 =back
1107
1108 =cut