Bug 27153: Add filter option to Elasticsearch indexing
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::Filter::MARC::EmbedSeeFromHeadings;
28 use Koha::SearchFields;
29 use Koha::SearchMarcMaps;
30 use Koha::Caches;
31 use C4::Heading;
32 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
33 use C4::Biblio;
34
35 use Carp qw( carp croak );
36 use Clone qw( clone );
37 use Modern::Perl;
38 use Readonly qw( Readonly );
39 use Search::Elasticsearch;
40 use Try::Tiny qw( catch try );
41 use YAML::XS;
42
43 use List::Util qw( sum0 );
44 use MARC::File::XML;
45 use MIME::Base64 qw( encode_base64 );
46 use Encode qw( encode );
47 use Business::ISBN;
48 use Scalar::Util qw( looks_like_number );
49
50 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
51 __PACKAGE__->mk_accessors(qw( sort_fields ));
52
53 # Constants to refer to the standard index names
54 Readonly our $BIBLIOS_INDEX     => 'biblios';
55 Readonly our $AUTHORITIES_INDEX => 'authorities';
56
57 =head1 NAME
58
59 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
60
61 =head1 ACCESSORS
62
63 =over 4
64
65 =item index
66
67 The name of the index to use, generally 'biblios' or 'authorities'.
68
69 =item index_name
70
71 The Elasticsearch index name with Koha instance prefix.
72
73 =back
74
75
76 =head1 FUNCTIONS
77
78 =cut
79
80 sub new {
81     my $class = shift @_;
82     my ($params) = @_;
83
84     # Check for a valid index
85     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
86     my $config = _read_configuration();
87     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88
89     my $self = $class->SUPER::new(@_);
90     return $self;
91 }
92
93 =head2 get_elasticsearch
94
95     my $elasticsearch_client = $self->get_elasticsearch();
96
97 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
98 instance level and will be reused if method is called multiple times.
99
100 =cut
101
102 sub get_elasticsearch {
103     my $self = shift @_;
104     unless (defined $self->{elasticsearch}) {
105         $self->{elasticsearch} = Search::Elasticsearch->new(
106             $self->get_elasticsearch_params()
107         );
108     }
109     return $self->{elasticsearch};
110 }
111
112 =head2 get_elasticsearch_params
113
114     my $params = $self->get_elasticsearch_params();
115
116 This provides a hashref that contains the parameters for connecting to the
117 ElasicSearch servers, in the form:
118
119     {
120         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
121         'index_name' => 'koha_instance_index',
122     }
123
124 This is configured by the following in the C<config> block in koha-conf.xml:
125
126     <elasticsearch>
127         <server>127.0.0.1:9200</server>
128         <server>anotherserver:9200</server>
129         <index_name>koha_instance</index_name>
130     </elasticsearch>
131
132 =cut
133
134 sub get_elasticsearch_params {
135     my ($self) = @_;
136
137     my $conf;
138     try {
139         $conf = _read_configuration();
140     } catch {
141         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
142             croak($_->message);
143         }
144     };
145
146     return $conf
147 }
148
149 =head2 get_elasticsearch_settings
150
151     my $settings = $self->get_elasticsearch_settings();
152
153 This provides the settings provided to Elasticsearch when an index is created.
154 These can do things like define tokenization methods.
155
156 A hashref containing the settings is returned.
157
158 =cut
159
160 sub get_elasticsearch_settings {
161     my ($self) = @_;
162
163     # Use state to speed up repeated calls
164     state $settings = undef;
165     if (!defined $settings) {
166         my $config_file = C4::Context->config('elasticsearch_index_config');
167         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
168         $settings = YAML::XS::LoadFile( $config_file );
169     }
170
171     return $settings;
172 }
173
174 =head2 get_elasticsearch_mappings
175
176     my $mappings = $self->get_elasticsearch_mappings();
177
178 This provides the mappings that get passed to Elasticsearch when an index is
179 created.
180
181 =cut
182
183 sub get_elasticsearch_mappings {
184     my ($self) = @_;
185
186     # Use state to speed up repeated calls
187     state %all_mappings;
188     state %sort_fields;
189
190     if (!defined $all_mappings{$self->index}) {
191         $sort_fields{$self->index} = {};
192         # Clone the general mapping to break ties with the original hash
193         my $mappings = clone(_get_elasticsearch_field_config('general', ''));
194         my $marcflavour = lc C4::Context->preference('marcflavour');
195         $self->_foreach_mapping(
196             sub {
197                 my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type ) = @_;
198                 return if $marc_type ne $marcflavour;
199                 # TODO if this gets any sort of complexity to it, it should
200                 # be broken out into its own function.
201
202                 # TODO be aware of date formats, but this requires pre-parsing
203                 # as ES will simply reject anything with an invalid date.
204                 my $es_type = 'text';
205                 if ($type eq 'boolean') {
206                     $es_type = 'boolean';
207                 } elsif ($type eq 'number' || $type eq 'sum') {
208                     $es_type = 'integer';
209                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
210                     $es_type = 'stdno';
211                 } elsif ($type eq 'year') {
212                     $es_type = 'year';
213                 } elsif ($type eq 'callnumber') {
214                     $es_type = 'cn_sort';
215                 }
216
217                 if ($search) {
218                     $mappings->{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
219                 }
220
221                 if ($facet) {
222                     $mappings->{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
223                 }
224                 if ($suggestible) {
225                     $mappings->{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226                 }
227                 # Sort should be defined in mappings as 1 (Yes) or 0 (No)
228                 # Previously, we also supported ~ (Undef) in the file
229                 # "undef" means to do the default thing, which is make it sortable.
230                 # This is preserved in order to not cause breakages for existing installs
231                 if (!defined $sort || $sort) {
232                     $mappings->{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         if( $self->index eq 'authorities' ){
238             $mappings->{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text');
239             $mappings->{properties}{ 'subject-heading-thesaurus' } = _get_elasticsearch_field_config('search', 'text');
240         }
241         $all_mappings{$self->index} = $mappings;
242     }
243     $self->sort_fields(\%{$sort_fields{$self->index}});
244     return $all_mappings{$self->index};
245 }
246
247 =head2 raw_elasticsearch_mappings
248
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc
251
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253
254 =cut
255
256 sub raw_elasticsearch_mappings {
257     my ( $marc_type ) = @_;
258
259     my $schema = Koha::Database->new()->schema();
260
261     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
262
263     my $mappings = {};
264     while ( my $search_field = $search_fields->next ) {
265
266         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267             { search_field_id => $search_field->id },
268             {
269                 join     => 'search_marc_map',
270                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
271             }
272         );
273
274         while ( my $marc_to_field = $marc_to_fields->next ) {
275
276             my $marc_map = $marc_to_field->search_marc_map;
277
278             next if $marc_type && $marc_map->marc_type ne $marc_type;
279
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
286             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
287
288             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
289                 {
290                     facet   => $marc_to_field->facet || '',
291                     marc_type => $marc_map->marc_type,
292                     marc_field => $marc_map->marc_field,
293                     sort        => $marc_to_field->sort,
294                     suggestible => $marc_to_field->suggestible || '',
295                     filter => $marc_to_field->filter || ''
296                 });
297
298         }
299     }
300
301     return $mappings;
302 }
303
304 =head2 _get_elasticsearch_field_config
305
306 Get the Elasticsearch field config for the given purpose and data type.
307
308 $mapping = _get_elasticsearch_field_config('search', 'text');
309
310 =cut
311
312 sub _get_elasticsearch_field_config {
313
314     my ( $purpose, $type ) = @_;
315
316     # Use state to speed up repeated calls
317     state $settings = undef;
318     if (!defined $settings) {
319         my $config_file = C4::Context->config('elasticsearch_field_config');
320         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
321         local $YAML::XS::Boolean = 'JSON::PP';
322         $settings = YAML::XS::LoadFile( $config_file );
323     }
324
325     if (!defined $settings->{$purpose}) {
326         die "Field purpose $purpose not defined in field config";
327     }
328     if ($type eq '') {
329         return $settings->{$purpose};
330     }
331     if (defined $settings->{$purpose}{$type}) {
332         return $settings->{$purpose}{$type};
333     }
334     if (defined $settings->{$purpose}{'default'}) {
335         return $settings->{$purpose}{'default'};
336     }
337     return;
338 }
339
340 =head2 _load_elasticsearch_mappings
341
342 Load Elasticsearch mappings in the format of mappings.yaml.
343
344 $indexes = _load_elasticsearch_mappings();
345
346 =cut
347
348 sub _load_elasticsearch_mappings {
349     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
350     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
351     return YAML::XS::LoadFile( $mappings_yaml );
352 }
353
354 sub reset_elasticsearch_mappings {
355     my ( $self ) = @_;
356     my $indexes = $self->_load_elasticsearch_mappings();
357
358     Koha::SearchMarcMaps->delete;
359     Koha::SearchFields->delete;
360
361     while ( my ( $index_name, $fields ) = each %$indexes ) {
362         while ( my ( $field_name, $data ) = each %$fields ) {
363
364             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
365
366             # Set default values
367             $sf_params{staff_client} //= 1;
368             $sf_params{opac} //= 1;
369
370             $sf_params{name} = $field_name;
371
372             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
373
374             my $mappings = $data->{mappings};
375             for my $mapping ( @$mappings ) {
376                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
377                     index_name => $index_name,
378                     marc_type => $mapping->{marc_type},
379                     marc_field => $mapping->{marc_field}
380                 });
381                 $search_field->add_to_search_marc_maps($marc_field, {
382                     facet => $mapping->{facet} || 0,
383                     suggestible => $mapping->{suggestible} || 0,
384                     sort => $mapping->{sort} // 1,
385                     search => $mapping->{search} // 1,
386                     filter => $mapping->{filter} // ''
387                 });
388             }
389         }
390     }
391
392     $self->clear_search_fields_cache();
393
394     # FIXME return the mappings?
395 }
396
397 # This overrides the accessor provided by Class::Accessor so that if
398 # sort_fields isn't set, then it'll generate it.
399 sub sort_fields {
400     my $self = shift;
401     if (@_) {
402         $self->_sort_fields_accessor(@_);
403         return;
404     }
405     my $val = $self->_sort_fields_accessor();
406     return $val if $val;
407
408     # This will populate the accessor as a side effect
409     $self->get_elasticsearch_mappings();
410     return $self->_sort_fields_accessor();
411 }
412
413 =head2 _process_mappings($mappings, $data, $record_document, $meta)
414
415     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
416
417 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
418 Since we group all mappings by MARC field targets C<$mappings> will contain
419 all targets for C<$data> and thus we need to fetch the MARC field only once.
420 C<$mappings> will be applied to C<$record_document> and new field values added.
421 The method has no return value.
422
423 =over 4
424
425 =item C<$mappings>
426
427 Arrayref of mappings containing arrayrefs in the format
428 [C<$target>, C<$options>] where C<$target> is the name of the target field and
429 C<$options> is a hashref containing processing directives for this particular
430 mapping.
431
432 =item C<$data>
433
434 The source data from a MARC record field.
435
436 =item C<$record_document>
437
438 Hashref representing the Elasticsearch document on which mappings should be
439 applied.
440
441 =item C<$meta>
442
443 A hashref containing metadata useful for enforcing per mapping rules. For
444 example for providing extra context for mapping options, or treating mapping
445 targets differently depending on type (sort, search, facet etc). Combining
446 this metadata with the mapping options and metadata allows us to mutate the
447 data per mapping, or even replace it with other data retrieved from the
448 metadata context.
449
450 Current properties are:
451
452 C<altscript>: A boolean value indicating whether an alternate script presentation is being
453 processed.
454
455 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
456 'subfield' or 'subfields_group'.
457
458 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
459
460 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
461 is 'subfields_group'.
462
463 C<field>: The original C<MARC::Record> object.
464
465 =back
466
467 =cut
468
469 sub _process_mappings {
470     my ($_self, $mappings, $data, $record_document, $meta) = @_;
471     foreach my $mapping (@{$mappings}) {
472         my ($target, $options) = @{$mapping};
473
474         # Don't process sort fields for alternate scripts
475         my $sort = $target =~ /__sort$/;
476         if ($sort && $meta->{altscript}) {
477             next;
478         }
479
480         # Copy (scalar) data since can have multiple targets
481         # with differing options for (possibly) mutating data
482         # so need a different copy for each
483         my $data_copy = $data;
484         if (defined $options->{substr}) {
485             my ($start, $length) = @{$options->{substr}};
486             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
487         }
488
489         # Add data to values array for callbacks processing
490         my $values = [$data_copy];
491
492         # Value callbacks takes subfield data (or values from previous
493         # callbacks) as argument, and returns a possibly different list of values.
494         # Note that the returned list may also be empty.
495         if (defined $options->{value_callbacks}) {
496             foreach my $callback (@{$options->{value_callbacks}}) {
497                 # Pass each value to current callback which returns a list
498                 # (scalar is fine too) resulting either in a list or
499                 # a list of lists that will be flattened by perl.
500                 # The next callback will receive the possibly expanded list of values.
501                 $values = [ map { $callback->($_) } @{$values} ];
502             }
503         }
504
505         # Skip mapping if all values has been removed
506         next unless @{$values};
507
508         if (defined $options->{property}) {
509             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
510         }
511         if (defined $options->{nonfiling_characters_indicator}) {
512             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
513             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
514             # Nonfiling chars does not make sense for multiple values
515             # Only apply on first element
516             $values->[0] = substr $values->[0], $nonfiling_chars;
517         }
518
519         $values = [ grep(!/^$/, @{$values}) ];
520
521         $record_document->{$target} //= [];
522         push @{$record_document->{$target}}, @{$values};
523     }
524 }
525
526 =head2 marc_records_to_documents($marc_records)
527
528     my $record_documents = $self->marc_records_to_documents($marc_records);
529
530 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
531
532 Returns array of hash references, representing Elasticsearch documents,
533 acceptable as body payload in C<Search::Elasticsearch> requests.
534
535 =over 4
536
537 =item C<$marc_documents>
538
539 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
540
541 =back
542
543 =cut
544
545 sub marc_records_to_documents {
546     my ($self, $records) = @_;
547     my $rules = $self->_get_marc_mapping_rules();
548     my $control_fields_rules = $rules->{control_fields};
549     my $data_fields_rules = $rules->{data_fields};
550     my $marcflavour = lc C4::Context->preference('marcflavour');
551     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
552
553     my @record_documents;
554
555     my %auth_match_headings;
556     if( $self->index eq 'authorities' ){
557         my @auth_types = Koha::Authority::Types->search->as_list;
558         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
559     }
560
561     foreach my $record (@{$records}) {
562         my $record_document = {};
563
564         if ( $self->index eq 'authorities' ){
565             my $authtypecode = GuessAuthTypeCode( $record );
566             if( $authtypecode ){
567                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
568                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
569                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
570                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
571                 }
572             } else {
573                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
574             }
575         }
576
577         my $mappings = $rules->{leader};
578         if ($mappings) {
579             $self->_process_mappings($mappings, $record->leader(), $record_document, {
580                     altscript => 0,
581                     data_source => 'leader'
582                 }
583             );
584         }
585         foreach my $field ($record->fields()) {
586             if ($field->is_control_field()) {
587                 my $mappings = $control_fields_rules->{$field->tag()};
588                 if ($mappings) {
589                     $self->_process_mappings($mappings, $field->data(), $record_document, {
590                             altscript => 0,
591                             data_source => 'control_field',
592                             field => $field
593                         }
594                     );
595                 }
596             }
597             else {
598                 my $tag = $field->tag();
599                 # Handle alternate scripts in MARC 21
600                 my $altscript = 0;
601                 if ($marcflavour eq 'marc21' && $tag eq '880') {
602                     my $sub6 = $field->subfield('6');
603                     if ($sub6 =~ /^(...)-\d+/) {
604                         $tag = $1;
605                         $altscript = 1;
606                     }
607                 }
608
609                 my $data_field_rules = $data_fields_rules->{$tag};
610                 if ($data_field_rules) {
611                     my $subfields_mappings = $data_field_rules->{subfields};
612                     my $wildcard_mappings = $subfields_mappings->{'*'};
613                     foreach my $subfield ($field->subfields()) {
614                         my ($code, $data) = @{$subfield};
615                         my $mappings = $subfields_mappings->{$code} // [];
616                         if ($wildcard_mappings) {
617                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
618                         }
619                         if (@{$mappings}) {
620                             $self->_process_mappings($mappings, $data, $record_document, {
621                                     altscript => $altscript,
622                                     data_source => 'subfield',
623                                     code => $code,
624                                     field => $field
625                                 }
626                             );
627                         }
628                     }
629
630                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
631                     if ($subfields_join_mappings) {
632                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
633                             my $data_field = $field->clone; #copy field to preserve for alt scripts
634                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
635                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
636                             if ($data) {
637                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
638                                         altscript => $altscript,
639                                         data_source => 'subfields_group',
640                                         codes => $subfields_group,
641                                         field => $field
642                                     }
643                                 );
644                             }
645                         }
646                     }
647                 }
648             }
649         }
650
651         if (C4::Context->preference('IncludeSeeFromInSearches') and $self->index eq 'biblios') {
652             foreach my $field (Koha::Filter::MARC::EmbedSeeFromHeadings->new->fields($record)) {
653                 my $data_field_rules = $data_fields_rules->{$field->tag()};
654                 if ($data_field_rules) {
655                     my $subfields_mappings = $data_field_rules->{subfields};
656                     my $wildcard_mappings = $subfields_mappings->{'*'};
657                     foreach my $subfield ($field->subfields()) {
658                         my ($code, $data) = @{$subfield};
659                         my @mappings;
660                         push @mappings, @{ $subfields_mappings->{$code} } if $subfields_mappings->{$code};
661                         push @mappings, @$wildcard_mappings if $wildcard_mappings;
662                         # Do not include "see from" into these kind of fields
663                         @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
664                         if (@mappings) {
665                             $self->_process_mappings(\@mappings, $data, $record_document, {
666                                     data_source => 'subfield',
667                                     code => $code,
668                                     field => $field
669                                 }
670                             );
671                         }
672                     }
673
674                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
675                     if ($subfields_join_mappings) {
676                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
677                             my $data_field = $field->clone;
678                             # remove empty subfields, otherwise they are printed as a space
679                             $data_field->delete_subfield(match => qr/^$/);
680                             my $data = $data_field->as_string( $subfields_group );
681                             if ($data) {
682                                 my @mappings = @{ $subfields_join_mappings->{$subfields_group} };
683                                 # Do not include "see from" into these kind of fields
684                                 @mappings = grep { $_->[0] !~ /__(sort|facet|suggestion)$/ } @mappings;
685                                 $self->_process_mappings(\@mappings, $data, $record_document, {
686                                         data_source => 'subfields_group',
687                                         codes => $subfields_group,
688                                         field => $field
689                                     }
690                                 );
691                             }
692                         }
693                     }
694                 }
695             }
696         }
697
698         foreach my $field (keys %{$rules->{defaults}}) {
699             unless (defined $record_document->{$field}) {
700                 $record_document->{$field} = $rules->{defaults}->{$field};
701             }
702         }
703         foreach my $field (@{$rules->{sum}}) {
704             if (defined $record_document->{$field}) {
705                 # TODO: validate numeric? filter?
706                 # TODO: Or should only accept fields without nested values?
707                 # TODO: Quick and dirty, improve if needed
708                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
709             }
710         }
711         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
712         foreach my $field (@{$rules->{isbn}}) {
713             if (defined $record_document->{$field}) {
714                 my @isbns = ();
715                 foreach my $input_isbn (@{$record_document->{$field}}) {
716                     my $isbn = Business::ISBN->new($input_isbn);
717                     if (defined $isbn && $isbn->is_valid) {
718                         my $isbn13 = $isbn->as_isbn13->as_string;
719                         push @isbns, $isbn13;
720                         $isbn13 =~ s/\-//g;
721                         push @isbns, $isbn13;
722
723                         my $isbn10 = $isbn->as_isbn10;
724                         if ($isbn10) {
725                             $isbn10 = $isbn10->as_string;
726                             push @isbns, $isbn10;
727                             $isbn10 =~ s/\-//g;
728                             push @isbns, $isbn10;
729                         }
730                     } else {
731                         push @isbns, $input_isbn;
732                     }
733                 }
734                 $record_document->{$field} = \@isbns;
735             }
736         }
737
738         # Remove duplicate values and collapse sort fields
739         foreach my $field (keys %{$record_document}) {
740             if (ref($record_document->{$field}) eq 'ARRAY') {
741                 @{$record_document->{$field}} = do {
742                     my %seen;
743                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
744                 };
745                 if ($field =~ /__sort$/) {
746                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
747                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
748                 }
749             }
750         }
751
752         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
753         $record->encoding('UTF-8');
754         if ($use_array) {
755             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
756             $record_document->{'marc_format'} = 'ARRAY';
757         } else {
758             my @warnings;
759             {
760                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
761                 local $SIG{__WARN__} = sub {
762                     push @warnings, $_[0];
763                 };
764                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
765             }
766             if (@warnings) {
767                 # Suppress warnings if record length exceeded
768                 unless (substr($record->leader(), 0, 5) eq '99999') {
769                     foreach my $warning (@warnings) {
770                         carp $warning;
771                     }
772                 }
773                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
774                 $record_document->{'marc_format'} = 'MARCXML';
775             }
776             else {
777                 $record_document->{'marc_format'} = 'base64ISO2709';
778             }
779         }
780
781         # Check if there is at least one available item
782         if ($self->index eq $BIBLIOS_INDEX) {
783             my ($tag, $code) = C4::Biblio::GetMarcFromKohaField('biblio.biblionumber');
784             my $field = $record->field($tag);
785             if ($field) {
786                 my $biblionumber = $field->is_control_field ? $field->data : $field->subfield($code);
787                 my $avail_items = Koha::Items->search({
788                     biblionumber => $biblionumber,
789                     onloan       => undef,
790                     itemlost     => 0,
791                 })->count;
792
793                 $record_document->{available} = $avail_items ? \1 : \0;
794             }
795         }
796
797         push @record_documents, $record_document;
798     }
799     return \@record_documents;
800 }
801
802 =head2 _marc_to_array($record)
803
804     my @fields = _marc_to_array($record)
805
806 Convert a MARC::Record to an array modeled after MARC-in-JSON
807 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
808
809 =over 4
810
811 =item C<$record>
812
813 A MARC::Record object
814
815 =back
816
817 =cut
818
819 sub _marc_to_array {
820     my ($self, $record) = @_;
821
822     my $data = {
823         leader => $record->leader(),
824         fields => []
825     };
826     for my $field ($record->fields()) {
827         my $tag = $field->tag();
828         if ($field->is_control_field()) {
829             push @{$data->{fields}}, {$tag => $field->data()};
830         } else {
831             my $subfields = ();
832             foreach my $subfield ($field->subfields()) {
833                 my ($code, $contents) = @{$subfield};
834                 push @{$subfields}, {$code => $contents};
835             }
836             push @{$data->{fields}}, {
837                 $tag => {
838                     ind1 => $field->indicator(1),
839                     ind2 => $field->indicator(2),
840                     subfields => $subfields
841                 }
842             };
843         }
844     }
845     return $data;
846 }
847
848 =head2 _array_to_marc($data)
849
850     my $record = _array_to_marc($data)
851
852 Convert an array modeled after MARC-in-JSON to a MARC::Record
853
854 =over 4
855
856 =item C<$data>
857
858 An array modeled after MARC-in-JSON
859 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
860
861 =back
862
863 =cut
864
865 sub _array_to_marc {
866     my ($self, $data) = @_;
867
868     my $record = MARC::Record->new();
869
870     $record->leader($data->{leader});
871     for my $field (@{$data->{fields}}) {
872         my $tag = (keys %{$field})[0];
873         $field = $field->{$tag};
874         my $marc_field;
875         if (ref($field) eq 'HASH') {
876             my @subfields;
877             foreach my $subfield (@{$field->{subfields}}) {
878                 my $code = (keys %{$subfield})[0];
879                 push @subfields, $code;
880                 push @subfields, $subfield->{$code};
881             }
882             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
883         } else {
884             $marc_field = MARC::Field->new($tag, $field)
885         }
886         $record->append_fields($marc_field);
887     }
888 ;
889     return $record;
890 }
891
892 =head2 _field_mappings($facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range)
893
894     my @mappings = _field_mappings( $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range )
895
896 Get mappings, an internal data structure later used by
897 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
898 data for a MARC mapping.
899
900 The returned C<$mappings> is not to to be confused with mappings provided by
901 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
902 provided by C<_foreach_mapping> and expands it to this internal data structure.
903 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
904 is then applied to each MARC target (leader, control field data, subfield or
905 joined subfields) and integrated into the mapping rules data structure used in
906 C<marc_records_to_documents> to transform MARC records into Elasticsearch
907 documents.
908
909 =over 4
910
911 =item C<$facet>
912
913 Boolean indicating whether to create a facet field for this mapping.
914
915 =item C<$suggestible>
916
917 Boolean indicating whether to create a suggestion field for this mapping.
918
919 =item C<$sort>
920
921 Boolean indicating whether to create a sort field for this mapping.
922
923 =item C<$search>
924
925 Boolean indicating whether to create a search field for this mapping.
926
927 =item C<$target_name>
928
929 Elasticsearch document target field name.
930
931 =item C<$target_type>
932
933 Elasticsearch document target field type.
934
935 =item C<$range>
936
937 An optional range as a string in the format "<START>-<END>" or "<START>",
938 where "<START>" and "<END>" are integers specifying a range that will be used
939 for extracting a substring from MARC data as Elasticsearch field target value.
940
941 The first character position is "0", and the range is inclusive,
942 so "0-2" means the first three characters of MARC data.
943
944 If only "<START>" is provided only one character at position "<START>" will
945 be extracted.
946
947 =back
948
949 =cut
950
951 sub _field_mappings {
952     my ( $_self, $facet, $suggestible, $sort, $search, $filter, $target_name, $target_type, $range ) = @_;
953     my %mapping_defaults = ();
954     my @mappings;
955
956     my $substr_args = undef;
957     if (defined $range) {
958         # TODO: use value_callback instead?
959         my ($start, $end) = map(int, split /-/, $range, 2);
960         $substr_args = [$start];
961         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
962     }
963     my $default_options = {};
964     if ($substr_args) {
965         $default_options->{substr} = $substr_args;
966     }
967
968     # TODO: Should probably have per type value callback/hook
969     # but hard code for now
970     if ($target_type eq 'boolean') {
971         $default_options->{value_callbacks} //= [];
972         push @{$default_options->{value_callbacks}}, sub {
973             my ($value) = @_;
974             # Trim whitespace at both ends
975             $value =~ s/^\s+|\s+$//g;
976             return $value ? 'true' : 'false';
977         };
978     }
979     elsif ($target_type eq 'year') {
980         $default_options->{value_callbacks} //= [];
981         # Only accept years containing digits and "u"
982         push @{$default_options->{value_callbacks}}, sub {
983             my ($value) = @_;
984             # Replace "u" with "0" for sorting
985             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
986         };
987     }
988
989     if ( defined $filter && $filter eq 'punctuation' ) {
990         $default_options->{value_callbacks} //= [];
991         push @{ $default_options->{value_callbacks} }, sub {
992             my ($value) = @_;
993
994             # Trim punctuation marks from field
995             $value =~
996                 s/[\x00-\x1F,\x21-\x2F,\x3A-\x40,\x5B-\x60,\x7B-\x89,\x8B,\x8D,\x8F,\x90-\x99,\x9B,\x9D,\xA0-\xBF,\xD7,\xF7]//g;
997             return $value;
998         };
999     }
1000
1001     if ($search) {
1002         my $mapping = [$target_name, $default_options];
1003         push @mappings, $mapping;
1004     }
1005
1006     my @suffixes = ();
1007     push @suffixes, 'facet' if $facet;
1008     push @suffixes, 'suggestion' if $suggestible;
1009     push @suffixes, 'sort' if !defined $sort || $sort;
1010
1011     foreach my $suffix (@suffixes) {
1012         my $mapping = ["${target_name}__$suffix"];
1013         # TODO: Hack, fix later in less hideous manner
1014         if ($suffix eq 'suggestion') {
1015             push @{$mapping}, {%{$default_options}, property => 'input'};
1016         }
1017         else {
1018             # Important! Make shallow clone, or we end up with the same hashref
1019             # shared by all mappings
1020             push @{$mapping}, {%{$default_options}};
1021         }
1022         push @mappings, $mapping;
1023     }
1024     return @mappings;
1025 };
1026
1027 =head2 _get_marc_mapping_rules
1028
1029     my $mapping_rules = $self->_get_marc_mapping_rules()
1030
1031 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
1032
1033 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
1034 each call to C<MARC::Record>->field) we create an optimized structure of mapping
1035 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
1036
1037 We can then iterate through all MARC fields for each record and apply all relevant
1038 rules once per fields instead of retreiving fields multiple times for each mapping rule
1039 which is terribly slow.
1040
1041 =cut
1042
1043 # TODO: This structure can be used for processing multiple MARC::Records so is currently
1044 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
1045 # memory cache which it is currently not. The performance gain of caching
1046 # would probably be marginal, but to do this could be a further improvement.
1047
1048 sub _get_marc_mapping_rules {
1049     my ($self) = @_;
1050     my $marcflavour = lc C4::Context->preference('marcflavour');
1051     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
1052     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
1053     my $rules = {
1054         'leader' => [],
1055         'control_fields' => {},
1056         'data_fields' => {},
1057         'sum' => [],
1058         'isbn' => [],
1059         'defaults' => {}
1060     };
1061
1062     $self->_foreach_mapping(sub {
1063         my ($name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type, $marc_field) = @_;
1064         return if $marc_type ne $marcflavour;
1065
1066         if ($type eq 'sum') {
1067             push @{$rules->{sum}}, $name;
1068             push @{$rules->{sum}}, $name."__sort" if $sort;
1069         }
1070         elsif ($type eq 'isbn') {
1071             push @{$rules->{isbn}}, $name;
1072         }
1073         elsif ($type eq 'boolean') {
1074             # boolean gets special handling, if value doesn't exist for a field,
1075             # it is set to false
1076             $rules->{defaults}->{$name} = 'false';
1077         }
1078
1079         if ($marc_field =~ $field_spec_regexp) {
1080             my $field_tag = $1;
1081
1082             my @subfields;
1083             my @subfield_groups;
1084             # Parse and separate subfields form subfield groups
1085             if (defined $2) {
1086                 my $subfield_group = '';
1087                 my $open_group = 0;
1088
1089                 foreach my $token (split //, $2) {
1090                     if ($token eq "(") {
1091                         if ($open_group) {
1092                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1093                                 "Unmatched opening parenthesis for $marc_field"
1094                             );
1095                         }
1096                         else {
1097                             $open_group = 1;
1098                         }
1099                     }
1100                     elsif ($token eq ")") {
1101                         if ($open_group) {
1102                             if ($subfield_group) {
1103                                 push @subfield_groups, $subfield_group;
1104                                 $subfield_group = '';
1105                             }
1106                             $open_group = 0;
1107                         }
1108                         else {
1109                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1110                                 "Unmatched closing parenthesis for $marc_field"
1111                             );
1112                         }
1113                     }
1114                     elsif ($open_group) {
1115                         $subfield_group .= $token;
1116                     }
1117                     else {
1118                         push @subfields, $token;
1119                     }
1120                 }
1121             }
1122             else {
1123                 push @subfields, '*';
1124             }
1125
1126             my $range = defined $3 ? $3 : undef;
1127             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1128             if ($field_tag < 10) {
1129                 $rules->{control_fields}->{$field_tag} //= [];
1130                 push @{$rules->{control_fields}->{$field_tag}}, @{clone(\@mappings)};
1131             }
1132             else {
1133                 $rules->{data_fields}->{$field_tag} //= {};
1134                 foreach my $subfield (@subfields) {
1135                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1136                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @{clone(\@mappings)};
1137                 }
1138                 foreach my $subfield_group (@subfield_groups) {
1139                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1140                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @{clone(\@mappings)};
1141                 }
1142             }
1143         }
1144         elsif ($marc_field =~ $leader_regexp) {
1145             my $range = defined $1 ? $1 : undef;
1146             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $filter, $name, $type, $range);
1147             push @{$rules->{leader}}, @{clone(\@mappings)};
1148         }
1149         else {
1150             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1151                 "Invalid MARC field expression: $marc_field"
1152             );
1153         }
1154     });
1155
1156     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1157     if ($marcflavour eq 'marc21') {
1158         # Nonfiling characters processing for sort fields
1159         my %title_fields;
1160         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1161             # Format is: nonfiling characters indicator => field names list
1162             %title_fields = (
1163                 1 => [130, 630, 730, 740],
1164                 2 => [222, 240, 242, 243, 245, 440, 830]
1165             );
1166         }
1167         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1168             %title_fields = (
1169                 1 => [730],
1170                 2 => [130, 430, 530]
1171             );
1172         }
1173         foreach my $indicator (keys %title_fields) {
1174             foreach my $field_tag (@{$title_fields{$indicator}}) {
1175                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1176                 foreach my $mapping (@{$mappings}) {
1177                     if ($mapping->[0] =~ /__sort$/) {
1178                         # Mark this as to be processed for nonfiling characters indicator
1179                         # later on in _process_mappings
1180                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1181                     }
1182                 }
1183             }
1184         }
1185     }
1186
1187     if( $self->index eq 'authorities' ){
1188         push @{$rules->{control_fields}->{'008'}}, ['subject-heading-thesaurus', { 'substr' => [ 11, 1 ] } ];
1189         push @{$rules->{data_fields}->{'040'}->{subfields}->{f}}, ['subject-heading-thesaurus', { } ];
1190     }
1191
1192     return $rules;
1193 }
1194
1195 =head2 _foreach_mapping
1196
1197     $self->_foreach_mapping(
1198         sub {
1199             my ( $name, $type, $facet, $suggestible, $sort, $search, $filter, $marc_type,
1200                 $marc_field )
1201               = @_;
1202             return unless $marc_type eq 'marc21';
1203             print "Data comes from: " . $marc_field . "\n";
1204         }
1205     );
1206
1207 This allows you to apply a function to each entry in the elasticsearch mappings
1208 table, in order to build the mappings for whatever is needed.
1209
1210 In the provided function, the files are:
1211
1212 =over 4
1213
1214 =item C<$name>
1215
1216 The field name for elasticsearch (corresponds to the 'mapping' column in the
1217 database.
1218
1219 =item C<$type>
1220
1221 The type for this value, e.g. 'string'.
1222
1223 =item C<$facet>
1224
1225 True if this value should be facetised. This only really makes sense if the
1226 field is understood by the facet processing code anyway.
1227
1228 =item C<$sort>
1229
1230 True if this is a field that a) needs special sort handling, and b) if it
1231 should be sorted on. False if a) but not b). Undef if not a). This allows,
1232 for example, author to be sorted on but not everything marked with "author"
1233 to be included in that sort.
1234
1235 =item C<$search>
1236
1237 True if this value should be searchable.
1238
1239 =item C<$filter>
1240
1241 Contains a string that represents a filter defined in the indexing code. Currently supports
1242 the option 'punctuation'
1243
1244 =item C<$marc_type>
1245
1246 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1247 'unimarc'.
1248
1249 =item C<$marc_field>
1250
1251 A string that describes the MARC field that contains the data to extract.
1252
1253 =back
1254
1255 =cut
1256
1257 sub _foreach_mapping {
1258     my ( $self, $sub ) = @_;
1259
1260     # TODO use a caching framework here
1261     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1262         {
1263             'search_marc_map.index_name' => $self->index,
1264         },
1265         {   join => { search_marc_to_fields => 'search_marc_map' },
1266             '+select' => [
1267                 'search_marc_to_fields.facet',
1268                 'search_marc_to_fields.suggestible',
1269                 'search_marc_to_fields.sort',
1270                 'search_marc_to_fields.search',
1271                 'search_marc_to_fields.filter',
1272                 'search_marc_map.marc_type',
1273                 'search_marc_map.marc_field',
1274             ],
1275             '+as'     => [
1276                 'facet',
1277                 'suggestible',
1278                 'sort',
1279                 'search',
1280                 'filter',
1281                 'marc_type',
1282                 'marc_field',
1283             ],
1284         }
1285     );
1286
1287     while ( my $search_field = $search_fields->next ) {
1288         $sub->(
1289             # Force lower case on indexed field names for case insensitive
1290             # field name searches
1291             lc($search_field->name),
1292             $search_field->type,
1293             $search_field->get_column('facet'),
1294             $search_field->get_column('suggestible'),
1295             $search_field->get_column('sort'),
1296             $search_field->get_column('search'),
1297             $search_field->get_column('filter'),
1298             $search_field->get_column('marc_type'),
1299             $search_field->get_column('marc_field'),
1300         );
1301     }
1302 }
1303
1304 =head2 process_error
1305
1306     die process_error($@);
1307
1308 This parses an Elasticsearch error message and produces a human-readable
1309 result from it. This result is probably missing all the useful information
1310 that you might want in diagnosing an issue, so the warning is also logged.
1311
1312 Note that currently the resulting message is not internationalised. This
1313 will happen eventually by some method or other.
1314
1315 =cut
1316
1317 sub process_error {
1318     my ($self, $msg) = @_;
1319
1320     warn $msg; # simple logging
1321
1322     # This is super-primitive
1323     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException|parse_exception/;
1324
1325     return "Unable to perform your search. Please try again.\n";
1326 }
1327
1328 =head2 _read_configuration
1329
1330     my $conf = _read_configuration();
1331
1332 Reads the I<configuration file> and returns a hash structure with the
1333 configuration information. It raises an exception if mandatory entries
1334 are missing.
1335
1336 The hashref structure has the following form:
1337
1338     {
1339         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1340         'index_name' => 'koha_instance',
1341     }
1342
1343 This is configured by the following in the C<config> block in koha-conf.xml:
1344
1345     <elasticsearch>
1346         <server>127.0.0.1:9200</server>
1347         <server>anotherserver:9200</server>
1348         <index_name>koha_instance</index_name>
1349     </elasticsearch>
1350
1351 =cut
1352
1353 sub _read_configuration {
1354
1355     my $configuration;
1356
1357     my $conf = C4::Context->config('elasticsearch');
1358     unless ( defined $conf ) {
1359         Koha::Exceptions::Config::MissingEntry->throw(
1360             "Missing <elasticsearch> entry in koha-conf.xml"
1361         );
1362     }
1363
1364     unless ( exists $conf->{server} ) {
1365         Koha::Exceptions::Config::MissingEntry->throw(
1366             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1367         );
1368     }
1369
1370     unless ( exists $conf->{index_name} ) {
1371         Koha::Exceptions::Config::MissingEntry->throw(
1372             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1373         );
1374     }
1375
1376     while ( my ( $var, $val ) = each %$conf ) {
1377         if ( $var eq 'server' ) {
1378             if ( ref($val) eq 'ARRAY' ) {
1379                 $configuration->{nodes} = $val;
1380             }
1381             else {
1382                 $configuration->{nodes} = [$val];
1383             }
1384         } else {
1385             $configuration->{$var} = $val;
1386         }
1387     }
1388
1389     $configuration->{cxn_pool} //= 'Static';
1390
1391     return $configuration;
1392 }
1393
1394 =head2 get_facetable_fields
1395
1396 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1397
1398 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1399
1400 =cut
1401
1402 sub get_facetable_fields {
1403     my ($self) = @_;
1404
1405     # These should correspond to the ES field names, as opposed to the CCL
1406     # things that zebra uses.
1407     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1408     my @faceted_fields = Koha::SearchFields->search(
1409         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1410     )->as_list;
1411     my @not_faceted_fields = Koha::SearchFields->search(
1412         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1413     )->as_list;
1414     # This could certainly be improved
1415     return ( @faceted_fields, @not_faceted_fields );
1416 }
1417
1418 =head2 clear_search_fields_cache
1419
1420 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1421
1422 Clear cached values for ES search fields
1423
1424 =cut
1425
1426 sub clear_search_fields_cache {
1427
1428     my $cache = Koha::Caches->get_instance();
1429     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1430     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1431     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1432     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1433
1434 }
1435
1436 1;
1437
1438 __END__
1439
1440 =head1 AUTHOR
1441
1442 =over 4
1443
1444 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1445
1446 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1447
1448 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1449
1450 =back
1451
1452 =cut