MT 2116: Addons to the CSV export
[koha.git] / misc / bin / zebraqueue_daemon.pl
1 #!/usr/bin/perl
2
3 # daemon to watch the zebraqueue and update zebra as needed
4
5 use strict;
6 BEGIN {
7     # find Koha's Perl modules
8     # test carefully before changing this
9     use FindBin;
10     eval { require "$FindBin::Bin/kohalib.pl" };
11 }
12
13 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
14 use Unix::Syslog qw(:macros);
15
16 use C4::Context;
17 use C4::Biblio;
18 use C4::Search;
19 use C4::AuthoritiesMarc;
20 use XML::Simple;
21 use POSIX;
22 use utf8;
23
24
25 # wait periods governing connection attempts
26 my $min_connection_wait =    1; # start off at 1 second
27 my $max_connection_wait = 1024; # max about 17 minutes
28
29 # keep separate wait period for bib and authority Zebra databases
30 my %zoom_connection_waits = (); 
31
32 my $db_connection_wait = $min_connection_wait;
33
34 # ZOOM and Z39.50 errors that are potentially
35 # resolvable by connecting again and retrying
36 # the operation
37 my %retriable_zoom_errors = (
38     10000 => 'ZOOM_ERROR_CONNECT',
39     10001 => 'ZOOM_ERROR_MEMORY',
40     10002 => 'ZOOM_ERROR_ENCODE',
41     10003 => 'ZOOM_ERROR_DECODE',
42     10004 => 'ZOOM_ERROR_CONNECTION_LOST',
43     10005 => 'ZOOM_ERROR_INIT',
44     10006 => 'ZOOM_ERROR_INTERNAL',
45     10007 => 'ZOOM_ERROR_TIMEOUT',
46 );
47
48 # structure to store updates that have
49 # failed and are to be retrieved.  The
50 # structure is a hashref of hashrefs, 
51 # e.g.,
52 #
53 # $postoned_updates->{$server}->{$record_number} = 1;
54 #
55 # If an operation is attempted and fails because
56 # of a retriable error (see above), the daemon
57 # will try several times to recover as follows:
58 #
59 # 1. close and reopen the connection to the
60 #    Zebra server, unless the error was a timeout,
61 #    in which case
62 # 2. retry the operation
63 #
64 # If, after trying this five times, the operation still
65 # fails, the daemon will mark the record number as
66 # postponed, and try to process other entries in 
67 # zebraqueue.  When an update is postponed, the 
68 # error will be reported to syslog. 
69 #
70 # If more than 100 postponed updates are 
71 # accumulated, the daemon will assume that 
72 # something is seriously wrong, complain loudly,
73 # and abort.  If running under the daemon(1) command, 
74 # this means that the daemon will respawn.
75 #
76 my $num_postponed_updates = 0;
77 my $postponed_updates = {};
78
79 my $max_operation_attempts =   5;
80 my $max_postponed_updates  = 100;
81
82 # Zebra connection timeout
83 my $zconn_timeout            =  30;
84 my $zconn_timeout_multiplier = 1.5;
85 my $max_zconn_timeout        = 120;
86
87 my $ident = "Koha Zebraqueue ";
88
89 my $debug = 0;
90 Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
91
92 Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
93
94 sub handler_start {
95
96     # Starts session. Only ever called once only really used to set an alias
97     # for the POE kernel
98     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
99
100     my $time = localtime(time);
101     Unix::Syslog::syslog LOG_INFO, "$time POE Session ", $session->ID, " has started.\n";
102
103     # check status
104 #    $kernel->yield('status_check');
105     $kernel->yield('sleep');
106 }
107
108 sub handler_sleep {
109
110     # can be used to slow down loop execution if needed
111     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
112     use Time::HiRes qw (sleep);
113     Time::HiRes::sleep(0.5);
114     #sleep 1;
115     $kernel->yield('status_check');
116 }
117
118 sub handler_check {
119     # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
120     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
121     my $dbh = get_db_connection();
122     my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
123     $sth->execute;
124     my $data = $sth->fetchrow_hashref();
125     if ($data->{'opcount'} > 0) {
126         Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n";
127         $sth->finish();
128         $dbh->commit(); # needed so that we get current state of zebraqueue next time
129                         # we enter handler_check
130         $kernel->yield('do_ops');
131     }
132     else {
133         $sth->finish();
134         $dbh->commit(); # needed so that we get current state of zebraqueue next time
135                         # we enter handler_check
136         $kernel->yield('sleep');
137     }
138 }
139
140 sub zebraop {
141     # execute operations waiting in the zebraqueue
142     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
143     my $dbh = get_db_connection();
144     my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0 ORDER BY id DESC");
145     $readsth->execute();
146     Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
147
148     my $completed_updates = {};
149     ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
150         warn "Inside while loop" if $debug;
151
152         my $id = $data->{'id'};
153         my $op = $data->{'operation'};
154         $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
155                                                   # either delete_record or recordDelete
156         my $record_number = $data->{'biblio_auth_number'};
157         my $server = $data->{'server'};
158
159         next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
160         next ZEBRAQUEUE if exists $completed_updates->{$server}->{$record_number}->{$op};
161
162         my $ok = 0;
163         my $record;
164         if ($op eq 'recordDelete') {
165             $ok = process_delete($dbh, $server, $record_number);
166         }
167         else {
168             $ok = process_update($dbh, $server, $record_number, $id);
169         }
170         if ($ok == 1) {
171             mark_done($dbh, $record_number, $op, $server);
172             $completed_updates->{$server}->{$record_number}->{$op} = 1;
173             if ($op eq 'recordDelete') {
174                 $completed_updates->{$server}->{$record_number}->{'specialUpdate'} = 1;
175             }
176         }                            
177     }
178     $readsth->finish();
179     $dbh->commit();
180     $kernel->yield('sleep');
181 }
182
183 sub process_delete {
184     my $dbh = shift;
185     my $server = shift;
186     my $record_number = shift;
187
188     my $record;
189     my $ok = 0;
190     eval {
191         warn "Searching for record to delete" if $debug;
192         # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
193         my $Zconn =  get_zebra_connection($server);
194         my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
195         $results->option(elementSetName => 'marcxml');
196         $record = $results->record(0)->raw();
197     };
198     if ($@) {
199         # this doesn't exist, so no need to wail on zebra to delete it
200         if ($@->code() eq 13) {
201             $ok = 1;
202         } else {
203             # caught a ZOOM::Exception
204             my $message = _format_zoom_error_message($@);
205             postpone_update($server, $record_number, $message);
206         }
207     } else {
208         # then, delete the record
209         warn "Deleting record" if $debug;
210         $ok = zebrado($record, 'recordDelete', $server, $record_number);
211     }
212     return $ok;
213 }
214
215 sub process_update {
216     my $dbh = shift;
217     my $server = shift;
218     my $record_number = shift;
219     my $id = shift;
220
221     my $record;
222     my $ok = 0;
223
224     warn "Updating record" if $debug;
225     # get the XML
226     my $marcxml;
227     if ($server eq "biblioserver") {
228         my $marc = GetMarcBiblio($record_number);
229         $marcxml = $marc->as_xml_record() if $marc;
230     } 
231     elsif ($server eq "authorityserver") {
232         $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number);
233     }
234     # check it's XML, just in case
235     eval {
236         my $hashed = XMLin($marcxml);
237     }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
238     ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
239     if ($@) {
240         Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
241         mark_done_by_id($dbh, $id, $server);
242         $ok = 0;
243     } else {
244         # ok, we have everything, do the operation in zebra !
245         $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number);
246     }
247     return $ok;
248 }
249
250 sub mark_done_by_id {
251     my $dbh = shift;
252     my $id = shift;
253     my $server = shift;
254     my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ? AND server = ? AND done = 0");
255     $delsth->execute($id, $server);
256 }
257
258 sub mark_done {
259     my $dbh = shift;
260     my $record_number = shift;
261     my $op = shift;
262     my $server = shift;
263
264     my $delsth;
265     if ($op eq 'recordDelete') {
266         # if it's a deletion, we can delete every request on this biblio : in case the user
267         # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
268         # that are pending and can't succeed, as we don't have the XML anymore
269         # so, delete everything for this biblionumber
270         $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done = 1 
271                                         WHERE biblio_auth_number = ? 
272                                         AND server = ?
273                                         AND done = 0");
274         $delsth->execute($record_number, $server);
275     } else {
276         # if it's not a deletion, delete every pending specialUpdate for this biblionumber
277         # in case the user add biblio, then X items, before this script runs
278         # this avoid indexing X+1 times where just 1 is enough.
279         $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 
280                                  WHERE biblio_auth_number = ? 
281                                  AND operation = 'specialUpdate'
282                                  AND server = ?
283                                  AND done = 0");
284         $delsth->execute($record_number, $server);
285     }
286 }
287
288 sub zebrado {
289     ###Accepts a $server variable thus we can use it to update  biblios, authorities or other zebra dbs
290     my ($record, $op, $server, $record_number) = @_;
291
292     unless ($record) {
293         my $message = "error updating index for $server $record $record_number: no source record";
294         postpone_update($server, $record_number, $message);
295         return 0;
296     }
297
298     my $attempts = 0;
299     my $ok = 0;
300     ATTEMPT: while ($attempts < $max_operation_attempts) {
301         $attempts++;
302         warn "Attempt $attempts for $op for $server $record_number" if $debug;
303         my $Zconn = get_zebra_connection($server);
304
305         my $Zpackage = $Zconn->package();
306         $Zpackage->option(action => $op);
307         $Zpackage->option(record => $record);
308
309         eval { $Zpackage->send("update") };
310         if ($@ && $@->isa("ZOOM::Exception")) {
311             my $message = _format_zoom_error_message($@);
312             my $error = $@->code();
313             if (exists $retriable_zoom_errors{$error}) {
314                 warn "reattempting operation $op for $server $record_number" if $debug;
315                 warn "last Zebra error was $message" if $debug;
316                 $Zpackage->destroy();
317
318                 if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
319                     # bump up connection timeout
320                     $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier);
321                     $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
322                     Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n";
323                     warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
324                 }
325                 next ATTEMPT;
326             } else {
327                 postpone_update($server, $record_number, $message);
328             }
329         }
330         # FIXME - would be more efficient to send a ES commit
331         # after a batch of records, rather than commiting after
332         # each one - Zebra handles updates relatively slowly.
333         eval { $Zpackage->send('commit'); };
334         if ($@) {
335             # operation succeeded, but commit
336             # did not - we have a problem
337             my $message = _format_zoom_error_message($@);
338             postpone_update($server, $record_number, $message);
339         } else {
340             $ok = 1;
341             last ATTEMPT;
342         }
343     }
344
345     unless ($ok) {
346         my $message = "Made $attempts attempts to index $server record $record_number without success";
347         postpone_update($server, $record_number, $message);
348     }
349
350     return $ok; 
351 }
352
353 sub postpone_update {
354     my ($server, $record_number, $message) = @_;
355     warn $message if $debug;
356     $message .= "\n" unless $message =~ /\n$/;
357     Unix::Syslog::syslog LOG_ERR, $message;
358     $postponed_updates->{$server}->{$record_number} = 1;
359
360     $num_postponed_updates++;
361     if ($num_postponed_updates > $max_postponed_updates) {
362         warn "exiting, over $max_postponed_updates postponed indexing updates";
363         Unix::Syslog::syslog LOG_ERR, "exiting, over $max_postponed_updates postponed indexing updates";
364         Unix::Syslog::closelog;
365         exit;
366     }
367 }
368
369 sub handler_stop {
370     my $heap = $_[HEAP];
371     my $time = localtime(time);
372     Unix::Syslog::syslog LOG_INFO, "$time Session ", $_[SESSION]->ID, " has stopped.\n";
373     delete $heap->{session};
374 }
375
376 # get a DB connection
377 sub get_db_connection {
378     my $dbh;
379
380     $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
381     while (1) {
382         eval {
383             # note that C4::Context caches the
384             # DB handle; C4::Context->dbh() will
385             # check that handle first before returning
386             # it.  If the connection is bad, it
387             # then tries (once) to create a new one.
388             $dbh = C4::Context->dbh();
389         };
390
391         unless ($@) {
392             # C4::Context->dbh dies if it cannot
393             # establish a connection
394             $db_connection_wait = $min_connection_wait;
395             $dbh->{AutoCommit} = 0; # do this to reduce number of
396                                     # commits to zebraqueue
397             return $dbh;
398         }
399
400         # connection failed
401         my $error = "failed to connect to DB: $DBI::errstr";
402         warn $error if $debug;
403         Unix::Syslog::syslog LOG_ERR, $error;
404         sleep $db_connection_wait;
405         $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
406     }
407 }
408
409 # get a Zebra connection
410 sub get_zebra_connection {
411     my $server = shift;
412
413     # start connection retry wait queue if necessary
414     $zoom_connection_waits{$server} = $min_connection_wait unless exists  $zoom_connection_waits{$server};
415
416     # try to connect to Zebra forever until we succeed
417     while (1) {
418         # what follows assumes that C4::Context->Zconn 
419         # makes only one attempt to create a new connection;
420         my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
421         $Zconn->option('timeout' => $zconn_timeout);
422
423         # it is important to note that if the existing connection
424         # stored by C4::Context has an error (any type of error)
425         # from the last transaction, C4::Context->Zconn closes
426         # it and establishes a new one.  Therefore, the
427         # following check will succeed if we have a new, good 
428         # connection or we're using a previously established
429         # connection that has experienced no errors.
430         if ($Zconn->errcode() == 0) {
431             $zoom_connection_waits{$server} = $min_connection_wait;
432             return $Zconn;
433         }
434
435         # connection failed
436         my $error = _format_zoom_error_message($Zconn);
437         warn $error if $debug;
438         Unix::Syslog::syslog LOG_ERR, $error;
439         sleep $zoom_connection_waits{$server};
440         $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
441     }
442 }
443
444 # given a ZOOM::Exception or
445 # ZOOM::Connection object, generate
446 # a human-reaable error message
447 sub _format_zoom_error_message {
448     my $err = shift;
449
450     my $message = "";
451     if (ref($err) eq 'ZOOM::Connection') {
452         $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
453     } elsif (ref($err) eq 'ZOOM::Exception') {
454         $message = $err->message() . " (" . $err->diagset . " " .  $err->code() . ") " . $err->addinfo();
455     }
456     return $message; 
457 }
458
459 POE::Session->create(
460     inline_states => {
461         _start       => \&handler_start,
462         sleep        => \&handler_sleep,
463         status_check => \&handler_check,
464         do_ops       => \&zebraop,
465         _stop        => \&handler_stop,
466     },
467 );
468
469 # start the kernel
470 $poe_kernel->run();
471
472 Unix::Syslog::closelog;
473
474 exit;