Bug 16505: <collection> is missing the marc namespace and updates fail if -x is passed
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $process_zebraqueue_skip_deletes;
43 my $do_not_clear_zebraqueue;
44 my $length;
45 my $where;
46 my $offset;
47 my $run_as_root;
48 my $run_user = (getpwuid($<))[0];
49 my $wait_for_lock = 0;
50 my $use_flock;
51 my $table = 'biblioitems';
52
53 my $verbose_logging = 0;
54 my $zebraidx_log_opt = " -v none,fatal,warn ";
55 my $result = GetOptions(
56     'daemon'        => \$daemon_mode,
57     'sleep:i'       => \$daemon_sleep,
58     'd:s'           => \$directory,
59     'r|reset'       => \$reset,
60     's'             => \$skip_export,
61     'k'             => \$keep_export,
62     'I|skip-index'  => \$skip_index,
63     'nosanitize'    => \$nosanitize,
64     'b'             => \$biblios,
65     'noxml'         => \$noxml,
66     'w'             => \$noshadow,
67     'a'             => \$authorities,
68     'h|help'        => \$want_help,
69     'x'             => \$as_xml,
70     'y'             => \$do_not_clear_zebraqueue,
71     'z'             => \$process_zebraqueue,
72     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
73     'where:s'       => \$where,
74     'length:i'      => \$length,
75     'offset:i'      => \$offset,
76     'v+'            => \$verbose_logging,
77     'run-as-root'   => \$run_as_root,
78     'wait-for-lock' => \$wait_for_lock,
79     't|table:s'     => \$table,
80 );
81
82 if (not $result or $want_help) {
83     print_usage();
84     exit 0;
85 }
86
87 if( not defined $run_as_root and $run_user eq 'root') {
88     my $msg = "Warning: You are running this script as the user 'root'.\n";
89     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
90     $msg   .= "Please do '$0 --help' to see usage.\n";
91     die $msg;
92 }
93
94 if ( !$as_xml and $nosanitize ) {
95     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
96     $msg   .= "Please do '$0 --help' to see usage.\n";
97     die $msg;
98 }
99
100 if ($process_zebraqueue and ($skip_export or $reset)) {
101     my $msg = "Cannot specify -r or -s if -z is specified\n";
102     $msg   .= "Please do '$0 --help' to see usage.\n";
103     die $msg;
104 }
105
106 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
107     my $msg = "Cannot specify both -y and -z\n";
108     $msg   .= "Please do '$0 --help' to see usage.\n";
109     die $msg;
110 }
111
112 if ($reset) {
113     $noshadow = 1;
114 }
115
116 if ($noshadow) {
117     $noshadow = ' -n ';
118 }
119
120 if ($daemon_mode) {
121     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
122     if ($skip_export or $keep_export or $skip_index or
123           $where or $length or $offset) {
124         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
125         $msg   .= "Please do '$0 --help' to see usage.\n";
126         die $msg;
127     }
128     $authorities = 1;
129     $biblios = 1;
130     $process_zebraqueue = 1;
131 }
132
133 if (not $biblios and not $authorities) {
134     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
135     $msg   .= "Please do '$0 --help' to see usage.\n";
136     die $msg;
137 }
138
139 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
140 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
141     die "Cannot specify -t|--table with value '$table'. Only "
142       . ( join ', ', @tables_allowed_for_select )
143       . " are allowed.";
144 }
145
146
147 #  -v is for verbose, which seems backwards here because of how logging is set
148 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
149 if ($verbose_logging >= 2) {
150     $zebraidx_log_opt = '-v none,fatal,warn,all';
151 }
152
153 my $use_tempdir = 0;
154 unless ($directory) {
155     $use_tempdir = 1;
156     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
157 }
158
159
160 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
161 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
162
163 my $kohadir = C4::Context->config('intranetdir');
164 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
165 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
166
167 my $dbh = C4::Context->dbh;
168 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
169 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
170
171 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
172 <collection xmlns="http://www.loc.gov/MARC21/slim">
173 };
174
175 my $marcxml_close = q{
176 </collection>
177 };
178
179 # Protect again simultaneous update of the zebra index by using a lock file.
180 # Create our own lock directory if its missing.  This shouild be created
181 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
182 # does not exist and cannot be created, we fall back on /tmp - which will
183 # always work.
184
185 my ($lockfile, $LockFH);
186 foreach (
187     C4::Context->config("zebra_lockdir"),
188     '/var/lock/zebra_' . C4::Context->config('database'),
189     '/tmp/zebra_' . C4::Context->config('database')
190 ) {
191     #we try three possibilities (we really want to lock :)
192     next if !$_;
193     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
194     last if defined $LockFH;
195 }
196 if( !defined $LockFH ) {
197     print "WARNING: Could not create lock file $lockfile: $!\n";
198     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
199     print "Verify file permissions for it too.\n";
200     $use_flock = 0; # we disable file locking now and will continue
201                     # without it
202                     # note that this mimics old behavior (before we used
203                     # the lockfile)
204 };
205
206 if ( $verbose_logging ) {
207     print "Zebra configuration information\n";
208     print "================================\n";
209     print "Zebra biblio directory      = $biblioserverdir\n";
210     print "Zebra authorities directory = $authorityserverdir\n";
211     print "Koha directory              = $kohadir\n";
212     print "Lockfile                    = $lockfile\n" if $lockfile;
213     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
214     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
215     print "================================\n";
216 }
217
218 my $tester = XML::LibXML->new();
219
220 # The main work is done here by calling do_one_pass().  We have added locking
221 # avoid race conditions between full rebuilds and incremental updates either from
222 # daemon mode or periodic invocation from cron.  The race can lead to an updated
223 # record being overwritten by a rebuild if the update is applied after the export
224 # by the rebuild and before the rebuild finishes (more likely to affect large
225 # catalogs).
226 #
227 # We have chosen to exit immediately by default if we cannot obtain the lock
228 # to prevent the potential for a infinite backlog from cron invocations, but an
229 # option (wait-for-lock) is provided to let the program wait for the lock.
230 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
231 if ($daemon_mode) {
232     while (1) {
233         # For incremental updates, skip the update if the updates are locked
234         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
235             do_one_pass() if ( zebraqueue_not_empty() );
236             _flock($LockFH, LOCK_UN);
237         }
238         sleep $daemon_sleep;
239     }
240 } else {
241     # all one-off invocations
242     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
243     if (_flock($LockFH, $lock_mode)) {
244         do_one_pass();
245         _flock($LockFH, LOCK_UN);
246     } else {
247         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
248     }
249 }
250
251
252 if ( $verbose_logging ) {
253     print "====================\n";
254     print "CLEANING\n";
255     print "====================\n";
256 }
257 if ($keep_export) {
258     print "NOTHING cleaned : the export $directory has been kept.\n";
259     print "You can re-run this script with the -s ";
260     if ($use_tempdir) {
261         print " and -d $directory parameters";
262     } else {
263         print "parameter";
264     }
265     print "\n";
266     print "if you just want to rebuild zebra after changing the record.abs\n";
267     print "or another zebra config file\n";
268 } else {
269     unless ($use_tempdir) {
270         # if we're using a temporary directory
271         # created by File::Temp, it will be removed
272         # automatically.
273         rmtree($directory, 0, 1);
274         print "directory $directory deleted\n";
275     }
276 }
277
278 sub do_one_pass {
279     if ($authorities) {
280         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
281     } else {
282         print "skipping authorities\n" if ( $verbose_logging );
283     }
284
285     if ($biblios) {
286         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
287     } else {
288         print "skipping biblios\n" if ( $verbose_logging );
289     }
290 }
291
292 # Check the zebra update queue and return true if there are records to process
293 # This routine will handle each of -ab, -a, or -b, but in practice we force
294 # -ab when in daemon mode.
295 sub zebraqueue_not_empty {
296     my $where_str;
297
298     if ($authorities && $biblios) {
299         $where_str = 'done = 0;';
300     } elsif ($biblios) {
301         $where_str = 'server = "biblioserver" AND done = 0;';
302     } else {
303         $where_str = 'server = "authorityserver" AND done = 0;';
304     }
305     my $query =
306         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
307
308     $query->execute;
309     my $count = $query->fetchrow_arrayref->[0];
310     print "queued records: $count\n" if $verbose_logging > 0;
311     return $count > 0;
312 }
313
314 # This checks to see if the zebra directories exist under the provided path.
315 # If they don't, then zebra is likely to spit the dummy. This returns true
316 # if the directories had to be created, false otherwise.
317 sub check_zebra_dirs {
318     my ($base) = shift() . '/';
319     my $needed_repairing = 0;
320     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
321     foreach my $dir (@dirs) {
322         my $bdir = $base . $dir;
323         if (! -d $bdir) {
324             $needed_repairing = 1;
325             mkdir $bdir || die "Unable to create '$bdir': $!\n";
326             print "$0: needed to create '$bdir'\n";
327         }
328     }
329     return $needed_repairing;
330 }   # ----------  end of subroutine check_zebra_dirs  ----------
331
332 sub index_records {
333     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
334
335     my $num_records_exported = 0;
336     my $records_deleted = {};
337     my $need_reset = check_zebra_dirs($server_dir);
338     if ($need_reset) {
339         print "$0: found broken zebra server directories: forcing a rebuild\n";
340         $reset = 1;
341     }
342     if ($skip_export && $verbose_logging) {
343         print "====================\n";
344         print "SKIPPING $record_type export\n";
345         print "====================\n";
346     } else {
347         if ( $verbose_logging ) {
348             print "====================\n";
349             print "exporting $record_type\n";
350             print "====================\n";
351         }
352         mkdir "$directory" unless (-d $directory);
353         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
354         if ($process_zebraqueue) {
355             my $entries;
356
357             unless ( $process_zebraqueue_skip_deletes ) {
358                 $entries = select_zebraqueue_records($record_type, 'deleted');
359                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
360                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
361                 mark_zebraqueue_batch_done($entries);
362             }
363
364             $entries = select_zebraqueue_records($record_type, 'updated');
365             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
366             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
367             mark_zebraqueue_batch_done($entries);
368
369         } else {
370             my $sth = select_all_records($record_type);
371             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
372             unless ($do_not_clear_zebraqueue) {
373                 mark_all_zebraqueue_done($record_type);
374             }
375         }
376     }
377
378     #
379     # and reindexing everything
380     #
381     if ($skip_index) {
382         if ($verbose_logging) {
383             print "====================\n";
384             print "SKIPPING $record_type indexing\n";
385             print "====================\n";
386         }
387     } else {
388         if ( $verbose_logging ) {
389             print "====================\n";
390             print "REINDEXING zebra\n";
391             print "====================\n";
392         }
393         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
394         if ($process_zebraqueue) {
395             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
396                 if %$records_deleted;
397             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
398                 if $num_records_exported;
399         } else {
400             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
401                 if ($num_records_exported or $skip_export);
402         }
403     }
404 }
405
406
407 sub select_zebraqueue_records {
408     my ($record_type, $update_type) = @_;
409
410     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
411     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
412
413     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
414                              FROM zebraqueue
415                              WHERE server = ?
416                              AND   operation = ?
417                              AND   done = 0
418                              ORDER BY id DESC");
419     $sth->execute($server, $op);
420     my $entries = $sth->fetchall_arrayref({});
421 }
422
423 sub mark_all_zebraqueue_done {
424     my ($record_type) = @_;
425
426     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
427
428     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
429                              WHERE server = ?
430                              AND done = 0");
431     $sth->execute($server);
432 }
433
434 sub mark_zebraqueue_batch_done {
435     my ($entries) = @_;
436
437     $dbh->{AutoCommit} = 0;
438     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
439     $dbh->commit();
440     foreach my $id (map { $_->{id} } @$entries) {
441         $sth->execute($id);
442     }
443     $dbh->{AutoCommit} = 1;
444 }
445
446 sub select_all_records {
447     my $record_type = shift;
448     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
449 }
450
451 sub select_all_authorities {
452     my $strsth=qq{SELECT authid FROM auth_header};
453     $strsth.=qq{ WHERE $where } if ($where);
454     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
455     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
456     my $sth = $dbh->prepare($strsth);
457     $sth->execute();
458     return $sth;
459 }
460
461 sub select_all_biblios {
462     $table = 'biblioitems'
463       unless grep { /^$table$/ } @tables_allowed_for_select;
464     my $strsth = qq{ SELECT biblionumber FROM $table };
465     $strsth.=qq{ WHERE $where } if ($where);
466     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
467     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
468     my $sth = $dbh->prepare($strsth);
469     $sth->execute();
470     return $sth;
471 }
472
473 sub include_xml_wrapper {
474     my $as_xml = shift;
475     my $record_type = shift;
476
477     return 0 unless $as_xml;
478     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
479     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
480     return 0;
481
482 }
483
484 sub export_marc_records_from_sth {
485     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
486
487     my $num_exported = 0;
488     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
489
490     print {$fh} $marcxml_open
491         if include_xml_wrapper($as_xml, $record_type);
492
493     my $i = 0;
494     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
495     while (my ($record_number) = $sth->fetchrow_array) {
496         print "." if ( $verbose_logging );
497         print "\r$i" unless ($i++ %100 or !$verbose_logging);
498         if ( $nosanitize ) {
499             my $marcxml = $record_type eq 'biblio'
500                           ? GetXmlBiblio( $record_number )
501                           : GetAuthorityXML( $record_number );
502             if ($record_type eq 'biblio'){
503                 my @items = GetItemsInfo($record_number);
504                 if (@items){
505                     my $record = MARC::Record->new;
506                     $record->encoding('UTF-8');
507                     my @itemsrecord;
508                     foreach my $item (@items){
509                         my $record = Item2Marc($item, $record_number);
510                         push @itemsrecord, $record->field($itemtag);
511                     }
512                     $record->insert_fields_ordered(@itemsrecord);
513                     my $itemsxml = $record->as_xml_record();
514                     $marcxml =
515                         substr($marcxml, 0, length($marcxml)-10) .
516                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
517                 }
518             }
519             # extra test to ensure that result is valid XML; otherwise
520             # Zebra won't parse it in DOM mode
521             eval {
522                 my $doc = $tester->parse_string($marcxml);
523             };
524             if ($@) {
525                 warn "Error exporting record $record_number ($record_type): $@\n";
526                 next;
527             }
528             if ( $marcxml ) {
529                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
530                 print {$fh} $marcxml;
531                 $num_exported++;
532             }
533             next;
534         }
535         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
536         if (defined $marc) {
537             eval {
538                 my $rec;
539                 if ($as_xml) {
540                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
541                     eval {
542                         my $doc = $tester->parse_string($rec);
543                     };
544                     if ($@) {
545                         die "invalid XML: $@";
546                     }
547                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
548                 } else {
549                     $rec = $marc->as_usmarc();
550                 }
551                 print {$fh} $rec;
552                 $num_exported++;
553             };
554             if ($@) {
555                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
556                 warn "... specific error is $@" if $verbose_logging;
557             }
558         }
559     }
560     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
561     print {$fh} $marcxml_close
562         if include_xml_wrapper($as_xml, $record_type);
563     close $fh;
564     return $num_exported;
565 }
566
567 sub export_marc_records_from_list {
568     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
569
570     my $num_exported = 0;
571     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
572
573     print {$fh} $marcxml_open
574         if include_xml_wrapper($as_xml, $record_type);
575
576     my $i = 0;
577
578     # Skip any deleted records. We check for this anyway, but this reduces error spam
579     my %found = %$records_deleted;
580     foreach my $record_number ( map { $_->{biblio_auth_number} }
581                                 grep { !$found{ $_->{biblio_auth_number} }++ }
582                                 @$entries ) {
583         print "." if ( $verbose_logging );
584         print "\r$i" unless ($i++ %100 or !$verbose_logging);
585         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
586         if (defined $marc) {
587             eval {
588                 my $rec;
589                 if ($as_xml) {
590                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
591                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
592                 } else {
593                     $rec = $marc->as_usmarc();
594                 }
595                 print {$fh} $rec;
596                 $num_exported++;
597             };
598             if ($@) {
599               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
600             }
601         }
602     }
603     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
604
605     print {$fh} $marcxml_close
606         if include_xml_wrapper($as_xml, $record_type);
607
608     close $fh;
609     return $num_exported;
610 }
611
612 sub generate_deleted_marc_records {
613     my ($record_type, $entries, $directory, $as_xml) = @_;
614
615     my $records_deleted = {};
616     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
617
618     print {$fh} $marcxml_open
619         if include_xml_wrapper($as_xml, $record_type);
620
621     my $i = 0;
622     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
623         print "\r$i" unless ($i++ %100 or !$verbose_logging);
624         print "." if ( $verbose_logging );
625
626         my $marc = MARC::Record->new();
627         if ($record_type eq 'biblio') {
628             fix_biblio_ids($marc, $record_number, $record_number);
629         } else {
630             fix_authority_id($marc, $record_number);
631         }
632         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
633             fix_unimarc_100($marc);
634         }
635
636         my $rec;
637         if ($as_xml) {
638             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
639             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
640         } else {
641             $rec = $marc->as_usmarc();
642         }
643         print {$fh} $rec;
644
645         $records_deleted->{$record_number} = 1;
646     }
647     print "\nRecords exported: $i\n" if ( $verbose_logging );
648
649     print {$fh} $marcxml_close
650         if include_xml_wrapper($as_xml, $record_type);
651
652     close $fh;
653     return $records_deleted;
654
655
656 }
657
658 sub get_corrected_marc_record {
659     my ($record_type, $record_number, $noxml) = @_;
660
661     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
662
663     if (defined $marc) {
664         fix_leader($marc);
665         if ($record_type eq 'authority') {
666             fix_authority_id($marc, $record_number);
667         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
668             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
669             $marc = $normalizer->process($marc);
670         }
671         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
672             fix_unimarc_100($marc);
673         }
674     }
675
676     return $marc;
677 }
678
679 sub get_raw_marc_record {
680     my ($record_type, $record_number, $noxml) = @_;
681
682     my $marc;
683     if ($record_type eq 'biblio') {
684         if ($noxml) {
685             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
686             $fetch_sth->execute($record_number);
687             if (my ($blob) = $fetch_sth->fetchrow_array) {
688                 $marc = MARC::Record->new_from_usmarc($blob);
689                 unless ($marc) {
690                     warn "error creating MARC::Record from $blob";
691                 }
692             }
693             # failure to find a bib is not a problem -
694             # a delete could have been done before
695             # trying to process a record update
696
697             $fetch_sth->finish();
698             return unless $marc;
699         } else {
700             eval { $marc = GetMarcBiblio($record_number, 1); };
701             if ($@ || !$marc) {
702                 # here we do warn since catching an exception
703                 # means that the bib was found but failed
704                 # to be parsed
705                 warn "error retrieving biblio $record_number";
706                 return;
707             }
708         }
709     } else {
710         eval { $marc = GetAuthority($record_number); };
711         if ($@) {
712             warn "error retrieving authority $record_number";
713             return;
714         }
715     }
716     return $marc;
717 }
718
719 sub fix_leader {
720     # FIXME - this routine is suspect
721     # It blanks the Leader/00-05 and Leader/12-16 to
722     # force them to be recalculated correct when
723     # the $marc->as_usmarc() or $marc->as_xml() is called.
724     # But why is this necessary?  It would be a serious bug
725     # in MARC::Record (definitely) and MARC::File::XML (arguably)
726     # if they are emitting incorrect leader values.
727     my $marc = shift;
728
729     my $leader = $marc->leader;
730     substr($leader,  0, 5) = '     ';
731     substr($leader, 10, 7) = '22     ';
732     $marc->leader(substr($leader, 0, 24));
733 }
734
735 sub fix_biblio_ids {
736     # FIXME - it is essential to ensure that the biblionumber is present,
737     #         otherwise, Zebra will choke on the record.  However, this
738     #         logic belongs in the relevant C4::Biblio APIs.
739     my $marc = shift;
740     my $biblionumber = shift;
741     my $biblioitemnumber;
742     if (@_) {
743         $biblioitemnumber = shift;
744     } else {
745         my $sth = $dbh->prepare(
746             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
747         $sth->execute($biblionumber);
748         ($biblioitemnumber) = $sth->fetchrow_array;
749         $sth->finish;
750         unless ($biblioitemnumber) {
751             warn "failed to get biblioitemnumber for biblio $biblionumber";
752             return 0;
753         }
754     }
755
756     # FIXME - this is cheating on two levels
757     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
758     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
759     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
760     #
761     # On the other hand, this better for now than what rebuild_zebra.pl used to
762     # do, which was duplicate the code for inserting the biblionumber
763     # and biblioitemnumber
764     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
765
766     return 1;
767 }
768
769 sub fix_authority_id {
770     # FIXME - as with fix_biblio_ids, the authid must be present
771     #         for Zebra's sake.  However, this really belongs
772     #         in C4::AuthoritiesMarc.
773     my ($marc, $authid) = @_;
774     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
775         $marc->delete_field($marc->field('001'));
776         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
777     }
778 }
779
780 sub fix_unimarc_100 {
781     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
782     my $marc = shift;
783
784     my $string;
785     if ( length($marc->subfield( 100, "a" )) == 36 ) {
786         $string = $marc->subfield( 100, "a" );
787         my $f100 = $marc->field(100);
788         $marc->delete_field($f100);
789     }
790     else {
791         $string = POSIX::strftime( "%Y%m%d", localtime );
792         $string =~ s/\-//g;
793         $string = sprintf( "%-*s", 35, $string );
794     }
795     substr( $string, 22, 6, "frey50" );
796     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
797         $marc->delete_field($marc->field(100));
798         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
799     }
800 }
801
802 sub do_indexing {
803     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
804
805     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
806     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
807     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
808     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
809
810     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
811     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
812     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
813
814 }
815
816 sub _flock {
817     # test if flock is present; if so, use it; if not, return true
818     # op refers to the official flock operations including LOCK_EX,
819     # LOCK_UN, etc.
820     # combining LOCK_EX with LOCK_NB returns immediately
821     my ($fh, $op)= @_;
822     if( !defined($use_flock) ) {
823         #check if flock is present; if not, you will have a fatal error
824         my $lock_acquired = eval { flock($fh, $op) };
825         # assuming that $fh and $op are fine(..), an undef $lock_acquired
826         # means no flock
827         $use_flock = defined($lock_acquired) ? 1 : 0;
828         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
829         return 1 if !$use_flock;
830         return $lock_acquired;
831     } else {
832         return 1 if !$use_flock;
833         return flock($fh, $op);
834     }
835 }
836
837 sub _create_lockfile { #returns undef on failure
838     my $dir= shift;
839     unless (-d $dir) {
840         eval { mkpath($dir, 0, oct(755)) };
841         return if $@;
842     }
843     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
844     return ( $fh, $dir.'/'.LOCK_FILENAME );
845 }
846
847 sub print_usage {
848     print <<_USAGE_;
849 $0: reindex MARC bibs and/or authorities in Zebra.
850
851 Use this batch job to reindex all biblio or authority
852 records in your Koha database.
853
854 Parameters:
855
856     -b                      index bibliographic records
857
858     -a                      index authority records
859
860     -daemon                 Run in daemon mode.  The program will loop checking
861                             for entries on the zebraqueue table, processing
862                             them incrementally if present, and then sleep
863                             for a few seconds before repeating the process
864                             Checking the zebraqueue table is done with a cheap
865                             SQL query.  This allows for near realtime update of
866                             the zebra search index with low system overhead.
867                             Use -sleep to control the checking interval.
868
869                             Daemon mode implies -z, -a, -b.  The program will
870                             refuse to start if options are present that do not
871                             make sense while running as an incremental update
872                             daemon (e.g. -r or -offset).
873
874     -sleep 10               Seconds to sleep between checks of the zebraqueue
875                             table in daemon mode.  The default is 5 seconds.
876
877     -z                      select only updated and deleted
878                             records marked in the zebraqueue
879                             table.  Cannot be used with -r
880                             or -s.
881
882     --skip-deletes          only select record updates, not record
883                             deletions, to avoid potential excessive
884                             I/O when zebraidx processes deletions.
885                             If this option is used for normal indexing,
886                             a cronjob should be set up to run
887                             rebuild_zebra.pl -z without --skip-deletes
888                             during off hours.
889                             Only effective with -z.
890
891     -r                      clear Zebra index before
892                             adding records to index. Implies -w.
893
894     -d                      Temporary directory for indexing.
895                             If not specified, one is automatically
896                             created.  The export directory
897                             is automatically deleted unless
898                             you supply the -k switch.
899
900     -k                      Do not delete export directory.
901
902     -s                      Skip export.  Used if you have
903                             already exported the records
904                             in a previous run.
905
906     -noxml                  index from ISO MARC blob
907                             instead of MARC XML.  This
908                             option is recommended only
909                             for advanced user.
910
911     -x                      export and index as xml instead of is02709 (biblios only).
912                             use this if you might have records > 99,999 chars,
913
914     -nosanitize             export biblio/authority records directly from DB marcxml
915                             field without sanitizing records. It speed up
916                             dump process but could fail if DB contains badly
917                             encoded records. Works only with -x,
918
919     -w                      skip shadow indexing for this batch
920
921     -y                      do NOT clear zebraqueue after indexing; normally,
922                             after doing batch indexing, zebraqueue should be
923                             marked done for the affected record type(s) so that
924                             a running zebraqueue_daemon doesn't try to reindex
925                             the same records - specify -y to override this.
926                             Cannot be used with -z.
927
928     -v                      increase the amount of logging.  Normally only
929                             warnings and errors from the indexing are shown.
930                             Use log level 2 (-v -v) to include all Zebra logs.
931
932     --length   1234         how many biblio you want to export
933     --offset 1243           offset you want to start to
934                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
935                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
936     --where                 let you specify a WHERE query, like itemtype='BOOK'
937                             or something like that
938
939     --run-as-root           explicitily allow script to run as 'root' user
940
941     --wait-for-lock         when not running in daemon mode, the default
942                             behavior is to abort a rebuild if the rebuild
943                             lock is busy.  This option will cause the program
944                             to wait for the lock to free and then continue
945                             processing the rebuild request,
946
947     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
948                             biblioitems is the default value.
949
950     --help or -h            show this message.
951 _USAGE_
952 }