Bug 23204: Fix ordering of fields in exported file
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
30
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
39
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
46
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
49
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX     => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
53
54 =head1 NAME
55
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
57
58 =head1 ACCESSORS
59
60 =over 4
61
62 =item index
63
64 The name of the index to use, generally 'biblios' or 'authorities'.
65
66 =back
67
68 =head1 FUNCTIONS
69
70 =cut
71
72 sub new {
73     my $class = shift @_;
74     my $self = $class->SUPER::new(@_);
75     # Check for a valid index
76     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77     return $self;
78 }
79
80 =head2 get_elasticsearch
81
82     my $elasticsearch_client = $self->get_elasticsearch();
83
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
86
87 =cut
88
89 sub get_elasticsearch {
90     my $self = shift @_;
91     unless (defined $self->{elasticsearch}) {
92         my $conf = $self->get_elasticsearch_params();
93         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
94     }
95     return $self->{elasticsearch};
96 }
97
98 =head2 get_elasticsearch_params
99
100     my $params = $self->get_elasticsearch_params();
101
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
104
105     {
106         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107         'index_name' => 'koha_instance_index',
108     }
109
110 This is configured by the following in the C<config> block in koha-conf.xml:
111
112     <elasticsearch>
113         <server>127.0.0.1:9200</server>
114         <server>anotherserver:9200</server>
115         <index_name>koha_instance</index_name>
116     </elasticsearch>
117
118 =cut
119
120 sub get_elasticsearch_params {
121     my ($self) = @_;
122
123     # Copy the hash so that we're not modifying the original
124     my $conf = C4::Context->config('elasticsearch');
125     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126     my $es = { %{ $conf } };
127
128     # Helpfully, the multiple server lines end up in an array for us anyway
129     # if there are multiple ones, but not if there's only one.
130     my $server = $es->{server};
131     delete $es->{server};
132     if ( ref($server) eq 'ARRAY' ) {
133
134         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135         $es->{nodes} = $server;
136     }
137     elsif ($server) {
138         $es->{nodes} = [$server];
139     }
140     else {
141         die "No elasticsearch servers were specified in koha-conf.xml.\n";
142     }
143     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144       if ( !$es->{index_name} );
145     # Append the name of this particular index to our namespace
146     $es->{index_name} .= '_' . $self->index;
147
148     $es->{key_prefix} = 'es_';
149     $es->{cxn_pool} //= 'Static';
150     $es->{request_timeout} //= 60;
151
152     return $es;
153 }
154
155 =head2 get_elasticsearch_settings
156
157     my $settings = $self->get_elasticsearch_settings();
158
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
161
162 A hashref containing the settings is returned.
163
164 =cut
165
166 sub get_elasticsearch_settings {
167     my ($self) = @_;
168
169     # Use state to speed up repeated calls
170     state $settings = undef;
171     if (!defined $settings) {
172         my $config_file = C4::Context->config('elasticsearch_index_config');
173         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174         $settings = LoadFile( $config_file );
175     }
176
177     return $settings;
178 }
179
180 =head2 get_elasticsearch_mappings
181
182     my $mappings = $self->get_elasticsearch_mappings();
183
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
186
187 =cut
188
189 sub get_elasticsearch_mappings {
190     my ($self) = @_;
191
192     # Use state to speed up repeated calls
193     state %all_mappings;
194     state %sort_fields;
195
196     if (!defined $all_mappings{$self->index}) {
197         $sort_fields{$self->index} = {};
198         # Clone the general mapping to break ties with the original hash
199         my $mappings = {
200             data => clone(_get_elasticsearch_field_config('general', ''))
201         };
202         my $marcflavour = lc C4::Context->preference('marcflavour');
203         $self->_foreach_mapping(
204             sub {
205                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206                 return if $marc_type ne $marcflavour;
207                 # TODO if this gets any sort of complexity to it, it should
208                 # be broken out into its own function.
209
210                 # TODO be aware of date formats, but this requires pre-parsing
211                 # as ES will simply reject anything with an invalid date.
212                 my $es_type = 'text';
213                 if ($type eq 'boolean') {
214                     $es_type = 'boolean';
215                 } elsif ($type eq 'number' || $type eq 'sum') {
216                     $es_type = 'integer';
217                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218                     $es_type = 'stdno';
219                 }
220
221                 if ($search) {
222                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223                 }
224
225                 if ($facet) {
226                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
227                 }
228                 if ($suggestible) {
229                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
230                 }
231                 # Sort is a bit special as it can be true, false, undef.
232                 # We care about "true" or "undef",
233                 # "undef" means to do the default thing, which is make it sortable.
234                 if (!defined $sort || $sort) {
235                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236                     $sort_fields{$self->index}{$name} = 1;
237                 }
238             }
239         );
240         $all_mappings{$self->index} = $mappings;
241     }
242     $self->sort_fields(\%{$sort_fields{$self->index}});
243
244     return $all_mappings{$self->index};
245 }
246
247 =head2 raw_elasticsearch_mappings
248
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc|normarc
251
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253
254 =cut
255
256 sub raw_elasticsearch_mappings {
257     my ( $marc_type ) = @_;
258
259     my $schema = Koha::Database->new()->schema();
260
261     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
262
263     my $mappings = {};
264     while ( my $search_field = $search_fields->next ) {
265
266         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267             { search_field_id => $search_field->id },
268             {
269                 join     => 'search_marc_map',
270                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
271             }
272         );
273
274         while ( my $marc_to_field = $marc_to_fields->next ) {
275
276             my $marc_map = $marc_to_field->search_marc_map;
277
278             next if $marc_type && $marc_map->marc_type ne $marc_type;
279
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
284
285             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
286                 {
287                     facet   => $marc_to_field->facet || '',
288                     marc_type => $marc_map->marc_type,
289                     marc_field => $marc_map->marc_field,
290                     sort        => $marc_to_field->sort,
291                     suggestible => $marc_to_field->suggestible || ''
292                 });
293
294         }
295     }
296
297     return $mappings;
298 }
299
300 =head2 _get_elasticsearch_field_config
301
302 Get the Elasticsearch field config for the given purpose and data type.
303
304 $mapping = _get_elasticsearch_field_config('search', 'text');
305
306 =cut
307
308 sub _get_elasticsearch_field_config {
309
310     my ( $purpose, $type ) = @_;
311
312     # Use state to speed up repeated calls
313     state $settings = undef;
314     if (!defined $settings) {
315         my $config_file = C4::Context->config('elasticsearch_field_config');
316         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
317         $settings = LoadFile( $config_file );
318     }
319
320     if (!defined $settings->{$purpose}) {
321         die "Field purpose $purpose not defined in field config";
322     }
323     if ($type eq '') {
324         return $settings->{$purpose};
325     }
326     if (defined $settings->{$purpose}{$type}) {
327         return $settings->{$purpose}{$type};
328     }
329     if (defined $settings->{$purpose}{'default'}) {
330         return $settings->{$purpose}{'default'};
331     }
332     return;
333 }
334
335 =head2 _load_elasticsearch_mappings
336
337 Load Elasticsearch mappings in the format of mappings.yaml.
338
339 $indexes = _load_elasticsearch_mappings();
340
341 =cut
342
343 sub _load_elasticsearch_mappings {
344     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
345     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
346     return LoadFile( $mappings_yaml );
347 }
348
349 sub reset_elasticsearch_mappings {
350     my ( $self ) = @_;
351     my $indexes = $self->_load_elasticsearch_mappings();
352
353     Koha::SearchMarcMaps->delete;
354     Koha::SearchFields->delete;
355
356     while ( my ( $index_name, $fields ) = each %$indexes ) {
357         while ( my ( $field_name, $data ) = each %$fields ) {
358
359             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
360
361             # Set default values
362             $sf_params{staff_client} //= 1;
363             $sf_params{opac} //= 1;
364
365             $sf_params{name} = $field_name;
366
367             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
368
369             my $mappings = $data->{mappings};
370             for my $mapping ( @$mappings ) {
371                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
372                     index_name => $index_name,
373                     marc_type => $mapping->{marc_type},
374                     marc_field => $mapping->{marc_field}
375                 });
376                 $search_field->add_to_search_marc_maps($marc_field, {
377                     facet => $mapping->{facet} || 0,
378                     suggestible => $mapping->{suggestible} || 0,
379                     sort => $mapping->{sort},
380                     search => $mapping->{search} // 1
381                 });
382             }
383         }
384     }
385 }
386
387 # This overrides the accessor provided by Class::Accessor so that if
388 # sort_fields isn't set, then it'll generate it.
389 sub sort_fields {
390     my $self = shift;
391     if (@_) {
392         $self->_sort_fields_accessor(@_);
393         return;
394     }
395     my $val = $self->_sort_fields_accessor();
396     return $val if $val;
397
398     # This will populate the accessor as a side effect
399     $self->get_elasticsearch_mappings();
400     return $self->_sort_fields_accessor();
401 }
402
403 =head2 _process_mappings($mappings, $data, $record_document, $meta)
404
405     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
406
407 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
408 Since we group all mappings by MARC field targets C<$mappings> will contain
409 all targets for C<$data> and thus we need to fetch the MARC field only once.
410 C<$mappings> will be applied to C<$record_document> and new field values added.
411 The method has no return value.
412
413 =over 4
414
415 =item C<$mappings>
416
417 Arrayref of mappings containing arrayrefs in the format
418 [C<$target>, C<$options>] where C<$target> is the name of the target field and
419 C<$options> is a hashref containing processing directives for this particular
420 mapping.
421
422 =item C<$data>
423
424 The source data from a MARC record field.
425
426 =item C<$record_document>
427
428 Hashref representing the Elasticsearch document on which mappings should be
429 applied.
430
431 =item C<$meta>
432
433 A hashref containing metadata useful for enforcing per mapping rules. For
434 example for providing extra context for mapping options, or treating mapping
435 targets differently depending on type (sort, search, facet etc). Combining
436 this metadata with the mapping options and metadata allows us to mutate the
437 data per mapping, or even replace it with other data retrieved from the
438 metadata context.
439
440 Current properties are:
441
442 C<altscript>: A boolean value indicating whether an alternate script presentation is being
443 processed.
444
445 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
446 'subfield' or 'subfields_group'.
447
448 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
449
450 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
451 is 'subfields_group'.
452
453 C<field>: The original C<MARC::Record> object.
454
455 =back
456
457 =cut
458
459 sub _process_mappings {
460     my ($_self, $mappings, $data, $record_document, $meta) = @_;
461     foreach my $mapping (@{$mappings}) {
462         my ($target, $options) = @{$mapping};
463
464         # Don't process sort fields for alternate scripts
465         my $sort = $target =~ /__sort$/;
466         if ($sort && $meta->{altscript}) {
467             next;
468         }
469
470         # Copy (scalar) data since can have multiple targets
471         # with differing options for (possibly) mutating data
472         # so need a different copy for each
473         my $_data = $data;
474         $record_document->{$target} //= [];
475         if (defined $options->{substr}) {
476             my ($start, $length) = @{$options->{substr}};
477             $_data = length($data) > $start ? substr $data, $start, $length : '';
478         }
479         if (defined $options->{value_callbacks}) {
480             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
481         }
482         if (defined $options->{property}) {
483             $_data = {
484                 $options->{property} => $_data
485             }
486         }
487         if (defined $options->{nonfiling_characters_indicator}) {
488             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
489             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
490             if ($nonfiling_chars) {
491                 $_data = substr $_data, $nonfiling_chars;
492             }
493         }
494         push @{$record_document->{$target}}, $_data;
495     }
496 }
497
498 =head2 marc_records_to_documents($marc_records)
499
500     my $record_documents = $self->marc_records_to_documents($marc_records);
501
502 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
503
504 Returns array of hash references, representing Elasticsearch documents,
505 acceptable as body payload in C<Search::Elasticsearch> requests.
506
507 =over 4
508
509 =item C<$marc_documents>
510
511 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
512
513 =back
514
515 =cut
516
517 sub marc_records_to_documents {
518     my ($self, $records) = @_;
519     my $rules = $self->_get_marc_mapping_rules();
520     my $control_fields_rules = $rules->{control_fields};
521     my $data_fields_rules = $rules->{data_fields};
522     my $marcflavour = lc C4::Context->preference('marcflavour');
523     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
524
525     my @record_documents;
526
527     foreach my $record (@{$records}) {
528         my $record_document = {};
529         my $mappings = $rules->{leader};
530         if ($mappings) {
531             $self->_process_mappings($mappings, $record->leader(), $record_document, {
532                     altscript => 0,
533                     data_source => 'leader'
534                 }
535             );
536         }
537         foreach my $field ($record->fields()) {
538             if ($field->is_control_field()) {
539                 my $mappings = $control_fields_rules->{$field->tag()};
540                 if ($mappings) {
541                     $self->_process_mappings($mappings, $field->data(), $record_document, {
542                             altscript => 0,
543                             data_source => 'control_field',
544                             field => $field
545                         }
546                     );
547                 }
548             }
549             else {
550                 my $tag = $field->tag();
551                 # Handle alternate scripts in MARC 21
552                 my $altscript = 0;
553                 if ($marcflavour eq 'marc21' && $tag eq '880') {
554                     my $sub6 = $field->subfield('6');
555                     if ($sub6 =~ /^(...)-\d+/) {
556                         $tag = $1;
557                         $altscript = 1;
558                     }
559                 }
560
561                 my $data_field_rules = $data_fields_rules->{$tag};
562                 if ($data_field_rules) {
563                     my $subfields_mappings = $data_field_rules->{subfields};
564                     my $wildcard_mappings = $subfields_mappings->{'*'};
565                     foreach my $subfield ($field->subfields()) {
566                         my ($code, $data) = @{$subfield};
567                         my $mappings = $subfields_mappings->{$code} // [];
568                         if ($wildcard_mappings) {
569                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
570                         }
571                         if (@{$mappings}) {
572                             $self->_process_mappings($mappings, $data, $record_document, {
573                                     altscript => $altscript,
574                                     data_source => 'subfield',
575                                     code => $code,
576                                     field => $field
577                                 }
578                             );
579                         }
580                         if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
581                             # Used by the authority linker the match-heading field requires a specific syntax
582                             # that is specified in C4/Heading
583                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
584                             next unless $heading;
585                             push @{$record_document->{'match-heading'}}, $heading->search_form;
586                         }
587                     }
588
589                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
590                     if ($subfields_join_mappings) {
591                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
592                             # Map each subfield to values, remove empty values, join with space
593                             my $data = join(
594                                 ' ',
595                                 grep(
596                                     $_,
597                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
598                                 )
599                             );
600                             if ($data) {
601                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
602                                         altscript => $altscript,
603                                         data_source => 'subfields_group',
604                                         codes => $subfields_group,
605                                         field => $field
606                                     }
607                                 );
608                             }
609                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
610                                 # Used by the authority linker the match-heading field requires a specific syntax
611                                 # that is specified in C4/Heading
612                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
613                                 next unless $heading;
614                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
615                             }
616                         }
617                     }
618                 }
619             }
620         }
621         foreach my $field (keys %{$rules->{defaults}}) {
622             unless (defined $record_document->{$field}) {
623                 $record_document->{$field} = $rules->{defaults}->{$field};
624             }
625         }
626         foreach my $field (@{$rules->{sum}}) {
627             if (defined $record_document->{$field}) {
628                 # TODO: validate numeric? filter?
629                 # TODO: Or should only accept fields without nested values?
630                 # TODO: Quick and dirty, improve if needed
631                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
632             }
633         }
634         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
635         foreach my $field (@{$rules->{isbn}}) {
636             if (defined $record_document->{$field}) {
637                 my @isbns = ();
638                 foreach my $input_isbn (@{$record_document->{$field}}) {
639                     my $isbn = Business::ISBN->new($input_isbn);
640                     if (defined $isbn && $isbn->is_valid) {
641                         my $isbn13 = $isbn->as_isbn13->as_string;
642                         push @isbns, $isbn13;
643                         $isbn13 =~ s/\-//g;
644                         push @isbns, $isbn13;
645
646                         my $isbn10 = $isbn->as_isbn10;
647                         if ($isbn10) {
648                             $isbn10 = $isbn10->as_string;
649                             push @isbns, $isbn10;
650                             $isbn10 =~ s/\-//g;
651                             push @isbns, $isbn10;
652                         }
653                     } else {
654                         push @isbns, $input_isbn;
655                     }
656                 }
657                 $record_document->{$field} = \@isbns;
658             }
659         }
660
661         # Remove duplicate values and collapse sort fields
662         foreach my $field (keys %{$record_document}) {
663             if (ref($record_document->{$field}) eq 'ARRAY') {
664                 @{$record_document->{$field}} = do {
665                     my %seen;
666                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
667                 };
668                 if ($field =~ /__sort$/) {
669                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
670                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
671                 }
672             }
673         }
674
675         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
676         $record->encoding('UTF-8');
677         if ($use_array) {
678             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
679             $record_document->{'marc_format'} = 'ARRAY';
680         } else {
681             my @warnings;
682             {
683                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
684                 local $SIG{__WARN__} = sub {
685                     push @warnings, $_[0];
686                 };
687                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
688             }
689             if (@warnings) {
690                 # Suppress warnings if record length exceeded
691                 unless (substr($record->leader(), 0, 5) eq '99999') {
692                     foreach my $warning (@warnings) {
693                         carp $warning;
694                     }
695                 }
696                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
697                 $record_document->{'marc_format'} = 'MARCXML';
698             }
699             else {
700                 $record_document->{'marc_format'} = 'base64ISO2709';
701             }
702         }
703         push @record_documents, $record_document;
704     }
705     return \@record_documents;
706 }
707
708 =head2 _marc_to_array($record)
709
710     my @fields = _marc_to_array($record)
711
712 Convert a MARC::Record to an array modeled after MARC-in-JSON
713 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
714
715 =over 4
716
717 =item C<$record>
718
719 A MARC::Record object
720
721 =back
722
723 =cut
724
725 sub _marc_to_array {
726     my ($self, $record) = @_;
727
728     my $data = {
729         leader => $record->leader(),
730         fields => []
731     };
732     for my $field ($record->fields()) {
733         my $tag = $field->tag();
734         if ($field->is_control_field()) {
735             push @{$data->{fields}}, {$tag => $field->data()};
736         } else {
737             my $subfields = ();
738             foreach my $subfield ($field->subfields()) {
739                 my ($code, $contents) = @{$subfield};
740                 push @{$subfields}, {$code => $contents};
741             }
742             push @{$data->{fields}}, {
743                 $tag => {
744                     ind1 => $field->indicator(1),
745                     ind2 => $field->indicator(2),
746                     subfields => $subfields
747                 }
748             };
749         }
750     }
751     return $data;
752 }
753
754 =head2 _array_to_marc($data)
755
756     my $record = _array_to_marc($data)
757
758 Convert an array modeled after MARC-in-JSON to a MARC::Record
759
760 =over 4
761
762 =item C<$data>
763
764 An array modeled after MARC-in-JSON
765 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
766
767 =back
768
769 =cut
770
771 sub _array_to_marc {
772     my ($self, $data) = @_;
773
774     my $record = MARC::Record->new();
775
776     $record->leader($data->{leader});
777     for my $field (@{$data->{fields}}) {
778         my $tag = (keys %{$field})[0];
779         $field = $field->{$tag};
780         my $marc_field;
781         if (ref($field) eq 'HASH') {
782             my @subfields;
783             foreach my $subfield (@{$field->{subfields}}) {
784                 my $code = (keys %{$subfield})[0];
785                 push @subfields, $code;
786                 push @subfields, $subfield->{$code};
787             }
788             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
789         } else {
790             $marc_field = MARC::Field->new($tag, $field)
791         }
792         $record->append_fields($marc_field);
793     }
794 ;
795     return $record;
796 }
797
798 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
799
800     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
801
802 Get mappings, an internal data structure later used by
803 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
804 data for a MARC mapping.
805
806 The returned C<$mappings> is not to to be confused with mappings provided by
807 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
808 provided by C<_foreach_mapping> and expands it to this internal data structure.
809 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
810 is then applied to each MARC target (leader, control field data, subfield or
811 joined subfields) and integrated into the mapping rules data structure used in
812 C<marc_records_to_documents> to transform MARC records into Elasticsearch
813 documents.
814
815 =over 4
816
817 =item C<$facet>
818
819 Boolean indicating whether to create a facet field for this mapping.
820
821 =item C<$suggestible>
822
823 Boolean indicating whether to create a suggestion field for this mapping.
824
825 =item C<$sort>
826
827 Boolean indicating whether to create a sort field for this mapping.
828
829 =item C<$search>
830
831 Boolean indicating whether to create a search field for this mapping.
832
833 =item C<$target_name>
834
835 Elasticsearch document target field name.
836
837 =item C<$target_type>
838
839 Elasticsearch document target field type.
840
841 =item C<$range>
842
843 An optional range as a string in the format "<START>-<END>" or "<START>",
844 where "<START>" and "<END>" are integers specifying a range that will be used
845 for extracting a substring from MARC data as Elasticsearch field target value.
846
847 The first character position is "0", and the range is inclusive,
848 so "0-2" means the first three characters of MARC data.
849
850 If only "<START>" is provided only one character at position "<START>" will
851 be extracted.
852
853 =back
854
855 =cut
856
857 sub _field_mappings {
858     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
859     my %mapping_defaults = ();
860     my @mappings;
861
862     my $substr_args = undef;
863     if (defined $range) {
864         # TODO: use value_callback instead?
865         my ($start, $end) = map(int, split /-/, $range, 2);
866         $substr_args = [$start];
867         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
868     }
869     my $default_options = {};
870     if ($substr_args) {
871         $default_options->{substr} = $substr_args;
872     }
873
874     # TODO: Should probably have per type value callback/hook
875     # but hard code for now
876     if ($target_type eq 'boolean') {
877         $default_options->{value_callbacks} //= [];
878         push @{$default_options->{value_callbacks}}, sub {
879             my ($value) = @_;
880             # Trim whitespace at both ends
881             $value =~ s/^\s+|\s+$//g;
882             return $value ? 'true' : 'false';
883         };
884     }
885
886     if ($search) {
887         my $mapping = [$target_name, $default_options];
888         push @mappings, $mapping;
889     }
890
891     my @suffixes = ();
892     push @suffixes, 'facet' if $facet;
893     push @suffixes, 'suggestion' if $suggestible;
894     push @suffixes, 'sort' if !defined $sort || $sort;
895
896     foreach my $suffix (@suffixes) {
897         my $mapping = ["${target_name}__$suffix"];
898         # TODO: Hack, fix later in less hideous manner
899         if ($suffix eq 'suggestion') {
900             push @{$mapping}, {%{$default_options}, property => 'input'};
901         }
902         else {
903             # Important! Make shallow clone, or we end up with the same hashref
904             # shared by all mappings
905             push @{$mapping}, {%{$default_options}};
906         }
907         push @mappings, $mapping;
908     }
909     return @mappings;
910 };
911
912 =head2 _get_marc_mapping_rules
913
914     my $mapping_rules = $self->_get_marc_mapping_rules()
915
916 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
917
918 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
919 each call to C<MARC::Record>->field) we create an optimized structure of mapping
920 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
921
922 We can then iterate through all MARC fields for each record and apply all relevant
923 rules once per fields instead of retreiving fields multiple times for each mapping rule
924 which is terribly slow.
925
926 =cut
927
928 # TODO: This structure can be used for processing multiple MARC::Records so is currently
929 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
930 # memory cache which it is currently not. The performance gain of caching
931 # would probably be marginal, but to do this could be a further improvement.
932
933 sub _get_marc_mapping_rules {
934     my ($self) = @_;
935     my $marcflavour = lc C4::Context->preference('marcflavour');
936     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
937     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
938     my $rules = {
939         'leader' => [],
940         'control_fields' => {},
941         'data_fields' => {},
942         'sum' => [],
943         'isbn' => [],
944         'defaults' => {}
945     };
946
947     $self->_foreach_mapping(sub {
948         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
949         return if $marc_type ne $marcflavour;
950
951         if ($type eq 'sum') {
952             push @{$rules->{sum}}, $name;
953             push @{$rules->{sum}}, $name."__sort" if $sort;
954         }
955         elsif ($type eq 'isbn') {
956             push @{$rules->{isbn}}, $name;
957         }
958         elsif ($type eq 'boolean') {
959             # boolean gets special handling, if value doesn't exist for a field,
960             # it is set to false
961             $rules->{defaults}->{$name} = 'false';
962         }
963
964         if ($marc_field =~ $field_spec_regexp) {
965             my $field_tag = $1;
966
967             my @subfields;
968             my @subfield_groups;
969             # Parse and separate subfields form subfield groups
970             if (defined $2) {
971                 my $subfield_group = '';
972                 my $open_group = 0;
973
974                 foreach my $token (split //, $2) {
975                     if ($token eq "(") {
976                         if ($open_group) {
977                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
978                                 "Unmatched opening parenthesis for $marc_field"
979                             );
980                         }
981                         else {
982                             $open_group = 1;
983                         }
984                     }
985                     elsif ($token eq ")") {
986                         if ($open_group) {
987                             if ($subfield_group) {
988                                 push @subfield_groups, $subfield_group;
989                                 $subfield_group = '';
990                             }
991                             $open_group = 0;
992                         }
993                         else {
994                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
995                                 "Unmatched closing parenthesis for $marc_field"
996                             );
997                         }
998                     }
999                     elsif ($open_group) {
1000                         $subfield_group .= $token;
1001                     }
1002                     else {
1003                         push @subfields, $token;
1004                     }
1005                 }
1006             }
1007             else {
1008                 push @subfields, '*';
1009             }
1010
1011             my $range = defined $3 ? $3 : undef;
1012             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1013             if ($field_tag < 10) {
1014                 $rules->{control_fields}->{$field_tag} //= [];
1015                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1016             }
1017             else {
1018                 $rules->{data_fields}->{$field_tag} //= {};
1019                 foreach my $subfield (@subfields) {
1020                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1021                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1022                 }
1023                 foreach my $subfield_group (@subfield_groups) {
1024                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1025                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1026                 }
1027             }
1028         }
1029         elsif ($marc_field =~ $leader_regexp) {
1030             my $range = defined $1 ? $1 : undef;
1031             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1032             push @{$rules->{leader}}, @mappings;
1033         }
1034         else {
1035             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1036                 "Invalid MARC field expression: $marc_field"
1037             );
1038         }
1039     });
1040
1041     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1042     if ($marcflavour eq 'marc21') {
1043         # Nonfiling characters processing for sort fields
1044         my %title_fields;
1045         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1046             # Format is: nonfiling characters indicator => field names list
1047             %title_fields = (
1048                 1 => [130, 630, 730, 740],
1049                 2 => [222, 240, 242, 243, 245, 440, 830]
1050             );
1051         }
1052         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1053             %title_fields = (
1054                 1 => [730],
1055                 2 => [130, 430, 530]
1056             );
1057         }
1058         foreach my $indicator (keys %title_fields) {
1059             foreach my $field_tag (@{$title_fields{$indicator}}) {
1060                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1061                 foreach my $mapping (@{$mappings}) {
1062                     if ($mapping->[0] =~ /__sort$/) {
1063                         # Mark this as to be processed for nonfiling characters indicator
1064                         # later on in _process_mappings
1065                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1066                     }
1067                 }
1068             }
1069         }
1070     }
1071
1072     return $rules;
1073 }
1074
1075 =head2 _foreach_mapping
1076
1077     $self->_foreach_mapping(
1078         sub {
1079             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1080                 $marc_field )
1081               = @_;
1082             return unless $marc_type eq 'marc21';
1083             print "Data comes from: " . $marc_field . "\n";
1084         }
1085     );
1086
1087 This allows you to apply a function to each entry in the elasticsearch mappings
1088 table, in order to build the mappings for whatever is needed.
1089
1090 In the provided function, the files are:
1091
1092 =over 4
1093
1094 =item C<$name>
1095
1096 The field name for elasticsearch (corresponds to the 'mapping' column in the
1097 database.
1098
1099 =item C<$type>
1100
1101 The type for this value, e.g. 'string'.
1102
1103 =item C<$facet>
1104
1105 True if this value should be facetised. This only really makes sense if the
1106 field is understood by the facet processing code anyway.
1107
1108 =item C<$sort>
1109
1110 True if this is a field that a) needs special sort handling, and b) if it
1111 should be sorted on. False if a) but not b). Undef if not a). This allows,
1112 for example, author to be sorted on but not everything marked with "author"
1113 to be included in that sort.
1114
1115 =item C<$marc_type>
1116
1117 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1118 'unimarc', 'normarc'.
1119
1120 =item C<$marc_field>
1121
1122 A string that describes the MARC field that contains the data to extract.
1123 These are of a form suited to Catmandu's MARC fixers.
1124
1125 =back
1126
1127 =cut
1128
1129 sub _foreach_mapping {
1130     my ( $self, $sub ) = @_;
1131
1132     # TODO use a caching framework here
1133     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1134         {
1135             'search_marc_map.index_name' => $self->index,
1136         },
1137         {   join => { search_marc_to_fields => 'search_marc_map' },
1138             '+select' => [
1139                 'search_marc_to_fields.facet',
1140                 'search_marc_to_fields.suggestible',
1141                 'search_marc_to_fields.sort',
1142                 'search_marc_to_fields.search',
1143                 'search_marc_map.marc_type',
1144                 'search_marc_map.marc_field',
1145             ],
1146             '+as'     => [
1147                 'facet',
1148                 'suggestible',
1149                 'sort',
1150                 'search',
1151                 'marc_type',
1152                 'marc_field',
1153             ],
1154         }
1155     );
1156
1157     while ( my $search_field = $search_fields->next ) {
1158         $sub->(
1159             # Force lower case on indexed field names for case insensitive
1160             # field name searches
1161             lc($search_field->name),
1162             $search_field->type,
1163             $search_field->get_column('facet'),
1164             $search_field->get_column('suggestible'),
1165             $search_field->get_column('sort'),
1166             $search_field->get_column('search'),
1167             $search_field->get_column('marc_type'),
1168             $search_field->get_column('marc_field'),
1169         );
1170     }
1171 }
1172
1173 =head2 process_error
1174
1175     die process_error($@);
1176
1177 This parses an Elasticsearch error message and produces a human-readable
1178 result from it. This result is probably missing all the useful information
1179 that you might want in diagnosing an issue, so the warning is also logged.
1180
1181 Note that currently the resulting message is not internationalised. This
1182 will happen eventually by some method or other.
1183
1184 =cut
1185
1186 sub process_error {
1187     my ($self, $msg) = @_;
1188
1189     warn $msg; # simple logging
1190
1191     # This is super-primitive
1192     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1193
1194     return "Unable to perform your search. Please try again.\n";
1195 }
1196
1197 =head2 _read_configuration
1198
1199     my $conf = _read_configuration();
1200
1201 Reads the I<configuration file> and returns a hash structure with the
1202 configuration information. It raises an exception if mandatory entries
1203 are missing.
1204
1205 The hashref structure has the following form:
1206
1207     {
1208         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1209         'index_name' => 'koha_instance',
1210     }
1211
1212 This is configured by the following in the C<config> block in koha-conf.xml:
1213
1214     <elasticsearch>
1215         <server>127.0.0.1:9200</server>
1216         <server>anotherserver:9200</server>
1217         <index_name>koha_instance</index_name>
1218     </elasticsearch>
1219
1220 =cut
1221
1222 sub _read_configuration {
1223
1224     my $configuration;
1225
1226     my $conf = C4::Context->config('elasticsearch');
1227     Koha::Exceptions::Config::MissingEntry->throw(
1228         "Missing 'elasticsearch' block in config file")
1229       unless defined $conf;
1230
1231     if ( $conf && $conf->{server} ) {
1232         my $nodes = $conf->{server};
1233         if ( ref($nodes) eq 'ARRAY' ) {
1234             $configuration->{nodes} = $nodes;
1235         }
1236         else {
1237             $configuration->{nodes} = [$nodes];
1238         }
1239     }
1240     else {
1241         Koha::Exceptions::Config::MissingEntry->throw(
1242             "Missing 'server' entry in config file for elasticsearch");
1243     }
1244
1245     if ( defined $conf->{index_name} ) {
1246         $configuration->{index_name} = $conf->{index_name};
1247     }
1248     else {
1249         Koha::Exceptions::Config::MissingEntry->throw(
1250             "Missing 'index_name' entry in config file for elasticsearch");
1251     }
1252
1253     return $configuration;
1254 }
1255
1256 =head2 get_facetable_fields
1257
1258 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1259
1260 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1261
1262 =cut
1263
1264 sub get_facetable_fields {
1265     my ($self) = @_;
1266
1267     # These should correspond to the ES field names, as opposed to the CCL
1268     # things that zebra uses.
1269     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1270     my @faceted_fields = Koha::SearchFields->search(
1271         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1272     );
1273     my @not_faceted_fields = Koha::SearchFields->search(
1274         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1275     );
1276     # This could certainly be improved
1277     return ( @faceted_fields, @not_faceted_fields );
1278 }
1279
1280 1;
1281
1282 __END__
1283
1284 =head1 AUTHOR
1285
1286 =over 4
1287
1288 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1289
1290 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1291
1292 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1293
1294 =back
1295
1296 =cut