Bug 12368: Rebuild Zebra improvement: allow to specify a DB table
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $process_zebraqueue_skip_deletes;
43 my $do_not_clear_zebraqueue;
44 my $length;
45 my $where;
46 my $offset;
47 my $run_as_root;
48 my $run_user = (getpwuid($<))[0];
49 my $wait_for_lock = 0;
50 my $use_flock;
51 my $table = 'biblioitems';
52
53 my $verbose_logging = 0;
54 my $zebraidx_log_opt = " -v none,fatal,warn ";
55 my $result = GetOptions(
56     'daemon'        => \$daemon_mode,
57     'sleep:i'       => \$daemon_sleep,
58     'd:s'           => \$directory,
59     'r|reset'       => \$reset,
60     's'             => \$skip_export,
61     'k'             => \$keep_export,
62     'I|skip-index'  => \$skip_index,
63     'nosanitize'    => \$nosanitize,
64     'b'             => \$biblios,
65     'noxml'         => \$noxml,
66     'w'             => \$noshadow,
67     'a'             => \$authorities,
68     'h|help'        => \$want_help,
69     'x'             => \$as_xml,
70     'y'             => \$do_not_clear_zebraqueue,
71     'z'             => \$process_zebraqueue,
72     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
73     'where:s'       => \$where,
74     'length:i'      => \$length,
75     'offset:i'      => \$offset,
76     'v+'            => \$verbose_logging,
77     'run-as-root'   => \$run_as_root,
78     'wait-for-lock' => \$wait_for_lock,
79     't|table:s'     => \$table,
80 );
81
82 if (not $result or $want_help) {
83     print_usage();
84     exit 0;
85 }
86
87 if( not defined $run_as_root and $run_user eq 'root') {
88     my $msg = "Warning: You are running this script as the user 'root'.\n";
89     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
90     $msg   .= "Please do '$0 --help' to see usage.\n";
91     die $msg;
92 }
93
94 if ( !$as_xml and $nosanitize ) {
95     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
96     $msg   .= "Please do '$0 --help' to see usage.\n";
97     die $msg;
98 }
99
100 if ($process_zebraqueue and ($skip_export or $reset)) {
101     my $msg = "Cannot specify -r or -s if -z is specified\n";
102     $msg   .= "Please do '$0 --help' to see usage.\n";
103     die $msg;
104 }
105
106 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
107     my $msg = "Cannot specify both -y and -z\n";
108     $msg   .= "Please do '$0 --help' to see usage.\n";
109     die $msg;
110 }
111
112 if ($reset) {
113     $noshadow = 1;
114 }
115
116 if ($noshadow) {
117     $noshadow = ' -n ';
118 }
119
120 if ($daemon_mode) {
121     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
122     if ($skip_export or $keep_export or $skip_index or
123           $where or $length or $offset) {
124         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
125         $msg   .= "Please do '$0 --help' to see usage.\n";
126         die $msg;
127     }
128     $authorities = 1;
129     $biblios = 1;
130     $process_zebraqueue = 1;
131 }
132
133 if (not $biblios and not $authorities) {
134     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
135     $msg   .= "Please do '$0 --help' to see usage.\n";
136     die $msg;
137 }
138
139
140 #  -v is for verbose, which seems backwards here because of how logging is set
141 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
142 if ($verbose_logging >= 2) {
143     $zebraidx_log_opt = '-v none,fatal,warn,all';
144 }
145
146 my $use_tempdir = 0;
147 unless ($directory) {
148     $use_tempdir = 1;
149     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
150 }
151
152
153 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
154 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
155
156 my $kohadir = C4::Context->config('intranetdir');
157 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
158 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
159
160 my $dbh = C4::Context->dbh;
161 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
162 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
163
164 # Protect again simultaneous update of the zebra index by using a lock file.
165 # Create our own lock directory if its missing.  This shouild be created
166 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
167 # does not exist and cannot be created, we fall back on /tmp - which will
168 # always work.
169
170 my ($lockfile, $LockFH);
171 foreach (
172     C4::Context->config("zebra_lockdir"),
173     '/var/lock/zebra_' . C4::Context->config('database'),
174     '/tmp/zebra_' . C4::Context->config('database')
175 ) {
176     #we try three possibilities (we really want to lock :)
177     next if !$_;
178     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
179     last if defined $LockFH;
180 }
181 if( !defined $LockFH ) {
182     print "WARNING: Could not create lock file $lockfile: $!\n";
183     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
184     print "Verify file permissions for it too.\n";
185     $use_flock = 0; # we disable file locking now and will continue
186                     # without it
187                     # note that this mimics old behavior (before we used
188                     # the lockfile)
189 };
190
191 if ( $verbose_logging ) {
192     print "Zebra configuration information\n";
193     print "================================\n";
194     print "Zebra biblio directory      = $biblioserverdir\n";
195     print "Zebra authorities directory = $authorityserverdir\n";
196     print "Koha directory              = $kohadir\n";
197     print "Lockfile                    = $lockfile\n" if $lockfile;
198     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
199     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
200     print "================================\n";
201 }
202
203 my $tester = XML::LibXML->new();
204
205 # The main work is done here by calling do_one_pass().  We have added locking
206 # avoid race conditions between full rebuilds and incremental updates either from
207 # daemon mode or periodic invocation from cron.  The race can lead to an updated
208 # record being overwritten by a rebuild if the update is applied after the export
209 # by the rebuild and before the rebuild finishes (more likely to affect large
210 # catalogs).
211 #
212 # We have chosen to exit immediately by default if we cannot obtain the lock
213 # to prevent the potential for a infinite backlog from cron invocations, but an
214 # option (wait-for-lock) is provided to let the program wait for the lock.
215 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
216 if ($daemon_mode) {
217     while (1) {
218         # For incremental updates, skip the update if the updates are locked
219         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
220             do_one_pass() if ( zebraqueue_not_empty() );
221             _flock($LockFH, LOCK_UN);
222         }
223         sleep $daemon_sleep;
224     }
225 } else {
226     # all one-off invocations
227     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
228     if (_flock($LockFH, $lock_mode)) {
229         do_one_pass();
230         _flock($LockFH, LOCK_UN);
231     } else {
232         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
233     }
234 }
235
236
237 if ( $verbose_logging ) {
238     print "====================\n";
239     print "CLEANING\n";
240     print "====================\n";
241 }
242 if ($keep_export) {
243     print "NOTHING cleaned : the export $directory has been kept.\n";
244     print "You can re-run this script with the -s ";
245     if ($use_tempdir) {
246         print " and -d $directory parameters";
247     } else {
248         print "parameter";
249     }
250     print "\n";
251     print "if you just want to rebuild zebra after changing the record.abs\n";
252     print "or another zebra config file\n";
253 } else {
254     unless ($use_tempdir) {
255         # if we're using a temporary directory
256         # created by File::Temp, it will be removed
257         # automatically.
258         rmtree($directory, 0, 1);
259         print "directory $directory deleted\n";
260     }
261 }
262
263 sub do_one_pass {
264     if ($authorities) {
265         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
266     } else {
267         print "skipping authorities\n" if ( $verbose_logging );
268     }
269
270     if ($biblios) {
271         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
272     } else {
273         print "skipping biblios\n" if ( $verbose_logging );
274     }
275 }
276
277 # Check the zebra update queue and return true if there are records to process
278 # This routine will handle each of -ab, -a, or -b, but in practice we force
279 # -ab when in daemon mode.
280 sub zebraqueue_not_empty {
281     my $where_str;
282
283     if ($authorities && $biblios) {
284         $where_str = 'done = 0;';
285     } elsif ($biblios) {
286         $where_str = 'server = "biblioserver" AND done = 0;';
287     } else {
288         $where_str = 'server = "authorityserver" AND done = 0;';
289     }
290     my $query =
291         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
292
293     $query->execute;
294     my $count = $query->fetchrow_arrayref->[0];
295     print "queued records: $count\n" if $verbose_logging > 0;
296     return $count > 0;
297 }
298
299 # This checks to see if the zebra directories exist under the provided path.
300 # If they don't, then zebra is likely to spit the dummy. This returns true
301 # if the directories had to be created, false otherwise.
302 sub check_zebra_dirs {
303     my ($base) = shift() . '/';
304     my $needed_repairing = 0;
305     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
306     foreach my $dir (@dirs) {
307         my $bdir = $base . $dir;
308         if (! -d $bdir) {
309             $needed_repairing = 1;
310             mkdir $bdir || die "Unable to create '$bdir': $!\n";
311             print "$0: needed to create '$bdir'\n";
312         }
313     }
314     return $needed_repairing;
315 }   # ----------  end of subroutine check_zebra_dirs  ----------
316
317 sub index_records {
318     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
319
320     my $num_records_exported = 0;
321     my $records_deleted = {};
322     my $need_reset = check_zebra_dirs($server_dir);
323     if ($need_reset) {
324         print "$0: found broken zebra server directories: forcing a rebuild\n";
325         $reset = 1;
326     }
327     if ($skip_export && $verbose_logging) {
328         print "====================\n";
329         print "SKIPPING $record_type export\n";
330         print "====================\n";
331     } else {
332         if ( $verbose_logging ) {
333             print "====================\n";
334             print "exporting $record_type\n";
335             print "====================\n";
336         }
337         mkdir "$directory" unless (-d $directory);
338         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
339         if ($process_zebraqueue) {
340             my $entries;
341
342             unless ( $process_zebraqueue_skip_deletes ) {
343                 $entries = select_zebraqueue_records($record_type, 'deleted');
344                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
345                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
346                 mark_zebraqueue_batch_done($entries);
347             }
348
349             $entries = select_zebraqueue_records($record_type, 'updated');
350             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
351             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
352             mark_zebraqueue_batch_done($entries);
353
354         } else {
355             my $sth = select_all_records($record_type);
356             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
357             unless ($do_not_clear_zebraqueue) {
358                 mark_all_zebraqueue_done($record_type);
359             }
360         }
361     }
362
363     #
364     # and reindexing everything
365     #
366     if ($skip_index) {
367         if ($verbose_logging) {
368             print "====================\n";
369             print "SKIPPING $record_type indexing\n";
370             print "====================\n";
371         }
372     } else {
373         if ( $verbose_logging ) {
374             print "====================\n";
375             print "REINDEXING zebra\n";
376             print "====================\n";
377         }
378         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
379         if ($process_zebraqueue) {
380             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
381                 if %$records_deleted;
382             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
383                 if $num_records_exported;
384         } else {
385             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
386                 if ($num_records_exported or $skip_export);
387         }
388     }
389 }
390
391
392 sub select_zebraqueue_records {
393     my ($record_type, $update_type) = @_;
394
395     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
396     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
397
398     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
399                              FROM zebraqueue
400                              WHERE server = ?
401                              AND   operation = ?
402                              AND   done = 0
403                              ORDER BY id DESC");
404     $sth->execute($server, $op);
405     my $entries = $sth->fetchall_arrayref({});
406 }
407
408 sub mark_all_zebraqueue_done {
409     my ($record_type) = @_;
410
411     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
412
413     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
414                              WHERE server = ?
415                              AND done = 0");
416     $sth->execute($server);
417 }
418
419 sub mark_zebraqueue_batch_done {
420     my ($entries) = @_;
421
422     $dbh->{AutoCommit} = 0;
423     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
424     $dbh->commit();
425     foreach my $id (map { $_->{id} } @$entries) {
426         $sth->execute($id);
427     }
428     $dbh->{AutoCommit} = 1;
429 }
430
431 sub select_all_records {
432     my $record_type = shift;
433     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
434 }
435
436 sub select_all_authorities {
437     my $strsth=qq{SELECT authid FROM auth_header};
438     $strsth.=qq{ WHERE $where } if ($where);
439     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
440     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
441     my $sth = $dbh->prepare($strsth);
442     $sth->execute();
443     return $sth;
444 }
445
446 sub select_all_biblios {
447     $table = 'biblioitems'
448       if $table ne 'items'
449       and $table ne 'biblio';
450     my $strsth = qq{ SELECT biblionumber FROM $table };
451     $strsth.=qq{ WHERE $where } if ($where);
452     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
453     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
454     my $sth = $dbh->prepare($strsth);
455     $sth->execute();
456     return $sth;
457 }
458
459 sub include_xml_wrapper {
460     my $as_xml = shift;
461     my $record_type = shift;
462
463     return 0 unless $as_xml;
464     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
465     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
466     return 0;
467
468 }
469
470 sub export_marc_records_from_sth {
471     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
472
473     my $num_exported = 0;
474     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
475     if (include_xml_wrapper($as_xml, $record_type)) {
476         # include XML declaration and root element
477         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
478     }
479     my $i = 0;
480     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
481     while (my ($record_number) = $sth->fetchrow_array) {
482         print "." if ( $verbose_logging );
483         print "\r$i" unless ($i++ %100 or !$verbose_logging);
484         if ( $nosanitize ) {
485             my $marcxml = $record_type eq 'biblio'
486                           ? GetXmlBiblio( $record_number )
487                           : GetAuthorityXML( $record_number );
488             if ($record_type eq 'biblio'){
489                 my @items = GetItemsInfo($record_number);
490                 if (@items){
491                     my $record = MARC::Record->new;
492                     $record->encoding('UTF-8');
493                     my @itemsrecord;
494                     foreach my $item (@items){
495                         my $record = Item2Marc($item, $record_number);
496                         push @itemsrecord, $record->field($itemtag);
497                     }
498                     $record->insert_fields_ordered(@itemsrecord);
499                     my $itemsxml = $record->as_xml_record();
500                     $marcxml =
501                         substr($marcxml, 0, length($marcxml)-10) .
502                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
503                 }
504             }
505             # extra test to ensure that result is valid XML; otherwise
506             # Zebra won't parse it in DOM mode
507             eval {
508                 my $doc = $tester->parse_string($marcxml);
509             };
510             if ($@) {
511                 warn "Error exporting record $record_number ($record_type): $@\n";
512                 next;
513             }
514             if ( $marcxml ) {
515                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
516                 print {$fh} $marcxml;
517                 $num_exported++;
518             }
519             next;
520         }
521         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
522         if (defined $marc) {
523             eval {
524                 my $rec;
525                 if ($as_xml) {
526                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
527                     eval {
528                         my $doc = $tester->parse_string($rec);
529                     };
530                     if ($@) {
531                         die "invalid XML: $@";
532                     }
533                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
534                 } else {
535                     $rec = $marc->as_usmarc();
536                 }
537                 print {$fh} $rec;
538                 $num_exported++;
539             };
540             if ($@) {
541                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
542                 warn "... specific error is $@" if $verbose_logging;
543             }
544         }
545     }
546     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
547     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
548     close $fh;
549     return $num_exported;
550 }
551
552 sub export_marc_records_from_list {
553     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
554
555     my $num_exported = 0;
556     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
557     if (include_xml_wrapper($as_xml, $record_type)) {
558         # include XML declaration and root element
559         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
560     }
561     my $i = 0;
562
563     # Skip any deleted records. We check for this anyway, but this reduces error spam
564     my %found = %$records_deleted;
565     foreach my $record_number ( map { $_->{biblio_auth_number} }
566                                 grep { !$found{ $_->{biblio_auth_number} }++ }
567                                 @$entries ) {
568         print "." if ( $verbose_logging );
569         print "\r$i" unless ($i++ %100 or !$verbose_logging);
570         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
571         if (defined $marc) {
572             eval {
573                 my $rec;
574                 if ($as_xml) {
575                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
576                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
577                 } else {
578                     $rec = $marc->as_usmarc();
579                 }
580                 print {$fh} $rec;
581                 $num_exported++;
582             };
583             if ($@) {
584               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
585             }
586         }
587     }
588     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
589     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
590     close $fh;
591     return $num_exported;
592 }
593
594 sub generate_deleted_marc_records {
595     my ($record_type, $entries, $directory, $as_xml) = @_;
596
597     my $records_deleted = {};
598     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
599     if (include_xml_wrapper($as_xml, $record_type)) {
600         # include XML declaration and root element
601         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
602     }
603     my $i = 0;
604     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
605         print "\r$i" unless ($i++ %100 or !$verbose_logging);
606         print "." if ( $verbose_logging );
607
608         my $marc = MARC::Record->new();
609         if ($record_type eq 'biblio') {
610             fix_biblio_ids($marc, $record_number, $record_number);
611         } else {
612             fix_authority_id($marc, $record_number);
613         }
614         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
615             fix_unimarc_100($marc);
616         }
617
618         my $rec;
619         if ($as_xml) {
620             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
621             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
622         } else {
623             $rec = $marc->as_usmarc();
624         }
625         print {$fh} $rec;
626
627         $records_deleted->{$record_number} = 1;
628     }
629     print "\nRecords exported: $i\n" if ( $verbose_logging );
630     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
631     close $fh;
632     return $records_deleted;
633
634
635 }
636
637 sub get_corrected_marc_record {
638     my ($record_type, $record_number, $noxml) = @_;
639
640     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
641
642     if (defined $marc) {
643         fix_leader($marc);
644         if ($record_type eq 'authority') {
645             fix_authority_id($marc, $record_number);
646         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
647             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
648             $marc = $normalizer->process($marc);
649         }
650         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
651             fix_unimarc_100($marc);
652         }
653     }
654
655     return $marc;
656 }
657
658 sub get_raw_marc_record {
659     my ($record_type, $record_number, $noxml) = @_;
660
661     my $marc;
662     if ($record_type eq 'biblio') {
663         if ($noxml) {
664             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
665             $fetch_sth->execute($record_number);
666             if (my ($blob) = $fetch_sth->fetchrow_array) {
667                 $marc = MARC::Record->new_from_usmarc($blob);
668                 unless ($marc) {
669                     warn "error creating MARC::Record from $blob";
670                 }
671             }
672             # failure to find a bib is not a problem -
673             # a delete could have been done before
674             # trying to process a record update
675
676             $fetch_sth->finish();
677             return unless $marc;
678         } else {
679             eval { $marc = GetMarcBiblio($record_number, 1); };
680             if ($@ || !$marc) {
681                 # here we do warn since catching an exception
682                 # means that the bib was found but failed
683                 # to be parsed
684                 warn "error retrieving biblio $record_number";
685                 return;
686             }
687         }
688     } else {
689         eval { $marc = GetAuthority($record_number); };
690         if ($@) {
691             warn "error retrieving authority $record_number";
692             return;
693         }
694     }
695     return $marc;
696 }
697
698 sub fix_leader {
699     # FIXME - this routine is suspect
700     # It blanks the Leader/00-05 and Leader/12-16 to
701     # force them to be recalculated correct when
702     # the $marc->as_usmarc() or $marc->as_xml() is called.
703     # But why is this necessary?  It would be a serious bug
704     # in MARC::Record (definitely) and MARC::File::XML (arguably)
705     # if they are emitting incorrect leader values.
706     my $marc = shift;
707
708     my $leader = $marc->leader;
709     substr($leader,  0, 5) = '     ';
710     substr($leader, 10, 7) = '22     ';
711     $marc->leader(substr($leader, 0, 24));
712 }
713
714 sub fix_biblio_ids {
715     # FIXME - it is essential to ensure that the biblionumber is present,
716     #         otherwise, Zebra will choke on the record.  However, this
717     #         logic belongs in the relevant C4::Biblio APIs.
718     my $marc = shift;
719     my $biblionumber = shift;
720     my $biblioitemnumber;
721     if (@_) {
722         $biblioitemnumber = shift;
723     } else {
724         my $sth = $dbh->prepare(
725             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
726         $sth->execute($biblionumber);
727         ($biblioitemnumber) = $sth->fetchrow_array;
728         $sth->finish;
729         unless ($biblioitemnumber) {
730             warn "failed to get biblioitemnumber for biblio $biblionumber";
731             return 0;
732         }
733     }
734
735     # FIXME - this is cheating on two levels
736     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
737     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
738     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
739     #
740     # On the other hand, this better for now than what rebuild_zebra.pl used to
741     # do, which was duplicate the code for inserting the biblionumber
742     # and biblioitemnumber
743     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
744
745     return 1;
746 }
747
748 sub fix_authority_id {
749     # FIXME - as with fix_biblio_ids, the authid must be present
750     #         for Zebra's sake.  However, this really belongs
751     #         in C4::AuthoritiesMarc.
752     my ($marc, $authid) = @_;
753     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
754         $marc->delete_field($marc->field('001'));
755         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
756     }
757 }
758
759 sub fix_unimarc_100 {
760     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
761     my $marc = shift;
762
763     my $string;
764     if ( length($marc->subfield( 100, "a" )) == 36 ) {
765         $string = $marc->subfield( 100, "a" );
766         my $f100 = $marc->field(100);
767         $marc->delete_field($f100);
768     }
769     else {
770         $string = POSIX::strftime( "%Y%m%d", localtime );
771         $string =~ s/\-//g;
772         $string = sprintf( "%-*s", 35, $string );
773     }
774     substr( $string, 22, 6, "frey50" );
775     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
776         $marc->delete_field($marc->field(100));
777         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
778     }
779 }
780
781 sub do_indexing {
782     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
783
784     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
785     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
786     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
787     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
788
789     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
790     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
791     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
792
793 }
794
795 sub _flock {
796     # test if flock is present; if so, use it; if not, return true
797     # op refers to the official flock operations including LOCK_EX,
798     # LOCK_UN, etc.
799     # combining LOCK_EX with LOCK_NB returns immediately
800     my ($fh, $op)= @_;
801     if( !defined($use_flock) ) {
802         #check if flock is present; if not, you will have a fatal error
803         my $lock_acquired = eval { flock($fh, $op) };
804         # assuming that $fh and $op are fine(..), an undef $lock_acquired
805         # means no flock
806         $use_flock = defined($lock_acquired) ? 1 : 0;
807         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
808         return 1 if !$use_flock;
809         return $lock_acquired;
810     } else {
811         return 1 if !$use_flock;
812         return flock($fh, $op);
813     }
814 }
815
816 sub _create_lockfile { #returns undef on failure
817     my $dir= shift;
818     unless (-d $dir) {
819         eval { mkpath($dir, 0, oct(755)) };
820         return if $@;
821     }
822     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
823     return ( $fh, $dir.'/'.LOCK_FILENAME );
824 }
825
826 sub print_usage {
827     print <<_USAGE_;
828 $0: reindex MARC bibs and/or authorities in Zebra.
829
830 Use this batch job to reindex all biblio or authority
831 records in your Koha database.
832
833 Parameters:
834
835     -b                      index bibliographic records
836
837     -a                      index authority records
838
839     -daemon                 Run in daemon mode.  The program will loop checking
840                             for entries on the zebraqueue table, processing
841                             them incrementally if present, and then sleep
842                             for a few seconds before repeating the process
843                             Checking the zebraqueue table is done with a cheap
844                             SQL query.  This allows for near realtime update of
845                             the zebra search index with low system overhead.
846                             Use -sleep to control the checking interval.
847
848                             Daemon mode implies -z, -a, -b.  The program will
849                             refuse to start if options are present that do not
850                             make sense while running as an incremental update
851                             daemon (e.g. -r or -offset).
852
853     -sleep 10               Seconds to sleep between checks of the zebraqueue
854                             table in daemon mode.  The default is 5 seconds.
855
856     -z                      select only updated and deleted
857                             records marked in the zebraqueue
858                             table.  Cannot be used with -r
859                             or -s.
860
861     --skip-deletes          only select record updates, not record
862                             deletions, to avoid potential excessive
863                             I/O when zebraidx processes deletions.
864                             If this option is used for normal indexing,
865                             a cronjob should be set up to run
866                             rebuild_zebra.pl -z without --skip-deletes
867                             during off hours.
868                             Only effective with -z.
869
870     -r                      clear Zebra index before
871                             adding records to index. Implies -w.
872
873     -d                      Temporary directory for indexing.
874                             If not specified, one is automatically
875                             created.  The export directory
876                             is automatically deleted unless
877                             you supply the -k switch.
878
879     -k                      Do not delete export directory.
880
881     -s                      Skip export.  Used if you have
882                             already exported the records
883                             in a previous run.
884
885     -noxml                  index from ISO MARC blob
886                             instead of MARC XML.  This
887                             option is recommended only
888                             for advanced user.
889
890     -x                      export and index as xml instead of is02709 (biblios only).
891                             use this if you might have records > 99,999 chars,
892
893     -nosanitize             export biblio/authority records directly from DB marcxml
894                             field without sanitizing records. It speed up
895                             dump process but could fail if DB contains badly
896                             encoded records. Works only with -x,
897
898     -w                      skip shadow indexing for this batch
899
900     -y                      do NOT clear zebraqueue after indexing; normally,
901                             after doing batch indexing, zebraqueue should be
902                             marked done for the affected record type(s) so that
903                             a running zebraqueue_daemon doesn't try to reindex
904                             the same records - specify -y to override this.
905                             Cannot be used with -z.
906
907     -v                      increase the amount of logging.  Normally only
908                             warnings and errors from the indexing are shown.
909                             Use log level 2 (-v -v) to include all Zebra logs.
910
911     --length   1234         how many biblio you want to export
912     --offset 1243           offset you want to start to
913                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
914                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
915     --where                 let you specify a WHERE query, like itemtype='BOOK'
916                             or something like that
917
918     --run-as-root           explicitily allow script to run as 'root' user
919
920     --wait-for-lock         when not running in daemon mode, the default
921                             behavior is to abort a rebuild if the rebuild
922                             lock is busy.  This option will cause the program
923                             to wait for the lock to free and then continue
924                             processing the rebuild request,
925
926     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
927                             biblioitems is the default value.
928
929     --help or -h            show this message.
930 _USAGE_
931 }