Bug 12368: Die if the --table value is not allowed.
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $process_zebraqueue_skip_deletes;
43 my $do_not_clear_zebraqueue;
44 my $length;
45 my $where;
46 my $offset;
47 my $run_as_root;
48 my $run_user = (getpwuid($<))[0];
49 my $wait_for_lock = 0;
50 my $use_flock;
51 my $table = 'biblioitems';
52
53 my $verbose_logging = 0;
54 my $zebraidx_log_opt = " -v none,fatal,warn ";
55 my $result = GetOptions(
56     'daemon'        => \$daemon_mode,
57     'sleep:i'       => \$daemon_sleep,
58     'd:s'           => \$directory,
59     'r|reset'       => \$reset,
60     's'             => \$skip_export,
61     'k'             => \$keep_export,
62     'I|skip-index'  => \$skip_index,
63     'nosanitize'    => \$nosanitize,
64     'b'             => \$biblios,
65     'noxml'         => \$noxml,
66     'w'             => \$noshadow,
67     'a'             => \$authorities,
68     'h|help'        => \$want_help,
69     'x'             => \$as_xml,
70     'y'             => \$do_not_clear_zebraqueue,
71     'z'             => \$process_zebraqueue,
72     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
73     'where:s'       => \$where,
74     'length:i'      => \$length,
75     'offset:i'      => \$offset,
76     'v+'            => \$verbose_logging,
77     'run-as-root'   => \$run_as_root,
78     'wait-for-lock' => \$wait_for_lock,
79     't|table:s'     => \$table,
80 );
81
82 if (not $result or $want_help) {
83     print_usage();
84     exit 0;
85 }
86
87 if( not defined $run_as_root and $run_user eq 'root') {
88     my $msg = "Warning: You are running this script as the user 'root'.\n";
89     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
90     $msg   .= "Please do '$0 --help' to see usage.\n";
91     die $msg;
92 }
93
94 if ( !$as_xml and $nosanitize ) {
95     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
96     $msg   .= "Please do '$0 --help' to see usage.\n";
97     die $msg;
98 }
99
100 if ($process_zebraqueue and ($skip_export or $reset)) {
101     my $msg = "Cannot specify -r or -s if -z is specified\n";
102     $msg   .= "Please do '$0 --help' to see usage.\n";
103     die $msg;
104 }
105
106 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
107     my $msg = "Cannot specify both -y and -z\n";
108     $msg   .= "Please do '$0 --help' to see usage.\n";
109     die $msg;
110 }
111
112 if ($reset) {
113     $noshadow = 1;
114 }
115
116 if ($noshadow) {
117     $noshadow = ' -n ';
118 }
119
120 if ($daemon_mode) {
121     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
122     if ($skip_export or $keep_export or $skip_index or
123           $where or $length or $offset) {
124         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
125         $msg   .= "Please do '$0 --help' to see usage.\n";
126         die $msg;
127     }
128     $authorities = 1;
129     $biblios = 1;
130     $process_zebraqueue = 1;
131 }
132
133 if (not $biblios and not $authorities) {
134     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
135     $msg   .= "Please do '$0 --help' to see usage.\n";
136     die $msg;
137 }
138
139 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
140 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
141     die "Cannot specify -t|--table with value '$table'. Only "
142       . ( join ', ', @tables_allowed_for_select )
143       . " are allowed.";
144 }
145
146
147 #  -v is for verbose, which seems backwards here because of how logging is set
148 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
149 if ($verbose_logging >= 2) {
150     $zebraidx_log_opt = '-v none,fatal,warn,all';
151 }
152
153 my $use_tempdir = 0;
154 unless ($directory) {
155     $use_tempdir = 1;
156     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
157 }
158
159
160 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
161 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
162
163 my $kohadir = C4::Context->config('intranetdir');
164 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
165 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
166
167 my $dbh = C4::Context->dbh;
168 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
169 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
170
171 # Protect again simultaneous update of the zebra index by using a lock file.
172 # Create our own lock directory if its missing.  This shouild be created
173 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
174 # does not exist and cannot be created, we fall back on /tmp - which will
175 # always work.
176
177 my ($lockfile, $LockFH);
178 foreach (
179     C4::Context->config("zebra_lockdir"),
180     '/var/lock/zebra_' . C4::Context->config('database'),
181     '/tmp/zebra_' . C4::Context->config('database')
182 ) {
183     #we try three possibilities (we really want to lock :)
184     next if !$_;
185     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
186     last if defined $LockFH;
187 }
188 if( !defined $LockFH ) {
189     print "WARNING: Could not create lock file $lockfile: $!\n";
190     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
191     print "Verify file permissions for it too.\n";
192     $use_flock = 0; # we disable file locking now and will continue
193                     # without it
194                     # note that this mimics old behavior (before we used
195                     # the lockfile)
196 };
197
198 if ( $verbose_logging ) {
199     print "Zebra configuration information\n";
200     print "================================\n";
201     print "Zebra biblio directory      = $biblioserverdir\n";
202     print "Zebra authorities directory = $authorityserverdir\n";
203     print "Koha directory              = $kohadir\n";
204     print "Lockfile                    = $lockfile\n" if $lockfile;
205     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
206     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
207     print "================================\n";
208 }
209
210 my $tester = XML::LibXML->new();
211
212 # The main work is done here by calling do_one_pass().  We have added locking
213 # avoid race conditions between full rebuilds and incremental updates either from
214 # daemon mode or periodic invocation from cron.  The race can lead to an updated
215 # record being overwritten by a rebuild if the update is applied after the export
216 # by the rebuild and before the rebuild finishes (more likely to affect large
217 # catalogs).
218 #
219 # We have chosen to exit immediately by default if we cannot obtain the lock
220 # to prevent the potential for a infinite backlog from cron invocations, but an
221 # option (wait-for-lock) is provided to let the program wait for the lock.
222 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
223 if ($daemon_mode) {
224     while (1) {
225         # For incremental updates, skip the update if the updates are locked
226         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
227             do_one_pass() if ( zebraqueue_not_empty() );
228             _flock($LockFH, LOCK_UN);
229         }
230         sleep $daemon_sleep;
231     }
232 } else {
233     # all one-off invocations
234     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
235     if (_flock($LockFH, $lock_mode)) {
236         do_one_pass();
237         _flock($LockFH, LOCK_UN);
238     } else {
239         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
240     }
241 }
242
243
244 if ( $verbose_logging ) {
245     print "====================\n";
246     print "CLEANING\n";
247     print "====================\n";
248 }
249 if ($keep_export) {
250     print "NOTHING cleaned : the export $directory has been kept.\n";
251     print "You can re-run this script with the -s ";
252     if ($use_tempdir) {
253         print " and -d $directory parameters";
254     } else {
255         print "parameter";
256     }
257     print "\n";
258     print "if you just want to rebuild zebra after changing the record.abs\n";
259     print "or another zebra config file\n";
260 } else {
261     unless ($use_tempdir) {
262         # if we're using a temporary directory
263         # created by File::Temp, it will be removed
264         # automatically.
265         rmtree($directory, 0, 1);
266         print "directory $directory deleted\n";
267     }
268 }
269
270 sub do_one_pass {
271     if ($authorities) {
272         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
273     } else {
274         print "skipping authorities\n" if ( $verbose_logging );
275     }
276
277     if ($biblios) {
278         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
279     } else {
280         print "skipping biblios\n" if ( $verbose_logging );
281     }
282 }
283
284 # Check the zebra update queue and return true if there are records to process
285 # This routine will handle each of -ab, -a, or -b, but in practice we force
286 # -ab when in daemon mode.
287 sub zebraqueue_not_empty {
288     my $where_str;
289
290     if ($authorities && $biblios) {
291         $where_str = 'done = 0;';
292     } elsif ($biblios) {
293         $where_str = 'server = "biblioserver" AND done = 0;';
294     } else {
295         $where_str = 'server = "authorityserver" AND done = 0;';
296     }
297     my $query =
298         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
299
300     $query->execute;
301     my $count = $query->fetchrow_arrayref->[0];
302     print "queued records: $count\n" if $verbose_logging > 0;
303     return $count > 0;
304 }
305
306 # This checks to see if the zebra directories exist under the provided path.
307 # If they don't, then zebra is likely to spit the dummy. This returns true
308 # if the directories had to be created, false otherwise.
309 sub check_zebra_dirs {
310     my ($base) = shift() . '/';
311     my $needed_repairing = 0;
312     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
313     foreach my $dir (@dirs) {
314         my $bdir = $base . $dir;
315         if (! -d $bdir) {
316             $needed_repairing = 1;
317             mkdir $bdir || die "Unable to create '$bdir': $!\n";
318             print "$0: needed to create '$bdir'\n";
319         }
320     }
321     return $needed_repairing;
322 }   # ----------  end of subroutine check_zebra_dirs  ----------
323
324 sub index_records {
325     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
326
327     my $num_records_exported = 0;
328     my $records_deleted = {};
329     my $need_reset = check_zebra_dirs($server_dir);
330     if ($need_reset) {
331         print "$0: found broken zebra server directories: forcing a rebuild\n";
332         $reset = 1;
333     }
334     if ($skip_export && $verbose_logging) {
335         print "====================\n";
336         print "SKIPPING $record_type export\n";
337         print "====================\n";
338     } else {
339         if ( $verbose_logging ) {
340             print "====================\n";
341             print "exporting $record_type\n";
342             print "====================\n";
343         }
344         mkdir "$directory" unless (-d $directory);
345         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
346         if ($process_zebraqueue) {
347             my $entries;
348
349             unless ( $process_zebraqueue_skip_deletes ) {
350                 $entries = select_zebraqueue_records($record_type, 'deleted');
351                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
352                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
353                 mark_zebraqueue_batch_done($entries);
354             }
355
356             $entries = select_zebraqueue_records($record_type, 'updated');
357             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
358             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
359             mark_zebraqueue_batch_done($entries);
360
361         } else {
362             my $sth = select_all_records($record_type);
363             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
364             unless ($do_not_clear_zebraqueue) {
365                 mark_all_zebraqueue_done($record_type);
366             }
367         }
368     }
369
370     #
371     # and reindexing everything
372     #
373     if ($skip_index) {
374         if ($verbose_logging) {
375             print "====================\n";
376             print "SKIPPING $record_type indexing\n";
377             print "====================\n";
378         }
379     } else {
380         if ( $verbose_logging ) {
381             print "====================\n";
382             print "REINDEXING zebra\n";
383             print "====================\n";
384         }
385         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
386         if ($process_zebraqueue) {
387             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
388                 if %$records_deleted;
389             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
390                 if $num_records_exported;
391         } else {
392             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
393                 if ($num_records_exported or $skip_export);
394         }
395     }
396 }
397
398
399 sub select_zebraqueue_records {
400     my ($record_type, $update_type) = @_;
401
402     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
403     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
404
405     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
406                              FROM zebraqueue
407                              WHERE server = ?
408                              AND   operation = ?
409                              AND   done = 0
410                              ORDER BY id DESC");
411     $sth->execute($server, $op);
412     my $entries = $sth->fetchall_arrayref({});
413 }
414
415 sub mark_all_zebraqueue_done {
416     my ($record_type) = @_;
417
418     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
419
420     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
421                              WHERE server = ?
422                              AND done = 0");
423     $sth->execute($server);
424 }
425
426 sub mark_zebraqueue_batch_done {
427     my ($entries) = @_;
428
429     $dbh->{AutoCommit} = 0;
430     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
431     $dbh->commit();
432     foreach my $id (map { $_->{id} } @$entries) {
433         $sth->execute($id);
434     }
435     $dbh->{AutoCommit} = 1;
436 }
437
438 sub select_all_records {
439     my $record_type = shift;
440     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
441 }
442
443 sub select_all_authorities {
444     my $strsth=qq{SELECT authid FROM auth_header};
445     $strsth.=qq{ WHERE $where } if ($where);
446     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
447     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
448     my $sth = $dbh->prepare($strsth);
449     $sth->execute();
450     return $sth;
451 }
452
453 sub select_all_biblios {
454     $table = 'biblioitems'
455       if grep { /^$table$/ } @tables_allowed_for_select;
456     my $strsth = qq{ SELECT biblionumber FROM $table };
457     $strsth.=qq{ WHERE $where } if ($where);
458     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
459     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
460     my $sth = $dbh->prepare($strsth);
461     $sth->execute();
462     return $sth;
463 }
464
465 sub include_xml_wrapper {
466     my $as_xml = shift;
467     my $record_type = shift;
468
469     return 0 unless $as_xml;
470     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
471     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
472     return 0;
473
474 }
475
476 sub export_marc_records_from_sth {
477     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
478
479     my $num_exported = 0;
480     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
481     if (include_xml_wrapper($as_xml, $record_type)) {
482         # include XML declaration and root element
483         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
484     }
485     my $i = 0;
486     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
487     while (my ($record_number) = $sth->fetchrow_array) {
488         print "." if ( $verbose_logging );
489         print "\r$i" unless ($i++ %100 or !$verbose_logging);
490         if ( $nosanitize ) {
491             my $marcxml = $record_type eq 'biblio'
492                           ? GetXmlBiblio( $record_number )
493                           : GetAuthorityXML( $record_number );
494             if ($record_type eq 'biblio'){
495                 my @items = GetItemsInfo($record_number);
496                 if (@items){
497                     my $record = MARC::Record->new;
498                     $record->encoding('UTF-8');
499                     my @itemsrecord;
500                     foreach my $item (@items){
501                         my $record = Item2Marc($item, $record_number);
502                         push @itemsrecord, $record->field($itemtag);
503                     }
504                     $record->insert_fields_ordered(@itemsrecord);
505                     my $itemsxml = $record->as_xml_record();
506                     $marcxml =
507                         substr($marcxml, 0, length($marcxml)-10) .
508                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
509                 }
510             }
511             # extra test to ensure that result is valid XML; otherwise
512             # Zebra won't parse it in DOM mode
513             eval {
514                 my $doc = $tester->parse_string($marcxml);
515             };
516             if ($@) {
517                 warn "Error exporting record $record_number ($record_type): $@\n";
518                 next;
519             }
520             if ( $marcxml ) {
521                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
522                 print {$fh} $marcxml;
523                 $num_exported++;
524             }
525             next;
526         }
527         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
528         if (defined $marc) {
529             eval {
530                 my $rec;
531                 if ($as_xml) {
532                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
533                     eval {
534                         my $doc = $tester->parse_string($rec);
535                     };
536                     if ($@) {
537                         die "invalid XML: $@";
538                     }
539                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
540                 } else {
541                     $rec = $marc->as_usmarc();
542                 }
543                 print {$fh} $rec;
544                 $num_exported++;
545             };
546             if ($@) {
547                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
548                 warn "... specific error is $@" if $verbose_logging;
549             }
550         }
551     }
552     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
553     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
554     close $fh;
555     return $num_exported;
556 }
557
558 sub export_marc_records_from_list {
559     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
560
561     my $num_exported = 0;
562     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
563     if (include_xml_wrapper($as_xml, $record_type)) {
564         # include XML declaration and root element
565         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
566     }
567     my $i = 0;
568
569     # Skip any deleted records. We check for this anyway, but this reduces error spam
570     my %found = %$records_deleted;
571     foreach my $record_number ( map { $_->{biblio_auth_number} }
572                                 grep { !$found{ $_->{biblio_auth_number} }++ }
573                                 @$entries ) {
574         print "." if ( $verbose_logging );
575         print "\r$i" unless ($i++ %100 or !$verbose_logging);
576         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
577         if (defined $marc) {
578             eval {
579                 my $rec;
580                 if ($as_xml) {
581                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
582                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
583                 } else {
584                     $rec = $marc->as_usmarc();
585                 }
586                 print {$fh} $rec;
587                 $num_exported++;
588             };
589             if ($@) {
590               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
591             }
592         }
593     }
594     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
595     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
596     close $fh;
597     return $num_exported;
598 }
599
600 sub generate_deleted_marc_records {
601     my ($record_type, $entries, $directory, $as_xml) = @_;
602
603     my $records_deleted = {};
604     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
605     if (include_xml_wrapper($as_xml, $record_type)) {
606         # include XML declaration and root element
607         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
608     }
609     my $i = 0;
610     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
611         print "\r$i" unless ($i++ %100 or !$verbose_logging);
612         print "." if ( $verbose_logging );
613
614         my $marc = MARC::Record->new();
615         if ($record_type eq 'biblio') {
616             fix_biblio_ids($marc, $record_number, $record_number);
617         } else {
618             fix_authority_id($marc, $record_number);
619         }
620         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
621             fix_unimarc_100($marc);
622         }
623
624         my $rec;
625         if ($as_xml) {
626             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
627             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
628         } else {
629             $rec = $marc->as_usmarc();
630         }
631         print {$fh} $rec;
632
633         $records_deleted->{$record_number} = 1;
634     }
635     print "\nRecords exported: $i\n" if ( $verbose_logging );
636     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
637     close $fh;
638     return $records_deleted;
639
640
641 }
642
643 sub get_corrected_marc_record {
644     my ($record_type, $record_number, $noxml) = @_;
645
646     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
647
648     if (defined $marc) {
649         fix_leader($marc);
650         if ($record_type eq 'authority') {
651             fix_authority_id($marc, $record_number);
652         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
653             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
654             $marc = $normalizer->process($marc);
655         }
656         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
657             fix_unimarc_100($marc);
658         }
659     }
660
661     return $marc;
662 }
663
664 sub get_raw_marc_record {
665     my ($record_type, $record_number, $noxml) = @_;
666
667     my $marc;
668     if ($record_type eq 'biblio') {
669         if ($noxml) {
670             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
671             $fetch_sth->execute($record_number);
672             if (my ($blob) = $fetch_sth->fetchrow_array) {
673                 $marc = MARC::Record->new_from_usmarc($blob);
674                 unless ($marc) {
675                     warn "error creating MARC::Record from $blob";
676                 }
677             }
678             # failure to find a bib is not a problem -
679             # a delete could have been done before
680             # trying to process a record update
681
682             $fetch_sth->finish();
683             return unless $marc;
684         } else {
685             eval { $marc = GetMarcBiblio($record_number, 1); };
686             if ($@ || !$marc) {
687                 # here we do warn since catching an exception
688                 # means that the bib was found but failed
689                 # to be parsed
690                 warn "error retrieving biblio $record_number";
691                 return;
692             }
693         }
694     } else {
695         eval { $marc = GetAuthority($record_number); };
696         if ($@) {
697             warn "error retrieving authority $record_number";
698             return;
699         }
700     }
701     return $marc;
702 }
703
704 sub fix_leader {
705     # FIXME - this routine is suspect
706     # It blanks the Leader/00-05 and Leader/12-16 to
707     # force them to be recalculated correct when
708     # the $marc->as_usmarc() or $marc->as_xml() is called.
709     # But why is this necessary?  It would be a serious bug
710     # in MARC::Record (definitely) and MARC::File::XML (arguably)
711     # if they are emitting incorrect leader values.
712     my $marc = shift;
713
714     my $leader = $marc->leader;
715     substr($leader,  0, 5) = '     ';
716     substr($leader, 10, 7) = '22     ';
717     $marc->leader(substr($leader, 0, 24));
718 }
719
720 sub fix_biblio_ids {
721     # FIXME - it is essential to ensure that the biblionumber is present,
722     #         otherwise, Zebra will choke on the record.  However, this
723     #         logic belongs in the relevant C4::Biblio APIs.
724     my $marc = shift;
725     my $biblionumber = shift;
726     my $biblioitemnumber;
727     if (@_) {
728         $biblioitemnumber = shift;
729     } else {
730         my $sth = $dbh->prepare(
731             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
732         $sth->execute($biblionumber);
733         ($biblioitemnumber) = $sth->fetchrow_array;
734         $sth->finish;
735         unless ($biblioitemnumber) {
736             warn "failed to get biblioitemnumber for biblio $biblionumber";
737             return 0;
738         }
739     }
740
741     # FIXME - this is cheating on two levels
742     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
743     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
744     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
745     #
746     # On the other hand, this better for now than what rebuild_zebra.pl used to
747     # do, which was duplicate the code for inserting the biblionumber
748     # and biblioitemnumber
749     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
750
751     return 1;
752 }
753
754 sub fix_authority_id {
755     # FIXME - as with fix_biblio_ids, the authid must be present
756     #         for Zebra's sake.  However, this really belongs
757     #         in C4::AuthoritiesMarc.
758     my ($marc, $authid) = @_;
759     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
760         $marc->delete_field($marc->field('001'));
761         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
762     }
763 }
764
765 sub fix_unimarc_100 {
766     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
767     my $marc = shift;
768
769     my $string;
770     if ( length($marc->subfield( 100, "a" )) == 36 ) {
771         $string = $marc->subfield( 100, "a" );
772         my $f100 = $marc->field(100);
773         $marc->delete_field($f100);
774     }
775     else {
776         $string = POSIX::strftime( "%Y%m%d", localtime );
777         $string =~ s/\-//g;
778         $string = sprintf( "%-*s", 35, $string );
779     }
780     substr( $string, 22, 6, "frey50" );
781     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
782         $marc->delete_field($marc->field(100));
783         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
784     }
785 }
786
787 sub do_indexing {
788     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
789
790     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
791     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
792     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
793     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
794
795     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
796     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
797     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
798
799 }
800
801 sub _flock {
802     # test if flock is present; if so, use it; if not, return true
803     # op refers to the official flock operations including LOCK_EX,
804     # LOCK_UN, etc.
805     # combining LOCK_EX with LOCK_NB returns immediately
806     my ($fh, $op)= @_;
807     if( !defined($use_flock) ) {
808         #check if flock is present; if not, you will have a fatal error
809         my $lock_acquired = eval { flock($fh, $op) };
810         # assuming that $fh and $op are fine(..), an undef $lock_acquired
811         # means no flock
812         $use_flock = defined($lock_acquired) ? 1 : 0;
813         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
814         return 1 if !$use_flock;
815         return $lock_acquired;
816     } else {
817         return 1 if !$use_flock;
818         return flock($fh, $op);
819     }
820 }
821
822 sub _create_lockfile { #returns undef on failure
823     my $dir= shift;
824     unless (-d $dir) {
825         eval { mkpath($dir, 0, oct(755)) };
826         return if $@;
827     }
828     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
829     return ( $fh, $dir.'/'.LOCK_FILENAME );
830 }
831
832 sub print_usage {
833     print <<_USAGE_;
834 $0: reindex MARC bibs and/or authorities in Zebra.
835
836 Use this batch job to reindex all biblio or authority
837 records in your Koha database.
838
839 Parameters:
840
841     -b                      index bibliographic records
842
843     -a                      index authority records
844
845     -daemon                 Run in daemon mode.  The program will loop checking
846                             for entries on the zebraqueue table, processing
847                             them incrementally if present, and then sleep
848                             for a few seconds before repeating the process
849                             Checking the zebraqueue table is done with a cheap
850                             SQL query.  This allows for near realtime update of
851                             the zebra search index with low system overhead.
852                             Use -sleep to control the checking interval.
853
854                             Daemon mode implies -z, -a, -b.  The program will
855                             refuse to start if options are present that do not
856                             make sense while running as an incremental update
857                             daemon (e.g. -r or -offset).
858
859     -sleep 10               Seconds to sleep between checks of the zebraqueue
860                             table in daemon mode.  The default is 5 seconds.
861
862     -z                      select only updated and deleted
863                             records marked in the zebraqueue
864                             table.  Cannot be used with -r
865                             or -s.
866
867     --skip-deletes          only select record updates, not record
868                             deletions, to avoid potential excessive
869                             I/O when zebraidx processes deletions.
870                             If this option is used for normal indexing,
871                             a cronjob should be set up to run
872                             rebuild_zebra.pl -z without --skip-deletes
873                             during off hours.
874                             Only effective with -z.
875
876     -r                      clear Zebra index before
877                             adding records to index. Implies -w.
878
879     -d                      Temporary directory for indexing.
880                             If not specified, one is automatically
881                             created.  The export directory
882                             is automatically deleted unless
883                             you supply the -k switch.
884
885     -k                      Do not delete export directory.
886
887     -s                      Skip export.  Used if you have
888                             already exported the records
889                             in a previous run.
890
891     -noxml                  index from ISO MARC blob
892                             instead of MARC XML.  This
893                             option is recommended only
894                             for advanced user.
895
896     -x                      export and index as xml instead of is02709 (biblios only).
897                             use this if you might have records > 99,999 chars,
898
899     -nosanitize             export biblio/authority records directly from DB marcxml
900                             field without sanitizing records. It speed up
901                             dump process but could fail if DB contains badly
902                             encoded records. Works only with -x,
903
904     -w                      skip shadow indexing for this batch
905
906     -y                      do NOT clear zebraqueue after indexing; normally,
907                             after doing batch indexing, zebraqueue should be
908                             marked done for the affected record type(s) so that
909                             a running zebraqueue_daemon doesn't try to reindex
910                             the same records - specify -y to override this.
911                             Cannot be used with -z.
912
913     -v                      increase the amount of logging.  Normally only
914                             warnings and errors from the indexing are shown.
915                             Use log level 2 (-v -v) to include all Zebra logs.
916
917     --length   1234         how many biblio you want to export
918     --offset 1243           offset you want to start to
919                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
920                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
921     --where                 let you specify a WHERE query, like itemtype='BOOK'
922                             or something like that
923
924     --run-as-root           explicitily allow script to run as 'root' user
925
926     --wait-for-lock         when not running in daemon mode, the default
927                             behavior is to abort a rebuild if the rebuild
928                             lock is busy.  This option will cause the program
929                             to wait for the lock to free and then continue
930                             processing the rebuild request,
931
932     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
933                             biblioitems is the default value.
934
935     --help or -h            show this message.
936 _USAGE_
937 }