Bug 32482: (follow-up) Add markup comments
[koha.git] / Koha / MetaSearcher.pm
1 package Koha::MetaSearcher;
2
3 # Copyright 2014 ByWater Solutions
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use Modern::Perl;
21
22 use base 'Class::Accessor';
23
24 use C4::Biblio qw(TransformMarcToKoha);
25 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag );
26 use C4::Search qw( new_record_from_zebra );
27 use DBIx::Class::ResultClass::HashRefInflator;
28 use IO::Select;
29 use Koha::Caches;
30 use Koha::Database;
31 use Koha::MetadataRecord;
32 use MARC::File::XML;
33 use Storable qw( fd_retrieve store_fd );
34 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC );
35 use UUID;
36 use ZOOM;
37
38 use sort 'stable';
39
40 __PACKAGE__->mk_accessors( qw( fetch offset on_error resultset ) );
41
42 sub new {
43     my ( $class, $options ) = @_;
44
45     my ( $uuid, $uuidstring );
46     UUID::generate($uuid);
47     UUID::unparse( $uuid, $uuidstring );
48
49     return bless {
50         offset => 0,
51         fetch => 100,
52         on_error => sub {},
53         results => [],
54         resultset => $uuidstring,
55         %{ $options || {} }
56     }, $class;
57 }
58
59 sub handle_hit {
60     my ( $self, $index, $server, $marcrecord ) = @_;
61
62     my @kohafields = ('biblio.title','biblio.subtitle','biblio.seriestitle','biblio.author',
63             'biblioitems.isbn','biblioitems.issn','biblioitems.lccn','biblioitems.editionstatement',
64             'biblio.copyrightdate','biblioitems.publicationyear');
65     my $metadata =  C4::Biblio::TransformMarcToKoha({ kohafields => \@kohafields, record => $marcrecord});
66     $metadata->{edition} = delete $metadata->{editionstatement};
67     $metadata->{date} = delete $metadata->{copyrightdate};
68     $metadata->{date} //= delete $metadata->{publicationyear};
69
70     push @{ $self->{results} }, {
71         server => $server,
72         index => $index,
73         record => $marcrecord,
74         metadata => $metadata,
75     };
76 }
77
78 sub search {
79     my ( $self, $server_ids, $query ) = @_;
80
81     my $resultset_expiry = 300;
82
83     my $cache = Koha::Caches->get_instance();
84     my $schema = Koha::Database->new->schema;
85     my $stats = {
86         num_fetched => {
87             map { $_ => 0 } @$server_ids
88         },
89         num_hits => {
90             map { $_ => 0 } @$server_ids
91         },
92         total_fetched => 0,
93         total_hits => 0,
94     };
95     my $start = clock_gettime( CLOCK_MONOTONIC );
96     my $select = IO::Select->new;
97
98     my @cached_sets;
99     my @servers;
100
101     foreach my $server_id ( @$server_ids ) {
102         if ( $server_id =~ /^\d+$/ ) {
103             # Z39.50 server
104             my $server = $schema->resultset('Z3950server')->find(
105                 { id => $server_id },
106                 { result_class => 'DBIx::Class::ResultClass::HashRefInflator' },
107             );
108             $server->{type} = 'z3950';
109
110             push @servers, $server;
111         } elsif ( $server_id =~ /(\w+)(?::(\w+))?/ ) {
112             # Special server
113             push @servers, {
114                 type => $1,
115                 extra => $2,
116                 id => $server_id,
117                 host => $server_id,
118                 servername => $server_id,
119             };
120         }
121     }
122
123     # HashRefInflator is used so that the information will survive into the fork
124     foreach my $server ( @servers ) {
125         if ( $cache ) {
126             my $set = $cache->get_from_cache( 'z3950-resultset-' . $self->resultset . '-' . $server->{id} );
127             if ( ref($set) eq 'HASH' ) {
128                 $set->{server} = $server;
129                 push @cached_sets, $set;
130                 next;
131             }
132         }
133
134         $select->add( $self->_start_worker( $server, $query ) );
135     }
136
137     # Handle these while the servers are searching
138     foreach my $set ( @cached_sets ) {
139         $self->_handle_hits( $stats, $set );
140     }
141
142     while ( $select->count ) {
143         foreach my $readfh ( $select->can_read() ) {
144             my $result = fd_retrieve( $readfh );
145
146             $select->remove( $readfh );
147             close $readfh;
148             wait;
149
150             next if ( ref $result ne 'HASH' );
151
152             if ( $result->{error} ) {
153                 $self->{on_error}->( $result->{server}, $result->{error} );
154                 next;
155             }
156
157             $self->_handle_hits( $stats, $result );
158
159             if ( $cache ) {
160                 $cache->set_in_cache( 'z3950-resultset-' . $self->resultset . '-' . $result->{server}->{id}, {
161                     hits => $result->{hits},
162                     num_fetched => $result->{num_fetched},
163                     num_hits => $result->{num_hits},
164                 }, { expiry => $resultset_expiry } );
165             }
166         }
167     }
168
169     $stats->{time} = clock_gettime( CLOCK_MONOTONIC ) - $start;
170
171     return $stats;
172 }
173
174 sub _start_worker {
175     my ( $self, $server, $query ) = @_;
176     pipe my $readfh, my $writefh;
177
178     # Accessing the cache or Koha database after the fork is risky, so get any resources we need
179     # here.
180     my $pid;
181     my $marcflavour = C4::Context->preference('marcflavour');
182
183     if ( ( $pid = fork ) ) {
184         # Parent process
185         close $writefh;
186
187         return $readfh;
188     } elsif ( !defined $pid ) {
189         # Error
190
191         $self->{on_error}->( $server, 'Failed to fork' );
192         return;
193     }
194
195     close $readfh;
196     my $connection;
197     my ( $num_hits, $num_fetched, $hits, $results );
198
199     eval {
200         if ( $server->{type} eq 'z3950' ) {
201             my $zoptions = ZOOM::Options->new();
202             $zoptions->option( 'elementSetName', 'F' );
203             $zoptions->option( 'databaseName',   $server->{db} );
204             $zoptions->option( 'user', $server->{userid} ) if $server->{userid};
205             $zoptions->option( 'password', $server->{password} ) if $server->{password};
206             $zoptions->option( 'preferredRecordSyntax', $server->{syntax} );
207             $zoptions->option( 'timeout', $server->{timeout} ) if $server->{timeout};
208
209             $connection = ZOOM::Connection->create($zoptions);
210
211             $connection->connect( $server->{host}, $server->{port} );
212             $results = $connection->search_pqf( $query ); # Starts the search
213         } elsif ( $server->{type} eq 'koha' ) {
214             $connection = C4::Context->Zconn( $server->{extra} );
215             $results = $connection->search_pqf( $query ); # Starts the search
216         } elsif ( $server->{type} eq 'batch' )  {
217             $server->{encoding} = 'utf-8';
218         }
219     };
220     if ($@) {
221         store_fd {
222             error => $connection ? $connection->exception() : $@,
223             server => $server,
224         }, $writefh;
225         exit;
226     }
227
228     if ( $server->{type} eq 'batch' ) {
229         # TODO: actually handle PQF
230         $query =~ s/@\w+ (?:\d+=\d+ )?//g;
231         $query =~ s/"//g;
232
233         my $schema = Koha::Database->new->schema;
234         $schema->storage->debug(1);
235         my $match_condition = [ map +{ -like => '%' . $_ . '%' }, split( /\s+/, $query ) ];
236         $hits = [ $schema->resultset('ImportRecord')->search(
237             {
238                 import_batch_id => $server->{extra},
239                 -or => [
240                     { 'import_biblios.title' => $match_condition },
241                     { 'import_biblios.author' => $match_condition },
242                     { 'import_biblios.isbn' => $match_condition },
243                     { 'import_biblios.issn' => $match_condition },
244                 ],
245             },
246             {
247                 join => [ qw( import_biblios ) ],
248                 rows => $self->{fetch},
249             }
250         )->get_column( 'marc' )->all ];
251
252         $num_hits = $num_fetched = scalar @$hits;
253     } else {
254         $num_hits = $results->size;
255         $num_fetched = ( $self->{offset} + $self->{fetch} ) < $num_hits ? $self->{fetch} : $num_hits;
256
257         $hits = [ map { $_->raw() } @{ $results->records( $self->{offset}, $num_fetched, 1 ) } ];
258     }
259
260     if ( !@$hits && $connection && $connection->exception() ) {
261         store_fd {
262             error => $connection->exception(),
263             server => $server,
264         }, $writefh;
265         exit;
266     }
267
268     if ( $server->{type} eq 'koha' ) {
269         $hits = [ map { C4::Search::new_record_from_zebra( $server->{extra}, $_ ) } @$hits ];
270     } else {
271         $hits = [ map { $self->_import_record( $_, $marcflavour, $server->{encoding} ? $server->{encoding} : "iso-5426" ) } @$hits ];
272     }
273
274     store_fd {
275         hits => $hits,
276         num_fetched => $num_fetched,
277         num_hits => $num_hits,
278         server => $server,
279     }, $writefh;
280
281     exit;
282 }
283
284 sub _import_record {
285     my ( $self, $raw, $marcflavour, $encoding ) = @_;
286
287     my ( $marcrecord ) = MarcToUTF8Record( $raw, $marcflavour, $encoding ); #ignores charset return values
288
289     SetUTF8Flag($marcrecord);
290     return $marcrecord;
291 }
292
293 sub _handle_hits {
294     my ( $self, $stats, $set ) = @_;
295
296     my $server = $set->{server};
297
298     my $num_hits = $stats->{num_hits}->{ $server->{id} } = $set->{num_hits};
299     my $num_fetched = $stats->{num_fetched}->{ $server->{id} } = $set->{num_fetched};
300
301     $stats->{total_hits} += $num_hits;
302     $stats->{total_fetched} += $num_fetched;
303
304     foreach my $j ( 0..$#{ $set->{hits} } ) {
305         $self->handle_hit( $self->{offset} + $j, $server, $set->{hits}->[$j] );
306     }
307 }
308
309 sub sort {
310     my ( $self, $key, $direction ) = @_;
311
312     my $empty_flip = -1; # Determines the flip of ordering for records with empty sort keys.
313
314     foreach my $hit ( @{ $self->{results} } ) {
315         ( $hit->{sort_key} = $hit->{metadata}->{$key} || '' ) =~ s/\W//g;
316     }
317
318     $self->{results} = [ sort {
319         # Sort empty records at the end
320         return -$empty_flip unless $a->{sort_key};
321         return $empty_flip unless $b->{sort_key};
322
323         $direction * ( $a->{sort_key} cmp $b->{sort_key} );
324     } @{ $self->{results} } ];
325 }
326
327 sub results {
328     my ( $self, $offset, $length ) = @_;
329
330     my @subset;
331
332     foreach my $i ( $offset..( $offset + $length - 1 ) ) {
333         push @subset, $self->{results}->[$i] if $self->{results}->[$i];
334     }
335
336     return @subset;
337 }
338
339 1;