Bug 15099: Move admin/categorie.pl to admin/categories.pl
[koha.git] / Koha / MetaSearcher.pm
1 package Koha::MetaSearcher;
2
3 # Copyright 2014 ByWater Solutions
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use Modern::Perl;
21
22 use base 'Class::Accessor';
23
24 use C4::Charset qw( MarcToUTF8Record );
25 use C4::Search qw(); # Purely for new_record_from_zebra
26 use DBIx::Class::ResultClass::HashRefInflator;
27 use IO::Select;
28 use Koha::Cache;
29 use Koha::Database;
30 use Koha::MetadataRecord;
31 use MARC::File::XML;
32 use Storable qw( store_fd fd_retrieve );
33 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC );
34 use UUID;
35 use ZOOM;
36
37 use sort 'stable';
38
39 __PACKAGE__->mk_accessors( qw( fetch offset on_error resultset ) );
40
41 sub new {
42     my ( $class, $options ) = @_;
43
44     my ( $uuid, $uuidstring );
45     UUID::generate($uuid);
46     UUID::unparse( $uuid, $uuidstring );
47
48     return bless {
49         offset => 0,
50         fetch => 100,
51         on_error => sub {},
52         results => [],
53         resultset => $uuidstring,
54         %{ $options || {} }
55     }, $class;
56 }
57
58 sub handle_hit {
59     my ( $self, $index, $server, $marcrecord ) = @_;
60
61     my $record = Koha::MetadataRecord->new( { schema => 'marc', record => $marcrecord } );
62
63     my %fetch = (
64         title => 'biblio.title',
65         seriestitle => 'biblio.seriestitle',
66         author => 'biblio.author',
67         isbn =>'biblioitems.isbn',
68         issn =>'biblioitems.issn',
69         lccn =>'biblioitems.lccn', #LC control number (not call number)
70         edition =>'biblioitems.editionstatement',
71         date => 'biblio.copyrightdate', #MARC21
72         date2 => 'biblioitems.publicationyear', #UNIMARC
73     );
74
75     my $metadata = {};
76     while ( my ( $key, $kohafield ) = each %fetch ) {
77         $metadata->{$key} = $record->getKohaField($kohafield);
78     }
79     $metadata->{date} //= $metadata->{date2};
80
81     push @{ $self->{results} }, {
82         server => $server,
83         index => $index,
84         record => $marcrecord,
85         metadata => $metadata,
86     };
87 }
88
89 sub search {
90     my ( $self, $server_ids, $query ) = @_;
91
92     my $resultset_expiry = 300;
93
94     my $cache;
95     eval { $cache = Koha::Cache->new(); };
96     my $schema = Koha::Database->new->schema;
97     my $stats = {
98         num_fetched => {
99             map { $_ => 0 } @$server_ids
100         },
101         num_hits => {
102             map { $_ => 0 } @$server_ids
103         },
104         total_fetched => 0,
105         total_hits => 0,
106     };
107     my $start = clock_gettime( CLOCK_MONOTONIC );
108     my $select = IO::Select->new;
109     my @worker_fhs;
110
111     my @cached_sets;
112     my @servers;
113
114     foreach my $server_id ( @$server_ids ) {
115         if ( $server_id =~ /^\d+$/ ) {
116             # Z39.50 server
117             my $server = $schema->resultset('Z3950server')->find(
118                 { id => $server_id },
119                 { result_class => 'DBIx::Class::ResultClass::HashRefInflator' },
120             );
121             $server->{type} = 'z3950';
122
123             push @servers, $server;
124         } elsif ( $server_id =~ /(\w+)(?::(\w+))?/ ) {
125             # Special server
126             push @servers, {
127                 type => $1,
128                 extra => $2,
129                 id => $server_id,
130                 host => $server_id,
131                 name => $server_id,
132             };
133         }
134     }
135
136     # HashRefInflator is used so that the information will survive into the fork
137     foreach my $server ( @servers ) {
138         if ( $cache ) {
139             my $set = $cache->get_from_cache( 'z3950-resultset-' . $self->resultset . '-' . $server->{id} );
140             if ( ref($set) eq 'HASH' ) {
141                 $set->{server} = $server;
142                 push @cached_sets, $set;
143                 next;
144             }
145         }
146
147         $select->add( $self->_start_worker( $server, $query ) );
148     }
149
150     # Handle these while the servers are searching
151     foreach my $set ( @cached_sets ) {
152         $self->_handle_hits( $stats, $set );
153     }
154
155     while ( $select->count ) {
156         foreach my $readfh ( $select->can_read() ) {
157             my $result = fd_retrieve( $readfh );
158
159             $select->remove( $readfh );
160             close $readfh;
161             wait;
162
163             next if ( ref $result ne 'HASH' );
164
165             if ( $result->{error} ) {
166                 $self->{on_error}->( $result->{server}, $result->{error} );
167                 next;
168             }
169
170             $self->_handle_hits( $stats, $result );
171
172             if ( $cache ) {
173                 $cache->set_in_cache( 'z3950-resultset-' . $self->resultset . '-' . $result->{server}->{id}, {
174                     hits => $result->{hits},
175                     num_fetched => $result->{num_fetched},
176                     num_hits => $result->{num_hits},
177                 }, $resultset_expiry );
178             }
179         }
180     }
181
182     $stats->{time} = clock_gettime( CLOCK_MONOTONIC ) - $start;
183
184     return $stats;
185 }
186
187 sub _start_worker {
188     my ( $self, $server, $query ) = @_;
189     pipe my $readfh, my $writefh;
190
191     # Accessing the cache or Koha database after the fork is risky, so get any resources we need
192     # here.
193     my $pid;
194     my $marcflavour = C4::Context->preference('marcflavour');
195
196     if ( ( $pid = fork ) ) {
197         # Parent process
198         close $writefh;
199
200         return $readfh;
201     } elsif ( !defined $pid ) {
202         # Error
203
204         $self->{on_error}->( $server, 'Failed to fork' );
205         return;
206     }
207
208     close $readfh;
209     my $connection;
210     my ( $num_hits, $num_fetched, $hits, $results );
211
212     eval {
213         if ( $server->{type} eq 'z3950' ) {
214             my $zoptions = ZOOM::Options->new();
215             $zoptions->option( 'elementSetName', 'F' );
216             $zoptions->option( 'databaseName',   $server->{db} );
217             $zoptions->option( 'user', $server->{userid} ) if $server->{userid};
218             $zoptions->option( 'password', $server->{password} ) if $server->{password};
219             $zoptions->option( 'preferredRecordSyntax', $server->{syntax} );
220             $zoptions->option( 'timeout', $server->{timeout} ) if $server->{timeout};
221
222             $connection = ZOOM::Connection->create($zoptions);
223
224             $connection->connect( $server->{host}, $server->{port} );
225             $results = $connection->search_pqf( $query ); # Starts the search
226         } elsif ( $server->{type} eq 'koha' ) {
227             $connection = C4::Context->Zconn( $server->{extra} );
228             $results = $connection->search_pqf( $query ); # Starts the search
229         } elsif ( $server->{type} eq 'batch' )  {
230             $server->{encoding} = 'utf-8';
231         }
232     };
233     if ($@) {
234         store_fd {
235             error => $connection ? $connection->exception() : $@,
236             server => $server,
237         }, $writefh;
238         exit;
239     }
240
241     if ( $server->{type} eq 'batch' ) {
242         # TODO: actually handle PQF
243         $query =~ s/@\w+ (?:\d+=\d+ )?//g;
244         $query =~ s/"//g;
245
246         my $schema = Koha::Database->new->schema;
247         $schema->storage->debug(1);
248         my $match_condition = [ map +{ -like => '%' . $_ . '%' }, split( /\s+/, $query ) ];
249         $hits = [ $schema->resultset('ImportRecord')->search(
250             {
251                 import_batch_id => $server->{extra},
252                 -or => [
253                     { 'import_biblios.title' => $match_condition },
254                     { 'import_biblios.author' => $match_condition },
255                     { 'import_biblios.isbn' => $match_condition },
256                     { 'import_biblios.issn' => $match_condition },
257                 ],
258             },
259             {
260                 join => [ qw( import_biblios ) ],
261                 rows => $self->{fetch},
262             }
263         )->get_column( 'marc' )->all ];
264
265         $num_hits = $num_fetched = scalar @$hits;
266     } else {
267         $num_hits = $results->size;
268         $num_fetched = ( $self->{offset} + $self->{fetch} ) < $num_hits ? $self->{fetch} : $num_hits;
269
270         $hits = [ map { $_->raw() } @{ $results->records( $self->{offset}, $num_fetched, 1 ) } ];
271     }
272
273     if ( !@$hits && $connection && $connection->exception() ) {
274         store_fd {
275             error => $connection->exception(),
276             server => $server,
277         }, $writefh;
278         exit;
279     }
280
281     if ( $server->{type} eq 'koha' ) {
282         $hits = [ map { C4::Search::new_record_from_zebra( $server->{extra}, $_ ) } @$hits ];
283     } else {
284         $hits = [ map { $self->_import_record( $_, $marcflavour, $server->{encoding} ? $server->{encoding} : "iso-5426" ) } @$hits ];
285     }
286
287     store_fd {
288         hits => $hits,
289         num_fetched => $num_fetched,
290         num_hits => $num_hits,
291         server => $server,
292     }, $writefh;
293
294     exit;
295 }
296
297 sub _import_record {
298     my ( $self, $raw, $marcflavour, $encoding ) = @_;
299
300     my ( $marcrecord ) = MarcToUTF8Record( $raw, $marcflavour, $encoding ); #ignores charset return values
301
302     return $marcrecord;
303 }
304
305 sub _handle_hits {
306     my ( $self, $stats, $set ) = @_;
307
308     my $server = $set->{server};
309
310     my $num_hits = $stats->{num_hits}->{ $server->{id} } = $set->{num_hits};
311     my $num_fetched = $stats->{num_fetched}->{ $server->{id} } = $set->{num_fetched};
312
313     $stats->{total_hits} += $num_hits;
314     $stats->{total_fetched} += $num_fetched;
315
316     foreach my $j ( 0..$#{ $set->{hits} } ) {
317         $self->handle_hit( $self->{offset} + $j, $server, $set->{hits}->[$j] );
318     }
319 }
320
321 sub sort {
322     my ( $self, $key, $direction ) = @_;
323
324     my $empty_flip = -1; # Determines the flip of ordering for records with empty sort keys.
325
326     foreach my $hit ( @{ $self->{results} } ) {
327         ( $hit->{sort_key} = $hit->{metadata}->{$key} || '' ) =~ s/\W//g;
328     }
329
330     $self->{results} = [ sort {
331         # Sort empty records at the end
332         return -$empty_flip unless $a->{sort_key};
333         return $empty_flip unless $b->{sort_key};
334
335         $direction * ( $a->{sort_key} cmp $b->{sort_key} );
336     } @{ $self->{results} } ];
337 }
338
339 sub results {
340     my ( $self, $offset, $length ) = @_;
341
342     my @subset;
343
344     foreach my $i ( $offset..( $offset + $length - 1 ) ) {
345         push @subset, $self->{results}->[$i] if $self->{results}->[$i];
346     }
347
348     return @subset;
349 }
350
351 1;