kohabug 2048 - improve zebraqueue_daemon performance

[1] Increase sleep interval between checks of zebraqueue
    from 0.01 seconds to 0.50.
[2] Batch up commits of changes to the zebraqueue table
[3] If the same record appears multiple times in the queue,
    handle only once.
[4] Properly postpone failures to process record deletes to
    avoid spinning.
[5] Correct how queue entries are marked done - avoid skipping
    an authority record update, e.g., if it has the same
    ID number as a bib that was updated.
[6] Added a FIXME about a possible later enhancement to
    batch up updates so that Zebra isn't told to commit
    after each record.

No documentation changes.

Signed-off-by: Joshua Ferraro <jmf@liblime.com>
This commit is contained in:
Galen Charlton 2008-05-20 10:48:00 -05:00 committed by Joshua Ferraro
parent 709619013d
commit e3f473187e

View file

@ -110,7 +110,7 @@ sub handler_sleep {
# can be used to slow down loop execution if needed # can be used to slow down loop execution if needed
my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ]; my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
use Time::HiRes qw (sleep); use Time::HiRes qw (sleep);
Time::HiRes::sleep(0.01); Time::HiRes::sleep(0.5);
#sleep 1; #sleep 1;
$kernel->yield('status_check'); $kernel->yield('status_check');
} }
@ -125,10 +125,14 @@ sub handler_check {
if ($data->{'opcount'} > 0) { if ($data->{'opcount'} > 0) {
Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n"; Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n";
$sth->finish(); $sth->finish();
$dbh->commit(); # needed so that we get current state of zebraqueue next time
# we enter handler_check
$kernel->yield('do_ops'); $kernel->yield('do_ops');
} }
else { else {
$sth->finish(); $sth->finish();
$dbh->commit(); # needed so that we get current state of zebraqueue next time
# we enter handler_check
$kernel->yield('sleep'); $kernel->yield('sleep');
} }
} }
@ -137,10 +141,11 @@ sub zebraop {
# execute operations waiting in the zebraqueue # execute operations waiting in the zebraqueue
my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ]; my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
my $dbh = get_db_connection(); my $dbh = get_db_connection();
my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0"); my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0 ORDER BY id DESC");
$readsth->execute(); $readsth->execute();
Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n"; Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
my $completed_updates = {};
ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) { ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
warn "Inside while loop" if $debug; warn "Inside while loop" if $debug;
@ -152,6 +157,7 @@ sub zebraop {
my $server = $data->{'server'}; my $server = $data->{'server'};
next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number}; next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
next ZEBRAQUEUE if exists $completed_updates->{$server}->{$record_number}->{$op};
my $ok = 0; my $ok = 0;
my $record; my $record;
@ -162,11 +168,16 @@ sub zebraop {
$ok = process_update($dbh, $server, $record_number, $id); $ok = process_update($dbh, $server, $record_number, $id);
} }
if ($ok == 1) { if ($ok == 1) {
mark_done($dbh, $record_number, $op); mark_done($dbh, $record_number, $op, $server);
$completed_updates->{$server}->{$record_number}->{$op} = 1;
if ($op eq 'recordDelete') {
$completed_updates->{$server}->{$record_number}->{'specialUpdate'} = 1;
}
} }
} }
$readsth->finish(); $readsth->finish();
$kernel->yield('status_check'); $dbh->commit();
$kernel->yield('sleep');
} }
sub process_delete { sub process_delete {
@ -190,8 +201,8 @@ sub process_delete {
$ok = 1; $ok = 1;
} else { } else {
# caught a ZOOM::Exception # caught a ZOOM::Exception
my $error = _format_zoom_error_message($@); my $message = _format_zoom_error_message($@);
warn "ERROR: $error"; postpone_update($server, $record_number, $message);
} }
} else { } else {
# then, delete the record # then, delete the record
@ -227,7 +238,7 @@ sub process_update {
## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
if ($@) { if ($@) {
Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@"; Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
mark_done_by_id($dbh, $id); mark_done_by_id($dbh, $id, $server);
$ok = 0; $ok = 0;
} else { } else {
# ok, we have everything, do the operation in zebra ! # ok, we have everything, do the operation in zebra !
@ -239,14 +250,16 @@ sub process_update {
sub mark_done_by_id { sub mark_done_by_id {
my $dbh = shift; my $dbh = shift;
my $id = shift; my $id = shift;
my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?"); my $server = shift;
$delsth->execute($id); my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ? AND server = ? AND done = 0");
$delsth->execute($id, $server);
} }
sub mark_done { sub mark_done {
my $dbh = shift; my $dbh = shift;
my $record_number = shift; my $record_number = shift;
my $op = shift; my $op = shift;
my $server = shift;
my $delsth; my $delsth;
if ($op eq 'recordDelete') { if ($op eq 'recordDelete') {
@ -254,15 +267,21 @@ sub mark_done {
# did a modif (or item deletion) just before biblio deletion, there are some specialUpdate # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
# that are pending and can't succeed, as we don't have the XML anymore # that are pending and can't succeed, as we don't have the XML anymore
# so, delete everything for this biblionumber # so, delete everything for this biblionumber
$delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ? and operation = ?"); $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done = 1
$delsth->execute($record_number, $op); WHERE biblio_auth_number = ?
AND server = ?
AND done = 0");
$delsth->execute($record_number, $server);
} else { } else {
# if it's not a deletion, delete every pending specialUpdate for this biblionumber # if it's not a deletion, delete every pending specialUpdate for this biblionumber
# in case the user add biblio, then X items, before this script runs # in case the user add biblio, then X items, before this script runs
# this avoid indexing X+1 times where just 1 is enough. # this avoid indexing X+1 times where just 1 is enough.
$delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1
WHERE biblio_auth_number = ? and operation = 'specialUpdate'"); WHERE biblio_auth_number = ?
$delsth->execute($record_number); AND operation = 'specialUpdate'
AND server = ?
AND done = 0");
$delsth->execute($record_number, $server);
} }
} }
@ -308,6 +327,9 @@ sub zebrado {
postpone_update($server, $record_number, $message); postpone_update($server, $record_number, $message);
} }
} }
# FIXME - would be more efficient to send a ES commit
# after a batch of records, rather than commiting after
# each one - Zebra handles updates relatively slowly.
eval { $Zpackage->send('commit'); }; eval { $Zpackage->send('commit'); };
if ($@) { if ($@) {
# operation succeeded, but commit # operation succeeded, but commit
@ -370,6 +392,8 @@ sub get_db_connection {
# C4::Context->dbh dies if it cannot # C4::Context->dbh dies if it cannot
# establish a connection # establish a connection
$db_connection_wait = $min_connection_wait; $db_connection_wait = $min_connection_wait;
$dbh->{AutoCommit} = 0; # do this to reduce number of
# commits to zebraqueue
return $dbh; return $dbh;
} }