Merge branch 'master' of git://git.koha-community.org/koha into MSWin32
[wip/koha-chris_n.git] / misc / bin / zebraqueue_daemon.pl
1 #!/usr/bin/perl
2
3 # daemon to watch the zebraqueue and update zebra as needed
4
5 use strict;
6 #use warnings; FIXME - Bug 2505
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/kohalib.pl" };
12 }
13
14 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
15 use Unix::Syslog qw(:macros);
16
17 use C4::Context;
18 use C4::Biblio;
19 use C4::Search;
20 use C4::AuthoritiesMarc;
21 use XML::Simple;
22 use POSIX;
23 use utf8;
24
25
26 # wait periods governing connection attempts
27 my $min_connection_wait =    1; # start off at 1 second
28 my $max_connection_wait = 1024; # max about 17 minutes
29
30 # keep separate wait period for bib and authority Zebra databases
31 my %zoom_connection_waits = (); 
32
33 my $db_connection_wait = $min_connection_wait;
34
35 # ZOOM and Z39.50 errors that are potentially
36 # resolvable by connecting again and retrying
37 # the operation
38 my %retriable_zoom_errors = (
39     10000 => 'ZOOM_ERROR_CONNECT',
40     10001 => 'ZOOM_ERROR_MEMORY',
41     10002 => 'ZOOM_ERROR_ENCODE',
42     10003 => 'ZOOM_ERROR_DECODE',
43     10004 => 'ZOOM_ERROR_CONNECTION_LOST',
44     10005 => 'ZOOM_ERROR_INIT',
45     10006 => 'ZOOM_ERROR_INTERNAL',
46     10007 => 'ZOOM_ERROR_TIMEOUT',
47 );
48
49 # structure to store updates that have
50 # failed and are to be retrieved.  The
51 # structure is a hashref of hashrefs, 
52 # e.g.,
53 #
54 # $postoned_updates->{$server}->{$record_number} = 1;
55 #
56 # If an operation is attempted and fails because
57 # of a retriable error (see above), the daemon
58 # will try several times to recover as follows:
59 #
60 # 1. close and reopen the connection to the
61 #    Zebra server, unless the error was a timeout,
62 #    in which case
63 # 2. retry the operation
64 #
65 # If, after trying this five times, the operation still
66 # fails, the daemon will mark the record number as
67 # postponed, and try to process other entries in 
68 # zebraqueue.  When an update is postponed, the 
69 # error will be reported to syslog. 
70 #
71 # If more than 100 postponed updates are 
72 # accumulated, the daemon will assume that 
73 # something is seriously wrong, complain loudly,
74 # and abort.  If running under the daemon(1) command, 
75 # this means that the daemon will respawn.
76 #
77 my $num_postponed_updates = 0;
78 my $postponed_updates = {};
79
80 my $max_operation_attempts =   5;
81 my $max_postponed_updates  = 100;
82
83 # Zebra connection timeout
84 my $zconn_timeout            =  30;
85 my $zconn_timeout_multiplier = 1.5;
86 my $max_zconn_timeout        = 120;
87
88 my $ident = "Koha Zebraqueue ";
89
90 my $debug = 0;
91 Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
92
93 Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
94
95 sub handler_start {
96
97     # Starts session. Only ever called once only really used to set an alias
98     # for the POE kernel
99     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
100
101     my $time = localtime(time);
102     Unix::Syslog::syslog LOG_INFO, "$time POE Session ", $session->ID, " has started.\n";
103
104     # check status
105 #    $kernel->yield('status_check');
106     $kernel->yield('sleep');
107 }
108
109 sub handler_sleep {
110
111     # can be used to slow down loop execution if needed
112     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
113     use Time::HiRes qw (sleep);
114     Time::HiRes::sleep(0.5);
115     #sleep 1;
116     $kernel->yield('status_check');
117 }
118
119 sub handler_check {
120     # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
121     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
122     my $dbh = get_db_connection();
123     my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
124     $sth->execute;
125     my $data = $sth->fetchrow_hashref();
126     if ($data->{'opcount'} > 0) {
127         Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n";
128         $sth->finish();
129         $dbh->commit(); # needed so that we get current state of zebraqueue next time
130                         # we enter handler_check
131         $kernel->yield('do_ops');
132     }
133     else {
134         $sth->finish();
135         $dbh->commit(); # needed so that we get current state of zebraqueue next time
136                         # we enter handler_check
137         $kernel->yield('sleep');
138     }
139 }
140
141 sub zebraop {
142     # execute operations waiting in the zebraqueue
143     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
144     my $dbh = get_db_connection();
145     my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0 ORDER BY id DESC");
146     $readsth->execute();
147     Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
148
149     my $completed_updates = {};
150     ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
151         warn "Inside while loop" if $debug;
152
153         my $id = $data->{'id'};
154         my $op = $data->{'operation'};
155         $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
156                                                   # either delete_record or recordDelete
157         my $record_number = $data->{'biblio_auth_number'};
158         my $server = $data->{'server'};
159
160         next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
161         next ZEBRAQUEUE if exists $completed_updates->{$server}->{$record_number}->{$op};
162
163         my $ok = 0;
164         my $record;
165         if ($op eq 'recordDelete') {
166             $ok = process_delete($dbh, $server, $record_number);
167         }
168         else {
169             $ok = process_update($dbh, $server, $record_number, $id);
170         }
171         if ($ok == 1) {
172             mark_done($dbh, $record_number, $op, $server);
173             $completed_updates->{$server}->{$record_number}->{$op} = 1;
174             if ($op eq 'recordDelete') {
175                 $completed_updates->{$server}->{$record_number}->{'specialUpdate'} = 1;
176             }
177         }                            
178     }
179     $readsth->finish();
180     $dbh->commit();
181     $kernel->yield('sleep');
182 }
183
184 sub process_delete {
185     my $dbh = shift;
186     my $server = shift;
187     my $record_number = shift;
188
189     my $record;
190     my $ok = 0;
191     eval {
192         warn "Searching for record to delete" if $debug;
193         # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
194         my $Zconn =  get_zebra_connection($server);
195         my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
196         $results->option(elementSetName => 'marcxml');
197         $record = $results->record(0)->raw();
198     };
199     if ($@) {
200         # this doesn't exist, so no need to wail on zebra to delete it
201         if ($@->code() eq 13) {
202             $ok = 1;
203         } else {
204             # caught a ZOOM::Exception
205             my $message = _format_zoom_error_message($@);
206             postpone_update($server, $record_number, $message);
207         }
208     } else {
209         # then, delete the record
210         warn "Deleting record" if $debug;
211         $ok = zebrado($record, 'recordDelete', $server, $record_number);
212     }
213     return $ok;
214 }
215
216 sub process_update {
217     my $dbh = shift;
218     my $server = shift;
219     my $record_number = shift;
220     my $id = shift;
221
222     my $record;
223     my $ok = 0;
224
225     warn "Updating record" if $debug;
226     # get the XML
227     my $marcxml;
228     if ($server eq "biblioserver") {
229         my $marc = GetMarcBiblio($record_number);
230         $marcxml = $marc->as_xml_record() if $marc;
231     } 
232     elsif ($server eq "authorityserver") {
233         $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number);
234     }
235     # check it's XML, just in case
236     eval {
237         my $hashed = XMLin($marcxml);
238     }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
239     ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
240     if ($@) {
241         Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
242         mark_done_by_id($dbh, $id, $server);
243         $ok = 0;
244     } else {
245         # ok, we have everything, do the operation in zebra !
246         $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number);
247     }
248     return $ok;
249 }
250
251 sub mark_done_by_id {
252     my $dbh = shift;
253     my $id = shift;
254     my $server = shift;
255     my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ? AND server = ? AND done = 0");
256     $delsth->execute($id, $server);
257 }
258
259 sub mark_done {
260     my $dbh = shift;
261     my $record_number = shift;
262     my $op = shift;
263     my $server = shift;
264
265     my $delsth;
266     if ($op eq 'recordDelete') {
267         # if it's a deletion, we can delete every request on this biblio : in case the user
268         # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
269         # that are pending and can't succeed, as we don't have the XML anymore
270         # so, delete everything for this biblionumber
271         $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done = 1 
272                                         WHERE biblio_auth_number = ? 
273                                         AND server = ?
274                                         AND done = 0");
275         $delsth->execute($record_number, $server);
276     } else {
277         # if it's not a deletion, delete every pending specialUpdate for this biblionumber
278         # in case the user add biblio, then X items, before this script runs
279         # this avoid indexing X+1 times where just 1 is enough.
280         $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 
281                                  WHERE biblio_auth_number = ? 
282                                  AND operation = 'specialUpdate'
283                                  AND server = ?
284                                  AND done = 0");
285         $delsth->execute($record_number, $server);
286     }
287 }
288
289 sub zebrado {
290     ###Accepts a $server variable thus we can use it to update  biblios, authorities or other zebra dbs
291     my ($record, $op, $server, $record_number) = @_;
292
293     unless ($record) {
294         my $message = "error updating index for $server $record $record_number: no source record";
295         postpone_update($server, $record_number, $message);
296         return 0;
297     }
298
299     my $attempts = 0;
300     my $ok = 0;
301     ATTEMPT: while ($attempts < $max_operation_attempts) {
302         $attempts++;
303         warn "Attempt $attempts for $op for $server $record_number" if $debug;
304         my $Zconn = get_zebra_connection($server);
305
306         my $Zpackage = $Zconn->package();
307         $Zpackage->option(action => $op);
308         $Zpackage->option(record => $record);
309
310         eval { $Zpackage->send("update") };
311         if ($@ && $@->isa("ZOOM::Exception")) {
312             my $message = _format_zoom_error_message($@);
313             my $error = $@->code();
314             if (exists $retriable_zoom_errors{$error}) {
315                 warn "reattempting operation $op for $server $record_number" if $debug;
316                 warn "last Zebra error was $message" if $debug;
317                 $Zpackage->destroy();
318
319                 if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
320                     # bump up connection timeout
321                     $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier);
322                     $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
323                     Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n";
324                     warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
325                 }
326                 next ATTEMPT;
327             } else {
328                 postpone_update($server, $record_number, $message);
329             }
330         }
331         # FIXME - would be more efficient to send a ES commit
332         # after a batch of records, rather than commiting after
333         # each one - Zebra handles updates relatively slowly.
334         eval { $Zpackage->send('commit'); };
335         if ($@) {
336             # operation succeeded, but commit
337             # did not - we have a problem
338             my $message = _format_zoom_error_message($@);
339             postpone_update($server, $record_number, $message);
340         } else {
341             $ok = 1;
342             last ATTEMPT;
343         }
344     }
345
346     unless ($ok) {
347         my $message = "Made $attempts attempts to index $server record $record_number without success";
348         postpone_update($server, $record_number, $message);
349     }
350
351     return $ok; 
352 }
353
354 sub postpone_update {
355     my ($server, $record_number, $message) = @_;
356     warn $message if $debug;
357     $message .= "\n" unless $message =~ /\n$/;
358     Unix::Syslog::syslog LOG_ERR, $message;
359     $postponed_updates->{$server}->{$record_number} = 1;
360
361     $num_postponed_updates++;
362     if ($num_postponed_updates > $max_postponed_updates) {
363         warn "exiting, over $max_postponed_updates postponed indexing updates";
364         Unix::Syslog::syslog LOG_ERR, "exiting, over $max_postponed_updates postponed indexing updates";
365         Unix::Syslog::closelog;
366         exit;
367     }
368 }
369
370 sub handler_stop {
371     my $heap = $_[HEAP];
372     my $time = localtime(time);
373     Unix::Syslog::syslog LOG_INFO, "$time Session ", $_[SESSION]->ID, " has stopped.\n";
374     delete $heap->{session};
375 }
376
377 # get a DB connection
378 sub get_db_connection {
379     my $dbh;
380
381     $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
382     while (1) {
383         eval {
384             # note that C4::Context caches the
385             # DB handle; C4::Context->dbh() will
386             # check that handle first before returning
387             # it.  If the connection is bad, it
388             # then tries (once) to create a new one.
389             $dbh = C4::Context->dbh();
390         };
391
392         unless ($@) {
393             # C4::Context->dbh dies if it cannot
394             # establish a connection
395             $db_connection_wait = $min_connection_wait;
396             $dbh->{AutoCommit} = 0; # do this to reduce number of
397                                     # commits to zebraqueue
398             return $dbh;
399         }
400
401         # connection failed
402         my $error = "failed to connect to DB: $DBI::errstr";
403         warn $error if $debug;
404         Unix::Syslog::syslog LOG_ERR, $error;
405         sleep $db_connection_wait;
406         $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
407     }
408 }
409
410 # get a Zebra connection
411 sub get_zebra_connection {
412     my $server = shift;
413
414     # start connection retry wait queue if necessary
415     $zoom_connection_waits{$server} = $min_connection_wait unless exists  $zoom_connection_waits{$server};
416
417     # try to connect to Zebra forever until we succeed
418     while (1) {
419         # what follows assumes that C4::Context->Zconn 
420         # makes only one attempt to create a new connection;
421         my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
422         $Zconn->option('timeout' => $zconn_timeout);
423
424         # it is important to note that if the existing connection
425         # stored by C4::Context has an error (any type of error)
426         # from the last transaction, C4::Context->Zconn closes
427         # it and establishes a new one.  Therefore, the
428         # following check will succeed if we have a new, good 
429         # connection or we're using a previously established
430         # connection that has experienced no errors.
431         if ($Zconn->errcode() == 0) {
432             $zoom_connection_waits{$server} = $min_connection_wait;
433             return $Zconn;
434         }
435
436         # connection failed
437         my $error = _format_zoom_error_message($Zconn);
438         warn $error if $debug;
439         Unix::Syslog::syslog LOG_ERR, $error;
440         sleep $zoom_connection_waits{$server};
441         $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
442     }
443 }
444
445 # given a ZOOM::Exception or
446 # ZOOM::Connection object, generate
447 # a human-reaable error message
448 sub _format_zoom_error_message {
449     my $err = shift;
450
451     my $message = "";
452     if (ref($err) eq 'ZOOM::Connection') {
453         $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
454     } elsif (ref($err) eq 'ZOOM::Exception') {
455         $message = $err->message() . " (" . $err->diagset . " " .  $err->code() . ") " . $err->addinfo();
456     }
457     return $message; 
458 }
459
460 POE::Session->create(
461     inline_states => {
462         _start       => \&handler_start,
463         sleep        => \&handler_sleep,
464         status_check => \&handler_check,
465         do_ops       => \&zebraop,
466         _stop        => \&handler_stop,
467     },
468 );
469
470 # start the kernel
471 $poe_kernel->run();
472
473 Unix::Syslog::closelog;
474
475 exit;