Bug 36574: Canceled/invalid ISBN not indexed by ES for MARC 21
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
30 use Koha::Caches;
31 use C4::Heading;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
33 use C4::Biblio;
34
35 use Carp qw( carp croak );
36 use Clone qw( clone );
37 use Modern::Perl;
38 use Readonly qw( Readonly );
39 use Search::Elasticsearch;
40 use Try::Tiny qw( catch try );
41 use YAML::XS;
42
43 use List::Util qw( sum0 );
44 use MARC::File::XML;
45 use MIME::Base64 qw( encode_base64 );
46 use Encode qw( encode );
47 use Business::ISBN;
48 use Scalar::Util qw( looks_like_number );
49
50 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
51 __PACKAGE__->mk_accessors(qw( sort_fields ));
52
53 # Constants to refer to the standard index names
54 Readonly our $BIBLIOS_INDEX     => 'biblios';
55 Readonly our $AUTHORITIES_INDEX => 'authorities';
56
57 =head1 NAME
58
59 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
60
61 =head1 ACCESSORS
62
63 =over 4
64
65 =item index
66
67 The name of the index to use, generally 'biblios' or 'authorities'.
68
69 =item index_name
70
71 The Elasticsearch index name with Koha instance prefix.
72
73 =back
74
75
76 =head1 FUNCTIONS
77
78 =cut
79
80 sub new {
81     my $class = shift @_;
82     my ($params) = @_;
83
84     # Check for a valid index
85     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
86     my $config = _read_configuration();
87     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88
89     my $self = $class->SUPER::new(@_);
90     return $self;
91 }
92
93 =head2 get_elasticsearch
94
95     my $elasticsearch_client = $self->get_elasticsearch();
96
97 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
98 instance level and will be reused if method is called multiple times.
99
100 =cut
101
102 sub get_elasticsearch {
103     my $self = shift @_;
104     unless (defined $self->{elasticsearch}) {
105         $self->{elasticsearch} = Search::Elasticsearch->new(
106             $self->get_elasticsearch_params()
107         );
108     }
109     return $self->{elasticsearch};
110 }
111
112 =head2 get_elasticsearch_params
113
114     my $params = $self->get_elasticsearch_params();
115
116 This provides a hashref that contains the parameters for connecting to the
117 ElasicSearch servers, in the form:
118
119     {
120         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
121         'index_name' => 'koha_instance_index',
122     }
123
124 This is configured by the following in the C<config> block in koha-conf.xml:
125
126     <elasticsearch>
127         <server>127.0.0.1:9200</server>
128         <server>anotherserver:9200</server>
129         <index_name>koha_instance</index_name>
130     </elasticsearch>
131
132 =cut
133
134 sub get_elasticsearch_params {
135     my ($self) = @_;
136
137     my $conf;
138     try {
139         $conf = _read_configuration();
140     } catch {
141         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
142             croak($_->message);
143         }
144     };
145
146     return $conf
147 }
148
149 =head2 get_elasticsearch_settings
150
151     my $settings = $self->get_elasticsearch_settings();
152
153 This provides the settings provided to Elasticsearch when an index is created.
154 These can do things like define tokenization methods.
155
156 A hashref containing the settings is returned.
157
158 =cut
159
160 sub get_elasticsearch_settings {
161     my ($self) = @_;
162
163     # Use state to speed up repeated calls
164     state $settings = undef;
165     if (!defined $settings) {
166         my $config_file = C4::Context->config('elasticsearch_index_config');
167         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
168         $settings = YAML::XS::LoadFile( $config_file );
169     }
170
171     return $settings;
172 }
173
174 =head2 get_elasticsearch_mappings
175
176     my $mappings = $self->get_elasticsearch_mappings();
177
178 This provides the mappings that get passed to Elasticsearch when an index is
179 created.
180
181 =cut
182
183 sub get_elasticsearch_mappings {
184     my ($self) = @_;
185
186     # Use state to speed up repeated calls
187     state %all_mappings;
188     state %sort_fields;
189
190     if (!defined $all_mappings{$self->index}) {
191         $sort_fields{$self->index} = {};
192         # Clone the general mapping to break ties with the original hash
193         my $mappings = clone(_get_elasticsearch_field_config('general', ''));
194         my $marcflavour = lc C4::Context->preference('marcflavour');
195         $self->_foreach_mapping(
196             sub {
197                 my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type ) = @_;
198                 return if $marc_type ne $marcflavour;
199                 # TODO if this gets any sort of complexity to it, it should
200                 # be broken out into its own function.
201
202                 # TODO be aware of date formats, but this requires pre-parsing
203                 # as ES will simply reject anything with an invalid date.
204                 my $es_type = 'text';
205                 if ($type eq 'boolean') {
206                     $es_type = 'boolean';
207                 } elsif ($type eq 'number' || $type eq 'sum') {
208                     $es_type = 'integer';
209                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
210                     $es_type = 'stdno';
211                 } elsif ($type eq 'year') {
212                     $es_type = 'year';
213                 } elsif ($type eq 'callnumber') {
214                     $es_type = 'cn_sort';
215                 }
216
217                 if ($search) {
218                     $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
219                 }
220
221                 if ($facet) {
222                     $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
223                 }
224                 if ($suggestible) {
225                     $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226                 }
227                 # Sort should be defined in mappings as 1 (Yes) or 0 (No)
228                 # Previously, we also supported ~ (Undef) in the file
229                 # "undef" means to do the default thing, which is make it sortable.
230                 # This is preserved in order to not cause breakages for existing installs
231                 if (!defined $sort || $sort) {
232                     $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         if( $self->index eq 'authorities' ){
238             $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text');
239             $mappings->{properties}{ 'subject-heading-thesaurus' } = _get_elasticsearch_field_config('search', 'text');
240         }
241         $all_mappings{$self->index} = $mappings;
242     }
243     $self->sort_fields(\%{$sort_fields{$self->index}});
244     return $all_mappings{$self->index};
245 }
246
247 =head2 raw_elasticsearch_mappings
248
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc
251
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253
254 =cut
255
256 sub raw_elasticsearch_mappings {
257     my ( $marc_type ) = @_;
258
259     my $schema = Koha::Database->new()->schema();
260
261     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
262
263     my $mappings = {};
264     while ( my $search_field = $search_fields->next ) {
265
266         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267             { search_field_id => $search_field->id },
268             {
269                 join     => 'search_marc_map',
270                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
271             }
272         );
273
274         while ( my $marc_to_field = $marc_to_fields->next ) {
275
276             my $marc_map = $marc_to_field->search_marc_map;
277
278             next if $marc_type && $marc_map->marc_type ne $marc_type;
279
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
287
288             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
289                 {
290                     facet   => $marc_to_field->facet || '',
291                     marc_type => $marc_map->marc_type,
292                     marc_field => $marc_map->marc_field,
293                     sort        => $marc_to_field->sort,
294                     suggestible => $marc_to_field->suggestible || '',
295                     filter => $marc_to_field->filter || ''
296                 });
297
298         }
299     }
300
301     return $mappings;
302 }
303
304 =head2 _get_elasticsearch_field_config
305
306 Get the Elasticsearch field config for the given purpose and data type.
307
308 $mapping = _get_elasticsearch_field_config('search', 'text');
309
310 =cut
311
312 sub _get_elasticsearch_field_config {
313
314     my ( $purpose, $type ) = @_;
315
316     # Use state to speed up repeated calls
317     state $settings = undef;
318     if (!defined $settings) {
319         my $config_file = C4::Context->config('elasticsearch_field_config');
320         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
321         local $YAML::XS::Boolean = 'JSON::PP';
322         $settings = YAML::XS::LoadFile( $config_file );
323     }
324
325     if (!defined $settings->{$purpose}) {
326         die "Field purpose $purpose not defined in field config";
327     }
328     if ($type eq '') {
329         return $settings->{$purpose};
330     }
331     if (defined $settings->{$purpose}{$type}) {
332         return $settings->{$purpose}{$type};
333     }
334     if (defined $settings->{$purpose}{'default'}) {
335         return $settings->{$purpose}{'default'};
336     }
337     return;
338 }
339
340 =head2 _load_elasticsearch_mappings
341
342 Load Elasticsearch mappings in the format of mappings.yaml.
343
344 $indexes = _load_elasticsearch_mappings();
345
346 =cut
347
348 sub _load_elasticsearch_mappings {
349     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
350     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
351     return YAML::XS::LoadFile( $mappings_yaml );
352 }
353
354 sub reset_elasticsearch_mappings {
355     my ( $self ) = @_;
356     my $indexes = $self->_load_elasticsearch_mappings();
357
358     Koha::SearchMarcMaps->delete;
359     Koha::SearchFields->delete;
360
361     while ( my ( $index_name, $fields ) = each %$indexes ) {
362         while ( my ( $field_name, $data ) = each %$fields ) {
363
364             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
365
366             # Set default values
367             $sf_params{staff_client} //= 1;
368             $sf_params{opac} //= 1;
369
370             $sf_params{name} = $field_name;
371
372             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
373
374             my $mappings = $data->{mappings};
375             for my $mapping ( @$mappings ) {
376                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
377                     index_name => $index_name,
378                     marc_type => $mapping->{marc_type},
379                     marc_field => $mapping->{marc_field}
380                 });
381                 $search_field->add_to_search_marc_maps($marc_field, {
382                     facet => $mapping->{facet} || 0,
383                     suggestible => $mapping->{suggestible} || 0,
384                     sort => $mapping->{sort} // 1,
385                     search => $mapping->{search} // 1,
386                     filter => $mapping->{filter} // ''
387                 });
388             }
389         }
390     }
391
392     $self->clear_search_fields_cache();
393
394     # FIXME return the mappings?
395 }
396
397 # This overrides the accessor provided by Class::Accessor so that if
398 # sort_fields isn't set, then it'll generate it.
399 sub sort_fields {
400     my $self = shift;
401     if (@_) {
402         $self->_sort_fields_accessor(@_);
403         return;
404     }
405     my $val = $self->_sort_fields_accessor();
406     return $val if $val;
407
408     # This will populate the accessor as a side effect
409     $self->get_elasticsearch_mappings();
410     return $self->_sort_fields_accessor();
411 }
412
413 =head2 _process_mappings($mappings, $data, $record_document, $meta)
414
415     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
416
417 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
418 Since we group all mappings by MARC field targets C<$mappings> will contain
419 all targets for C<$data> and thus we need to fetch the MARC field only once.
420 C<$mappings> will be applied to C<$record_document> and new field values added.
421 The method has no return value.
422
423 =over 4
424
425 =item C<$mappings>
426
427 Arrayref of mappings containing arrayrefs in the format
428 [C<$target>, C<$options>] where C<$target> is the name of the target field and
429 C<$options> is a hashref containing processing directives for this particular
430 mapping.
431
432 =item C<$data>
433
434 The source data from a MARC record field.
435
436 =item C<$record_document>
437
438 Hashref representing the Elasticsearch document on which mappings should be
439 applied.
440
441 =item C<$meta>
442
443 A hashref containing metadata useful for enforcing per mapping rules. For
444 example for providing extra context for mapping options, or treating mapping
445 targets differently depending on type (sort, search, facet etc). Combining
446 this metadata with the mapping options and metadata allows us to mutate the
447 data per mapping, or even replace it with other data retrieved from the
448 metadata context.
449
450 Current properties are:
451
452 C<altscript>: A boolean value indicating whether an alternate script presentation is being
453 processed.
454
455 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
456 'subfield' or 'subfields_group'.
457
458 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
459
460 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
461 is 'subfields_group'.
462
463 C<field>: The original C<MARC::Record> object.
464
465 =back
466
467 =cut
468
469 sub _process_mappings {
470     my ($_self, $mappings, $data, $record_document, $meta) = @_;
471     foreach my $mapping (@{$mappings}) {
472         my ($target, $options) = @{$mapping};
473
474         # Don't process sort fields for alternate scripts
475         my $sort = $target =~ /__sort$/;
476         if ($sort && $meta->{altscript}) {
477             next;
478         }
479
480         # Copy (scalar) data since can have multiple targets
481         # with differing options for (possibly) mutating data
482         # so need a different copy for each
483         my $data_copy = $data;
484         if (defined $options->{substr}) {
485             my ($start, $length) = @{$options->{substr}};
486             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
487         }
488
489         # Add data to values array for callbacks processing
490         my $values = [$data_copy];
491
492         # Value callbacks takes subfield data (or values from previous
493         # callbacks) as argument, and returns a possibly different list of values.
494         # Note that the returned list may also be empty.
495         if (defined $options->{value_callbacks}) {
496             foreach my $callback (@{$options->{value_callbacks}}) {
497                 # Pass each value to current callback which returns a list
498                 # (scalar is fine too) resulting either in a list or
499                 # a list of lists that will be flattened by perl.
500                 # The next callback will receive the possibly expanded list of values.
501                 $values = [ map { $callback->($_) } @{$values} ];
502             }
503         }
504
505         # Skip mapping if all values has been removed
506         next unless @{$values};
507
508         if (defined $options->{property}) {
509             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
510         }
511         if (defined $options->{nonfiling_characters_indicator}) {
512             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
513             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
514             # Nonfiling chars does not make sense for multiple values
515             # Only apply on first element
516             if ( $nonfiling_chars > 0 ) {
517                 if ($sort) {
518                     $values->[0] = substr $values->[0], $nonfiling_chars;
519                 } else {
520                     push @{$values}, substr $values->[0], $nonfiling_chars;
521                 }
522             }
523         }
524
525         $values = [ grep(!/^$/, @{$values}) ];
526
527         $record_document->{$target} //= [];
528         push @{$record_document->{$target}}, @{$values};
529     }
530 }
531
532 =head2 marc_records_to_documents($marc_records)
533
534     my $record_documents = $self->marc_records_to_documents($marc_records);
535
536 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
537
538 Returns array of hash references, representing Elasticsearch documents,
539 acceptable as body payload in C<Search::Elasticsearch> requests.
540
541 =over 4
542
543 =item C<$marc_documents>
544
545 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
546
547 =back
548
549 =cut
550
551 sub marc_records_to_documents {
552     my ($self, $records) = @_;
553     my $rules = $self->_get_marc_mapping_rules();
554     my $control_fields_rules = $rules->{control_fields};
555     my $data_fields_rules = $rules->{data_fields};
556     my $marcflavour = lc C4::Context->preference('marcflavour');
557     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
558
559     my @record_documents;
560
561     my %auth_match_headings;
562     if( $self->index eq 'authorities' ){
563         my @auth_types = Koha::Authority::Types->search->as_list;
564         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
565     }
566
567     foreach my $record (@{$records}) {
568         my $record_document = {};
569
570         if ( $self->index eq 'authorities' ){
571             my $authtypecode = GuessAuthTypeCode( $record );
572             if( $authtypecode ){
573                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
574                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
575                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
576                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
577                 }
578             } else {
579                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
580             }
581         }
582
583         my $mappings = $rules->{leader};
584         if ($mappings) {
585             $self->_process_mappings($mappings, $record->leader(), $record_document, {
586                     altscript => 0,
587                     data_source => 'leader'
588                 }
589             );
590         }
591         foreach my $field ($record->fields()) {
592             if ($field->is_control_field()) {
593                 my $mappings = $control_fields_rules->{$field->tag()};
594                 if ($mappings) {
595                     $self->_process_mappings($mappings, $field->data(), $record_document, {
596                             altscript => 0,
597                             data_source => 'control_field',
598                             field => $field
599                         }
600                     );
601                 }
602             }
603             else {
604                 my $tag = $field->tag();
605                 # Handle alternate scripts in MARC 21
606                 my $altscript = 0;
607                 if ($marcflavour eq 'marc21' && $tag eq '880') {
608                     my $sub6 = $field->subfield('6');
609                     if ($sub6 =~ /^(...)-\d+/) {
610                         $tag = $1;
611                         $altscript = 1;
612                     }
613                 }
614
615                 my $data_field_rules = $data_fields_rules->{$tag};
616                 if ($data_field_rules) {
617                     my $subfields_mappings = $data_field_rules->{subfields};
618                     my $wildcard_mappings = $subfields_mappings->{'*'};
619                     foreach my $subfield ($field->subfields()) {
620                         my ($code, $data) = @{$subfield};
621                         my $mappings = $subfields_mappings->{$code} // [];
622                         if ($wildcard_mappings) {
623                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
624                         }
625                         if (@{$mappings}) {
626                             $self->_process_mappings($mappings, $data, $record_document, {
627                                     altscript => $altscript,
628                                     data_source => 'subfield',
629                                     code => $code,
630                                     field => $field
631                                 }
632                             );
633                         }
634                     }
635
636                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
637                     if ($subfields_join_mappings) {
638                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
639                             my $data_field = $field->clone; #copy field to preserve for alt scripts
640                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
641                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
642                             if ($data) {
643                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
644                                         altscript => $altscript,
645                                         data_source => 'subfields_group',
646                                         codes => $subfields_group,
647                                         field => $field
648                                     }
649                                 );
650                             }
651                         }
652                     }
653                 }
654             }
655         }
656
657         if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
658             foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
659                 my $data_field_rules = $data_fields_rules->{$field->tag()};
660                 if ($data_field_rules) {
661                     my $subfields_mappings = $data_field_rules->{subfields};
662                     my $wildcard_mappings = $subfields_mappings->{'*'};
663                     foreach my $subfield ($field->subfields()) {
664                         my ($code, $data) = @{$subfield};
665                         my @mappings;
666                         push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
667                         push @mappings, @$wildcard_mappings if $wildcard_mappings;
668                         # Do not include "see from" into these kind of fields
669                         @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
670                         if (@mappings) {
671                             $self->_process_mappings(\@mappings, $data, $record_document, {
672                                     data_source => 'subfield',
673                                     code => $code,
674                                     field => $field
675                                 }
676                             );
677                         }
678                     }
679
680                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
681                     if ($subfields_join_mappings) {
682                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
683                             my $data_field = $field->clone;
684                             # remove empty subfields, otherwise they are printed as a space
685                             $data_field->delete_subfield(match => qr/^$/);
686                             my $data = $data_field->as_string( $subfields_group );
687                             if ($data) {
688                                 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
689                                 # Do not include "see from" into these kind of fields
690                                 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
691                                 $self->_process_mappings(\@mappings, $data, $record_document, {
692                                         data_source => 'subfields_group',
693                                         codes => $subfields_group,
694                                         field => $field
695                                     }
696                                 );
697                             }
698                         }
699                     }
700                 }
701             }
702         }
703
704         foreach my $field (keys %{$rules->{defaults}}) {
705             unless (defined $record_document->{$field}) {
706                 $record_document->{$field} = $rules->{defaults}->{$field};
707             }
708         }
709         foreach my $field (@{$rules->{sum}}) {
710             if (defined $record_document->{$field}) {
711                 # TODO: validate numeric? filter?
712                 # TODO: Or should only accept fields without nested values?
713                 # TODO: Quick and dirty, improve if needed
714                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
715             }
716         }
717         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
718         foreach my $field (@{$rules->{isbn}}) {
719             if (defined $record_document->{$field}) {
720                 my @isbns = ();
721                 foreach my $input_isbn (@{$record_document->{$field}}) {
722                     my $isbn = Business::ISBN->new($input_isbn);
723                     if (defined $isbn && $isbn->is_valid) {
724                         my $isbn13 = $isbn->as_isbn13->as_string;
725                         push @isbns, $isbn13;
726                         $isbn13 =~ s/\-//g;
727                         push @isbns, $isbn13;
728
729                         my $isbn10 = $isbn->as_isbn10;
730                         if ($isbn10) {
731                             $isbn10 = $isbn10->as_string;
732                             push @isbns, $isbn10;
733                             $isbn10 =~ s/\-//g;
734                             push @isbns, $isbn10;
735                         }
736                     } else {
737                         push @isbns, $input_isbn;
738                     }
739                 }
740                 $record_document->{$field} = \@isbns;
741             }
742         }
743
744         # Remove duplicate values and collapse sort fields
745         foreach my $field (keys %{$record_document}) {
746             if (ref($record_document->{$field}) eq 'ARRAY') {
747                 @{$record_document->{$field}} = do {
748                     my %seen;
749                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
750                 };
751                 if ($field =~ /__sort$/) {
752                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
753                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
754                 }
755             }
756         }
757
758         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
759         $record->encoding('UTF-8');
760         if ($use_array) {
761             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
762             $record_document->{'marc_format'} = 'ARRAY';
763         } else {
764             my @warnings;
765             {
766                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
767                 local $SIG{__WARN__} = sub {
768                     push @warnings, $_[0];
769                 };
770                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
771             }
772             if (@warnings) {
773                 # Suppress warnings if record length exceeded
774                 unless (substr($record->leader(), 0, 5) eq '99999') {
775                     foreach my $warning (@warnings) {
776                         carp $warning;
777                     }
778                 }
779                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
780                 $record_document->{'marc_format'} = 'MARCXML';
781             }
782             else {
783                 $record_document->{'marc_format'} = 'base64ISO2709';
784             }
785         }
786
787         # Check if there is at least one available item
788         if ($self->index eq $BIBLIOS_INDEX) {
789             my ($tag, $code) = C4::Biblio::GetMarcFromKohaField('biblio.biblionumber');
790             my $field = $record->field($tag);
791             if ($field) {
792                 my $biblionumber = $field->is_control_field ? $field->data : $field->subfield($code);
793                 my $avail_items = Koha::Items->search({
794                     biblionumber => $biblionumber,
795                     onloan       => undef,
796                     itemlost     => 0,
797                 })->count;
798
799                 $record_document->{available} = $avail_items ? \1 : \0;
800             }
801         }
802
803         push @record_documents, $record_document;
804     }
805     return \@record_documents;
806 }
807
808 =head2 _marc_to_array($record)
809
810     my @fields = _marc_to_array($record)
811
812 Convert a MARC::Record to an array modeled after MARC-in-JSON
813 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
814
815 =over 4
816
817 =item C<$record>
818
819 A MARC::Record object
820
821 =back
822
823 =cut
824
825 sub _marc_to_array {
826     my ($self, $record) = @_;
827
828     my $data = {
829         leader => $record->leader(),
830         fields => []
831     };
832     for my $field ($record->fields()) {
833         my $tag = $field->tag();
834         if ($field->is_control_field()) {
835             push @{$data->{fields}}, {$tag => $field->data()};
836         } else {
837             my $subfields = ();
838             foreach my $subfield ($field->subfields()) {
839                 my ($code, $contents) = @{$subfield};
840                 push @{$subfields}, {$code => $contents};
841             }
842             push @{$data->{fields}}, {
843                 $tag => {
844                     ind1 => $field->indicator(1),
845                     ind2 => $field->indicator(2),
846                     subfields => $subfields
847                 }
848             };
849         }
850     }
851     return $data;
852 }
853
854 =head2 _array_to_marc($data)
855
856     my $record = _array_to_marc($data)
857
858 Convert an array modeled after MARC-in-JSON to a MARC::Record
859
860 =over 4
861
862 =item C<$data>
863
864 An array modeled after MARC-in-JSON
865 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
866
867 =back
868
869 =cut
870
871 sub _array_to_marc {
872     my ($self, $data) = @_;
873
874     my $record = MARC::Record->new();
875
876     $record->leader($data->{leader});
877     for my $field (@{$data->{fields}}) {
878         my $tag = (keys %{$field})[0];
879         $field = $field->{$tag};
880         my $marc_field;
881         if (ref($field) eq 'HASH') {
882             my @subfields;
883             foreach my $subfield (@{$field->{subfields}}) {
884                 my $code = (keys %{$subfield})[0];
885                 push @subfields, $code;
886                 push @subfields, $subfield->{$code};
887             }
888             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
889         } else {
890             $marc_field = MARC::Field->new($tag, $field)
891         }
892         $record->append_fields($marc_field);
893     }
894 ;
895     return $record;
896 }
897
898 =head2 _field_mappings($facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range)
899
900     my @mappings = _field_mappings( $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range )
901
902 Get mappings, an internal data structure later used by
903 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
904 data for a MARC mapping.
905
906 The returned C<$mappings> is not to to be confused with mappings provided by
907 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
908 provided by C<_foreach_mapping> and expands it to this internal data structure.
909 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
910 is then applied to each MARC target (leader, control field data, subfield or
911 joined subfields) and integrated into the mapping rules data structure used in
912 C<marc_records_to_documents> to transform MARC records into Elasticsearch
913 documents.
914
915 =over 4
916
917 =item C<$facet>
918
919 Boolean indicating whether to create a facet field for this mapping.
920
921 =item C<$suggestible>
922
923 Boolean indicating whether to create a suggestion field for this mapping.
924
925 =item C<$sort>
926
927 Boolean indicating whether to create a sort field for this mapping.
928
929 =item C<$search>
930
931 Boolean indicating whether to create a search field for this mapping.
932
933 =item C<$target_name>
934
935 Elasticsearch document target field name.
936
937 =item C<$target_type>
938
939 Elasticsearch document target field type.
940
941 =item C<$range>
942
943 An optional range as a string in the format "<START>-<END>" or "<START>",
944 where "<START>" and "<END>" are integers specifying a range that will be used
945 for extracting a substring from MARC data as Elasticsearch field target value.
946
947 The first character position is "0", and the range is inclusive,
948 so "0-2" means the first three characters of MARC data.
949
950 If only "<START>" is provided only one character at position "<START>" will
951 be extracted.
952
953 =back
954
955 =cut
956
957 sub _field_mappings {
958     my ( $_self, $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range ) = @_;
959     my %mapping_defaults = ();
960     my @mappings;
961
962     my $substr_args = undef;
963     if (defined $range) {
964         # TODO: use value_callback instead?
965         my ($start, $end) = map(int, split /-/, $range, 2);
966         $substr_args = [$start];
967         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
968     }
969     my $default_options = {};
970     if ($substr_args) {
971         $default_options->{substr} = $substr_args;
972     }
973
974     # TODO: Should probably have per type value callback/hook
975     # but hard code for now
976     if ($target_type eq 'boolean') {
977         $default_options->{value_callbacks} //= [];
978         push @{$default_options->{value_callbacks}}, sub {
979             my ($value) = @_;
980             # Trim whitespace at both ends
981             $value =~ s/^\s+|\s+$//g;
982             return $value ? 'true' : 'false';
983         };
984     }
985     elsif ($target_type eq 'year') {
986         $default_options->{value_callbacks} //= [];
987         # Only accept years containing digits and "u"
988         push @{$default_options->{value_callbacks}}, sub {
989             my ($value) = @_;
990             # Replace "u" with "0" for sorting
991             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
992         };
993     }
994
995     if ( defined $filter && $filter eq 'punctuation' ) {
996         $default_options->{value_callbacks} //= [];
997         push @{ $default_options->{value_callbacks} }, sub {
998             my ($value) = @_;
999
1000             # Trim punctuation marks from field
1001             $value =~
1002                 s/[\x00-\x1F,\x21-\x2F,\x3A-\x40,\x5B-\x60,\x7B-\x89,\x8B,\x8D,\x8F,\x90-\x99,\x9B,\x9D,\xA0-\xBF,\xD7,\xF7]//g;
1003             return $value;
1004         };
1005     }
1006
1007     if ($search) {
1008         my $mapping = [$target_name, $default_options];
1009         push @mappings, $mapping;
1010     }
1011
1012     my @suffixes = ();
1013     push @suffixes, 'facet' if $facet;
1014     push @suffixes, 'suggestion' if $suggestible;
1015     push @suffixes, 'sort' if !defined $sort || $sort;
1016
1017     foreach my $suffix (@suffixes) {
1018         my $mapping = ["${target_name}__$suffix"];
1019         # TODO: Hack, fix later in less hideous manner
1020         if ($suffix eq 'suggestion') {
1021             push @{$mapping}, {%{$default_options}, property => 'input'};
1022         }
1023         else {
1024             # Important! Make shallow clone, or we end up with the same hashref
1025             # shared by all mappings
1026             push @{$mapping}, {%{$default_options}};
1027         }
1028         push @mappings, $mapping;
1029     }
1030     return @mappings;
1031 };
1032
1033 =head2 _get_marc_mapping_rules
1034
1035     my $mapping_rules = $self->_get_marc_mapping_rules()
1036
1037 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1038
1039 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1040 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1041 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1042
1043 We can then iterate through all MARC fields for each record and apply all relevant
1044 rules once per fields instead of retreiving fields multiple times for each mapping rule
1045 which is terribly slow.
1046
1047 =cut
1048
1049 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1050 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1051 # memory cache which it is currently not. The performance gain of caching
1052 # would probably be marginal, but to do this could be a further improvement.
1053
1054 sub _get_marc_mapping_rules {
1055     my ($self) = @_;
1056     my $marcflavour = lc C4::Context->preference('marcflavour');
1057     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1058     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1059     my $rules = {
1060         'leader' => [],
1061         'control_fields' => {},
1062         'data_fields' => {},
1063         'sum' => [],
1064         'isbn' => [],
1065         'defaults' => {}
1066     };
1067
1068     $self->_foreach_mapping(sub {
1069         my ($name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type, $marc_field) = @_;
1070         return if $marc_type ne $marcflavour;
1071
1072         if ($type eq 'sum') {
1073             push @{$rules->{sum}}, $name;
1074             push @{$rules->{sum}}, $name."__sort" if $sort;
1075         }
1076         elsif ($type eq 'isbn') {
1077             push @{$rules->{isbn}}, $name;
1078         }
1079         elsif ($type eq 'boolean') {
1080             # boolean gets special handling, if value doesn't exist for a field,
1081             # it is set to false
1082             $rules->{defaults}->{$name} = 'false';
1083         }
1084
1085         if ($marc_field =~ $field_spec_regexp) {
1086             my $field_tag = $1;
1087
1088             my @subfields;
1089             my @subfield_groups;
1090             # Parse and separate subfields form subfield groups
1091             if (defined $2) {
1092                 my $subfield_group = '';
1093                 my $open_group = 0;
1094
1095                 foreach my $token (split //, $2) {
1096                     if ($token eq "(") {
1097                         if ($open_group) {
1098                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1099                                 "Unmatched opening parenthesis for $marc_field"
1100                             );
1101                         }
1102                         else {
1103                             $open_group = 1;
1104                         }
1105                     }
1106                     elsif ($token eq ")") {
1107                         if ($open_group) {
1108                             if ($subfield_group) {
1109                                 push @subfield_groups, $subfield_group;
1110                                 $subfield_group = '';
1111                             }
1112                             $open_group = 0;
1113                         }
1114                         else {
1115                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1116                                 "Unmatched closing parenthesis for $marc_field"
1117                             );
1118                         }
1119                     }
1120                     elsif ($open_group) {
1121                         $subfield_group .= $token;
1122                     }
1123                     else {
1124                         push @subfields, $token;
1125                     }
1126                 }
1127             }
1128             else {
1129                 push @subfields, '*';
1130             }
1131
1132             my $range = defined $3 ? $3 : undef;
1133             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1134             if ($field_tag < 10) {
1135                 $rules->{control_fields}->{$field_tag} //= [];
1136                 push @{$rules->{control_fields}->{$field_tag}}, @{clone(\@mappings)};
1137             }
1138             else {
1139                 $rules->{data_fields}->{$field_tag} //= {};
1140                 foreach my $subfield (@subfields) {
1141                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1142                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @{clone(\@mappings)};
1143                 }
1144                 foreach my $subfield_group (@subfield_groups) {
1145                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1146                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @{clone(\@mappings)};
1147                 }
1148             }
1149         }
1150         elsif ($marc_field =~ $leader_regexp) {
1151             my $range = defined $1 ? $1 : undef;
1152             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1153             push @{$rules->{leader}}, @{clone(\@mappings)};
1154         }
1155         else {
1156             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1157                 "Invalid MARC field expression: $marc_field"
1158             );
1159         }
1160     });
1161
1162     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1163     if ($marcflavour eq 'marc21') {
1164         # Nonfiling characters processing for sort fields
1165         my %title_fields;
1166         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1167             # Format is: nonfiling characters indicator => field names list
1168             %title_fields = (
1169                 1 => [130, 630, 730, 740],
1170                 2 => [222, 240, 242, 243, 245, 440, 830]
1171             );
1172         }
1173         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1174             %title_fields = (
1175                 1 => [730],
1176                 2 => [130, 430, 530]
1177             );
1178         }
1179         foreach my $indicator (keys %title_fields) {
1180             foreach my $field_tag (@{$title_fields{$indicator}}) {
1181                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1182                 foreach my $mapping ( @{$mappings} ) {
1183                     # Mark this as to be processed for nonfiling characters indicator
1184                     # later on in _process_mappings
1185                     $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1186                 }
1187             }
1188         }
1189     }
1190
1191     if( $self->index eq 'authorities' ){
1192         push @{$rules->{control_fields}->{'008'}}, ['subject-heading-thesaurus', { 'substr' => [ 11, 1 ] } ];
1193         push @{$rules->{data_fields}->{'040'}->{subfields}->{f}}, ['subject-heading-thesaurus', { } ];
1194     }
1195
1196     return $rules;
1197 }
1198
1199 =head2 _foreach_mapping
1200
1201     $self->_foreach_mapping(
1202         sub {
1203             my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type,
1204                 $marc_field )
1205               = @_;
1206             return unless $marc_type eq 'marc21';
1207             print "Data comes from: " . $marc_field . "\n";
1208         }
1209     );
1210
1211 This allows you to apply a function to each entry in the elasticsearch mappings
1212 table, in order to build the mappings for whatever is needed.
1213
1214 In the provided function, the files are:
1215
1216 =over 4
1217
1218 =item C<$name>
1219
1220 The field name for elasticsearch (corresponds to the 'mapping' column in the
1221 database.
1222
1223 =item C<$type>
1224
1225 The type for this value, e.g. 'string'.
1226
1227 =item C<$facet>
1228
1229 True if this value should be facetised. This only really makes sense if the
1230 field is understood by the facet processing code anyway.
1231
1232 =item C<$sort>
1233
1234 True if this is a field that a) needs special sort handling, and b) if it
1235 should be sorted on. False if a) but not b). Undef if not a). This allows,
1236 for example, author to be sorted on but not everything marked with "author"
1237 to be included in that sort.
1238
1239 =item C<$search>
1240
1241 True if this value should be searchable.
1242
1243 =item C<$filter>
1244
1245 Contains a string that represents a filter defined in the indexing code. Currently supports
1246 the option 'punctuation'
1247
1248 =item C<$marc_type>
1249
1250 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1251 'unimarc'.
1252
1253 =item C<$marc_field>
1254
1255 A string that describes the MARC field that contains the data to extract.
1256
1257 =back
1258
1259 =cut
1260
1261 sub _foreach_mapping {
1262     my ( $self, $sub ) = @_;
1263
1264     # TODO use a caching framework here
1265     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1266         {
1267             'search_marc_map.index_name' => $self->index,
1268         },
1269         {   join => { search_marc_to_fields => 'search_marc_map' },
1270             '+select' => [
1271                 'search_marc_to_fields.facet',
1272                 'search_marc_to_fields.suggestible',
1273                 'search_marc_to_fields.sort',
1274                 'search_marc_to_fields.search',
1275                 'search_marc_to_fields.filter',
1276                 'search_marc_map.marc_type',
1277                 'search_marc_map.marc_field',
1278             ],
1279             '+as'     => [
1280                 'facet',
1281                 'suggestible',
1282                 'sort',
1283                 'search',
1284                 'filter',
1285                 'marc_type',
1286                 'marc_field',
1287             ],
1288         }
1289     );
1290
1291     while ( my $search_field = $search_fields->next ) {
1292         $sub->(
1293             # Force lower case on indexed field names for case insensitive
1294             # field name searches
1295             lc($search_field->name),
1296             $search_field->type,
1297             $search_field->get_column('facet'),
1298             $search_field->get_column('suggestible'),
1299             $search_field->get_column('sort'),
1300             $search_field->get_column('search'),
1301             $search_field->get_column('filter'),
1302             $search_field->get_column('marc_type'),
1303             $search_field->get_column('marc_field'),
1304         );
1305     }
1306 }
1307
1308 =head2 process_error
1309
1310     die process_error($@);
1311
1312 This parses an Elasticsearch error message and produces a human-readable
1313 result from it. This result is probably missing all the useful information
1314 that you might want in diagnosing an issue, so the warning is also logged.
1315
1316 Note that currently the resulting message is not internationalised. This
1317 will happen eventually by some method or other.
1318
1319 =cut
1320
1321 sub process_error {
1322     my ($self, $msg) = @_;
1323
1324     warn $msg; # simple logging
1325
1326     # This is super-primitive
1327     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1328
1329     return "Unable to perform your search. Please try again.\n";
1330 }
1331
1332 =head2 _read_configuration
1333
1334     my $conf = _read_configuration();
1335
1336 Reads the I<configuration file> and returns a hash structure with the
1337 configuration information. It raises an exception if mandatory entries
1338 are missing.
1339
1340 The hashref structure has the following form:
1341
1342     {
1343         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1344         'index_name' => 'koha_instance',
1345     }
1346
1347 This is configured by the following in the C<config> block in koha-conf.xml:
1348
1349     <elasticsearch>
1350         <server>127.0.0.1:9200</server>
1351         <server>anotherserver:9200</server>
1352         <index_name>koha_instance</index_name>
1353     </elasticsearch>
1354
1355 =cut
1356
1357 sub _read_configuration {
1358
1359     my $configuration;
1360
1361     my $conf = C4::Context->config('elasticsearch');
1362     unless ( defined $conf ) {
1363         Koha::Exceptions::Config::MissingEntry->throw(
1364             "Missing <elasticsearch> entry in koha-conf.xml"
1365         );
1366     }
1367
1368     unless ( exists $conf->{server} ) {
1369         Koha::Exceptions::Config::MissingEntry->throw(
1370             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1371         );
1372     }
1373
1374     unless ( exists $conf->{index_name} ) {
1375         Koha::Exceptions::Config::MissingEntry->throw(
1376             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1377         );
1378     }
1379
1380     while ( my ( $var, $val ) = each %$conf ) {
1381         if ( $var eq 'server' ) {
1382             if ( ref($val) eq 'ARRAY' ) {
1383                 $configuration->{nodes} = $val;
1384             }
1385             else {
1386                 $configuration->{nodes} = [$val];
1387             }
1388         } else {
1389             $configuration->{$var} = $val;
1390         }
1391     }
1392
1393     $configuration->{cxn_pool} //= 'Static';
1394
1395     return $configuration;
1396 }
1397
1398 =head2 get_facetable_fields
1399
1400 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1401
1402 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1403
1404 =cut
1405
1406 sub get_facetable_fields {
1407     my ($self) = @_;
1408
1409     # These should correspond to the ES field names, as opposed to the CCL
1410     # things that zebra uses.
1411     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1412     my @faceted_fields = Koha::SearchFields->search(
1413         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1414     )->as_list;
1415     my @not_faceted_fields = Koha::SearchFields->search(
1416         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1417     )->as_list;
1418     # This could certainly be improved
1419     return ( @faceted_fields, @not_faceted_fields );
1420 }
1421
1422 =head2 clear_search_fields_cache
1423
1424 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1425
1426 Clear cached values for ES search fields
1427
1428 =cut
1429
1430 sub clear_search_fields_cache {
1431
1432     my $cache = Koha::Caches->get_instance();
1433     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1434     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1435     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1436     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1437
1438 }
1439
1440 1;
1441
1442 __END__
1443
1444 =head1 AUTHOR
1445
1446 =over 4
1447
1448 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1449
1450 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1451
1452 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1453
1454 =back
1455
1456 =cut