Bug 11078: (follow-up) tidy code
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $do_not_clear_zebraqueue;
43 my $length;
44 my $where;
45 my $offset;
46 my $run_as_root;
47 my $run_user = (getpwuid($<))[0];
48 my $wait_for_lock = 0;
49 my $use_flock;
50
51 my $verbose_logging = 0;
52 my $zebraidx_log_opt = " -v none,fatal,warn ";
53 my $result = GetOptions(
54     'daemon'        => \$daemon_mode,
55     'sleep:i'       => \$daemon_sleep,
56     'd:s'           => \$directory,
57     'r|reset'       => \$reset,
58     's'             => \$skip_export,
59     'k'             => \$keep_export,
60     'I|skip-index'  => \$skip_index,
61     'nosanitize'    => \$nosanitize,
62     'b'             => \$biblios,
63     'noxml'         => \$noxml,
64     'w'             => \$noshadow,
65     'a'             => \$authorities,
66     'h|help'        => \$want_help,
67     'x'             => \$as_xml,
68     'y'             => \$do_not_clear_zebraqueue,
69     'z'             => \$process_zebraqueue,
70     'where:s'       => \$where,
71     'length:i'      => \$length,
72     'offset:i'      => \$offset,
73     'v+'            => \$verbose_logging,
74     'run-as-root'   => \$run_as_root,
75     'wait-for-lock' => \$wait_for_lock,
76 );
77
78 if (not $result or $want_help) {
79     print_usage();
80     exit 0;
81 }
82
83 if( not defined $run_as_root and $run_user eq 'root') {
84     my $msg = "Warning: You are running this script as the user 'root'.\n";
85     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
86     $msg   .= "Please do '$0 --help' to see usage.\n";
87     die $msg;
88 }
89
90 if ( !$as_xml and $nosanitize ) {
91     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
92     $msg   .= "Please do '$0 --help' to see usage.\n";
93     die $msg;
94 }
95
96 if ($process_zebraqueue and ($skip_export or $reset)) {
97     my $msg = "Cannot specify -r or -s if -z is specified\n";
98     $msg   .= "Please do '$0 --help' to see usage.\n";
99     die $msg;
100 }
101
102 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
103     my $msg = "Cannot specify both -y and -z\n";
104     $msg   .= "Please do '$0 --help' to see usage.\n";
105     die $msg;
106 }
107
108 if ($reset) {
109     $noshadow = 1;
110 }
111
112 if ($noshadow) {
113     $noshadow = ' -n ';
114 }
115
116 if ($daemon_mode) {
117     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
118     if ($skip_export or $keep_export or $skip_index or
119           $where or $length or $offset) {
120         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
121         $msg   .= "Please do '$0 --help' to see usage.\n";
122         die $msg;
123     }
124     $authorities = 1;
125     $biblios = 1;
126     $process_zebraqueue = 1;
127 }
128
129 if (not $biblios and not $authorities) {
130     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
131     $msg   .= "Please do '$0 --help' to see usage.\n";
132     die $msg;
133 }
134
135
136 #  -v is for verbose, which seems backwards here because of how logging is set
137 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
138 if ($verbose_logging >= 2) {
139     $zebraidx_log_opt = '-v none,fatal,warn,all';
140 }
141
142 my $use_tempdir = 0;
143 unless ($directory) {
144     $use_tempdir = 1;
145     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
146 }
147
148
149 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
150 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
151
152 my $kohadir = C4::Context->config('intranetdir');
153 my $bib_index_mode = C4::Context->config('zebra_bib_index_mode') || 'grs1';
154 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') || 'dom';
155
156 my $dbh = C4::Context->dbh;
157 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
158 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
159
160 # Protect again simultaneous update of the zebra index by using a lock file.
161 # Create our own lock directory if its missing.  This shouild be created
162 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
163 # does not exist and cannot be created, we fall back on /tmp - which will
164 # always work.
165
166 my ($lockfile, $LockFH);
167 foreach (
168     C4::Context->config("zebra_lockdir"),
169     '/var/lock/zebra_' . C4::Context->config('database'),
170     '/tmp/zebra_' . C4::Context->config('database')
171 ) {
172     #we try three possibilities (we really want to lock :)
173     next if !$_;
174     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
175     last if defined $LockFH;
176 }
177 if( !defined $LockFH ) {
178     print "WARNING: Could not create lock file $lockfile: $!\n";
179     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
180     print "Verify file permissions for it too.\n";
181     $use_flock = 0; # we disable file locking now and will continue
182                     # without it
183                     # note that this mimics old behavior (before we used
184                     # the lockfile)
185 };
186
187 if ( $verbose_logging ) {
188     print "Zebra configuration information\n";
189     print "================================\n";
190     print "Zebra biblio directory      = $biblioserverdir\n";
191     print "Zebra authorities directory = $authorityserverdir\n";
192     print "Koha directory              = $kohadir\n";
193     print "Lockfile                    = $lockfile\n" if $lockfile;
194     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
195     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
196     print "================================\n";
197 }
198
199 my $tester = XML::LibXML->new();
200
201 # The main work is done here by calling do_one_pass().  We have added locking
202 # avoid race conditions between full rebuilds and incremental updates either from
203 # daemon mode or periodic invocation from cron.  The race can lead to an updated
204 # record being overwritten by a rebuild if the update is applied after the export
205 # by the rebuild and before the rebuild finishes (more likely to affect large
206 # catalogs).
207 #
208 # We have chosen to exit immediately by default if we cannot obtain the lock
209 # to prevent the potential for a infinite backlog from cron invocations, but an
210 # option (wait-for-lock) is provided to let the program wait for the lock.
211 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
212 if ($daemon_mode) {
213     while (1) {
214         # For incremental updates, skip the update if the updates are locked
215         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
216             do_one_pass() if ( zebraqueue_not_empty() );
217             _flock($LockFH, LOCK_UN);
218         }
219         sleep $daemon_sleep;
220     }
221 } else {
222     # all one-off invocations
223     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
224     if (_flock($LockFH, $lock_mode)) {
225         do_one_pass();
226         _flock($LockFH, LOCK_UN);
227     } else {
228         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
229     }
230 }
231
232
233 if ( $verbose_logging ) {
234     print "====================\n";
235     print "CLEANING\n";
236     print "====================\n";
237 }
238 if ($keep_export) {
239     print "NOTHING cleaned : the export $directory has been kept.\n";
240     print "You can re-run this script with the -s ";
241     if ($use_tempdir) {
242         print " and -d $directory parameters";
243     } else {
244         print "parameter";
245     }
246     print "\n";
247     print "if you just want to rebuild zebra after changing the record.abs\n";
248     print "or another zebra config file\n";
249 } else {
250     unless ($use_tempdir) {
251         # if we're using a temporary directory
252         # created by File::Temp, it will be removed
253         # automatically.
254         rmtree($directory, 0, 1);
255         print "directory $directory deleted\n";
256     }
257 }
258
259 sub do_one_pass {
260     if ($authorities) {
261         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
262     } else {
263         print "skipping authorities\n" if ( $verbose_logging );
264     }
265
266     if ($biblios) {
267         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
268     } else {
269         print "skipping biblios\n" if ( $verbose_logging );
270     }
271 }
272
273 # Check the zebra update queue and return true if there are records to process
274 # This routine will handle each of -ab, -a, or -b, but in practice we force
275 # -ab when in daemon mode.
276 sub zebraqueue_not_empty {
277     my $where_str;
278
279     if ($authorities && $biblios) {
280         $where_str = 'done = 0;';
281     } elsif ($biblios) {
282         $where_str = 'server = "biblioserver" AND done = 0;';
283     } else {
284         $where_str = 'server = "authorityserver" AND done = 0;';
285     }
286     my $query =
287         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
288
289     $query->execute;
290     my $count = $query->fetchrow_arrayref->[0];
291     print "queued records: $count\n" if $verbose_logging > 0;
292     return $count > 0;
293 }
294
295 # This checks to see if the zebra directories exist under the provided path.
296 # If they don't, then zebra is likely to spit the dummy. This returns true
297 # if the directories had to be created, false otherwise.
298 sub check_zebra_dirs {
299     my ($base) = shift() . '/';
300     my $needed_repairing = 0;
301     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
302     foreach my $dir (@dirs) {
303         my $bdir = $base . $dir;
304         if (! -d $bdir) {
305             $needed_repairing = 1;
306             mkdir $bdir || die "Unable to create '$bdir': $!\n";
307             print "$0: needed to create '$bdir'\n";
308         }
309     }
310     return $needed_repairing;
311 }   # ----------  end of subroutine check_zebra_dirs  ----------
312
313 sub index_records {
314     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
315
316     my $num_records_exported = 0;
317     my $records_deleted;
318     my $need_reset = check_zebra_dirs($server_dir);
319     if ($need_reset) {
320         print "$0: found broken zebra server directories: forcing a rebuild\n";
321         $reset = 1;
322     }
323     if ($skip_export && $verbose_logging) {
324         print "====================\n";
325         print "SKIPPING $record_type export\n";
326         print "====================\n";
327     } else {
328         if ( $verbose_logging ) {
329             print "====================\n";
330             print "exporting $record_type\n";
331             print "====================\n";
332         }
333         mkdir "$directory" unless (-d $directory);
334         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
335         if ($process_zebraqueue) {
336             my $entries = select_zebraqueue_records($record_type, 'deleted');
337             mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
338             $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
339             mark_zebraqueue_batch_done($entries);
340             $entries = select_zebraqueue_records($record_type, 'updated');
341             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
342             $num_records_exported = export_marc_records_from_list($record_type,
343                                                                   $entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
344             mark_zebraqueue_batch_done($entries);
345         } else {
346             my $sth = select_all_records($record_type);
347             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
348             unless ($do_not_clear_zebraqueue) {
349                 mark_all_zebraqueue_done($record_type);
350             }
351         }
352     }
353
354     #
355     # and reindexing everything
356     #
357     if ($skip_index) {
358         if ($verbose_logging) {
359             print "====================\n";
360             print "SKIPPING $record_type indexing\n";
361             print "====================\n";
362         }
363     } else {
364         if ( $verbose_logging ) {
365             print "====================\n";
366             print "REINDEXING zebra\n";
367             print "====================\n";
368         }
369         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
370         if ($process_zebraqueue) {
371             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
372                 if %$records_deleted;
373             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
374                 if $num_records_exported;
375         } else {
376             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
377                 if ($num_records_exported or $skip_export);
378         }
379     }
380 }
381
382
383 sub select_zebraqueue_records {
384     my ($record_type, $update_type) = @_;
385
386     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
387     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
388
389     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
390                              FROM zebraqueue
391                              WHERE server = ?
392                              AND   operation = ?
393                              AND   done = 0
394                              ORDER BY id DESC");
395     $sth->execute($server, $op);
396     my $entries = $sth->fetchall_arrayref({});
397 }
398
399 sub mark_all_zebraqueue_done {
400     my ($record_type) = @_;
401
402     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
403
404     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
405                              WHERE server = ?
406                              AND done = 0");
407     $sth->execute($server);
408 }
409
410 sub mark_zebraqueue_batch_done {
411     my ($entries) = @_;
412
413     $dbh->{AutoCommit} = 0;
414     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
415     $dbh->commit();
416     foreach my $id (map { $_->{id} } @$entries) {
417         $sth->execute($id);
418     }
419     $dbh->{AutoCommit} = 1;
420 }
421
422 sub select_all_records {
423     my $record_type = shift;
424     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
425 }
426
427 sub select_all_authorities {
428     my $strsth=qq{SELECT authid FROM auth_header};
429     $strsth.=qq{ WHERE $where } if ($where);
430     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
431     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
432     my $sth = $dbh->prepare($strsth);
433     $sth->execute();
434     return $sth;
435 }
436
437 sub select_all_biblios {
438     my $strsth = qq{ SELECT biblionumber FROM biblioitems };
439     $strsth.=qq{ WHERE $where } if ($where);
440     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
441     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
442     my $sth = $dbh->prepare($strsth);
443     $sth->execute();
444     return $sth;
445 }
446
447 sub include_xml_wrapper {
448     my $as_xml = shift;
449     my $record_type = shift;
450
451     return 0 unless $as_xml;
452     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
453     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
454     return 0;
455
456 }
457
458 sub export_marc_records_from_sth {
459     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
460
461     my $num_exported = 0;
462     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
463     if (include_xml_wrapper($as_xml, $record_type)) {
464         # include XML declaration and root element
465         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
466     }
467     my $i = 0;
468     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
469     while (my ($record_number) = $sth->fetchrow_array) {
470         print "." if ( $verbose_logging );
471         print "\r$i" unless ($i++ %100 or !$verbose_logging);
472         if ( $nosanitize ) {
473             my $marcxml = $record_type eq 'biblio'
474                           ? GetXmlBiblio( $record_number )
475                           : GetAuthorityXML( $record_number );
476             if ($record_type eq 'biblio'){
477                 my @items = GetItemsInfo($record_number);
478                 if (@items){
479                     my $record = MARC::Record->new;
480                     $record->encoding('UTF-8');
481                     my @itemsrecord;
482                     foreach my $item (@items){
483                         my $record = Item2Marc($item, $record_number);
484                         push @itemsrecord, $record->field($itemtag);
485                     }
486                     $record->insert_fields_ordered(@itemsrecord);
487                     my $itemsxml = $record->as_xml_record();
488                     $marcxml =
489                         substr($marcxml, 0, length($marcxml)-10) .
490                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
491                 }
492             }
493             # extra test to ensure that result is valid XML; otherwise
494             # Zebra won't parse it in DOM mode
495             eval {
496                 my $doc = $tester->parse_string($marcxml);
497             };
498             if ($@) {
499                 warn "Error exporting record $record_number ($record_type): $@\n";
500                 next;
501             }
502             if ( $marcxml ) {
503                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
504                 print {$fh} $marcxml;
505                 $num_exported++;
506             }
507             next;
508         }
509         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
510         if (defined $marc) {
511             eval {
512                 my $rec;
513                 if ($as_xml) {
514                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
515                     eval {
516                         my $doc = $tester->parse_string($rec);
517                     };
518                     if ($@) {
519                         die "invalid XML: $@";
520                     }
521                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
522                 } else {
523                     $rec = $marc->as_usmarc();
524                 }
525                 print {$fh} $rec;
526                 $num_exported++;
527             };
528             if ($@) {
529                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
530                 warn "... specific error is $@" if $verbose_logging;
531             }
532         }
533     }
534     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
535     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
536     close $fh;
537     return $num_exported;
538 }
539
540 sub export_marc_records_from_list {
541     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
542
543     my $num_exported = 0;
544     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
545     if (include_xml_wrapper($as_xml, $record_type)) {
546         # include XML declaration and root element
547         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
548     }
549     my $i = 0;
550
551     # Skip any deleted records. We check for this anyway, but this reduces error spam
552     my %found = %$records_deleted;
553     foreach my $record_number ( map { $_->{biblio_auth_number} }
554                                 grep { !$found{ $_->{biblio_auth_number} }++ }
555                                 @$entries ) {
556         print "." if ( $verbose_logging );
557         print "\r$i" unless ($i++ %100 or !$verbose_logging);
558         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
559         if (defined $marc) {
560             eval {
561                 my $rec;
562                 if ($as_xml) {
563                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
564                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
565                 } else {
566                     $rec = $marc->as_usmarc();
567                 }
568                 print {$fh} $rec;
569                 $num_exported++;
570             };
571             if ($@) {
572               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
573             }
574         }
575     }
576     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
577     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
578     close $fh;
579     return $num_exported;
580 }
581
582 sub generate_deleted_marc_records {
583     my ($record_type, $entries, $directory, $as_xml) = @_;
584
585     my $records_deleted = {};
586     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
587     if (include_xml_wrapper($as_xml, $record_type)) {
588         # include XML declaration and root element
589         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
590     }
591     my $i = 0;
592     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
593         print "\r$i" unless ($i++ %100 or !$verbose_logging);
594         print "." if ( $verbose_logging );
595
596         my $marc = MARC::Record->new();
597         if ($record_type eq 'biblio') {
598             fix_biblio_ids($marc, $record_number, $record_number);
599         } else {
600             fix_authority_id($marc, $record_number);
601         }
602         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
603             fix_unimarc_100($marc);
604         }
605
606         my $rec;
607         if ($as_xml) {
608             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
609             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
610         } else {
611             $rec = $marc->as_usmarc();
612         }
613         print {$fh} $rec;
614
615         $records_deleted->{$record_number} = 1;
616     }
617     print "\nRecords exported: $i\n" if ( $verbose_logging );
618     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
619     close $fh;
620     return $records_deleted;
621
622
623 }
624
625 sub get_corrected_marc_record {
626     my ($record_type, $record_number, $noxml) = @_;
627
628     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
629
630     if (defined $marc) {
631         fix_leader($marc);
632         if ($record_type eq 'authority') {
633             fix_authority_id($marc, $record_number);
634         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
635             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
636             $marc = $normalizer->process($marc);
637         }
638         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
639             fix_unimarc_100($marc);
640         }
641     }
642
643     return $marc;
644 }
645
646 sub get_raw_marc_record {
647     my ($record_type, $record_number, $noxml) = @_;
648
649     my $marc;
650     if ($record_type eq 'biblio') {
651         if ($noxml) {
652             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
653             $fetch_sth->execute($record_number);
654             if (my ($blob) = $fetch_sth->fetchrow_array) {
655                 $marc = MARC::Record->new_from_usmarc($blob);
656                 unless ($marc) {
657                     warn "error creating MARC::Record from $blob";
658                 }
659             }
660             # failure to find a bib is not a problem -
661             # a delete could have been done before
662             # trying to process a record update
663
664             $fetch_sth->finish();
665             return unless $marc;
666         } else {
667             eval { $marc = GetMarcBiblio($record_number, 1); };
668             if ($@ || !$marc) {
669                 # here we do warn since catching an exception
670                 # means that the bib was found but failed
671                 # to be parsed
672                 warn "error retrieving biblio $record_number";
673                 return;
674             }
675         }
676     } else {
677         eval { $marc = GetAuthority($record_number); };
678         if ($@) {
679             warn "error retrieving authority $record_number";
680             return;
681         }
682     }
683     return $marc;
684 }
685
686 sub fix_leader {
687     # FIXME - this routine is suspect
688     # It blanks the Leader/00-05 and Leader/12-16 to
689     # force them to be recalculated correct when
690     # the $marc->as_usmarc() or $marc->as_xml() is called.
691     # But why is this necessary?  It would be a serious bug
692     # in MARC::Record (definitely) and MARC::File::XML (arguably)
693     # if they are emitting incorrect leader values.
694     my $marc = shift;
695
696     my $leader = $marc->leader;
697     substr($leader,  0, 5) = '     ';
698     substr($leader, 10, 7) = '22     ';
699     $marc->leader(substr($leader, 0, 24));
700 }
701
702 sub fix_biblio_ids {
703     # FIXME - it is essential to ensure that the biblionumber is present,
704     #         otherwise, Zebra will choke on the record.  However, this
705     #         logic belongs in the relevant C4::Biblio APIs.
706     my $marc = shift;
707     my $biblionumber = shift;
708     my $biblioitemnumber;
709     if (@_) {
710         $biblioitemnumber = shift;
711     } else {
712         my $sth = $dbh->prepare(
713             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
714         $sth->execute($biblionumber);
715         ($biblioitemnumber) = $sth->fetchrow_array;
716         $sth->finish;
717         unless ($biblioitemnumber) {
718             warn "failed to get biblioitemnumber for biblio $biblionumber";
719             return 0;
720         }
721     }
722
723     # FIXME - this is cheating on two levels
724     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
725     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
726     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
727     #
728     # On the other hand, this better for now than what rebuild_zebra.pl used to
729     # do, which was duplicate the code for inserting the biblionumber
730     # and biblioitemnumber
731     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
732
733     return 1;
734 }
735
736 sub fix_authority_id {
737     # FIXME - as with fix_biblio_ids, the authid must be present
738     #         for Zebra's sake.  However, this really belongs
739     #         in C4::AuthoritiesMarc.
740     my ($marc, $authid) = @_;
741     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
742         $marc->delete_field($marc->field('001'));
743         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
744     }
745 }
746
747 sub fix_unimarc_100 {
748     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
749     my $marc = shift;
750
751     my $string;
752     if ( length($marc->subfield( 100, "a" )) == 36 ) {
753         $string = $marc->subfield( 100, "a" );
754         my $f100 = $marc->field(100);
755         $marc->delete_field($f100);
756     }
757     else {
758         $string = POSIX::strftime( "%Y%m%d", localtime );
759         $string =~ s/\-//g;
760         $string = sprintf( "%-*s", 35, $string );
761     }
762     substr( $string, 22, 6, "frey50" );
763     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
764         $marc->delete_field($marc->field(100));
765         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
766     }
767 }
768
769 sub do_indexing {
770     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
771
772     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
773     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
774     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
775     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
776
777     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
778     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
779     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
780
781 }
782
783 sub _flock {
784     # test if flock is present; if so, use it; if not, return true
785     # op refers to the official flock operations including LOCK_EX,
786     # LOCK_UN, etc.
787     # combining LOCK_EX with LOCK_NB returns immediately
788     my ($fh, $op)= @_;
789     if( !defined($use_flock) ) {
790         #check if flock is present; if not, you will have a fatal error
791         my $lock_acquired = eval { flock($fh, $op) };
792         # assuming that $fh and $op are fine(..), an undef $lock_acquired
793         # means no flock
794         $use_flock = defined($lock_acquired) ? 1 : 0;
795         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
796         return 1 if !$use_flock;
797         return $lock_acquired;
798     } else {
799         return 1 if !$use_flock;
800         return flock($fh, $op);
801     }
802 }
803
804 sub _create_lockfile { #returns undef on failure
805     my $dir= shift;
806     unless (-d $dir) {
807         eval { mkpath($dir, 0, oct(755)) };
808         return if $@;
809     }
810     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
811     return ( $fh, $dir.'/'.LOCK_FILENAME );
812 }
813
814 sub print_usage {
815     print <<_USAGE_;
816 $0: reindex MARC bibs and/or authorities in Zebra.
817
818 Use this batch job to reindex all biblio or authority
819 records in your Koha database.
820
821 Parameters:
822
823     -b                      index bibliographic records
824
825     -a                      index authority records
826
827     -daemon                 Run in daemon mode.  The program will loop checking
828                             for entries on the zebraqueue table, processing
829                             them incrementally if present, and then sleep
830                             for a few seconds before repeating the process
831                             Checking the zebraqueue table is done with a cheap
832                             SQL query.  This allows for near realtime update of
833                             the zebra search index with low system overhead.
834                             Use -sleep to control the checking interval.
835
836                             Daemon mode implies -z, -a, -b.  The program will
837                             refuse to start if options are present that do not
838                             make sense while running as an incremental update
839                             daemon (e.g. -r or -offset).
840
841     -sleep 10               Seconds to sleep between checks of the zebraqueue
842                             table in daemon mode.  The default is 5 seconds.
843
844     -z                      select only updated and deleted
845                             records marked in the zebraqueue
846                             table.  Cannot be used with -r
847                             or -s.
848
849     -r                      clear Zebra index before
850                             adding records to index. Implies -w.
851
852     -d                      Temporary directory for indexing.
853                             If not specified, one is automatically
854                             created.  The export directory
855                             is automatically deleted unless
856                             you supply the -k switch.
857
858     -k                      Do not delete export directory.
859
860     -s                      Skip export.  Used if you have
861                             already exported the records
862                             in a previous run.
863
864     -noxml                  index from ISO MARC blob
865                             instead of MARC XML.  This
866                             option is recommended only
867                             for advanced user.
868
869     -x                      export and index as xml instead of is02709 (biblios only).
870                             use this if you might have records > 99,999 chars,
871
872     -nosanitize             export biblio/authority records directly from DB marcxml
873                             field without sanitizing records. It speed up
874                             dump process but could fail if DB contains badly
875                             encoded records. Works only with -x,
876
877     -w                      skip shadow indexing for this batch
878
879     -y                      do NOT clear zebraqueue after indexing; normally,
880                             after doing batch indexing, zebraqueue should be
881                             marked done for the affected record type(s) so that
882                             a running zebraqueue_daemon doesn't try to reindex
883                             the same records - specify -y to override this.
884                             Cannot be used with -z.
885
886     -v                      increase the amount of logging.  Normally only
887                             warnings and errors from the indexing are shown.
888                             Use log level 2 (-v -v) to include all Zebra logs.
889
890     --length   1234         how many biblio you want to export
891     --offset 1243           offset you want to start to
892                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
893                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
894     --where                 let you specify a WHERE query, like itemtype='BOOK'
895                             or something like that
896
897     --run-as-root           explicitily allow script to run as 'root' user
898
899     --wait-for-lock         when not running in daemon mode, the default
900                             behavior is to abort a rebuild if the rebuild
901                             lock is busy.  This option will cause the program
902                             to wait for the lock to free and then continue
903                             processing the rebuild request,
904
905     --help or -h            show this message.
906 _USAGE_
907 }