Continuing work on Z39.50 search tool. Daemon now forks up to 12 processes
[koha.git] / acqui.simple / processz3950queue
1 #!/usr/bin/perl
2 use C4::Database;
3 use DBI;
4 #use strict;
5 use C4::Acquisitions;
6 use C4::Output;
7 use Net::Z3950;
8 my $dbh=C4Connect;
9
10 my $sth=$dbh->prepare("update z3950results set active=0");
11 $sth->execute;
12 $sth->finish;
13 $SIG{CHLD}='reap';
14
15 my $reapcounter=0;
16 my $forkcounter=0;
17 my $pid=$$;
18 my $lastrun=0;
19 while (1) {
20     if ((time-$lastrun)>5) {
21         my $sth=$dbh->prepare("select id,term,type,servers from z3950queue order by id");
22         $sth->execute;
23         while (my ($id, $term, $type, $servers) = $sth->fetchrow) {
24             if ($forkcounter<12) {
25                 my $now=time();
26                 $stk=$dbh->prepare("select id,server,startdate,enddate,numrecords,active from z3950results where queryid=$id");
27                 $stk->execute;
28                 my %serverdone;
29                 unless ($stk->rows) {
30                     my $sti=$dbh->prepare("update z3950queue set done=-1,startdate=$now where id=$id");
31                     $sti->execute;
32                 }
33                 while (my ($r_id, $r_server,$r_startdate,$r_enddate,$r_numrecords,$active) = $stk->fetchrow) {
34                     if ($r_enddate >0) {
35                         $serverdone{$r_server}=1;
36                     } elsif ($active) {
37                         $serverdone{$r_server}=1;
38                     } else {
39                         $serverdone{$r_server}=-1;
40                     }
41                 }
42
43                 $stk->finish;
44                 my $attr='';
45                 if ($type eq 'isbn') {
46                     $attr='1=7';
47                 } elsif ($type eq 'title') {
48                     $attr='1=4';
49                 } elsif ($type eq 'author') {
50                     $attr='1=1003';
51                 } elsif ($type eq 'lccn') {
52                     $attr='1=9';
53                 } elsif ($type eq 'keyword') {
54                     $attr='1=1016';
55                 }
56                 $term='"'.$term.'"';
57                 $query="\@attr $attr $term";
58                 my $totalrecords=0;
59                 my $serverinfo;
60                 my $stillprocessing=0;
61                 foreach $serverinfo (split(/\s+/, $servers)) {
62                     (next) if ($serverdone{$serverinfo} == 1);
63                     my $stillprocessing=1;
64                     if (my $pid=fork()) {
65                         $forkcounter++;
66                     } else {
67                         #$sth->finish;
68                         #$sti->finish;
69                         #$dbh->disconnect;
70                         my $dbi=C4Connect;
71                         my ($name, $server, $database, $user, $password) = split(/\//, $serverinfo, 5);
72                         $server=~/(.*)\:(\d+)/;
73                         my $servername=$1;
74                         my $port=$2;
75                         print "Processing $type=$term at $name $server $database (".($forkcounter+1)." forks)\n";
76                         $now=time();
77                         my $q_serverinfo=$dbi->quote($serverinfo);
78                         my $resultsid;
79                         if ($serverdone{$serverinfo}==-1) {
80                             my $stj=$dbi->prepare("select id from z3950results where server=$q_serverinfo and queryid=$id");
81                             $stj->execute;
82                             ($resultsid) = $stj->fetchrow;
83                         } else {
84                             my $stj=$dbi->prepare("insert into z3950results (server, queryid, startdate) values ($q_serverinfo, $id, $now)");
85                             $stj->execute;
86                             $resultsid=$dbi->{'mysql_insertid'};
87                         }
88                         my $stj=$dbh->prepare("update z3950results set active=1 where id=$resultsid");
89                         $stj->execute;
90                         my $conn;
91                         my $noconnection=0;
92                         if ($user) {
93                             eval { $conn= new Net::Z3950::Connection($servername, $port, databaseName => $database, user => $user, password => $password); };
94                             if ($@) {
95                                 $noconnection=1;
96                             }
97                             pe();
98                         } else {
99                             eval { $conn= new Net::Z3950::Connection($servername, $port, databaseName => $database); };
100                             if ($@) {
101                                 $noconnection=1;
102                             }
103                             pe();
104                         }
105                         if ($noconnection) {
106                         } else {
107                             my $rs=$conn->search($query);
108                             pe();
109                             $rs->option(preferredRecordSyntax => Net::Z3950::RecordSyntax::USMARC);
110                             pe();
111                             my $numresults=$rs->size();
112                             pe();
113                             my $i;
114                             my $result='';
115                             my $scantimerstart=time();
116                             for ($i=1; $i<=(($numresults<80) ? ($numresults) : (80)); $i++) {
117                                 my $rec=$rs->record($i);
118                                 my $marcdata=$rec->rawdata();
119                                 $result.=$marcdata;
120                             }
121                             my $scantimerend=time();
122                             my $numrecords;
123                             ($numresults<80) ? ($numrecords=$numresults) : ($numrecords=80);
124                             my $elapsed=$scantimerend-$scantimerstart;
125                             if ($elapsed) {
126                                 my $speed=int($numresults/$elapsed*100)/100;
127                                 print "  SPEED: $speed  $server done $numrecords\n";
128                             }
129
130                             my $q_result=$dbi->quote($result);
131                             ($q_result) || ($q_result='""');
132                             $now=time();
133                             my $task="update z3950results set numrecords=$numresults,numdownloaded=$numrecords,highestseen=0,results=$q_result,enddate=$now where id=$resultsid";
134                             my $stj=$dbi->prepare($task);
135                             $stj->execute;
136                             my $counter=0;
137                             while ($counter<60 && $numrecords<$numresults) {
138                                 $counter++;
139                                 my $stj=$dbi->prepare("select highestseen from z3950results where id=$resultsid");
140                                 $stj->execute;
141                                 my ($highestseen) = $stj->fetchrow;
142                                 if ($highestseen>($numrecords-30)) {
143                                     $counter=0;
144                                     print "   $server rescanning\n";
145                                     my $scantimerstart=time();
146                                     for ($i=$numrecords+1; $i<=(($numresults<($numrecords+40)) ? ($numresults) : ($numrecords+40)); $i++) {
147                                         my $rec=$rs->record($i);
148                                         my $marcdata=$rec->rawdata();
149                                         $result.=$marcdata;
150                                     }
151                                     my $scantimerend=time();
152                                     ($numresults<$numrecords+40) ? ($numrecords=$numresults) : ($numrecords=$numrecords+40);
153                                     my $elapsed=$scantimerend-$scantimerstart;
154                                     if ($elapsed) {
155                                         my $speed=int($numresults/$elapsed*100)/100;
156                                         print "  SPEED: $speed  $server done $numrecords\n";
157                                     }
158
159                                     my $q_result=$dbi->quote($result);
160                                     ($q_result) || ($q_result='""');
161                                     $now=time();
162                                     my $task="update z3950results set numdownloaded=$numrecords,results=$q_result where id=$resultsid";
163                                     my $stj=$dbi->prepare($task);
164                                     $stj->execute;
165                                 }
166                                 sleep 5;
167                             }
168                         }
169                         my $stj=$dbi->prepare("update z3950results set active=0 where id=$resultsid");
170                         $stj->execute;
171                         eval {$stj->finish};
172                         $dbi->disconnect;
173                         print "    $server done.\n";
174                         exit;
175                         sub pe {
176                             (return) unless ($code);
177                             my $code=$conn->errcode();
178                             my $msg=$conn->errmsg();
179                             my $ai=$conn->addinfo();
180                             print << "EOF";
181                         CODE:  $code
182                         MSG:   $msg
183                         ADDTL: $ai
184
185 EOF
186                         }
187                     }
188                 } unless ($stillprocessing) {
189                     #my $sti=$dbh->prepare("select enddate from z3950queue where id=$id");
190                     #$sti->execute;
191                     #my ($enddate) = $sti->fetchrow;
192                     #unless ($enddate) {
193         #               my $now=time;
194 #                       $sti=$dbh->prepare("update z3950queue set done=1,numrecords=$totalrecords,enddate=$now where id=$id");
195 #                       $sti->execute;
196 #                   }
197                 }
198             } else {
199 #           my $q_serverinfo=$dbh->quote($serverinfo);
200 #           my $stj=$dbh->prepare("insert into z3950results (server, queryid, startdate) values ($q_serverinfo, $id, 0)");
201 #           $stj->execute;
202             }
203         }
204         $lastrun=time();
205     }
206     sleep 1;
207 }
208
209 sub getrecord {
210     my $server=shift;
211     my $base=shift;
212     my $query=shift;
213     my $auth=shift;
214     my $id=shift;
215     open  (M, "|yaz-client -m yaz-$id.mrc >>yaz.out 2>>yaz.err");
216     select M;
217     $|=1;
218     select STDOUT;
219     ($auth) && ($auth="authentication $auth\n");
220     print M << "EOF";
221 $auth\open $server
222 base $base
223 setnames
224 $query
225 s
226 s
227 s
228 s
229 s
230 s
231 s
232 s
233 s
234 s
235 quit
236 EOF
237     close M;
238 }
239 sub reap {
240     $forkcounter--;
241 }
242
243
244