Bug 19893: Restore and fix removed tests
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use Search::Elasticsearch;
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     croak('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 sub get_elasticsearch {
77     my $self = shift @_;
78     unless (defined $self->{elasticsearch}) {
79         my $conf = $self->get_elasticsearch_params();
80         $self->{elasticsearch} = Search::Elasticsearch->new(
81             client => "5_0::Direct",
82             nodes => $conf->{nodes},
83             cxn_pool => 'Sniff'
84         );
85     }
86     return $self->{elasticsearch};
87 }
88
89 =head2 get_elasticsearch_params
90
91     my $params = $self->get_elasticsearch_params();
92
93 This provides a hashref that contains the parameters for connecting to the
94 ElasicSearch servers, in the form:
95
96     {
97         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
98         'index_name' => 'koha_instance_index',
99     }
100
101 This is configured by the following in the C<config> block in koha-conf.xml:
102
103     <elasticsearch>
104         <server>127.0.0.1:9200</server>
105         <server>anotherserver:9200</server>
106         <index_name>koha_instance</index_name>
107     </elasticsearch>
108
109 =cut
110
111 sub get_elasticsearch_params {
112     my ($self) = @_;
113
114     # Copy the hash so that we're not modifying the original
115     my $conf = C4::Context->config('elasticsearch');
116     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
117     my $es = { %{ $conf } };
118
119     # Helpfully, the multiple server lines end up in an array for us anyway
120     # if there are multiple ones, but not if there's only one.
121     my $server = $es->{server};
122     delete $es->{server};
123     if ( ref($server) eq 'ARRAY' ) {
124
125         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
126         $es->{nodes} = $server;
127     }
128     elsif ($server) {
129         $es->{nodes} = [$server];
130     }
131     else {
132         die "No elasticsearch servers were specified in koha-conf.xml.\n";
133     }
134     die "No elasticserver index_name was specified in koha-conf.xml.\n"
135       if ( !$es->{index_name} );
136     # Append the name of this particular index to our namespace
137     $es->{index_name} .= '_' . $self->index;
138
139     $es->{key_prefix} = 'es_';
140     return $es;
141 }
142
143 =head2 get_elasticsearch_settings
144
145     my $settings = $self->get_elasticsearch_settings();
146
147 This provides the settings provided to elasticsearch when an index is created.
148 These can do things like define tokenisation methods.
149
150 A hashref containing the settings is returned.
151
152 =cut
153
154 sub get_elasticsearch_settings {
155     my ($self) = @_;
156
157     # Use state to speed up repeated calls
158     state $settings = undef;
159     if (!defined $settings) {
160         my $config_file = C4::Context->config('elasticsearch_index_config');
161         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
162         $settings = LoadFile( $config_file );
163     }
164
165     return $settings;
166 }
167
168 =head2 get_elasticsearch_mappings
169
170     my $mappings = $self->get_elasticsearch_mappings();
171
172 This provides the mappings that get passed to elasticsearch when an index is
173 created.
174
175 =cut
176
177 sub get_elasticsearch_mappings {
178     my ($self) = @_;
179
180     # Use state to speed up repeated calls
181     state %all_mappings;
182     state %sort_fields;
183
184     if (!defined $all_mappings{$self->index}) {
185         $sort_fields{$self->index} = {};
186         my $mappings = {
187             data => scalar _get_elasticsearch_mapping('general', '')
188         };
189         my $marcflavour = lc C4::Context->preference('marcflavour');
190         $self->_foreach_mapping(
191             sub {
192                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
193                 return if $marc_type ne $marcflavour;
194                 # TODO if this gets any sort of complexity to it, it should
195                 # be broken out into its own function.
196
197                 # TODO be aware of date formats, but this requires pre-parsing
198                 # as ES will simply reject anything with an invalid date.
199                 my $es_type = 'text';
200                 if ($type eq 'boolean') {
201                     $es_type = 'boolean';
202                 } elsif ($type eq 'number' || $type eq 'sum') {
203                     $es_type = 'integer';
204                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
205                     $es_type = 'stdno';
206                 }
207
208                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
209
210                 if ($facet) {
211                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
212                 }
213                 if ($suggestible) {
214                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
215                 }
216                 # Sort is a bit special as it can be true, false, undef.
217                 # We care about "true" or "undef",
218                 # "undef" means to do the default thing, which is make it sortable.
219                 if (!defined $sort || $sort) {
220                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
221                     $sort_fields{$self->index}{$name} = 1;
222                 }
223             }
224         );
225         $all_mappings{$self->index} = $mappings;
226     }
227     $self->sort_fields(\%{$sort_fields{$self->index}});
228
229     return $all_mappings{$self->index};
230 }
231
232 =head2 _get_elasticsearch_mapping
233
234 Get the ES mappings for the given purpose and data type
235
236 $mapping = _get_elasticsearch_mapping('search', 'text');
237
238 =cut
239
240 sub _get_elasticsearch_mapping {
241
242     my ( $purpose, $type ) = @_;
243
244     # Use state to speed up repeated calls
245     state $settings = undef;
246     if (!defined $settings) {
247         my $config_file = C4::Context->config('elasticsearch_field_config');
248         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
249         $settings = LoadFile( $config_file );
250     }
251
252     if (!defined $settings->{$purpose}) {
253         die "Field purpose $purpose not defined in field config";
254     }
255     if ($type eq '') {
256         return $settings->{$purpose};
257     }
258     if (defined $settings->{$purpose}{$type}) {
259         return $settings->{$purpose}{$type};
260     }
261     if (defined $settings->{$purpose}{'default'}) {
262         return $settings->{$purpose}{'default'};
263     }
264     return;
265 }
266
267 sub reset_elasticsearch_mappings {
268     my ( $reset_fields ) = @_;
269     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
270     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
271     my $indexes = LoadFile( $mappings_yaml );
272
273     while ( my ( $index_name, $fields ) = each %$indexes ) {
274         while ( my ( $field_name, $data ) = each %$fields ) {
275             my $field_type = $data->{type};
276             my $field_label = $data->{label};
277             my $mappings = $data->{mappings};
278             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
279             for my $mapping ( @$mappings ) {
280                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
281                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
282             }
283         }
284     }
285 }
286
287 # This overrides the accessor provided by Class::Accessor so that if
288 # sort_fields isn't set, then it'll generate it.
289 sub sort_fields {
290     my $self = shift;
291     if (@_) {
292         $self->_sort_fields_accessor(@_);
293         return;
294     }
295     my $val = $self->_sort_fields_accessor();
296     return $val if $val;
297
298     # This will populate the accessor as a side effect
299     $self->get_elasticsearch_mappings();
300     return $self->_sort_fields_accessor();
301 }
302
303 sub marc_records_to_documents {
304     my ($self, $records) = @_;
305     my $rules = $self->get_marc_mapping_rules();
306     my $control_fields_rules = $rules->{control_fields};
307     my $data_fields_rules = $rules->{data_fields};
308     my $marcflavour = lc C4::Context->preference('marcflavour');
309     my $serialization_format = C4::Context->preference('ElasticsearchMARCSerializationFormat');
310
311     my @record_documents;
312
313     sub _process_mappings {
314         my ($mappings, $data, $record_document) = @_;
315         foreach my $mapping (@{$mappings}) {
316             my ($target, $options) = @{$mapping};
317             # Copy (scalar) data since can have multiple targets
318             # with differing options for (possibly) mutating data
319             # so need a different copy for each
320             my $_data = $data;
321             $record_document->{$target} //= [];
322             if (defined $options->{substr}) {
323                 my ($start, $length) = @{$options->{substr}};
324                 $_data = length($data) > $start ? substr $data, $start, $length : '';
325             }
326             if (defined $options->{value_callbacks}) {
327                 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
328             }
329             if (defined $options->{property}) {
330                 $_data = {
331                     $options->{property} => $_data
332                 }
333             }
334             push @{$record_document->{$target}}, $_data;
335         }
336     }
337     foreach my $record (@{$records}) {
338         my $record_document = {};
339         my $mappings = $rules->{leader};
340         if ($mappings) {
341             _process_mappings($mappings, $record->leader(), $record_document);
342         }
343         foreach my $field ($record->fields()) {
344             if($field->is_control_field()) {
345                 my $mappings = $control_fields_rules->{$field->tag()};
346                 if ($mappings) {
347                     _process_mappings($mappings, $field->data(), $record_document);
348                 }
349             }
350             else {
351                 my $subfields_mappings = $data_fields_rules->{$field->tag()};
352                 if ($subfields_mappings) {
353                     my $wildcard_mappings = $subfields_mappings->{'*'};
354                     foreach my $subfield ($field->subfields()) {
355                         my ($code, $data) = @{$subfield};
356                         my $mappings = $subfields_mappings->{$code} // [];
357                         if ($wildcard_mappings) {
358                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
359                         }
360                         if (@{$mappings}) {
361                             _process_mappings($mappings, $data, $record_document);
362                         }
363                     }
364                 }
365             }
366         }
367         foreach my $field (keys %{$rules->{defaults}}) {
368             unless (defined $record_document->{$field}) {
369                 $record_document->{$field} = $rules->{defaults}->{$field};
370             }
371         }
372         foreach my $field (@{$rules->{sum}}) {
373             if (defined $record_document->{$field}) {
374                 # TODO: validate numeric? filter?
375                 # TODO: Or should only accept fields without nested values?
376                 # TODO: Quick and dirty, improve if needed
377                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
378             }
379         }
380         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
381         $record->encoding('UTF-8');
382         my @warnings;
383         {
384             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
385             local $SIG{__WARN__} = sub {
386                 push @warnings, $_[0];
387             };
388             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
389         }
390         if (@warnings) {
391             # Suppress warnings if record length exceeded
392             unless (substr($record->leader(), 0, 5) eq '99999') {
393                 foreach my $warning (@warnings) {
394                     carp($warning);
395                 }
396             }
397             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
398             $record_document->{'marc_format'} = 'MARCXML';
399         }
400         else {
401             $record_document->{'marc_format'} = 'base64ISO2709';
402         }
403         my $id = $record->subfield('999', 'c');
404         push @record_documents, [$id, $record_document];
405     }
406     return \@record_documents;
407 }
408
409 # Provides the rules for marc to Elasticsearch JSON document conversion.
410 sub get_marc_mapping_rules {
411     my ($self) = @_;
412
413     my $marcflavour = lc C4::Context->preference('marcflavour');
414     my @rules;
415
416     sub _field_mappings {
417         my ($facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
418         my %mapping_defaults = ();
419         my @mappings;
420
421         my $substr_args = undef;
422         if ($range) {
423             # TODO: use value_callback instead?
424             my ($start, $end) = map(int, split /-/, $range, 2);
425             $substr_args = [$start];
426             push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
427         }
428         my $default_options = {};
429         if ($substr_args) {
430             $default_options->{substr} = $substr_args;
431         }
432
433         # TODO: Should probably have per type value callback/hook
434         # but hard code for now
435         if ($target_type eq 'boolean') {
436             $default_options->{value_callbacks} //= [];
437             push @{$default_options->{value_callbacks}}, sub {
438                 my ($value) = @_;
439                 # Trim whitespace at both ends
440                 $value =~ s/^\s+|\s+$//g;
441                 return $value ? 'true' : 'false';
442             };
443         }
444
445         my $mapping = [$target_name, $default_options];
446         push @mappings, $mapping;
447
448         my @suffixes = ();
449         push @suffixes, 'facet' if $facet;
450         push @suffixes, 'suggestion' if $suggestible;
451         push @suffixes, 'sort' if !defined $sort || $sort;
452
453         foreach my $suffix (@suffixes) {
454             my $mapping = ["${target_name}__$suffix"];
455             # Hack, fix later in less hideous manner
456             if ($suffix eq 'suggestion') {
457                 push @{$mapping}, {%{$default_options}, property => 'input'};
458             }
459             else {
460                 push @{$mapping}, $default_options;
461             }
462             push @mappings, $mapping;
463         }
464         return @mappings;
465     };
466     my $field_spec_regexp = qr/^([0-9]{3})([0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
467     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
468     my $rules = {
469         'leader' => [],
470         'control_fields' => {},
471         'data_fields' => {},
472         'sum' => [],
473         'defaults' => {}
474     };
475
476     $self->_foreach_mapping(sub {
477         my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
478         return if $marc_type ne $marcflavour;
479
480         if ($type eq 'sum') {
481             push @{$rules->{sum}}, $name;
482         }
483         elsif($type eq 'boolean') {
484             # boolean gets special handling, if value doesn't exist for a field,
485             # it is set to false
486             $rules->{defaults}->{$name} = 'false';
487         }
488
489         if ($marc_field =~ $field_spec_regexp) {
490             my $field_tag = $1;
491             my $subfields = defined $2 ? $2 : '*';
492             my $range = defined $3 ? $3 : undef;
493             if ($field_tag < 10) {
494                 $rules->{control_fields}->{$field_tag} //= [];
495                 my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
496                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
497             }
498             else {
499                 $rules->{data_fields}->{$field_tag} //= {};
500                 foreach my $subfield (split //, $subfields) {
501                     $rules->{data_fields}->{$field_tag}->{$subfield} //= [];
502                     my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
503                     push @{$rules->{data_fields}->{$field_tag}->{$subfield}}, @mappings;
504                 }
505             }
506         }
507         elsif ($marc_field =~ $leader_regexp) {
508             my $range = defined $1 ? $1 : undef;
509             my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
510             push @{$rules->{leader}}, @mappings;
511         }
512         else {
513             die("Invalid marc field: $marc_field");
514         }
515     });
516     return $rules;
517 }
518
519 =head2 _foreach_mapping
520
521     $self->_foreach_mapping(
522         sub {
523             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
524                 $marc_field )
525               = @_;
526             return unless $marc_type eq 'marc21';
527             print "Data comes from: " . $marc_field . "\n";
528         }
529     );
530
531 This allows you to apply a function to each entry in the elasticsearch mappings
532 table, in order to build the mappings for whatever is needed.
533
534 In the provided function, the files are:
535
536 =over 4
537
538 =item C<$name>
539
540 The field name for elasticsearch (corresponds to the 'mapping' column in the
541 database.
542
543 =item C<$type>
544
545 The type for this value, e.g. 'string'.
546
547 =item C<$facet>
548
549 True if this value should be facetised. This only really makes sense if the
550 field is understood by the facet processing code anyway.
551
552 =item C<$sort>
553
554 True if this is a field that a) needs special sort handling, and b) if it
555 should be sorted on. False if a) but not b). Undef if not a). This allows,
556 for example, author to be sorted on but not everything marked with "author"
557 to be included in that sort.
558
559 =item C<$marc_type>
560
561 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
562 'unimarc', 'normarc'.
563
564 =item C<$marc_field>
565
566 A string that describes the MARC field that contains the data to extract.
567 These are of a form suited to Catmandu's MARC fixers.
568
569 =back
570
571 =cut
572
573 sub _foreach_mapping {
574     my ( $self, $sub ) = @_;
575
576     # TODO use a caching framework here
577     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
578         {
579             'search_marc_map.index_name' => $self->index,
580         },
581         {   join => { search_marc_to_fields => 'search_marc_map' },
582             '+select' => [
583                 'search_marc_to_fields.facet',
584                 'search_marc_to_fields.suggestible',
585                 'search_marc_to_fields.sort',
586                 'search_marc_map.marc_type',
587                 'search_marc_map.marc_field',
588             ],
589             '+as'     => [
590                 'facet',
591                 'suggestible',
592                 'sort',
593                 'marc_type',
594                 'marc_field',
595             ],
596         }
597     );
598
599     while ( my $search_field = $search_fields->next ) {
600         $sub->(
601             $search_field->name,
602             $search_field->type,
603             $search_field->get_column('facet'),
604             $search_field->get_column('suggestible'),
605             $search_field->get_column('sort'),
606             $search_field->get_column('marc_type'),
607             $search_field->get_column('marc_field'),
608         );
609     }
610 }
611
612 =head2 process_error
613
614     die process_error($@);
615
616 This parses an Elasticsearch error message and produces a human-readable
617 result from it. This result is probably missing all the useful information
618 that you might want in diagnosing an issue, so the warning is also logged.
619
620 Note that currently the resulting message is not internationalised. This
621 will happen eventually by some method or other.
622
623 =cut
624
625 sub process_error {
626     my ($self, $msg) = @_;
627
628     warn $msg; # simple logging
629
630     # This is super-primitive
631     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
632
633     return "Unable to perform your search. Please try again.\n";
634 }
635
636 =head2 _read_configuration
637
638     my $conf = _read_configuration();
639
640 Reads the I<configuration file> and returns a hash structure with the
641 configuration information. It raises an exception if mandatory entries
642 are missing.
643
644 The hashref structure has the following form:
645
646     {
647         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
648         'index_name' => 'koha_instance',
649     }
650
651 This is configured by the following in the C<config> block in koha-conf.xml:
652
653     <elasticsearch>
654         <server>127.0.0.1:9200</server>
655         <server>anotherserver:9200</server>
656         <index_name>koha_instance</index_name>
657     </elasticsearch>
658
659 =cut
660
661 sub _read_configuration {
662
663     my $configuration;
664
665     my $conf = C4::Context->config('elasticsearch');
666     Koha::Exceptions::Config::MissingEntry->throw(
667         "Missing 'elasticsearch' block in config file")
668       unless defined $conf;
669
670     if ( $conf && $conf->{server} ) {
671         my $nodes = $conf->{server};
672         if ( ref($nodes) eq 'ARRAY' ) {
673             $configuration->{nodes} = $nodes;
674         }
675         else {
676             $configuration->{nodes} = [$nodes];
677         }
678     }
679     else {
680         Koha::Exceptions::Config::MissingEntry->throw(
681             "Missing 'server' entry in config file for elasticsearch");
682     }
683
684     if ( defined $conf->{index_name} ) {
685         $configuration->{index_name} = $conf->{index_name};
686     }
687     else {
688         Koha::Exceptions::Config::MissingEntry->throw(
689             "Missing 'index_name' entry in config file for elasticsearch");
690     }
691
692     return $configuration;
693 }
694
695 1;
696
697 __END__
698
699 =head1 AUTHOR
700
701 =over 4
702
703 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
704
705 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
706
707 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
708
709 =back
710
711 =cut