Bug 19893: Alternative optimized indexing for Elasticsearch
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use Search::Elasticsearch;
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     croak('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 sub get_elasticsearch {
77     my $self = shift @_;
78     unless (defined $self->{elasticsearch}) {
79         my $conf = $self->get_elasticsearch_params();
80         $self->{elasticsearch} = Search::Elasticsearch->new(
81             client => "5_0::Direct",
82             nodes => $conf->{nodes},
83             cxn_pool => 'Sniff'
84         );
85     }
86     return $self->{elasticsearch};
87 }
88
89 =head2 get_elasticsearch_params
90
91     my $params = $self->get_elasticsearch_params();
92
93 This provides a hashref that contains the parameters for connecting to the
94 ElasicSearch servers, in the form:
95
96     {
97         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
98         'index_name' => 'koha_instance_index',
99     }
100
101 This is configured by the following in the C<config> block in koha-conf.xml:
102
103     <elasticsearch>
104         <server>127.0.0.1:9200</server>
105         <server>anotherserver:9200</server>
106         <index_name>koha_instance</index_name>
107     </elasticsearch>
108
109 =cut
110
111 sub get_elasticsearch_params {
112     my ($self) = @_;
113
114     # Copy the hash so that we're not modifying the original
115     my $conf = C4::Context->config('elasticsearch');
116     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
117     my $es = { %{ $conf } };
118
119     # Helpfully, the multiple server lines end up in an array for us anyway
120     # if there are multiple ones, but not if there's only one.
121     my $server = $es->{server};
122     delete $es->{server};
123     if ( ref($server) eq 'ARRAY' ) {
124
125         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
126         $es->{nodes} = $server;
127     }
128     elsif ($server) {
129         $es->{nodes} = [$server];
130     }
131     else {
132         die "No elasticsearch servers were specified in koha-conf.xml.\n";
133     }
134     die "No elasticserver index_name was specified in koha-conf.xml.\n"
135       if ( !$es->{index_name} );
136     # Append the name of this particular index to our namespace
137     $es->{index_name} .= '_' . $self->index;
138
139     $es->{key_prefix} = 'es_';
140     return $es;
141 }
142
143 =head2 get_elasticsearch_settings
144
145     my $settings = $self->get_elasticsearch_settings();
146
147 This provides the settings provided to elasticsearch when an index is created.
148 These can do things like define tokenisation methods.
149
150 A hashref containing the settings is returned.
151
152 =cut
153
154 sub get_elasticsearch_settings {
155     my ($self) = @_;
156
157     # Use state to speed up repeated calls
158     state $settings = undef;
159     if (!defined $settings) {
160         my $config_file = C4::Context->config('elasticsearch_index_config');
161         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
162         $settings = LoadFile( $config_file );
163     }
164
165     return $settings;
166 }
167
168 =head2 get_elasticsearch_mappings
169
170     my $mappings = $self->get_elasticsearch_mappings();
171
172 This provides the mappings that get passed to elasticsearch when an index is
173 created.
174
175 =cut
176
177 sub get_elasticsearch_mappings {
178     my ($self) = @_;
179
180     # Use state to speed up repeated calls
181     state %all_mappings;
182     state %sort_fields;
183
184     if (!defined $all_mappings{$self->index}) {
185         $sort_fields{$self->index} = {};
186         my $mappings = {
187             data => scalar _get_elasticsearch_mapping('general', '')
188         };
189         my $marcflavour = lc C4::Context->preference('marcflavour');
190         $self->_foreach_mapping(
191             sub {
192                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
193                 return if $marc_type ne $marcflavour;
194                 # TODO if this gets any sort of complexity to it, it should
195                 # be broken out into its own function.
196
197                 # TODO be aware of date formats, but this requires pre-parsing
198                 # as ES will simply reject anything with an invalid date.
199                 my $es_type = 'text';
200                 if ($type eq 'boolean') {
201                     $es_type = 'boolean';
202                 } elsif ($type eq 'number' || $type eq 'sum') {
203                     $es_type = 'integer';
204                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
205                     $es_type = 'stdno';
206                 }
207
208                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
209
210                 if ($facet) {
211                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
212                 }
213                 if ($suggestible) {
214                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
215                 }
216                 # Sort is a bit special as it can be true, false, undef.
217                 # We care about "true" or "undef",
218                 # "undef" means to do the default thing, which is make it sortable.
219                 if (!defined $sort || $sort) {
220                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
221                     $sort_fields{$self->index}{$name} = 1;
222                 }
223             }
224         );
225         $all_mappings{$self->index} = $mappings;
226     }
227     $self->sort_fields(\%{$sort_fields{$self->index}});
228
229     return $all_mappings{$self->index};
230 }
231
232 =head2 _get_elasticsearch_mapping
233
234 Get the ES mappings for the given purpose and data type
235
236 $mapping = _get_elasticsearch_mapping('search', 'text');
237
238 =cut
239
240 sub _get_elasticsearch_mapping {
241
242     my ( $purpose, $type ) = @_;
243
244     # Use state to speed up repeated calls
245     state $settings = undef;
246     if (!defined $settings) {
247         my $config_file = C4::Context->config('elasticsearch_field_config');
248         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
249         $settings = LoadFile( $config_file );
250     }
251
252     if (!defined $settings->{$purpose}) {
253         die "Field purpose $purpose not defined in field config";
254     }
255     if ($type eq '') {
256         return $settings->{$purpose};
257     }
258     if (defined $settings->{$purpose}{$type}) {
259         return $settings->{$purpose}{$type};
260     }
261     if (defined $settings->{$purpose}{'default'}) {
262         return $settings->{$purpose}{'default'};
263     }
264     return;
265 }
266
267 sub reset_elasticsearch_mappings {
268     my ( $reset_fields ) = @_;
269     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
270     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
271     my $indexes = LoadFile( $mappings_yaml );
272
273     while ( my ( $index_name, $fields ) = each %$indexes ) {
274         while ( my ( $field_name, $data ) = each %$fields ) {
275             my $field_type = $data->{type};
276             my $field_label = $data->{label};
277             my $mappings = $data->{mappings};
278             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
279             for my $mapping ( @$mappings ) {
280                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
281                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
282             }
283         }
284     }
285 }
286
287 # This overrides the accessor provided by Class::Accessor so that if
288 # sort_fields isn't set, then it'll generate it.
289 sub sort_fields {
290     my $self = shift;
291     if (@_) {
292         $self->_sort_fields_accessor(@_);
293         return;
294     }
295     my $val = $self->_sort_fields_accessor();
296     return $val if $val;
297
298     # This will populate the accessor as a side effect
299     $self->get_elasticsearch_mappings();
300     return $self->_sort_fields_accessor();
301 }
302
303 sub marc_records_to_documents {
304     my ($self, $records) = @_;
305     my $rules = $self->get_marc_mapping_rules();
306     my $control_fields_rules = $rules->{control_fields};
307     my $data_fields_rules = $rules->{data_fields};
308     my $marcflavour = lc C4::Context->preference('marcflavour');
309     my $serialization_format = C4::Context->preference('ElasticsearchMARCSerializationFormat');
310
311     my @record_documents;
312
313     sub _process_mappings {
314         my ($mappings, $data, $record_document) = @_;
315         foreach my $mapping (@{$mappings}) {
316             my ($target, $options) = @{$mapping};
317             # Copy (scalar) data since can have multiple targets
318             # with differing options for (possibly) mutating data
319             # so need a different copy for each
320             my $_data = $data;
321             $record_document->{$target} //= [];
322             if (defined $options->{substr}) {
323                 my ($start, $length) = @{$options->{substr}};
324                 $_data = length($data) > $start ? substr $data, $start, $length : '';
325             }
326             if (defined $options->{value_callbacks}) {
327                 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
328             }
329             if (defined $options->{property}) {
330                 $_data = {
331                     $options->{property} => $_data
332                 }
333             }
334             push @{$record_document->{$target}}, $_data;
335         }
336     }
337     foreach my $record (@{$records}) {
338         my $record_document = {};
339         my $mappings = $rules->{leader};
340         if ($mappings) {
341             _process_mappings($mappings, $record->leader(), $record_document);
342         }
343         foreach my $field ($record->fields()) {
344             if($field->is_control_field()) {
345                 my $mappings = $control_fields_rules->{$field->tag()};
346                 if ($mappings) {
347                     _process_mappings($mappings, $field->data(), $record_document);
348                 }
349             }
350             else {
351                 my $subfields_mappings = $data_fields_rules->{$field->tag()};
352                 if ($subfields_mappings) {
353                     my $wildcard_mappings = $subfields_mappings->{'*'};
354                     foreach my $subfield ($field->subfields()) {
355                         my ($code, $data) = @{$subfield};
356                         my $mappings = $subfields_mappings->{$code} // [];
357                         if ($wildcard_mappings) {
358                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
359                         }
360                         if (@{$mappings}) {
361                             _process_mappings($mappings, $data, $record_document);
362                         }
363                     }
364                 }
365             }
366         }
367         foreach my $field (keys %{$rules->{defaults}}) {
368             unless (defined $record_document->{$field}) {
369                 $record_document->{$field} = $rules->{defaults}->{$field};
370             }
371         }
372         foreach my $field (@{$rules->{sum}}) {
373             if (defined $record_document->{$field}) {
374                 # TODO: validate numeric? filter?
375                 # TODO: Or should only accept fields without nested values?
376                 # TODO: Quick and dirty, improve if needed
377                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
378             }
379         }
380         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
381         $record->encoding('UTF-8');
382         if ($serialization_format eq 'base64ISO2709') {
383             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
384         }
385         else {
386             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
387         }
388         my $id = $record->subfield('999', 'c');
389         push @record_documents, [$id, $record_document];
390     }
391     return \@record_documents;
392 }
393
394 # Provides the rules for marc to Elasticsearch JSON document conversion.
395 sub get_marc_mapping_rules {
396     my ($self) = @_;
397
398     my $marcflavour = lc C4::Context->preference('marcflavour');
399     my @rules;
400
401     sub _field_mappings {
402         my ($facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
403         my %mapping_defaults = ();
404         my @mappings;
405
406         my $substr_args = undef;
407         if ($range) {
408             # TODO: use value_callback instead?
409             my ($start, $end) = map(int, split /-/, $range, 2);
410             $substr_args = [$start];
411             push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
412         }
413         my $default_options = {};
414         if ($substr_args) {
415             $default_options->{substr} = $substr_args;
416         }
417
418         # TODO: Should probably have per type value callback/hook
419         # but hard code for now
420         if ($target_type eq 'boolean') {
421             $default_options->{value_callbacks} //= [];
422             push @{$default_options->{value_callbacks}}, sub {
423                 my ($value) = @_;
424                 # Trim whitespace at both ends
425                 $value =~ s/^\s+|\s+$//g;
426                 return $value ? 'true' : 'false';
427             };
428         }
429
430         my $mapping = [$target_name, $default_options];
431         push @mappings, $mapping;
432
433         my @suffixes = ();
434         push @suffixes, 'facet' if $facet;
435         push @suffixes, 'suggestion' if $suggestible;
436         push @suffixes, 'sort' if !defined $sort || $sort;
437
438         foreach my $suffix (@suffixes) {
439             my $mapping = ["${target_name}__$suffix"];
440             # Hack, fix later in less hideous manner
441             if ($suffix eq 'suggestion') {
442                 push @{$mapping}, {%{$default_options}, property => 'input'};
443             }
444             else {
445                 push @{$mapping}, $default_options;
446             }
447             push @mappings, $mapping;
448         }
449         return @mappings;
450     };
451     my $field_spec_regexp = qr/^([0-9]{3})([0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
452     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
453     my $rules = {
454         'leader' => [],
455         'control_fields' => {},
456         'data_fields' => {},
457         'sum' => [],
458         'defaults' => {}
459     };
460
461     $self->_foreach_mapping(sub {
462         my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
463         return if $marc_type ne $marcflavour;
464
465         if ($type eq 'sum') {
466             push @{$rules->{sum}}, $name;
467         }
468         elsif($type eq 'boolean') {
469             # boolean gets special handling, if value doesn't exist for a field,
470             # it is set to false
471             $rules->{defaults}->{$name} = 'false';
472         }
473
474         if ($marc_field =~ $field_spec_regexp) {
475             my $field_tag = $1;
476             my $subfields = defined $2 ? $2 : '*';
477             my $range = defined $3 ? $3 : undef;
478             if ($field_tag < 10) {
479                 $rules->{control_fields}->{$field_tag} //= [];
480                 my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
481                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
482             }
483             else {
484                 $rules->{data_fields}->{$field_tag} //= {};
485                 foreach my $subfield (split //, $subfields) {
486                     $rules->{data_fields}->{$field_tag}->{$subfield} //= [];
487                     my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
488                     push @{$rules->{data_fields}->{$field_tag}->{$subfield}}, @mappings;
489                 }
490             }
491         }
492         elsif ($marc_field =~ $leader_regexp) {
493             my $range = defined $1 ? $1 : undef;
494             my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
495             push @{$rules->{leader}}, @mappings;
496         }
497         else {
498             die("Invalid marc field: $marc_field");
499         }
500     });
501     return $rules;
502 }
503
504 =head2 _foreach_mapping
505
506     $self->_foreach_mapping(
507         sub {
508             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
509                 $marc_field )
510               = @_;
511             return unless $marc_type eq 'marc21';
512             print "Data comes from: " . $marc_field . "\n";
513         }
514     );
515
516 This allows you to apply a function to each entry in the elasticsearch mappings
517 table, in order to build the mappings for whatever is needed.
518
519 In the provided function, the files are:
520
521 =over 4
522
523 =item C<$name>
524
525 The field name for elasticsearch (corresponds to the 'mapping' column in the
526 database.
527
528 =item C<$type>
529
530 The type for this value, e.g. 'string'.
531
532 =item C<$facet>
533
534 True if this value should be facetised. This only really makes sense if the
535 field is understood by the facet processing code anyway.
536
537 =item C<$sort>
538
539 True if this is a field that a) needs special sort handling, and b) if it
540 should be sorted on. False if a) but not b). Undef if not a). This allows,
541 for example, author to be sorted on but not everything marked with "author"
542 to be included in that sort.
543
544 =item C<$marc_type>
545
546 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
547 'unimarc', 'normarc'.
548
549 =item C<$marc_field>
550
551 A string that describes the MARC field that contains the data to extract.
552 These are of a form suited to Catmandu's MARC fixers.
553
554 =back
555
556 =cut
557
558 sub _foreach_mapping {
559     my ( $self, $sub ) = @_;
560
561     # TODO use a caching framework here
562     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
563         {
564             'search_marc_map.index_name' => $self->index,
565         },
566         {   join => { search_marc_to_fields => 'search_marc_map' },
567             '+select' => [
568                 'search_marc_to_fields.facet',
569                 'search_marc_to_fields.suggestible',
570                 'search_marc_to_fields.sort',
571                 'search_marc_map.marc_type',
572                 'search_marc_map.marc_field',
573             ],
574             '+as'     => [
575                 'facet',
576                 'suggestible',
577                 'sort',
578                 'marc_type',
579                 'marc_field',
580             ],
581         }
582     );
583
584     while ( my $search_field = $search_fields->next ) {
585         $sub->(
586             $search_field->name,
587             $search_field->type,
588             $search_field->get_column('facet'),
589             $search_field->get_column('suggestible'),
590             $search_field->get_column('sort'),
591             $search_field->get_column('marc_type'),
592             $search_field->get_column('marc_field'),
593         );
594     }
595 }
596
597 =head2 process_error
598
599     die process_error($@);
600
601 This parses an Elasticsearch error message and produces a human-readable
602 result from it. This result is probably missing all the useful information
603 that you might want in diagnosing an issue, so the warning is also logged.
604
605 Note that currently the resulting message is not internationalised. This
606 will happen eventually by some method or other.
607
608 =cut
609
610 sub process_error {
611     my ($self, $msg) = @_;
612
613     warn $msg; # simple logging
614
615     # This is super-primitive
616     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
617
618     return "Unable to perform your search. Please try again.\n";
619 }
620
621 =head2 _read_configuration
622
623     my $conf = _read_configuration();
624
625 Reads the I<configuration file> and returns a hash structure with the
626 configuration information. It raises an exception if mandatory entries
627 are missing.
628
629 The hashref structure has the following form:
630
631     {
632         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
633         'index_name' => 'koha_instance',
634     }
635
636 This is configured by the following in the C<config> block in koha-conf.xml:
637
638     <elasticsearch>
639         <server>127.0.0.1:9200</server>
640         <server>anotherserver:9200</server>
641         <index_name>koha_instance</index_name>
642     </elasticsearch>
643
644 =cut
645
646 sub _read_configuration {
647
648     my $configuration;
649
650     my $conf = C4::Context->config('elasticsearch');
651     Koha::Exceptions::Config::MissingEntry->throw(
652         "Missing 'elasticsearch' block in config file")
653       unless defined $conf;
654
655     if ( $conf && $conf->{server} ) {
656         my $nodes = $conf->{server};
657         if ( ref($nodes) eq 'ARRAY' ) {
658             $configuration->{nodes} = $nodes;
659         }
660         else {
661             $configuration->{nodes} = [$nodes];
662         }
663     }
664     else {
665         Koha::Exceptions::Config::MissingEntry->throw(
666             "Missing 'server' entry in config file for elasticsearch");
667     }
668
669     if ( defined $conf->{index_name} ) {
670         $configuration->{index_name} = $conf->{index_name};
671     }
672     else {
673         Koha::Exceptions::Config::MissingEntry->throw(
674             "Missing 'index_name' entry in config file for elasticsearch");
675     }
676
677     return $configuration;
678 }
679
680 1;
681
682 __END__
683
684 =head1 AUTHOR
685
686 =over 4
687
688 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
689
690 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
691
692 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
693
694 =back
695
696 =cut