Bug 11078: Add locking to rebuild_zebra
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 # script that checks zebradir structure & create directories & mandatory files if needed
18 #
19 #
20
21 $|=1; # flushes output
22 # If the cron job starts us in an unreadable dir, we will break without
23 # this.
24 chdir $ENV{HOME} if (!(-r '.'));
25 my $daemon_mode;
26 my $daemon_sleep = 5;
27 my $directory;
28 my $nosanitize;
29 my $skip_export;
30 my $keep_export;
31 my $skip_index;
32 my $reset;
33 my $biblios;
34 my $authorities;
35 my $noxml;
36 my $noshadow;
37 my $want_help;
38 my $as_xml;
39 my $process_zebraqueue;
40 my $do_not_clear_zebraqueue;
41 my $length;
42 my $where;
43 my $offset;
44 my $run_as_root;
45 my $run_user = (getpwuid($<))[0];
46 my $wait_for_lock = 0;
47 my $use_flock;
48
49 my $verbose_logging = 0;
50 my $zebraidx_log_opt = " -v none,fatal,warn ";
51 my $result = GetOptions(
52     'daemon'        => \$daemon_mode,
53     'sleep:i'       => \$daemon_sleep,
54     'd:s'           => \$directory,
55     'r|reset'       => \$reset,
56     's'             => \$skip_export,
57     'k'             => \$keep_export,
58     'I|skip-index'  => \$skip_index,
59     'nosanitize'    => \$nosanitize,
60     'b'             => \$biblios,
61     'noxml'         => \$noxml,
62     'w'             => \$noshadow,
63     'a'             => \$authorities,
64     'h|help'        => \$want_help,
65     'x'             => \$as_xml,
66     'y'             => \$do_not_clear_zebraqueue,
67     'z'             => \$process_zebraqueue,
68     'where:s'       => \$where,
69     'length:i'      => \$length,
70     'offset:i'      => \$offset,
71     'v+'            => \$verbose_logging,
72     'run-as-root'   => \$run_as_root,
73     'wait-for-lock' => \$wait_for_lock,
74 );
75
76 if (not $result or $want_help) {
77     print_usage();
78     exit 0;
79 }
80
81 if( not defined $run_as_root and $run_user eq 'root') {
82     my $msg = "Warning: You are running this script as the user 'root'.\n";
83     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
84     $msg   .= "Please do '$0 --help' to see usage.\n";
85     die $msg;
86 }
87
88 if ( !$as_xml and $nosanitize ) {
89     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
90     $msg   .= "Please do '$0 --help' to see usage.\n";
91     die $msg;
92 }
93
94 if ($process_zebraqueue and ($skip_export or $reset)) {
95     my $msg = "Cannot specify -r or -s if -z is specified\n";
96     $msg   .= "Please do '$0 --help' to see usage.\n";
97     die $msg;
98 }
99
100 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
101     my $msg = "Cannot specify both -y and -z\n";
102     $msg   .= "Please do '$0 --help' to see usage.\n";
103     die $msg;
104 }
105
106 if ($reset) {
107     $noshadow = 1;
108 }
109
110 if ($noshadow) {
111     $noshadow = ' -n ';
112 }
113
114 if ($daemon_mode) {
115     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
116     if ($skip_export or $keep_export or $skip_index or
117           $where or $length or $offset) {
118         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
119         $msg   .= "Please do '$0 --help' to see usage.\n";
120         die $msg;
121     }
122     $authorities = 1;
123     $biblios = 1;
124     $process_zebraqueue = 1;
125 }
126
127 if (not $biblios and not $authorities) {
128     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
129     $msg   .= "Please do '$0 --help' to see usage.\n";
130     die $msg;
131 }
132
133
134 #  -v is for verbose, which seems backwards here because of how logging is set
135 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
136 if ($verbose_logging >= 2) {
137     $zebraidx_log_opt = '-v none,fatal,warn,all';
138 }
139
140 my $use_tempdir = 0;
141 unless ($directory) {
142     $use_tempdir = 1;
143     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
144 }
145
146
147 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
148 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
149
150 my $kohadir = C4::Context->config('intranetdir');
151 my $bib_index_mode = C4::Context->config('zebra_bib_index_mode') || 'grs1';
152 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') || 'dom';
153
154 my $dbh = C4::Context->dbh;
155 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
156 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
157
158 # Protect again simultaneous update of the zebra index by using a lock file.
159 # Create our own lock directory if its missing.  This shouild be created
160 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
161 # does not exist and cannot be created, we fall back on /tmp - which will
162 # always work.
163
164 my $lockdir = C4::Context->config("zebra_lockdir") // "/var/lock";
165 $lockdir .= "/rebuild";
166 unless (-d $lockdir) {
167     eval { mkpath($lockdir, 0, oct(755)) };
168     $lockdir = "/tmp" if ($@);
169 }
170 my $lockfile = $lockdir . "/rebuild..LCK";
171
172 if ( $verbose_logging ) {
173     print "Zebra configuration information\n";
174     print "================================\n";
175     print "Zebra biblio directory      = $biblioserverdir\n";
176     print "Zebra authorities directory = $authorityserverdir\n";
177     print "Koha directory              = $kohadir\n";
178     print "Lockfile                    = $lockfile\n";
179     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
180     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
181     print "================================\n";
182 }
183
184 my $tester = XML::LibXML->new();
185
186 # The main work is done here by calling do_one_pass().  We have added locking
187 # avoid race conditions between Full rebuilds and incremental updates either from
188 # daemon mode or periodic invocation from cron.  The race can lead to an updated
189 # record being overwritten by a rebuild if the update is applied after the export
190 # by the rebuild and before the rebuild finishes (more likely to effect large
191 # catalogs).
192 #
193 # We have chosen to exit immediately by default if we cannot obtain the lock
194 # to prevent the potential for a infinite backlog from cron invocations, but an
195 # option (wait-for-lock) is provided to let the program wait for the lock.
196 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
197 open my $LockFH, q{>}, $lockfile or die "$lockfile: $!";
198 if ($daemon_mode) {
199     while (1) {
200         # For incremental updates, skip the update if the updates are locked
201         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
202             do_one_pass() if ( zebraqueue_not_empty() );
203             _flock($LockFH, LOCK_UN);
204         }
205         sleep $daemon_sleep;
206     }
207 } else {
208     # all one-off invocations
209     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
210     if (_flock($LockFH, $lock_mode)) {
211         do_one_pass();
212         _flock($LockFH, LOCK_UN);
213     } else {
214         # Can't die() here because we have files to dlean up.
215         print "Aborting rebuild.  Unable to flock $lockfile: $!\n";
216     }
217 }
218
219
220 if ( $verbose_logging ) {
221     print "====================\n";
222     print "CLEANING\n";
223     print "====================\n";
224 }
225 if ($keep_export) {
226     print "NOTHING cleaned : the export $directory has been kept.\n";
227     print "You can re-run this script with the -s ";
228     if ($use_tempdir) {
229         print " and -d $directory parameters";
230     } else {
231         print "parameter";
232     }
233     print "\n";
234     print "if you just want to rebuild zebra after changing the record.abs\n";
235     print "or another zebra config file\n";
236 } else {
237     unless ($use_tempdir) {
238         # if we're using a temporary directory
239         # created by File::Temp, it will be removed
240         # automatically.
241         rmtree($directory, 0, 1);
242         print "directory $directory deleted\n";
243     }
244 }
245
246 sub do_one_pass {
247     if ($authorities) {
248         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
249     } else {
250         print "skipping authorities\n" if ( $verbose_logging );
251     }
252
253     if ($biblios) {
254         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
255     } else {
256         print "skipping biblios\n" if ( $verbose_logging );
257     }
258 }
259
260 # Check the zebra update queue and return true if there are records to process
261 # This routine will handle each of -ab, -a, or -b, but in practice we force
262 # -ab when in daemon mode.
263 sub zebraqueue_not_empty {
264     my $where_str;
265
266     if ($authorities && $biblios) {
267         $where_str = 'done = 0;';
268     } elsif ($biblios) {
269         $where_str = 'server = "biblioserver" AND done = 0;';
270     } else {
271         $where_str = 'server = "authorityserver" AND done = 0;';
272     }
273     my $query =
274         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
275
276     $query->execute;
277     my $count = $query->fetchrow_arrayref->[0];
278     print "queued records: $count\n" if $verbose_logging > 0;
279     return $count > 0;
280 }
281
282 # This checks to see if the zebra directories exist under the provided path.
283 # If they don't, then zebra is likely to spit the dummy. This returns true
284 # if the directories had to be created, false otherwise.
285 sub check_zebra_dirs {
286     my ($base) = shift() . '/';
287     my $needed_repairing = 0;
288     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
289     foreach my $dir (@dirs) {
290         my $bdir = $base . $dir;
291         if (! -d $bdir) {
292             $needed_repairing = 1;
293             mkdir $bdir || die "Unable to create '$bdir': $!\n";
294             print "$0: needed to create '$bdir'\n";
295         }
296     }
297     return $needed_repairing;
298 }   # ----------  end of subroutine check_zebra_dirs  ----------
299
300 sub index_records {
301     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
302
303     my $num_records_exported = 0;
304     my $records_deleted;
305     my $need_reset = check_zebra_dirs($server_dir);
306     if ($need_reset) {
307         print "$0: found broken zebra server directories: forcing a rebuild\n";
308         $reset = 1;
309     }
310     if ($skip_export && $verbose_logging) {
311         print "====================\n";
312         print "SKIPPING $record_type export\n";
313         print "====================\n";
314     } else {
315         if ( $verbose_logging ) {
316             print "====================\n";
317             print "exporting $record_type\n";
318             print "====================\n";
319         }
320         mkdir "$directory" unless (-d $directory);
321         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
322         if ($process_zebraqueue) {
323             my $entries = select_zebraqueue_records($record_type, 'deleted');
324             mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
325             $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
326             mark_zebraqueue_batch_done($entries);
327             $entries = select_zebraqueue_records($record_type, 'updated');
328             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
329             $num_records_exported = export_marc_records_from_list($record_type,
330                                                                   $entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
331             mark_zebraqueue_batch_done($entries);
332         } else {
333             my $sth = select_all_records($record_type);
334             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
335             unless ($do_not_clear_zebraqueue) {
336                 mark_all_zebraqueue_done($record_type);
337             }
338         }
339     }
340
341     #
342     # and reindexing everything
343     #
344     if ($skip_index) {
345         if ($verbose_logging) {
346             print "====================\n";
347             print "SKIPPING $record_type indexing\n";
348             print "====================\n";
349         }
350     } else {
351         if ( $verbose_logging ) {
352             print "====================\n";
353             print "REINDEXING zebra\n";
354             print "====================\n";
355         }
356         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
357         if ($process_zebraqueue) {
358             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
359                 if %$records_deleted;
360             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
361                 if $num_records_exported;
362         } else {
363             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
364                 if ($num_records_exported or $skip_export);
365         }
366     }
367 }
368
369
370 sub select_zebraqueue_records {
371     my ($record_type, $update_type) = @_;
372
373     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
374     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
375
376     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
377                              FROM zebraqueue
378                              WHERE server = ?
379                              AND   operation = ?
380                              AND   done = 0
381                              ORDER BY id DESC");
382     $sth->execute($server, $op);
383     my $entries = $sth->fetchall_arrayref({});
384 }
385
386 sub mark_all_zebraqueue_done {
387     my ($record_type) = @_;
388
389     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
390
391     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
392                              WHERE server = ?
393                              AND done = 0");
394     $sth->execute($server);
395 }
396
397 sub mark_zebraqueue_batch_done {
398     my ($entries) = @_;
399
400     $dbh->{AutoCommit} = 0;
401     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
402     $dbh->commit();
403     foreach my $id (map { $_->{id} } @$entries) {
404         $sth->execute($id);
405     }
406     $dbh->{AutoCommit} = 1;
407 }
408
409 sub select_all_records {
410     my $record_type = shift;
411     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
412 }
413
414 sub select_all_authorities {
415     my $strsth=qq{SELECT authid FROM auth_header};
416     $strsth.=qq{ WHERE $where } if ($where);
417     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
418     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
419     my $sth = $dbh->prepare($strsth);
420     $sth->execute();
421     return $sth;
422 }
423
424 sub select_all_biblios {
425     my $strsth = qq{ SELECT biblionumber FROM biblioitems };
426     $strsth.=qq{ WHERE $where } if ($where);
427     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
428     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
429     my $sth = $dbh->prepare($strsth);
430     $sth->execute();
431     return $sth;
432 }
433
434 sub include_xml_wrapper {
435     my $as_xml = shift;
436     my $record_type = shift;
437
438     return 0 unless $as_xml;
439     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
440     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
441     return 0;
442
443 }
444
445 sub export_marc_records_from_sth {
446     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
447
448     my $num_exported = 0;
449     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
450     if (include_xml_wrapper($as_xml, $record_type)) {
451         # include XML declaration and root element
452         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
453     }
454     my $i = 0;
455     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
456     while (my ($record_number) = $sth->fetchrow_array) {
457         print "." if ( $verbose_logging );
458         print "\r$i" unless ($i++ %100 or !$verbose_logging);
459         if ( $nosanitize ) {
460             my $marcxml = $record_type eq 'biblio'
461                           ? GetXmlBiblio( $record_number )
462                           : GetAuthorityXML( $record_number );
463             if ($record_type eq 'biblio'){
464                 my @items = GetItemsInfo($record_number);
465                 if (@items){
466                     my $record = MARC::Record->new;
467                     $record->encoding('UTF-8');
468                     my @itemsrecord;
469                     foreach my $item (@items){
470                         my $record = Item2Marc($item, $record_number);
471                         push @itemsrecord, $record->field($itemtag);
472                     }
473                     $record->insert_fields_ordered(@itemsrecord);
474                     my $itemsxml = $record->as_xml_record();
475                     $marcxml =
476                         substr($marcxml, 0, length($marcxml)-10) .
477                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
478                 }
479             }
480             # extra test to ensure that result is valid XML; otherwise
481             # Zebra won't parse it in DOM mode
482             eval {
483                 my $doc = $tester->parse_string($marcxml);
484             };
485             if ($@) {
486                 warn "Error exporting record $record_number ($record_type): $@\n";
487                 next;
488             }
489             if ( $marcxml ) {
490                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
491                 print {$fh} $marcxml;
492                 $num_exported++;
493             }
494             next;
495         }
496         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
497         if (defined $marc) {
498             eval {
499                 my $rec;
500                 if ($as_xml) {
501                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
502                     eval {
503                         my $doc = $tester->parse_string($rec);
504                     };
505                     if ($@) {
506                         die "invalid XML: $@";
507                     }
508                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
509                 } else {
510                     $rec = $marc->as_usmarc();
511                 }
512                 print {$fh} $rec;
513                 $num_exported++;
514             };
515             if ($@) {
516                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
517                 warn "... specific error is $@" if $verbose_logging;
518             }
519         }
520     }
521     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
522     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
523     close $fh;
524     return $num_exported;
525 }
526
527 sub export_marc_records_from_list {
528     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
529
530     my $num_exported = 0;
531     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
532     if (include_xml_wrapper($as_xml, $record_type)) {
533         # include XML declaration and root element
534         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
535     }
536     my $i = 0;
537
538     # Skip any deleted records. We check for this anyway, but this reduces error spam
539     my %found = %$records_deleted;
540     foreach my $record_number ( map { $_->{biblio_auth_number} }
541                                 grep { !$found{ $_->{biblio_auth_number} }++ }
542                                 @$entries ) {
543         print "." if ( $verbose_logging );
544         print "\r$i" unless ($i++ %100 or !$verbose_logging);
545         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
546         if (defined $marc) {
547             eval {
548                 my $rec;
549                 if ($as_xml) {
550                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
551                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
552                 } else {
553                     $rec = $marc->as_usmarc();
554                 }
555                 print {$fh} $rec;
556                 $num_exported++;
557             };
558             if ($@) {
559               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
560             }
561         }
562     }
563     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
564     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
565     close $fh;
566     return $num_exported;
567 }
568
569 sub generate_deleted_marc_records {
570     my ($record_type, $entries, $directory, $as_xml) = @_;
571
572     my $records_deleted = {};
573     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
574     if (include_xml_wrapper($as_xml, $record_type)) {
575         # include XML declaration and root element
576         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
577     }
578     my $i = 0;
579     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
580         print "\r$i" unless ($i++ %100 or !$verbose_logging);
581         print "." if ( $verbose_logging );
582
583         my $marc = MARC::Record->new();
584         if ($record_type eq 'biblio') {
585             fix_biblio_ids($marc, $record_number, $record_number);
586         } else {
587             fix_authority_id($marc, $record_number);
588         }
589         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
590             fix_unimarc_100($marc);
591         }
592
593         my $rec;
594         if ($as_xml) {
595             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
596             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
597         } else {
598             $rec = $marc->as_usmarc();
599         }
600         print {$fh} $rec;
601
602         $records_deleted->{$record_number} = 1;
603     }
604     print "\nRecords exported: $i\n" if ( $verbose_logging );
605     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
606     close $fh;
607     return $records_deleted;
608
609
610 }
611
612 sub get_corrected_marc_record {
613     my ($record_type, $record_number, $noxml) = @_;
614
615     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
616
617     if (defined $marc) {
618         fix_leader($marc);
619         if ($record_type eq 'authority') {
620             fix_authority_id($marc, $record_number);
621         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
622             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
623             $marc = $normalizer->process($marc);
624         }
625         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
626             fix_unimarc_100($marc);
627         }
628     }
629
630     return $marc;
631 }
632
633 sub get_raw_marc_record {
634     my ($record_type, $record_number, $noxml) = @_;
635
636     my $marc;
637     if ($record_type eq 'biblio') {
638         if ($noxml) {
639             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
640             $fetch_sth->execute($record_number);
641             if (my ($blob) = $fetch_sth->fetchrow_array) {
642                 $marc = MARC::Record->new_from_usmarc($blob);
643                 unless ($marc) {
644                     warn "error creating MARC::Record from $blob";
645                 }
646             }
647             # failure to find a bib is not a problem -
648             # a delete could have been done before
649             # trying to process a record update
650
651             $fetch_sth->finish();
652             return unless $marc;
653         } else {
654             eval { $marc = GetMarcBiblio($record_number, 1); };
655             if ($@ || !$marc) {
656                 # here we do warn since catching an exception
657                 # means that the bib was found but failed
658                 # to be parsed
659                 warn "error retrieving biblio $record_number";
660                 return;
661             }
662         }
663     } else {
664         eval { $marc = GetAuthority($record_number); };
665         if ($@) {
666             warn "error retrieving authority $record_number";
667             return;
668         }
669     }
670     return $marc;
671 }
672
673 sub fix_leader {
674     # FIXME - this routine is suspect
675     # It blanks the Leader/00-05 and Leader/12-16 to
676     # force them to be recalculated correct when
677     # the $marc->as_usmarc() or $marc->as_xml() is called.
678     # But why is this necessary?  It would be a serious bug
679     # in MARC::Record (definitely) and MARC::File::XML (arguably)
680     # if they are emitting incorrect leader values.
681     my $marc = shift;
682
683     my $leader = $marc->leader;
684     substr($leader,  0, 5) = '     ';
685     substr($leader, 10, 7) = '22     ';
686     $marc->leader(substr($leader, 0, 24));
687 }
688
689 sub fix_biblio_ids {
690     # FIXME - it is essential to ensure that the biblionumber is present,
691     #         otherwise, Zebra will choke on the record.  However, this
692     #         logic belongs in the relevant C4::Biblio APIs.
693     my $marc = shift;
694     my $biblionumber = shift;
695     my $biblioitemnumber;
696     if (@_) {
697         $biblioitemnumber = shift;
698     } else {
699         my $sth = $dbh->prepare(
700             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
701         $sth->execute($biblionumber);
702         ($biblioitemnumber) = $sth->fetchrow_array;
703         $sth->finish;
704         unless ($biblioitemnumber) {
705             warn "failed to get biblioitemnumber for biblio $biblionumber";
706             return 0;
707         }
708     }
709
710     # FIXME - this is cheating on two levels
711     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
712     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
713     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
714     #
715     # On the other hand, this better for now than what rebuild_zebra.pl used to
716     # do, which was duplicate the code for inserting the biblionumber
717     # and biblioitemnumber
718     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
719
720     return 1;
721 }
722
723 sub fix_authority_id {
724     # FIXME - as with fix_biblio_ids, the authid must be present
725     #         for Zebra's sake.  However, this really belongs
726     #         in C4::AuthoritiesMarc.
727     my ($marc, $authid) = @_;
728     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
729         $marc->delete_field($marc->field('001'));
730         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
731     }
732 }
733
734 sub fix_unimarc_100 {
735     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
736     my $marc = shift;
737
738     my $string;
739     if ( length($marc->subfield( 100, "a" )) == 36 ) {
740         $string = $marc->subfield( 100, "a" );
741         my $f100 = $marc->field(100);
742         $marc->delete_field($f100);
743     }
744     else {
745         $string = POSIX::strftime( "%Y%m%d", localtime );
746         $string =~ s/\-//g;
747         $string = sprintf( "%-*s", 35, $string );
748     }
749     substr( $string, 22, 6, "frey50" );
750     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
751         $marc->delete_field($marc->field(100));
752         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
753     }
754 }
755
756 sub do_indexing {
757     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
758
759     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
760     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
761     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
762     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
763
764     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
765     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
766     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
767
768 }
769
770 sub _flock {
771 # test if flock is present; if so, use it; if not, return true
772 # op refers to the official flock operations incl LOCK_EX, LOCK_UN, etc.
773 # combining LOCK_EX with LOCK_NB returns immediately
774     my ($fh, $op)= @_;
775     if( !defined($use_flock) ) {
776         #check if flock is present; if not, you will have a fatal error
777         my $i=eval { flock($fh, $op) };
778         #assuming that $fh and $op are fine(..), an undef i means no flock
779         $use_flock= defined($i)? 1: 0;
780         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
781         return 1 if !$use_flock;
782         return $i;
783     }
784     else {
785         return 1 if !$use_flock;
786         return flock($fh, $op);
787     }
788 }
789
790 sub print_usage {
791     print <<_USAGE_;
792 $0: reindex MARC bibs and/or authorities in Zebra.
793
794 Use this batch job to reindex all biblio or authority
795 records in your Koha database.
796
797 Parameters:
798
799     -b                      index bibliographic records
800
801     -a                      index authority records
802
803     -daemon                 Run in daemon mode.  The program will loop checking
804                             for entries on the zebraqueue table, processing
805                             them incrementally if present, and then sleep
806                             for a few seconds before repeating the process
807                             Checking the zebraqueue table is done with a cheap
808                             SQL query.  This allows for near realtime update of
809                             the zebra search index with low system overhead.
810                             Use -sleep to control the checking interval.
811
812                             Daemon mode implies -z, -a, -b.  The program will
813                             refuse to start if options are present that do not
814                             make sense while running as an incremental update
815                             daemon (e.g. -r or -offset).
816
817     -sleep 10               Seconds to sleep between checks of the zebraqueue
818                             table in daemon mode.  The default is 5 seconds.
819
820     -z                      select only updated and deleted
821                             records marked in the zebraqueue
822                             table.  Cannot be used with -r
823                             or -s.
824
825     -r                      clear Zebra index before
826                             adding records to index. Implies -w.
827
828     -d                      Temporary directory for indexing.
829                             If not specified, one is automatically
830                             created.  The export directory
831                             is automatically deleted unless
832                             you supply the -k switch.
833
834     -k                      Do not delete export directory.
835
836     -s                      Skip export.  Used if you have
837                             already exported the records
838                             in a previous run.
839
840     -noxml                  index from ISO MARC blob
841                             instead of MARC XML.  This
842                             option is recommended only
843                             for advanced user.
844
845     -x                      export and index as xml instead of is02709 (biblios only).
846                             use this if you might have records > 99,999 chars,
847
848     -nosanitize             export biblio/authority records directly from DB marcxml
849                             field without sanitizing records. It speed up
850                             dump process but could fail if DB contains badly
851                             encoded records. Works only with -x,
852
853     -w                      skip shadow indexing for this batch
854
855     -y                      do NOT clear zebraqueue after indexing; normally,
856                             after doing batch indexing, zebraqueue should be
857                             marked done for the affected record type(s) so that
858                             a running zebraqueue_daemon doesn't try to reindex
859                             the same records - specify -y to override this.
860                             Cannot be used with -z.
861
862     -v                      increase the amount of logging.  Normally only
863                             warnings and errors from the indexing are shown.
864                             Use log level 2 (-v -v) to include all Zebra logs.
865
866     --length   1234         how many biblio you want to export
867     --offset 1243           offset you want to start to
868                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
869                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
870     --where                 let you specify a WHERE query, like itemtype='BOOK'
871                             or something like that
872
873     --run-as-root           explicitily allow script to run as 'root' user
874
875     --wait-for-lock         when not running in daemon mode, the default
876                             behavior is to abort a rebuild if the rebuild
877                             lock is busy.  This option will cause the program
878                             to wait for the lock to free and then continue
879                             processing the rebuild request,
880
881     --help or -h            show this message.
882 _USAGE_
883 }