From c82afab40a13068b7edd1d31dc737315f1ca9bd9 Mon Sep 17 00:00:00 2001 From: Galen Charlton Date: Tue, 4 Mar 2008 18:26:02 -0600 Subject: [PATCH] zebraqueue - major changes Signed-off-by: Joshua Ferraro --- misc/bin/zebraqueue_daemon.pl | 450 ++++++++++++++++++++++++---------- 1 file changed, 323 insertions(+), 127 deletions(-) diff --git a/misc/bin/zebraqueue_daemon.pl b/misc/bin/zebraqueue_daemon.pl index b0ec12528e..ea788d5ff5 100755 --- a/misc/bin/zebraqueue_daemon.pl +++ b/misc/bin/zebraqueue_daemon.pl @@ -9,6 +9,7 @@ BEGIN { use FindBin; eval { require "$FindBin::Bin/kohalib.pl" }; } + use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW); use Unix::Syslog qw(:macros); @@ -17,12 +18,75 @@ use C4::Biblio; use C4::Search; use C4::AuthoritiesMarc; use XML::Simple; +use POSIX; use utf8; -my $dbh = C4::Context->dbh; + +# wait periods governing connection attempts +my $min_connection_wait = 1; # start off at 1 second +my $max_connection_wait = 1024; # max about 17 minutes + +# keep separate wait period for bib and authority Zebra databases +my %zoom_connection_waits = (); + +my $db_connection_wait = $min_connection_wait; + +# ZOOM and Z39.50 errors that are potentially +# resolvable by connecting again and retrying +# the operation +my %retriable_zoom_errors = ( + 10000 => 'ZOOM_ERROR_CONNECT', + 10001 => 'ZOOM_ERROR_MEMORY', + 10002 => 'ZOOM_ERROR_ENCODE', + 10003 => 'ZOOM_ERROR_DECODE', + 10004 => 'ZOOM_ERROR_CONNECTION_LOST', + 10005 => 'ZOOM_ERROR_INIT', + 10006 => 'ZOOM_ERROR_INTERNAL', + 10007 => 'ZOOM_ERROR_TIMEOUT', +); + +# structure to store updates that have +# failed and are to be retrieved. The +# structure is a hashref of hashrefs, +# e.g., +# +# $postoned_updates->{$server}->{$record_number} = 1; +# +# If an operation is attempted and fails because +# of a retriable error (see above), the daemon +# will try several times to recover as follows: +# +# 1. close and reopen the connection to the +# Zebra server, unless the error was a timeout, +# in which case +# 2. retry the operation +# +# If, after trying this five times, the operation still +# fails, the daemon will mark the record number as +# postponed, and try to process other entries in +# zebraqueue. When an update is postponed, the +# error will be reported to syslog. +# +# If more than 100 postponed updates are +# accumulated, the daemon will assume that +# something is seriously wrong, complain loudly, +# and abort. If running under the daemon(1) command, +# this means that the daemon will respawn. +# +my $num_postponed_updates = 0; +my $postponed_updates = {}; + +my $max_operation_attempts = 5; +my $max_postponed_updates = 100; + +# Zebra connection timeout +my $zconn_timeout = 30; +my $zconn_timeout_multiplier = 1.5; +my $max_zconn_timeout = 120; + my $ident = "Koha Zebraqueue "; -my $debug = 1; +my $debug = 0; Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0; Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n"; @@ -54,7 +118,7 @@ sub handler_sleep { sub handler_check { # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ]; - my $dbh = C4::Context->dbh; + my $dbh = get_db_connection(); my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0"); $sth->execute; my $data = $sth->fetchrow_hashref(); @@ -72,154 +136,205 @@ sub handler_check { sub zebraop { # execute operations waiting in the zebraqueue my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ]; - my $dbh = C4::Context->dbh; + my $dbh = get_db_connection(); my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0"); $readsth->execute(); Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n"; - while (my $data = $readsth->fetchrow_hashref()) { - warn "Inside while loop" if $debug; - eval { - my $ok = 0; - my $record; - if ($data->{'operation'} =~ /delete/i ) { - eval { - warn "Searching for record to delete" if $debug; - # 1st read the record in zebra, we have to get it from zebra as its no longer in the db - my $Zconn = C4::Context->Zconn($data->{'server'}, 0, 1, '', 'xml'); - my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$data->{'biblio_auth_number'}); - $results->option(elementSetName => 'marcxml'); - $record = $results->record(0)->raw(); - }; - if ($@) { - # this doesn't exist, so no need to wail on zebra to delete it - if ($@->code() eq 13) { - $ok = 1; - } else { - # caught a ZOOM::Exception - my $error = - $@->message() . " (" - . $@->code() . ") " - . $@->addinfo() . " " - . $@->diagset(); - warn "ERROR: $error"; - } - } else { - # then, delete the record - warn "Deleting record" if $debug; - $ok = zebrado($record, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'}); - } - } - else { - # it is an update - warn "Updating record" if $debug; - # get the XML - my $marcxml; - if ($data->{'server'} eq "biblioserver") { - my $marc = GetMarcBiblio($data->{'biblio_auth_number'}); - $marcxml = $marc->as_xml_record() if $marc; - } - elsif ($data->{'server'} eq "authorityserver") { - $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($data->{'biblio_auth_number'}); - } - # check it's XML, just in case - eval { - my $hashed = XMLin($marcxml); - }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe - ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA - if ($@) { - Unix::Syslog::syslog LOG_ERR, "$@"; - my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?"); - $delsth->execute($data->{'id'}); - next; - } - # ok, we have everything, do the operation in zebra ! - $ok = zebrado($marcxml, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'}); - } - if ($ok == 1) { - $dbh = C4::Context->dbh; - my $delsth; - # if it's a deletion, we can delete every request on this biblio : in case the user - # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate - # that are pending and can't succeed, as we don't have the XML anymore - # so, delete everything for this biblionumber - if ($data->{'operation'} =~ /delete/i) { - $delsth = $dbh->prepare("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ?"); - $delsth->execute($data->{'biblio_auth_number'}); - # if it's not a deletion, delete every pending specialUpdate for this biblionumber - # in case the user add biblio, then X items, before this script runs - # this avoid indexing X+1 times where just 1 is enough. - } else { - $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 - WHERE biblio_auth_number = ? and operation = 'specialUpdate'"); - $delsth->execute($data->{'biblio_auth_number'}); - } - } - }; - if ($@) { - Unix::Syslog::syslog LOG_ERR, "$@"; + ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) { + warn "Inside while loop" if $debug; + + my $id = $data->{'id'}; + my $op = $data->{'operation'}; + $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded + # either delete_record or recordDelete + my $record_number = $data->{'biblio_auth_number'}; + my $server = $data->{'server'}; + + next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number}; + + my $ok = 0; + my $record; + if ($op eq 'recordDelete') { + $ok = process_delete($dbh, $server, $record_number); } + else { + $ok = process_update($dbh, $server, $record_number, $id); + } + if ($ok == 1) { + mark_done($dbh, $record_number, $op); + } } $readsth->finish(); $kernel->yield('status_check'); } +sub process_delete { + my $dbh = shift; + my $server = shift; + my $record_number = shift; + + my $record; + my $ok = 0; + eval { + warn "Searching for record to delete" if $debug; + # 1st read the record in zebra, we have to get it from zebra as its no longer in the db + my $Zconn = get_zebra_connection($server); + my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number); + $results->option(elementSetName => 'marcxml'); + $record = $results->record(0)->raw(); + }; + if ($@) { + # this doesn't exist, so no need to wail on zebra to delete it + if ($@->code() eq 13) { + $ok = 1; + } else { + # caught a ZOOM::Exception + my $error = _format_zoom_error_message($@); + warn "ERROR: $error"; + } + } else { + # then, delete the record + warn "Deleting record" if $debug; + $ok = zebrado($record, 'recordDelete', $server, $record_number); + } + return $ok; +} + +sub process_update { + my $dbh = shift; + my $server = shift; + my $record_number = shift; + my $id = shift; + + my $record; + my $ok = 0; + + warn "Updating record" if $debug; + # get the XML + my $marcxml; + if ($server eq "biblioserver") { + my $marc = GetMarcBiblio($record_number); + $marcxml = $marc->as_xml_record() if $marc; + } + elsif ($server eq "authorityserver") { + $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number); + } + # check it's XML, just in case + eval { + my $hashed = XMLin($marcxml); + }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe + ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA + if ($@) { + Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@"; + mark_done_by_id($dbh, $id); + $ok = 0; + } else { + # ok, we have everything, do the operation in zebra ! + $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number); + } + return $ok; +} + +sub mark_done_by_id { + my $dbh = shift; + my $id = shift; + my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?"); + $delsth->execute($id); +} + +sub mark_done { + my $dbh = shift; + my $record_number = shift; + my $op = shift; + + my $delsth; + if ($op eq 'recordDelete') { + # if it's a deletion, we can delete every request on this biblio : in case the user + # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate + # that are pending and can't succeed, as we don't have the XML anymore + # so, delete everything for this biblionumber + $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ? and operation = ?"); + $delsth->execute($record_number, $op); + } else { + # if it's not a deletion, delete every pending specialUpdate for this biblionumber + # in case the user add biblio, then X items, before this script runs + # this avoid indexing X+1 times where just 1 is enough. + $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 + WHERE biblio_auth_number = ? and operation = 'specialUpdate'"); + $delsth->execute($record_number); + } +} + sub zebrado { ###Accepts a $server variable thus we can use it to update biblios, authorities or other zebra dbs - my ($record, $op, $server, $biblionumber) = @_; + my ($record, $op, $server, $record_number) = @_; - warn "In zebrado" if $debug; - my @port; - - my $tried = 0; - my $recon = 0; - my $reconnect = 0; -# $record=Encode::encode("UTF-8",$record); - my $shadow = $server."shadow"; - $op = 'recordDelete' if $op eq 'delete_record'; + unless ($record) { + my $message = "error updating index for $server $record $record_number: no source record"; + postpone_update($server, $record_number, $message); + return 0; + } + + my $attempts = 0; + my $ok = 0; + ATTEMPT: while ($attempts < $max_operation_attempts) { + $attempts++; + warn "Attempt $attempts for $op for $server $record_number" if $debug; + my $Zconn = get_zebra_connection($server); -reconnect: - warn "At reconnect" if $debug; - my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml'); - if ($record) { - warn "Record found" if $debug; my $Zpackage = $Zconn->package(); $Zpackage->option(action => $op); $Zpackage->option(record => $record); -# $Zpackage->option(recordIdOpaque => $biblionumber) if $biblionumber; -retry: - warn "At Retry" if $debug; + eval { $Zpackage->send("update") }; if ($@ && $@->isa("ZOOM::Exception")) { - print "Oops! ", $@->message(), "\n"; - return $@->code(); + my $message = _format_zoom_error_message($@); + my $error = $@->code(); + if (exists $retriable_zoom_errors{$error}) { + warn "reattempting operation $op for $server $record_number" if $debug; + warn "last Zebra error was $message" if $debug; + $Zpackage->destroy(); + + if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) { + # bump up connection timeout + $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier); + $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout; + Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n"; + warn "increased Zebra connection timeout to $zconn_timeout" if $debug; + } + next ATTEMPT; + } else { + postpone_update($server, $record_number, $message); + } } - my($error, $errmsg, $addinfo, $diagset) = $Zconn->error_x(); - if ($error == 10007 && $tried < 3) {## timeout --another 30 looonng seconds for this update - sleep 1; ## wait a sec! - $tried++; - goto "retry"; - } elsif ($error == 2 && $tried < 2) {## timeout --temporary zebra error !whatever that means - sleep 2; ## wait two seconds! - $tried++; - goto "retry"; - } elsif ($error==10004 && $recon == 0) {##Lost connection -reconnect - sleep 1; ## wait a sec! - $recon = 1; - $Zpackage->destroy(); - $Zconn->destroy(); - goto "reconnect"; - } elsif ($error) { - $Zpackage->destroy(); - $Zconn->destroy(); - return 0; + eval { $Zpackage->send('commit'); }; + if ($@) { + # operation succeeded, but commit + # did not - we have a problem + my $message = _format_zoom_error_message($@); + postpone_update($server, $record_number, $message); + } else { + $ok = 1; + last ATTEMPT; } - $Zpackage->send('commit'); - return 1; } - return 0; + + unless ($ok) { + my $message = "Made $attempts attempts to index $server record $record_number without success"; + postpone_update($server, $record_number, $message); + } + + return $ok; } +sub postpone_update { + my ($server, $record_number, $message) = @_; + warn $message if $debug; + $message .= "\n" unless $message =~ /\n$/; + Unix::Syslog::syslog LOG_ERR, $message; + $postponed_updates->{$server}->{$record_number} = 1; +} sub handler_stop { my $heap = $_[HEAP]; @@ -228,6 +343,87 @@ sub handler_stop { delete $heap->{session}; } +# get a DB connection +sub get_db_connection { + my $dbh; + + $db_connection_wait = $min_connection_wait unless defined $db_connection_wait; + while (1) { + eval { + # note that C4::Context caches the + # DB handle; C4::Context->dbh() will + # check that handle first before returning + # it. If the connection is bad, it + # then tries (once) to create a new one. + $dbh = C4::Context->dbh(); + }; + + unless ($@) { + # C4::Context->dbh dies if it cannot + # establish a connection + $db_connection_wait = $min_connection_wait; + return $dbh; + } + + # connection failed + my $error = "failed to connect to DB: $DBI::errstr"; + warn $error if $debug; + Unix::Syslog::syslog LOG_ERR, $error; + sleep $db_connection_wait; + $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait; + } +} + +# get a Zebra connection +sub get_zebra_connection { + my $server = shift; + + # start connection retry wait queue if necessary + $zoom_connection_waits{$server} = $min_connection_wait unless exists $zoom_connection_waits{$server}; + + # try to connect to Zebra forever until we succeed + while (1) { + # what follows assumes that C4::Context->Zconn + # makes only one attempt to create a new connection; + my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml'); + $Zconn->option('timeout' => $zconn_timeout); + + # it is important to note that if the existing connection + # stored by C4::Context has an error (any type of error) + # from the last transaction, C4::Context->Zconn closes + # it and establishes a new one. Therefore, the + # following check will succeed if we have a new, good + # connection or we're using a previously established + # connection that has experienced no errors. + if ($Zconn->errcode() == 0) { + $zoom_connection_waits{$server} = $min_connection_wait; + return $Zconn; + } + + # connection failed + my $error = _format_zoom_error_message($Zconn); + warn $error if $debug; + Unix::Syslog::syslog LOG_ERR, $error; + sleep $zoom_connection_waits{$server}; + $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait; + } +} + +# given a ZOOM::Exception or +# ZOOM::Connection object, generate +# a human-reaable error message +sub _format_zoom_error_message { + my $err = shift; + + my $message = ""; + if (ref($err) eq 'ZOOM::Connection') { + $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo(); + } elsif (ref($err) eq 'ZOOM::Exception') { + $message = $err->message() . " (" . $err->diagset . " " . $err->code() . ") " . $err->addinfo(); + } + return $message; +} + POE::Session->create( inline_states => { _start => \&handler_start,