Bug 25265: [20.05.x] Prevent double reindex of the same item in batchmod
[koha.git] / Koha / SearchEngine / Elasticsearch / Indexer.pm
1 package Koha::SearchEngine::Elasticsearch::Indexer;
2
3 # Copyright 2013 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use Carp;
21 use Modern::Perl;
22 use Try::Tiny;
23 use List::Util qw(any);
24 use base qw(Koha::SearchEngine::Elasticsearch);
25 use Data::Dumper;
26
27 use Koha::Exceptions;
28 use Koha::SearchEngine::Zebra::Indexer;
29 use C4::Biblio;
30 use C4::Context;
31
32 =head1 NAME
33
34 Koha::SearchEngine::Elasticsearch::Indexer - handles adding new records to the index
35
36 =head1 SYNOPSIS
37
38     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new(
39         { index => Koha::SearchEngine::BIBLIOS_INDEX } );
40     $indexer->drop_index();
41     $indexer->update_index(\@biblionumbers, \@records);
42
43
44 =head1 CONSTANTS
45
46 =over 4
47
48 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_OK>
49
50 Represents an index state where index is created and in a working state.
51
52 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_REINDEX_REQUIRED>
53
54 Not currently used, but could be useful later, for example if can detect when new field or mapping added.
55
56 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_RECREATE_REQUIRED>
57
58 Representings an index state where index needs to be recreated and is not in a working state.
59
60 =back
61
62 =cut
63
64 use constant {
65     INDEX_STATUS_OK => 0,
66     INDEX_STATUS_REINDEX_REQUIRED => 1,
67     INDEX_STATUS_RECREATE_REQUIRED => 2,
68 };
69
70 =head1 FUNCTIONS
71
72 =head2 update_index($biblionums, $records)
73
74     try {
75         $self->update_index($biblionums, $records);
76     } catch {
77         die("Something went wrong trying to update index:" .  $_[0]);
78     }
79
80 Converts C<MARC::Records> C<$records> to Elasticsearch documents and performs
81 an update request for these records on the Elasticsearch index.
82
83 =over 4
84
85 =item C<$biblionums>
86
87 Arrayref of biblio numbers for the C<$records>, the order must be the same as
88 and match up with C<$records>.
89
90 =item C<$records>
91
92 Arrayref of C<MARC::Record>s.
93
94 =back
95
96 =cut
97
98 sub update_index {
99     my ($self, $biblionums, $records) = @_;
100
101     my $documents = $self->marc_records_to_documents($records);
102     my @body;
103
104     for (my $i = 0; $i < scalar @$biblionums; $i++) {
105         my $id = $biblionums->[$i];
106         my $document = $documents->[$i];
107         push @body, {
108             index => {
109                 _id => "$id"
110             }
111         };
112         push @body, $document;
113     }
114     my $response;
115     if (@body) {
116         my $elasticsearch = $self->get_elasticsearch();
117         $response = $elasticsearch->bulk(
118             index => $self->index_name,
119             type => 'data', # is just hard coded in Indexer.pm?
120             body => \@body
121         );
122     }
123     return $response;
124 }
125
126 =head2 set_index_status_ok
127
128 Convenience method for setting index status to C<INDEX_STATUS_OK>.
129
130 =cut
131
132 sub set_index_status_ok {
133     my ($self) = @_;
134     $self->index_status(INDEX_STATUS_OK);
135 }
136
137 =head2 is_index_status_ok
138
139 Convenience method for checking if index status is C<INDEX_STATUS_OK>.
140
141 =cut
142
143 sub is_index_status_ok {
144     my ($self) = @_;
145     return $self->index_status == INDEX_STATUS_OK;
146 }
147
148 =head2 set_index_status_reindex_required
149
150 Convenience method for setting index status to C<INDEX_REINDEX_REQUIRED>.
151
152 =cut
153
154 sub set_index_status_reindex_required {
155     my ($self) = @_;
156     $self->index_status(INDEX_STATUS_REINDEX_REQUIRED);
157 }
158
159 =head2 is_index_status_reindex_required
160
161 Convenience method for checking if index status is C<INDEX_STATUS_REINDEX_REQUIRED>.
162
163 =cut
164
165 sub is_index_status_reindex_required {
166     my ($self) = @_;
167     return $self->index_status == INDEX_STATUS_REINDEX_REQUIRED;
168 }
169
170 =head2 set_index_status_recreate_required
171
172 Convenience method for setting index status to C<INDEX_STATUS_RECREATE_REQUIRED>.
173
174 =cut
175
176 sub set_index_status_recreate_required {
177     my ($self) = @_;
178     $self->index_status(INDEX_STATUS_RECREATE_REQUIRED);
179 }
180
181 =head2 is_index_status_recreate_required
182
183 Convenience method for checking if index status is C<INDEX_STATUS_RECREATE_REQUIRED>.
184
185 =cut
186
187 sub is_index_status_recreate_required {
188     my ($self) = @_;
189     return $self->index_status == INDEX_STATUS_RECREATE_REQUIRED;
190 }
191
192 =head2 index_status($status)
193
194 Will either set the current index status to C<$status> and return C<$status>,
195 or return the current index status if called with no arguments.
196
197 =over 4
198
199 =item C<$status>
200
201 Optional argument. If passed will set current index status to C<$status> if C<$status> is
202 a valid status. See L</CONSTANTS>.
203
204 =back
205
206 =cut
207
208 sub index_status {
209     my ($self, $status) = @_;
210     my $key = 'ElasticsearchIndexStatus_' . $self->index;
211
212     if (defined $status) {
213         unless (any { $status == $_ } (
214                 INDEX_STATUS_OK,
215                 INDEX_STATUS_REINDEX_REQUIRED,
216                 INDEX_STATUS_RECREATE_REQUIRED,
217             )
218         ) {
219             Koha::Exceptions::Exception->throw("Invalid index status: $status");
220         }
221         C4::Context->set_preference($key, $status);
222         return $status;
223     }
224     else {
225         return C4::Context->preference($key);
226     }
227 }
228
229 =head2 update_mappings
230
231 Generate Elasticsearch mappings from mappings stored in database and
232 perform a request to update Elasticsearch index mappings. Will throw an
233 error and set index status to C<INDEX_STATUS_RECREATE_REQUIRED> if update
234 failes.
235
236 =cut
237
238 sub update_mappings {
239     my ($self) = @_;
240     my $elasticsearch = $self->get_elasticsearch();
241     my $mappings = $self->get_elasticsearch_mappings();
242
243     foreach my $type (keys %{$mappings}) {
244         try {
245             my $response = $elasticsearch->indices->put_mapping(
246                 index => $self->index_name,
247                 type => $type,
248                 body => {
249                     $type => $mappings->{$type}
250                 }
251             );
252         } catch {
253             $self->set_index_status_recreate_required();
254             my $reason = $_[0]->{vars}->{body}->{error}->{reason};
255             my $index_name = $self->index_name;
256             Koha::Exceptions::Exception->throw(
257                 error => "Unable to update mappings for index \"$index_name\". Reason was: \"$reason\". Index needs to be recreated and reindexed",
258             );
259         };
260     }
261     $self->set_index_status_ok();
262 }
263
264 =head2 update_index_background($biblionums, $records)
265
266 This has exactly the same API as C<update_index> however it'll
267 return immediately. It'll start a background process that does the adding.
268
269 If it fails to add to Elasticsearch then it'll add to a queue that will cause
270 it to be updated by a regular index cron job in the future.
271
272 =cut
273
274 # TODO implement in the future - I don't know the best way of doing this yet.
275 # If fork: make sure process group is changed so apache doesn't wait for us.
276
277 sub update_index_background {
278     my $self = shift;
279     $self->update_index(@_);
280 }
281
282 =head2 index_records
283
284 This function takes an array of record numbers and fetches the records to send to update_index
285 for actual indexing.
286
287 If $records parameter is provided the records will be used as-is, this is only utilized for authorities
288 at the moment.
289
290 The other variables are used for parity with Zebra indexing calls. Currently the calls are passed through
291 to Zebra as well.
292
293 =cut
294
295 sub index_records {
296     my ( $self, $record_numbers, $op, $server, $records ) = @_;
297     $record_numbers = [$record_numbers] if ref $record_numbers ne 'ARRAY' && defined $record_numbers;
298     $records = [$records] if ref $records ne 'ARRAY' && defined $records;
299     if ( $op eq 'specialUpdate' ) {
300         my $index_record_numbers;
301         unless ($records) {
302             foreach my $record_number ( @$record_numbers ){
303                 my $record = _get_record( $record_number, $server );
304                 if( $record ){
305                     push @$records, $record;
306                     push @$index_record_numbers, $record_number;
307                 }
308             }
309         }
310         $self->update_index_background( $index_record_numbers, $records ) if $index_record_numbers && $records;
311     }
312     elsif ( $op eq 'recordDelete' ) {
313         $self->delete_index_background( $record_numbers );
314     }
315     #FIXME Current behaviour is to index Zebra when using ES, at some point we should stop
316     Koha::SearchEngine::Zebra::Indexer::index_records( $self, $record_numbers, $op, $server, undef );
317 }
318
319 sub _get_record {
320     my ( $id, $server ) = @_;
321     return $server eq 'biblioserver'
322         ? C4::Biblio::GetMarcBiblio({ biblionumber => $id, embed_items  => 1 })
323         : C4::AuthoritiesMarc::GetAuthority($id);
324 }
325
326 =head2 delete_index($biblionums)
327
328 C<$biblionums> is an arrayref of biblionumbers to delete from the index.
329
330 =cut
331
332 sub delete_index {
333     my ($self, $biblionums) = @_;
334
335     my $elasticsearch = $self->get_elasticsearch();
336     my @body = map { { delete => { _id => "$_" } } } @{$biblionums};
337     my $result = $elasticsearch->bulk(
338         index => $self->index_name,
339         type => 'data',
340         body => \@body,
341     );
342     if ($result->{errors}) {
343         croak "An Elasticsearch error occurred during bulk delete";
344     }
345 }
346
347 =head2 delete_index_background($biblionums)
348
349 Identical to L</delete_index($biblionums)>
350
351 =cut
352
353 # TODO: Should be made async
354 sub delete_index_background {
355     my $self = shift;
356     $self->delete_index(@_);
357 }
358
359 =head2 drop_index
360
361 Drops the index from the Elasticsearch server.
362
363 =cut
364
365 sub drop_index {
366     my ($self) = @_;
367     if ($self->index_exists) {
368         my $elasticsearch = $self->get_elasticsearch();
369         $elasticsearch->indices->delete(index => $self->index_name);
370         $self->set_index_status_recreate_required();
371     }
372 }
373
374 =head2 create_index
375
376 Creates the index (including mappings) on the Elasticsearch server.
377
378 =cut
379
380 sub create_index {
381     my ($self) = @_;
382     my $settings = $self->get_elasticsearch_settings();
383     my $elasticsearch = $self->get_elasticsearch();
384     $elasticsearch->indices->create(
385         index => $self->index_name,
386         body => {
387             settings => $settings
388         }
389     );
390     $self->update_mappings();
391 }
392
393 =head2 index_exists
394
395 Checks if index has been created on the Elasticsearch server. Returns C<1> or the
396 empty string to indicate whether index exists or not.
397
398 =cut
399
400 sub index_exists {
401     my ($self) = @_;
402     my $elasticsearch = $self->get_elasticsearch();
403     return $elasticsearch->indices->exists(
404         index => $self->index_name,
405     );
406 }
407
408 1;
409
410 __END__
411
412 =head1 AUTHOR
413
414 =over 4
415
416 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
417
418 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
419
420 =back