fixing zebraqueue daemon for delete operation part two
[koha.git] / misc / bin / zebraqueue_daemon.pl
1 #!/usr/bin/perl
2
3 # daemon to watch the zebraqueue and update zebra as needed
4
5 use strict;
6 BEGIN {
7     # find Koha's Perl modules
8     # test carefully before changing this
9     use FindBin;
10     eval { require "$FindBin::Bin/kohalib.pl" };
11 }
12 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
13 use Unix::Syslog qw(:macros);
14
15 use C4::Context;
16 use C4::Biblio;
17 use C4::Search;
18 use C4::AuthoritiesMarc;
19 use XML::Simple;
20 use utf8;
21
22 my $dbh=C4::Context->dbh;
23 my $ident = "Koha Zebraqueue ";
24
25 my $debug = 1;
26 Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
27
28 Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
29
30 sub handler_start {
31
32     # Starts session. Only ever called once only really used to set an alias
33     # for the POE kernel
34     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
35
36     my $time = localtime(time);
37     Unix::Syslog::syslog LOG_INFO, "$time POE Session ", $session->ID, " has started.\n";
38
39     # check status
40 #    $kernel->yield('status_check');
41     $kernel->yield('sleep');
42 }
43
44 sub handler_sleep {
45
46     # can be used to slow down loop execution if needed
47     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
48     use Time::HiRes qw (sleep);
49     Time::HiRes::sleep(0.01);
50     #sleep 1;
51     $kernel->yield('status_check');
52 }
53
54 sub handler_check {
55     # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
56     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
57     my $dbh=C4::Context->dbh;
58     my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
59     $sth->execute;
60     my $data = $sth->fetchrow_hashref();
61     if ($data->{'opcount'} > 0){
62         Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n";
63         $sth->finish();
64         $kernel->yield('do_ops');
65     }
66     else {
67         $sth->finish();
68         $kernel->yield('sleep');
69     }
70 }
71
72 sub zebraop {
73     # execute operations waiting in the zebraqueue
74     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
75     my $dbh=C4::Context->dbh;
76     my $readsth=$dbh->prepare("SELECT id,biblio_auth_number,operation,server FROM zebraqueue WHERE done=0");
77     $readsth->execute();
78     Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
79     while (my $data = $readsth->fetchrow_hashref()){
80         warn "Inside while loop" if $debug;
81         eval {
82         my $ok = 0;
83         if ($data->{'operation'} =~ /delete/i ){
84             eval {
85
86             warn "Searching for record to delete" if $debug;
87             # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
88             my $Zconn=C4::Context->Zconn($data->{'server'}, 0, 1,'','xml');
89             my $query = $Zconn->search_pqf( '@attr 1=Local-number '.$data->{'biblio_auth_number'});
90
91             # then, delete the record
92             warn "Deleting record" if $debug;
93             $ok=zebrado($query->record(0)->render(),$data->{'operation'},$data->{'server'},$data->{'biblio_auth_number'});
94             };
95
96             if ($@) {
97                 # caught a ZOOM::Exception
98                 my $error =
99                     $@->message() . " ("
100                   . $@->code() . ") "
101                   . $@->addinfo() . " "
102                   . $@->diagset();
103                 warn "ERROR: $error";
104                 # this doesn't exist, so no need to wail on zebra to delete it
105                 if ($@->code() eq 13) {
106                     $ok = 1;
107                 }        
108             }
109             #if ($ok == 1) { 
110             #    my $delsth=$dbh->prepare("UPDATE zebraqueue SET done=1 WHERE id =?");
111             #    $delsth->execute($data->{'id'});
112             #    next;
113             #}
114
115         }
116         else {
117             # it is an update           
118             warn "Updating record" if $debug;
119             # get the XML
120             my $marcxml;
121             if ($data->{'server'} eq "biblioserver") {
122                 my $marc = GetMarcBiblio($data->{'biblio_auth_number'});
123                 $marcxml = $marc->as_xml_record() if $marc;
124             } 
125             elsif ($data->{'server'} eq "authorityserver") {
126                 $marcxml =C4::AuthoritiesMarc::GetAuthorityXML($data->{'biblio_auth_number'});
127             }
128             # check it's XML, just in case
129             eval {
130                 my $hashed=XMLin($marcxml);
131             }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
132             ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
133             if ($@){
134                  Unix::Syslog::syslog LOG_ERR, "$@";
135                 my $delsth=$dbh->prepare("UPDATE zebraqueue SET done=1 WHERE id =?");
136                 $delsth->execute($data->{'id'});
137                 next;
138             }
139             # ok, we have everything, do the operation in zebra !
140             $ok=zebrado($marcxml,$data->{'operation'},$data->{'server'},$data->{'biblio_auth_number'});
141         }
142         if ($ok == 1){
143             $dbh=C4::Context->dbh;
144             my $delsth;
145             # if it's a deletion, we can delete every request on this biblio : in case the user
146             # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
147             # that are pending and can't succeed, as we don't have the XML anymore
148             # so, delete everything for this biblionumber
149             if ($data->{'operation'} eq 'delete_record') {
150                 $delsth =$dbh->prepare("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number =?");
151                 $delsth->execute($data->{'biblio_auth_number'});
152                 # if it's not a deletion, delete every pending specialUpdate for this biblionumber
153                 # in case the user add biblio, then X items, before this script runs
154                 # this avoid indexing X+1 times where just 1 is enough.
155             } else {
156                 $delsth =$dbh->prepare("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number =? and operation='specialUpdate'");
157                 $delsth->execute($data->{'biblio_auth_number'});
158             }
159         }                            
160         };
161         if ($@){
162             Unix::Syslog::syslog LOG_ERR, "$@";
163         }
164     }
165     $readsth->finish();
166     $kernel->yield('status_check');
167 }
168
169 sub zebrado {
170     ###Accepts a $server variable thus we can use it to update  biblios, authorities or other zebra dbs
171     my ($record,$op,$server,$biblionumber)=@_;
172
173     warn "In zebrado" if $debug; 
174     my @port;
175     
176     my $tried=0;
177     my $recon=0;
178     my $reconnect=0;
179 #    $record=Encode::encode("UTF-8",$record);
180     my $shadow=$server."shadow";
181     $op = 'recordDelete' if $op eq 'delete_record';
182
183 reconnect:
184     warn "At reconnect" if $debug; 
185     my $Zconn=C4::Context->Zconn($server, 0, 1,'','xml');
186     if ($record){
187         warn "Record found" if $debug;
188         my $Zpackage = $Zconn->package();
189         $Zpackage->option(action => $op);
190         $Zpackage->option(record => $record);
191 #       $Zpackage->option(recordIdOpaque => $biblionumber) if $biblionumber;
192 retry:
193         warn "At Retry" if $debug;
194         eval { $Zpackage->send("update") };
195         if ($@ && $@->isa("ZOOM::Exception")) {
196         print "Oops!  ", $@->message(), "\n";
197         return $@->code();
198         }
199         my($error, $errmsg, $addinfo, $diagset) = $Zconn->error_x();
200         if ($error==10007 && $tried<3) {## timeout --another 30 looonng seconds for this update
201             sleep 1;    ##  wait a sec!
202             $tried++;
203             goto "retry";
204         }elsif ($error==2 && $tried<2) {## timeout --temporary zebra error !whatever that means
205             sleep 2;    ##  wait two seconds!
206             $tried++;
207             goto "retry";
208         }elsif($error==10004 && $recon==0){##Lost connection -reconnect
209             sleep 1;    ##  wait a sec!
210             $recon=1;
211             $Zpackage->destroy();
212             $Zconn->destroy();
213             goto "reconnect";
214         }elsif ($error){
215             $Zpackage->destroy();
216             $Zconn->destroy();
217             return 0;
218         }
219         $Zpackage->send('commit');
220         return 1;
221     }
222     return 0;
223 }
224
225
226 sub handler_stop {
227     my $heap = $_[HEAP];
228     my $time = localtime(time);
229     Unix::Syslog::syslog LOG_INFO, "$time Session ", $_[SESSION]->ID, " has stopped.\n";
230     delete $heap->{session};
231 }
232
233 POE::Session->create(
234     inline_states => {
235         _start       => \&handler_start,
236         sleep        => \&handler_sleep,
237         status_check => \&handler_check,
238         do_ops       => \&zebraop,
239         _stop        => \&handler_stop,
240     },
241 );
242
243 # start the kernel
244 $poe_kernel->run();
245
246 Unix::Syslog::closelog;
247
248 exit;