Bug 16506: (followup) Fix wrong option switch warning message
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use XML::LibXML;
30
31 use constant LOCK_FILENAME => 'rebuild..LCK';
32
33 # script that checks zebradir structure & create directories & mandatory files if needed
34 #
35 #
36
37 $|=1; # flushes output
38 # If the cron job starts us in an unreadable dir, we will break without
39 # this.
40 chdir $ENV{HOME} if (!(-r '.'));
41 my $daemon_mode;
42 my $daemon_sleep = 5;
43 my $directory;
44 my $nosanitize;
45 my $skip_export;
46 my $keep_export;
47 my $skip_index;
48 my $reset;
49 my $biblios;
50 my $authorities;
51 my $as_usmarc;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66
67 my $verbose_logging = 0;
68 my $zebraidx_log_opt = " -v none,fatal,warn ";
69 my $result = GetOptions(
70     'daemon'        => \$daemon_mode,
71     'sleep:i'       => \$daemon_sleep,
72     'd:s'           => \$directory,
73     'r|reset'       => \$reset,
74     's'             => \$skip_export,
75     'k'             => \$keep_export,
76     'I|skip-index'  => \$skip_index,
77     'nosanitize'    => \$nosanitize,
78     'b'             => \$biblios,
79     'noxml'         => \$as_usmarc,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated·\n";
103 }
104
105 if( not defined $run_as_root and $run_user eq 'root') {
106     my $msg = "Warning: You are running this script as the user 'root'.\n";
107     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
108     $msg   .= "Please do '$0 --help' to see usage.\n";
109     die $msg;
110 }
111
112 if ( $as_usmarc and $nosanitize ) {
113     my $msg = "Cannot specify both -noxml and -nosanitize\n";
114     $msg   .= "Please do '$0 --help' to see usage.\n";
115     die $msg;
116 }
117
118 if ($process_zebraqueue and ($skip_export or $reset)) {
119     my $msg = "Cannot specify -r or -s if -z is specified\n";
120     $msg   .= "Please do '$0 --help' to see usage.\n";
121     die $msg;
122 }
123
124 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
125     my $msg = "Cannot specify both -y and -z\n";
126     $msg   .= "Please do '$0 --help' to see usage.\n";
127     die $msg;
128 }
129
130 if ($daemon_mode) {
131     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
132     if ($skip_export or $keep_export or $skip_index or
133           $where or $length or $offset) {
134         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
135         $msg   .= "Please do '$0 --help' to see usage.\n";
136         die $msg;
137     }
138     $authorities = 1;
139     $biblios = 1;
140     $process_zebraqueue = 1;
141 }
142
143 if (not $biblios and not $authorities) {
144     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
145     $msg   .= "Please do '$0 --help' to see usage.\n";
146     die $msg;
147 }
148
149 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
150 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
151     die "Cannot specify -t|--table with value '$table'. Only "
152       . ( join ', ', @tables_allowed_for_select )
153       . " are allowed.";
154 }
155
156
157 #  -v is for verbose, which seems backwards here because of how logging is set
158 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
159 if ($verbose_logging >= 2) {
160     $zebraidx_log_opt = '-v none,fatal,warn,all';
161 }
162
163 my $use_tempdir = 0;
164 unless ($directory) {
165     $use_tempdir = 1;
166     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
167 }
168
169
170 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
171 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
172
173 my $kohadir = C4::Context->config('intranetdir');
174 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
175 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
176
177 my $dbh = C4::Context->dbh;
178 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
179 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
180
181 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
182 <collection xmlns="http://www.loc.gov/MARC21/slim">
183 };
184
185 my $marcxml_close = q{
186 </collection>
187 };
188
189 # Protect again simultaneous update of the zebra index by using a lock file.
190 # Create our own lock directory if its missing.  This shouild be created
191 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
192 # does not exist and cannot be created, we fall back on /tmp - which will
193 # always work.
194
195 my ($lockfile, $LockFH);
196 foreach (
197     C4::Context->config("zebra_lockdir"),
198     '/var/lock/zebra_' . C4::Context->config('database'),
199     '/tmp/zebra_' . C4::Context->config('database')
200 ) {
201     #we try three possibilities (we really want to lock :)
202     next if !$_;
203     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
204     last if defined $LockFH;
205 }
206 if( !defined $LockFH ) {
207     print "WARNING: Could not create lock file $lockfile: $!\n";
208     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
209     print "Verify file permissions for it too.\n";
210     $use_flock = 0; # we disable file locking now and will continue
211                     # without it
212                     # note that this mimics old behavior (before we used
213                     # the lockfile)
214 };
215
216 if ( $verbose_logging ) {
217     print "Zebra configuration information\n";
218     print "================================\n";
219     print "Zebra biblio directory      = $biblioserverdir\n";
220     print "Zebra authorities directory = $authorityserverdir\n";
221     print "Koha directory              = $kohadir\n";
222     print "Lockfile                    = $lockfile\n" if $lockfile;
223     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
224     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
225     print "================================\n";
226 }
227
228 my $tester = XML::LibXML->new();
229
230 # The main work is done here by calling do_one_pass().  We have added locking
231 # avoid race conditions between full rebuilds and incremental updates either from
232 # daemon mode or periodic invocation from cron.  The race can lead to an updated
233 # record being overwritten by a rebuild if the update is applied after the export
234 # by the rebuild and before the rebuild finishes (more likely to affect large
235 # catalogs).
236 #
237 # We have chosen to exit immediately by default if we cannot obtain the lock
238 # to prevent the potential for a infinite backlog from cron invocations, but an
239 # option (wait-for-lock) is provided to let the program wait for the lock.
240 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
241 if ($daemon_mode) {
242     while (1) {
243         # For incremental updates, skip the update if the updates are locked
244         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
245             do_one_pass() if ( zebraqueue_not_empty() );
246             _flock($LockFH, LOCK_UN);
247         }
248         sleep $daemon_sleep;
249     }
250 } else {
251     # all one-off invocations
252     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
253     if (_flock($LockFH, $lock_mode)) {
254         do_one_pass();
255         _flock($LockFH, LOCK_UN);
256     } else {
257         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
258     }
259 }
260
261
262 if ( $verbose_logging ) {
263     print "====================\n";
264     print "CLEANING\n";
265     print "====================\n";
266 }
267 if ($keep_export) {
268     print "NOTHING cleaned : the export $directory has been kept.\n";
269     print "You can re-run this script with the -s ";
270     if ($use_tempdir) {
271         print " and -d $directory parameters";
272     } else {
273         print "parameter";
274     }
275     print "\n";
276     print "if you just want to rebuild zebra after changing the record.abs\n";
277     print "or another zebra config file\n";
278 } else {
279     unless ($use_tempdir) {
280         # if we're using a temporary directory
281         # created by File::Temp, it will be removed
282         # automatically.
283         rmtree($directory, 0, 1);
284         print "directory $directory deleted\n";
285     }
286 }
287
288 sub do_one_pass {
289     if ($authorities) {
290         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
291     } else {
292         print "skipping authorities\n" if ( $verbose_logging );
293     }
294
295     if ($biblios) {
296         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
297     } else {
298         print "skipping biblios\n" if ( $verbose_logging );
299     }
300 }
301
302 # Check the zebra update queue and return true if there are records to process
303 # This routine will handle each of -ab, -a, or -b, but in practice we force
304 # -ab when in daemon mode.
305 sub zebraqueue_not_empty {
306     my $where_str;
307
308     if ($authorities && $biblios) {
309         $where_str = 'done = 0;';
310     } elsif ($biblios) {
311         $where_str = 'server = "biblioserver" AND done = 0;';
312     } else {
313         $where_str = 'server = "authorityserver" AND done = 0;';
314     }
315     my $query =
316         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
317
318     $query->execute;
319     my $count = $query->fetchrow_arrayref->[0];
320     print "queued records: $count\n" if $verbose_logging > 0;
321     return $count > 0;
322 }
323
324 # This checks to see if the zebra directories exist under the provided path.
325 # If they don't, then zebra is likely to spit the dummy. This returns true
326 # if the directories had to be created, false otherwise.
327 sub check_zebra_dirs {
328     my ($base) = shift() . '/';
329     my $needed_repairing = 0;
330     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
331     foreach my $dir (@dirs) {
332         my $bdir = $base . $dir;
333         if (! -d $bdir) {
334             $needed_repairing = 1;
335             mkdir $bdir || die "Unable to create '$bdir': $!\n";
336             print "$0: needed to create '$bdir'\n";
337         }
338     }
339     return $needed_repairing;
340 }   # ----------  end of subroutine check_zebra_dirs  ----------
341
342 sub index_records {
343     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
344
345     my $num_records_exported = 0;
346     my $records_deleted = {};
347     my $need_reset = check_zebra_dirs($server_dir);
348     if ($need_reset) {
349         print "$0: found broken zebra server directories: forcing a rebuild\n";
350         $reset = 1;
351     }
352     if ($skip_export && $verbose_logging) {
353         print "====================\n";
354         print "SKIPPING $record_type export\n";
355         print "====================\n";
356     } else {
357         if ( $verbose_logging ) {
358             print "====================\n";
359             print "exporting $record_type\n";
360             print "====================\n";
361         }
362         mkdir "$directory" unless (-d $directory);
363         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
364         if ($process_zebraqueue) {
365             my $entries;
366
367             unless ( $process_zebraqueue_skip_deletes ) {
368                 $entries = select_zebraqueue_records($record_type, 'deleted');
369                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
370                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_usmarc);
371                 mark_zebraqueue_batch_done($entries);
372             }
373
374             $entries = select_zebraqueue_records($record_type, 'updated');
375             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
376             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_usmarc, $records_deleted);
377             mark_zebraqueue_batch_done($entries);
378
379         } else {
380             my $sth = select_all_records($record_type);
381             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_usmarc, $nosanitize);
382             unless ($do_not_clear_zebraqueue) {
383                 mark_all_zebraqueue_done($record_type);
384             }
385         }
386     }
387
388     #
389     # and reindexing everything
390     #
391     if ($skip_index) {
392         if ($verbose_logging) {
393             print "====================\n";
394             print "SKIPPING $record_type indexing\n";
395             print "====================\n";
396         }
397     } else {
398         if ( $verbose_logging ) {
399             print "====================\n";
400             print "REINDEXING zebra\n";
401             print "====================\n";
402         }
403         my $record_fmt = ($as_usmarc) ? 'iso2709' : 'marcxml' ;
404         if ($process_zebraqueue) {
405             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
406                 if %$records_deleted;
407             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
408                 if $num_records_exported;
409         } else {
410             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
411                 if ($num_records_exported or $skip_export);
412         }
413     }
414 }
415
416
417 sub select_zebraqueue_records {
418     my ($record_type, $update_type) = @_;
419
420     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
421     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
422
423     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
424                              FROM zebraqueue
425                              WHERE server = ?
426                              AND   operation = ?
427                              AND   done = 0
428                              ORDER BY id DESC");
429     $sth->execute($server, $op);
430     my $entries = $sth->fetchall_arrayref({});
431 }
432
433 sub mark_all_zebraqueue_done {
434     my ($record_type) = @_;
435
436     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
437
438     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
439                              WHERE server = ?
440                              AND done = 0");
441     $sth->execute($server);
442 }
443
444 sub mark_zebraqueue_batch_done {
445     my ($entries) = @_;
446
447     $dbh->{AutoCommit} = 0;
448     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
449     $dbh->commit();
450     foreach my $id (map { $_->{id} } @$entries) {
451         $sth->execute($id);
452     }
453     $dbh->{AutoCommit} = 1;
454 }
455
456 sub select_all_records {
457     my $record_type = shift;
458     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
459 }
460
461 sub select_all_authorities {
462     my $strsth=qq{SELECT authid FROM auth_header};
463     $strsth.=qq{ WHERE $where } if ($where);
464     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
465     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
466     my $sth = $dbh->prepare($strsth);
467     $sth->execute();
468     return $sth;
469 }
470
471 sub select_all_biblios {
472     $table = 'biblioitems'
473       unless grep { /^$table$/ } @tables_allowed_for_select;
474     my $strsth = qq{ SELECT biblionumber FROM $table };
475     $strsth.=qq{ WHERE $where } if ($where);
476     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
477     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
478     my $sth = $dbh->prepare($strsth);
479     $sth->execute();
480     return $sth;
481 }
482
483 sub export_marc_records_from_sth {
484     my ($record_type, $sth, $directory, $as_usmarc, $nosanitize) = @_;
485
486     my $num_exported = 0;
487     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
488
489     print {$fh} $marcxml_open
490         unless $as_usmarc;
491
492     my $i = 0;
493     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
494     while (my ($record_number) = $sth->fetchrow_array) {
495         print "." if ( $verbose_logging );
496         print "\r$i" unless ($i++ %100 or !$verbose_logging);
497         if ( $nosanitize ) {
498             my $marcxml = $record_type eq 'biblio'
499                           ? GetXmlBiblio( $record_number )
500                           : GetAuthorityXML( $record_number );
501             if ($record_type eq 'biblio'){
502                 my @items = GetItemsInfo($record_number);
503                 if (@items){
504                     my $record = MARC::Record->new;
505                     $record->encoding('UTF-8');
506                     my @itemsrecord;
507                     foreach my $item (@items){
508                         my $record = Item2Marc($item, $record_number);
509                         push @itemsrecord, $record->field($itemtag);
510                     }
511                     $record->insert_fields_ordered(@itemsrecord);
512                     my $itemsxml = $record->as_xml_record();
513                     $marcxml =
514                         substr($marcxml, 0, length($marcxml)-10) .
515                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
516                 }
517             }
518             # extra test to ensure that result is valid XML; otherwise
519             # Zebra won't parse it in DOM mode
520             eval {
521                 my $doc = $tester->parse_string($marcxml);
522             };
523             if ($@) {
524                 warn "Error exporting record $record_number ($record_type): $@\n";
525                 next;
526             }
527             if ( $marcxml ) {
528                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
529                 print {$fh} $marcxml;
530                 $num_exported++;
531             }
532             next;
533         }
534         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
535         if (defined $marc) {
536             eval {
537                 my $rec;
538                 if ($as_usmarc) {
539                     $rec = $marc->as_usmarc();
540                 } else {
541                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
542                     eval {
543                         my $doc = $tester->parse_string($rec);
544                     };
545                     if ($@) {
546                         die "invalid XML: $@";
547                     }
548                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
549                 }
550                 print {$fh} $rec;
551                 $num_exported++;
552             };
553             if ($@) {
554                 warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
555                 warn "... specific error is $@" if $verbose_logging;
556             }
557         }
558     }
559     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
560     print {$fh} $marcxml_close
561         unless $as_usmarc;
562
563     close $fh;
564     return $num_exported;
565 }
566
567 sub export_marc_records_from_list {
568     my ($record_type, $entries, $directory, $as_usmarc, $records_deleted) = @_;
569
570     my $num_exported = 0;
571     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
572
573     print {$fh} $marcxml_open
574         unless $as_usmarc;
575
576     my $i = 0;
577
578     # Skip any deleted records. We check for this anyway, but this reduces error spam
579     my %found = %$records_deleted;
580     foreach my $record_number ( map { $_->{biblio_auth_number} }
581                                 grep { !$found{ $_->{biblio_auth_number} }++ }
582                                 @$entries ) {
583         print "." if ( $verbose_logging );
584         print "\r$i" unless ($i++ %100 or !$verbose_logging);
585         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
586         if (defined $marc) {
587             eval {
588                 my $rec;
589                 if ( $as_usmarc ) {
590                     $rec = $marc->as_usmarc();
591                 } else {
592                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
593                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
594                 }
595                 print {$fh} $rec;
596                 $num_exported++;
597             };
598             if ($@) {
599               warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
600             }
601         }
602     }
603     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
604
605     print {$fh} $marcxml_close
606         unless $as_usmarc;
607
608     close $fh;
609     return $num_exported;
610 }
611
612 sub generate_deleted_marc_records {
613
614     my ($record_type, $entries, $directory, $as_usmarc) = @_;
615
616     my $records_deleted = {};
617     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
618
619     print {$fh} $marcxml_open
620         unless $as_usmarc;
621
622     my $i = 0;
623     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
624         print "\r$i" unless ($i++ %100 or !$verbose_logging);
625         print "." if ( $verbose_logging );
626
627         my $marc = MARC::Record->new();
628         if ($record_type eq 'biblio') {
629             fix_biblio_ids($marc, $record_number, $record_number);
630         } else {
631             fix_authority_id($marc, $record_number);
632         }
633         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
634             fix_unimarc_100($marc);
635         }
636
637         my $rec;
638         if ( $as_usmarc ) {
639             $rec = $marc->as_usmarc();
640         } else {
641             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
642             # Remove the record's XML header
643             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
644         }
645         print {$fh} $rec;
646
647         $records_deleted->{$record_number} = 1;
648     }
649     print "\nRecords exported: $i\n" if ( $verbose_logging );
650
651     print {$fh} $marcxml_close
652         unless $as_usmarc;
653
654     close $fh;
655     return $records_deleted;
656 }
657
658 sub get_corrected_marc_record {
659     my ($record_type, $record_number, $as_usmarc) = @_;
660
661     my $marc = get_raw_marc_record($record_type, $record_number, $as_usmarc);
662
663     if (defined $marc) {
664         fix_leader($marc);
665         if ($record_type eq 'authority') {
666             fix_authority_id($marc, $record_number);
667         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
668             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
669             $marc = $normalizer->process($marc);
670         }
671         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
672             fix_unimarc_100($marc);
673         }
674     }
675
676     return $marc;
677 }
678
679 sub get_raw_marc_record {
680     my ($record_type, $record_number, $as_usmarc) = @_;
681
682     my $marc;
683     if ($record_type eq 'biblio') {
684         if ($as_usmarc) {
685             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
686             $fetch_sth->execute($record_number);
687             if (my ($blob) = $fetch_sth->fetchrow_array) {
688                 $marc = MARC::Record->new_from_usmarc($blob);
689                 unless ($marc) {
690                     warn "error creating MARC::Record from $blob";
691                 }
692             }
693             # failure to find a bib is not a problem -
694             # a delete could have been done before
695             # trying to process a record update
696
697             $fetch_sth->finish();
698             return unless $marc;
699         } else {
700             eval { $marc = GetMarcBiblio($record_number, 1); };
701             if ($@ || !$marc) {
702                 # here we do warn since catching an exception
703                 # means that the bib was found but failed
704                 # to be parsed
705                 warn "error retrieving biblio $record_number";
706                 return;
707             }
708         }
709     } else {
710         eval { $marc = GetAuthority($record_number); };
711         if ($@) {
712             warn "error retrieving authority $record_number";
713             return;
714         }
715     }
716     return $marc;
717 }
718
719 sub fix_leader {
720     # FIXME - this routine is suspect
721     # It blanks the Leader/00-05 and Leader/12-16 to
722     # force them to be recalculated correct when
723     # the $marc->as_usmarc() or $marc->as_xml() is called.
724     # But why is this necessary?  It would be a serious bug
725     # in MARC::Record (definitely) and MARC::File::XML (arguably)
726     # if they are emitting incorrect leader values.
727     my $marc = shift;
728
729     my $leader = $marc->leader;
730     substr($leader,  0, 5) = '     ';
731     substr($leader, 10, 7) = '22     ';
732     $marc->leader(substr($leader, 0, 24));
733 }
734
735 sub fix_biblio_ids {
736     # FIXME - it is essential to ensure that the biblionumber is present,
737     #         otherwise, Zebra will choke on the record.  However, this
738     #         logic belongs in the relevant C4::Biblio APIs.
739     my $marc = shift;
740     my $biblionumber = shift;
741     my $biblioitemnumber;
742     if (@_) {
743         $biblioitemnumber = shift;
744     } else {
745         my $sth = $dbh->prepare(
746             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
747         $sth->execute($biblionumber);
748         ($biblioitemnumber) = $sth->fetchrow_array;
749         $sth->finish;
750         unless ($biblioitemnumber) {
751             warn "failed to get biblioitemnumber for biblio $biblionumber";
752             return 0;
753         }
754     }
755
756     # FIXME - this is cheating on two levels
757     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
758     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
759     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
760     #
761     # On the other hand, this better for now than what rebuild_zebra.pl used to
762     # do, which was duplicate the code for inserting the biblionumber
763     # and biblioitemnumber
764     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
765
766     return 1;
767 }
768
769 sub fix_authority_id {
770     # FIXME - as with fix_biblio_ids, the authid must be present
771     #         for Zebra's sake.  However, this really belongs
772     #         in C4::AuthoritiesMarc.
773     my ($marc, $authid) = @_;
774     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
775         $marc->delete_field($marc->field('001'));
776         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
777     }
778 }
779
780 sub fix_unimarc_100 {
781     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
782     my $marc = shift;
783
784     my $string;
785     if ( length($marc->subfield( 100, "a" )) == 36 ) {
786         $string = $marc->subfield( 100, "a" );
787         my $f100 = $marc->field(100);
788         $marc->delete_field($f100);
789     }
790     else {
791         $string = POSIX::strftime( "%Y%m%d", localtime );
792         $string =~ s/\-//g;
793         $string = sprintf( "%-*s", 35, $string );
794     }
795     substr( $string, 22, 6, "frey50" );
796     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
797         $marc->delete_field($marc->field(100));
798         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
799     }
800 }
801
802 sub do_indexing {
803     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
804
805     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
806     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
807     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
808     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
809
810     $noshadow //= '';
811
812     if ($noshadow or $reset_index) {
813         $noshadow = '-n';
814     }
815
816     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
817     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
818     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
819 }
820
821 sub _flock {
822     # test if flock is present; if so, use it; if not, return true
823     # op refers to the official flock operations including LOCK_EX,
824     # LOCK_UN, etc.
825     # combining LOCK_EX with LOCK_NB returns immediately
826     my ($fh, $op)= @_;
827     if( !defined($use_flock) ) {
828         #check if flock is present; if not, you will have a fatal error
829         my $lock_acquired = eval { flock($fh, $op) };
830         # assuming that $fh and $op are fine(..), an undef $lock_acquired
831         # means no flock
832         $use_flock = defined($lock_acquired) ? 1 : 0;
833         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
834         return 1 if !$use_flock;
835         return $lock_acquired;
836     } else {
837         return 1 if !$use_flock;
838         return flock($fh, $op);
839     }
840 }
841
842 sub _create_lockfile { #returns undef on failure
843     my $dir= shift;
844     unless (-d $dir) {
845         eval { mkpath($dir, 0, oct(755)) };
846         return if $@;
847     }
848     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
849     return ( $fh, $dir.'/'.LOCK_FILENAME );
850 }
851
852 sub print_usage {
853     print <<_USAGE_;
854 $0: reindex MARC bibs and/or authorities in Zebra.
855
856 Use this batch job to reindex all biblio or authority
857 records in your Koha database.
858
859 Parameters:
860
861     -b                      index bibliographic records
862
863     -a                      index authority records
864
865     -daemon                 Run in daemon mode.  The program will loop checking
866                             for entries on the zebraqueue table, processing
867                             them incrementally if present, and then sleep
868                             for a few seconds before repeating the process
869                             Checking the zebraqueue table is done with a cheap
870                             SQL query.  This allows for near realtime update of
871                             the zebra search index with low system overhead.
872                             Use -sleep to control the checking interval.
873
874                             Daemon mode implies -z, -a, -b.  The program will
875                             refuse to start if options are present that do not
876                             make sense while running as an incremental update
877                             daemon (e.g. -r or -offset).
878
879     -sleep 10               Seconds to sleep between checks of the zebraqueue
880                             table in daemon mode.  The default is 5 seconds.
881
882     -z                      select only updated and deleted
883                             records marked in the zebraqueue
884                             table.  Cannot be used with -r
885                             or -s.
886
887     --skip-deletes          only select record updates, not record
888                             deletions, to avoid potential excessive
889                             I/O when zebraidx processes deletions.
890                             If this option is used for normal indexing,
891                             a cronjob should be set up to run
892                             rebuild_zebra.pl -z without --skip-deletes
893                             during off hours.
894                             Only effective with -z.
895
896     -r                      clear Zebra index before
897                             adding records to index. Implies -w.
898
899     -d                      Temporary directory for indexing.
900                             If not specified, one is automatically
901                             created.  The export directory
902                             is automatically deleted unless
903                             you supply the -k switch.
904
905     -k                      Do not delete export directory.
906
907     -s                      Skip export.  Used if you have
908                             already exported the records
909                             in a previous run.
910
911     -noxml                  index from ISO MARC blob
912                             instead of MARC XML.  This
913                             option is recommended only
914                             for advanced user.
915
916     -nosanitize             export biblio/authority records directly from DB marcxml
917                             field without sanitizing records. It speed up
918                             dump process but could fail if DB contains badly
919                             encoded records. Works only with -x,
920
921     -w                      skip shadow indexing for this batch
922
923     -y                      do NOT clear zebraqueue after indexing; normally,
924                             after doing batch indexing, zebraqueue should be
925                             marked done for the affected record type(s) so that
926                             a running zebraqueue_daemon doesn't try to reindex
927                             the same records - specify -y to override this.
928                             Cannot be used with -z.
929
930     -v                      increase the amount of logging.  Normally only
931                             warnings and errors from the indexing are shown.
932                             Use log level 2 (-v -v) to include all Zebra logs.
933
934     --length   1234         how many biblio you want to export
935     --offset 1243           offset you want to start to
936                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
937                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
938     --where                 let you specify a WHERE query, like itemtype='BOOK'
939                             or something like that
940
941     --run-as-root           explicitily allow script to run as 'root' user
942
943     --wait-for-lock         when not running in daemon mode, the default
944                             behavior is to abort a rebuild if the rebuild
945                             lock is busy.  This option will cause the program
946                             to wait for the lock to free and then continue
947                             processing the rebuild request,
948
949     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
950                             biblioitems is the default value.
951
952     --help or -h            show this message.
953 _USAGE_
954 }