Bug 16506: Make rebuild_zebra.pl use XML as default
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use XML::LibXML;
30
31 use constant LOCK_FILENAME => 'rebuild..LCK';
32
33 # script that checks zebradir structure & create directories & mandatory files if needed
34 #
35 #
36
37 $|=1; # flushes output
38 # If the cron job starts us in an unreadable dir, we will break without
39 # this.
40 chdir $ENV{HOME} if (!(-r '.'));
41 my $daemon_mode;
42 my $daemon_sleep = 5;
43 my $directory;
44 my $nosanitize;
45 my $skip_export;
46 my $keep_export;
47 my $skip_index;
48 my $reset;
49 my $biblios;
50 my $authorities;
51 my $as_usmarc;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66
67 my $verbose_logging = 0;
68 my $zebraidx_log_opt = " -v none,fatal,warn ";
69 my $result = GetOptions(
70     'daemon'        => \$daemon_mode,
71     'sleep:i'       => \$daemon_sleep,
72     'd:s'           => \$directory,
73     'r|reset'       => \$reset,
74     's'             => \$skip_export,
75     'k'             => \$keep_export,
76     'I|skip-index'  => \$skip_index,
77     'nosanitize'    => \$nosanitize,
78     'b'             => \$biblios,
79     'noxml'         => \$as_usmarc,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated·\n";
103 }
104
105 if( not defined $run_as_root and $run_user eq 'root') {
106     my $msg = "Warning: You are running this script as the user 'root'.\n";
107     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
108     $msg   .= "Please do '$0 --help' to see usage.\n";
109     die $msg;
110 }
111
112 if ( $as_usmarc and $nosanitize ) {
113     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
114     $msg   .= "Please do '$0 --help' to see usage.\n";
115     die $msg;
116 }
117
118 if ($process_zebraqueue and ($skip_export or $reset)) {
119     my $msg = "Cannot specify -r or -s if -z is specified\n";
120     $msg   .= "Please do '$0 --help' to see usage.\n";
121     die $msg;
122 }
123
124 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
125     my $msg = "Cannot specify both -y and -z\n";
126     $msg   .= "Please do '$0 --help' to see usage.\n";
127     die $msg;
128 }
129
130 if ($reset) {
131     $noshadow = 1;
132 }
133
134 if ($noshadow) {
135     $noshadow = ' -n ';
136 }
137
138 if ($daemon_mode) {
139     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
140     if ($skip_export or $keep_export or $skip_index or
141           $where or $length or $offset) {
142         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
143         $msg   .= "Please do '$0 --help' to see usage.\n";
144         die $msg;
145     }
146     $authorities = 1;
147     $biblios = 1;
148     $process_zebraqueue = 1;
149 }
150
151 if (not $biblios and not $authorities) {
152     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
153     $msg   .= "Please do '$0 --help' to see usage.\n";
154     die $msg;
155 }
156
157 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
158 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
159     die "Cannot specify -t|--table with value '$table'. Only "
160       . ( join ', ', @tables_allowed_for_select )
161       . " are allowed.";
162 }
163
164
165 #  -v is for verbose, which seems backwards here because of how logging is set
166 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
167 if ($verbose_logging >= 2) {
168     $zebraidx_log_opt = '-v none,fatal,warn,all';
169 }
170
171 my $use_tempdir = 0;
172 unless ($directory) {
173     $use_tempdir = 1;
174     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
175 }
176
177
178 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
179 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
180
181 my $kohadir = C4::Context->config('intranetdir');
182 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
183 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
184
185 my $dbh = C4::Context->dbh;
186 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
187 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
188
189 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
190 <collection xmlns="http://www.loc.gov/MARC21/slim">
191 };
192
193 my $marcxml_close = q{
194 </collection>
195 };
196
197 # Protect again simultaneous update of the zebra index by using a lock file.
198 # Create our own lock directory if its missing.  This shouild be created
199 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
200 # does not exist and cannot be created, we fall back on /tmp - which will
201 # always work.
202
203 my ($lockfile, $LockFH);
204 foreach (
205     C4::Context->config("zebra_lockdir"),
206     '/var/lock/zebra_' . C4::Context->config('database'),
207     '/tmp/zebra_' . C4::Context->config('database')
208 ) {
209     #we try three possibilities (we really want to lock :)
210     next if !$_;
211     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
212     last if defined $LockFH;
213 }
214 if( !defined $LockFH ) {
215     print "WARNING: Could not create lock file $lockfile: $!\n";
216     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
217     print "Verify file permissions for it too.\n";
218     $use_flock = 0; # we disable file locking now and will continue
219                     # without it
220                     # note that this mimics old behavior (before we used
221                     # the lockfile)
222 };
223
224 if ( $verbose_logging ) {
225     print "Zebra configuration information\n";
226     print "================================\n";
227     print "Zebra biblio directory      = $biblioserverdir\n";
228     print "Zebra authorities directory = $authorityserverdir\n";
229     print "Koha directory              = $kohadir\n";
230     print "Lockfile                    = $lockfile\n" if $lockfile;
231     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
232     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
233     print "================================\n";
234 }
235
236 my $tester = XML::LibXML->new();
237
238 # The main work is done here by calling do_one_pass().  We have added locking
239 # avoid race conditions between full rebuilds and incremental updates either from
240 # daemon mode or periodic invocation from cron.  The race can lead to an updated
241 # record being overwritten by a rebuild if the update is applied after the export
242 # by the rebuild and before the rebuild finishes (more likely to affect large
243 # catalogs).
244 #
245 # We have chosen to exit immediately by default if we cannot obtain the lock
246 # to prevent the potential for a infinite backlog from cron invocations, but an
247 # option (wait-for-lock) is provided to let the program wait for the lock.
248 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
249 if ($daemon_mode) {
250     while (1) {
251         # For incremental updates, skip the update if the updates are locked
252         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
253             do_one_pass() if ( zebraqueue_not_empty() );
254             _flock($LockFH, LOCK_UN);
255         }
256         sleep $daemon_sleep;
257     }
258 } else {
259     # all one-off invocations
260     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
261     if (_flock($LockFH, $lock_mode)) {
262         do_one_pass();
263         _flock($LockFH, LOCK_UN);
264     } else {
265         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
266     }
267 }
268
269
270 if ( $verbose_logging ) {
271     print "====================\n";
272     print "CLEANING\n";
273     print "====================\n";
274 }
275 if ($keep_export) {
276     print "NOTHING cleaned : the export $directory has been kept.\n";
277     print "You can re-run this script with the -s ";
278     if ($use_tempdir) {
279         print " and -d $directory parameters";
280     } else {
281         print "parameter";
282     }
283     print "\n";
284     print "if you just want to rebuild zebra after changing the record.abs\n";
285     print "or another zebra config file\n";
286 } else {
287     unless ($use_tempdir) {
288         # if we're using a temporary directory
289         # created by File::Temp, it will be removed
290         # automatically.
291         rmtree($directory, 0, 1);
292         print "directory $directory deleted\n";
293     }
294 }
295
296 sub do_one_pass {
297     if ($authorities) {
298         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
299     } else {
300         print "skipping authorities\n" if ( $verbose_logging );
301     }
302
303     if ($biblios) {
304         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
305     } else {
306         print "skipping biblios\n" if ( $verbose_logging );
307     }
308 }
309
310 # Check the zebra update queue and return true if there are records to process
311 # This routine will handle each of -ab, -a, or -b, but in practice we force
312 # -ab when in daemon mode.
313 sub zebraqueue_not_empty {
314     my $where_str;
315
316     if ($authorities && $biblios) {
317         $where_str = 'done = 0;';
318     } elsif ($biblios) {
319         $where_str = 'server = "biblioserver" AND done = 0;';
320     } else {
321         $where_str = 'server = "authorityserver" AND done = 0;';
322     }
323     my $query =
324         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
325
326     $query->execute;
327     my $count = $query->fetchrow_arrayref->[0];
328     print "queued records: $count\n" if $verbose_logging > 0;
329     return $count > 0;
330 }
331
332 # This checks to see if the zebra directories exist under the provided path.
333 # If they don't, then zebra is likely to spit the dummy. This returns true
334 # if the directories had to be created, false otherwise.
335 sub check_zebra_dirs {
336     my ($base) = shift() . '/';
337     my $needed_repairing = 0;
338     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
339     foreach my $dir (@dirs) {
340         my $bdir = $base . $dir;
341         if (! -d $bdir) {
342             $needed_repairing = 1;
343             mkdir $bdir || die "Unable to create '$bdir': $!\n";
344             print "$0: needed to create '$bdir'\n";
345         }
346     }
347     return $needed_repairing;
348 }   # ----------  end of subroutine check_zebra_dirs  ----------
349
350 sub index_records {
351     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
352
353     my $num_records_exported = 0;
354     my $records_deleted = {};
355     my $need_reset = check_zebra_dirs($server_dir);
356     if ($need_reset) {
357         print "$0: found broken zebra server directories: forcing a rebuild\n";
358         $reset = 1;
359     }
360     if ($skip_export && $verbose_logging) {
361         print "====================\n";
362         print "SKIPPING $record_type export\n";
363         print "====================\n";
364     } else {
365         if ( $verbose_logging ) {
366             print "====================\n";
367             print "exporting $record_type\n";
368             print "====================\n";
369         }
370         mkdir "$directory" unless (-d $directory);
371         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
372         if ($process_zebraqueue) {
373             my $entries;
374
375             unless ( $process_zebraqueue_skip_deletes ) {
376                 $entries = select_zebraqueue_records($record_type, 'deleted');
377                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
378                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_usmarc);
379                 mark_zebraqueue_batch_done($entries);
380             }
381
382             $entries = select_zebraqueue_records($record_type, 'updated');
383             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
384             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_usmarc, $records_deleted);
385             mark_zebraqueue_batch_done($entries);
386
387         } else {
388             my $sth = select_all_records($record_type);
389             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_usmarc, $nosanitize);
390             unless ($do_not_clear_zebraqueue) {
391                 mark_all_zebraqueue_done($record_type);
392             }
393         }
394     }
395
396     #
397     # and reindexing everything
398     #
399     if ($skip_index) {
400         if ($verbose_logging) {
401             print "====================\n";
402             print "SKIPPING $record_type indexing\n";
403             print "====================\n";
404         }
405     } else {
406         if ( $verbose_logging ) {
407             print "====================\n";
408             print "REINDEXING zebra\n";
409             print "====================\n";
410         }
411         my $record_fmt = ($as_usmarc) ? 'iso2709' : 'marcxml' ;
412         if ($process_zebraqueue) {
413             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
414                 if %$records_deleted;
415             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
416                 if $num_records_exported;
417         } else {
418             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
419                 if ($num_records_exported or $skip_export);
420         }
421     }
422 }
423
424
425 sub select_zebraqueue_records {
426     my ($record_type, $update_type) = @_;
427
428     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
429     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
430
431     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
432                              FROM zebraqueue
433                              WHERE server = ?
434                              AND   operation = ?
435                              AND   done = 0
436                              ORDER BY id DESC");
437     $sth->execute($server, $op);
438     my $entries = $sth->fetchall_arrayref({});
439 }
440
441 sub mark_all_zebraqueue_done {
442     my ($record_type) = @_;
443
444     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
445
446     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
447                              WHERE server = ?
448                              AND done = 0");
449     $sth->execute($server);
450 }
451
452 sub mark_zebraqueue_batch_done {
453     my ($entries) = @_;
454
455     $dbh->{AutoCommit} = 0;
456     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
457     $dbh->commit();
458     foreach my $id (map { $_->{id} } @$entries) {
459         $sth->execute($id);
460     }
461     $dbh->{AutoCommit} = 1;
462 }
463
464 sub select_all_records {
465     my $record_type = shift;
466     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
467 }
468
469 sub select_all_authorities {
470     my $strsth=qq{SELECT authid FROM auth_header};
471     $strsth.=qq{ WHERE $where } if ($where);
472     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
473     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
474     my $sth = $dbh->prepare($strsth);
475     $sth->execute();
476     return $sth;
477 }
478
479 sub select_all_biblios {
480     $table = 'biblioitems'
481       unless grep { /^$table$/ } @tables_allowed_for_select;
482     my $strsth = qq{ SELECT biblionumber FROM $table };
483     $strsth.=qq{ WHERE $where } if ($where);
484     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
485     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
486     my $sth = $dbh->prepare($strsth);
487     $sth->execute();
488     return $sth;
489 }
490
491 sub export_marc_records_from_sth {
492     my ($record_type, $sth, $directory, $as_usmarc, $nosanitize) = @_;
493
494     my $num_exported = 0;
495     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
496
497     print {$fh} $marcxml_open
498         unless $as_usmarc;
499
500     my $i = 0;
501     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
502     while (my ($record_number) = $sth->fetchrow_array) {
503         print "." if ( $verbose_logging );
504         print "\r$i" unless ($i++ %100 or !$verbose_logging);
505         if ( $nosanitize ) {
506             my $marcxml = $record_type eq 'biblio'
507                           ? GetXmlBiblio( $record_number )
508                           : GetAuthorityXML( $record_number );
509             if ($record_type eq 'biblio'){
510                 my @items = GetItemsInfo($record_number);
511                 if (@items){
512                     my $record = MARC::Record->new;
513                     $record->encoding('UTF-8');
514                     my @itemsrecord;
515                     foreach my $item (@items){
516                         my $record = Item2Marc($item, $record_number);
517                         push @itemsrecord, $record->field($itemtag);
518                     }
519                     $record->insert_fields_ordered(@itemsrecord);
520                     my $itemsxml = $record->as_xml_record();
521                     $marcxml =
522                         substr($marcxml, 0, length($marcxml)-10) .
523                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
524                 }
525             }
526             # extra test to ensure that result is valid XML; otherwise
527             # Zebra won't parse it in DOM mode
528             eval {
529                 my $doc = $tester->parse_string($marcxml);
530             };
531             if ($@) {
532                 warn "Error exporting record $record_number ($record_type): $@\n";
533                 next;
534             }
535             if ( $marcxml ) {
536                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
537                 print {$fh} $marcxml;
538                 $num_exported++;
539             }
540             next;
541         }
542         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
543         if (defined $marc) {
544             eval {
545                 my $rec;
546                 if ($as_usmarc) {
547                     $rec = $marc->as_usmarc();
548                 } else {
549                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
550                     eval {
551                         my $doc = $tester->parse_string($rec);
552                     };
553                     if ($@) {
554                         die "invalid XML: $@";
555                     }
556                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
557                 }
558                 print {$fh} $rec;
559                 $num_exported++;
560             };
561             if ($@) {
562                 warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
563                 warn "... specific error is $@" if $verbose_logging;
564             }
565         }
566     }
567     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
568     print {$fh} $marcxml_close
569         unless $as_usmarc;
570
571     close $fh;
572     return $num_exported;
573 }
574
575 sub export_marc_records_from_list {
576     my ($record_type, $entries, $directory, $as_usmarc, $records_deleted) = @_;
577
578     my $num_exported = 0;
579     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
580
581     print {$fh} $marcxml_open
582         unless $as_usmarc;
583
584     my $i = 0;
585
586     # Skip any deleted records. We check for this anyway, but this reduces error spam
587     my %found = %$records_deleted;
588     foreach my $record_number ( map { $_->{biblio_auth_number} }
589                                 grep { !$found{ $_->{biblio_auth_number} }++ }
590                                 @$entries ) {
591         print "." if ( $verbose_logging );
592         print "\r$i" unless ($i++ %100 or !$verbose_logging);
593         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
594         if (defined $marc) {
595             eval {
596                 my $rec;
597                 if ( $as_usmarc ) {
598                     $rec = $marc->as_usmarc();
599                 } else {
600                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
601                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
602                 }
603                 print {$fh} $rec;
604                 $num_exported++;
605             };
606             if ($@) {
607               warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
608             }
609         }
610     }
611     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
612
613     print {$fh} $marcxml_close
614         unless $as_usmarc;
615
616     close $fh;
617     return $num_exported;
618 }
619
620 sub generate_deleted_marc_records {
621
622     my ($record_type, $entries, $directory, $as_usmarc) = @_;
623
624     my $records_deleted = {};
625     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
626
627     print {$fh} $marcxml_open
628         unless $as_usmarc;
629
630     my $i = 0;
631     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
632         print "\r$i" unless ($i++ %100 or !$verbose_logging);
633         print "." if ( $verbose_logging );
634
635         my $marc = MARC::Record->new();
636         if ($record_type eq 'biblio') {
637             fix_biblio_ids($marc, $record_number, $record_number);
638         } else {
639             fix_authority_id($marc, $record_number);
640         }
641         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
642             fix_unimarc_100($marc);
643         }
644
645         my $rec;
646         if ( $as_usmarc ) {
647             $rec = $marc->as_usmarc();
648         } else {
649             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
650             # Remove the record's XML header
651             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
652         }
653         print {$fh} $rec;
654
655         $records_deleted->{$record_number} = 1;
656     }
657     print "\nRecords exported: $i\n" if ( $verbose_logging );
658
659     print {$fh} $marcxml_close
660         unless $as_usmarc;
661
662     close $fh;
663     return $records_deleted;
664 }
665
666 sub get_corrected_marc_record {
667     my ($record_type, $record_number, $as_usmarc) = @_;
668
669     my $marc = get_raw_marc_record($record_type, $record_number, $as_usmarc);
670
671     if (defined $marc) {
672         fix_leader($marc);
673         if ($record_type eq 'authority') {
674             fix_authority_id($marc, $record_number);
675         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
676             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
677             $marc = $normalizer->process($marc);
678         }
679         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
680             fix_unimarc_100($marc);
681         }
682     }
683
684     return $marc;
685 }
686
687 sub get_raw_marc_record {
688     my ($record_type, $record_number, $as_usmarc) = @_;
689
690     my $marc;
691     if ($record_type eq 'biblio') {
692         if ($as_usmarc) {
693             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
694             $fetch_sth->execute($record_number);
695             if (my ($blob) = $fetch_sth->fetchrow_array) {
696                 $marc = MARC::Record->new_from_usmarc($blob);
697                 unless ($marc) {
698                     warn "error creating MARC::Record from $blob";
699                 }
700             }
701             # failure to find a bib is not a problem -
702             # a delete could have been done before
703             # trying to process a record update
704
705             $fetch_sth->finish();
706             return unless $marc;
707         } else {
708             eval { $marc = GetMarcBiblio($record_number, 1); };
709             if ($@ || !$marc) {
710                 # here we do warn since catching an exception
711                 # means that the bib was found but failed
712                 # to be parsed
713                 warn "error retrieving biblio $record_number";
714                 return;
715             }
716         }
717     } else {
718         eval { $marc = GetAuthority($record_number); };
719         if ($@) {
720             warn "error retrieving authority $record_number";
721             return;
722         }
723     }
724     return $marc;
725 }
726
727 sub fix_leader {
728     # FIXME - this routine is suspect
729     # It blanks the Leader/00-05 and Leader/12-16 to
730     # force them to be recalculated correct when
731     # the $marc->as_usmarc() or $marc->as_xml() is called.
732     # But why is this necessary?  It would be a serious bug
733     # in MARC::Record (definitely) and MARC::File::XML (arguably)
734     # if they are emitting incorrect leader values.
735     my $marc = shift;
736
737     my $leader = $marc->leader;
738     substr($leader,  0, 5) = '     ';
739     substr($leader, 10, 7) = '22     ';
740     $marc->leader(substr($leader, 0, 24));
741 }
742
743 sub fix_biblio_ids {
744     # FIXME - it is essential to ensure that the biblionumber is present,
745     #         otherwise, Zebra will choke on the record.  However, this
746     #         logic belongs in the relevant C4::Biblio APIs.
747     my $marc = shift;
748     my $biblionumber = shift;
749     my $biblioitemnumber;
750     if (@_) {
751         $biblioitemnumber = shift;
752     } else {
753         my $sth = $dbh->prepare(
754             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
755         $sth->execute($biblionumber);
756         ($biblioitemnumber) = $sth->fetchrow_array;
757         $sth->finish;
758         unless ($biblioitemnumber) {
759             warn "failed to get biblioitemnumber for biblio $biblionumber";
760             return 0;
761         }
762     }
763
764     # FIXME - this is cheating on two levels
765     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
766     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
767     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
768     #
769     # On the other hand, this better for now than what rebuild_zebra.pl used to
770     # do, which was duplicate the code for inserting the biblionumber
771     # and biblioitemnumber
772     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
773
774     return 1;
775 }
776
777 sub fix_authority_id {
778     # FIXME - as with fix_biblio_ids, the authid must be present
779     #         for Zebra's sake.  However, this really belongs
780     #         in C4::AuthoritiesMarc.
781     my ($marc, $authid) = @_;
782     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
783         $marc->delete_field($marc->field('001'));
784         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
785     }
786 }
787
788 sub fix_unimarc_100 {
789     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
790     my $marc = shift;
791
792     my $string;
793     if ( length($marc->subfield( 100, "a" )) == 36 ) {
794         $string = $marc->subfield( 100, "a" );
795         my $f100 = $marc->field(100);
796         $marc->delete_field($f100);
797     }
798     else {
799         $string = POSIX::strftime( "%Y%m%d", localtime );
800         $string =~ s/\-//g;
801         $string = sprintf( "%-*s", 35, $string );
802     }
803     substr( $string, 22, 6, "frey50" );
804     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
805         $marc->delete_field($marc->field(100));
806         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
807     }
808 }
809
810 sub do_indexing {
811     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
812
813     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
814     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
815     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
816     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
817
818     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
819     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
820     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
821
822 }
823
824 sub _flock {
825     # test if flock is present; if so, use it; if not, return true
826     # op refers to the official flock operations including LOCK_EX,
827     # LOCK_UN, etc.
828     # combining LOCK_EX with LOCK_NB returns immediately
829     my ($fh, $op)= @_;
830     if( !defined($use_flock) ) {
831         #check if flock is present; if not, you will have a fatal error
832         my $lock_acquired = eval { flock($fh, $op) };
833         # assuming that $fh and $op are fine(..), an undef $lock_acquired
834         # means no flock
835         $use_flock = defined($lock_acquired) ? 1 : 0;
836         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
837         return 1 if !$use_flock;
838         return $lock_acquired;
839     } else {
840         return 1 if !$use_flock;
841         return flock($fh, $op);
842     }
843 }
844
845 sub _create_lockfile { #returns undef on failure
846     my $dir= shift;
847     unless (-d $dir) {
848         eval { mkpath($dir, 0, oct(755)) };
849         return if $@;
850     }
851     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
852     return ( $fh, $dir.'/'.LOCK_FILENAME );
853 }
854
855 sub print_usage {
856     print <<_USAGE_;
857 $0: reindex MARC bibs and/or authorities in Zebra.
858
859 Use this batch job to reindex all biblio or authority
860 records in your Koha database.
861
862 Parameters:
863
864     -b                      index bibliographic records
865
866     -a                      index authority records
867
868     -daemon                 Run in daemon mode.  The program will loop checking
869                             for entries on the zebraqueue table, processing
870                             them incrementally if present, and then sleep
871                             for a few seconds before repeating the process
872                             Checking the zebraqueue table is done with a cheap
873                             SQL query.  This allows for near realtime update of
874                             the zebra search index with low system overhead.
875                             Use -sleep to control the checking interval.
876
877                             Daemon mode implies -z, -a, -b.  The program will
878                             refuse to start if options are present that do not
879                             make sense while running as an incremental update
880                             daemon (e.g. -r or -offset).
881
882     -sleep 10               Seconds to sleep between checks of the zebraqueue
883                             table in daemon mode.  The default is 5 seconds.
884
885     -z                      select only updated and deleted
886                             records marked in the zebraqueue
887                             table.  Cannot be used with -r
888                             or -s.
889
890     --skip-deletes          only select record updates, not record
891                             deletions, to avoid potential excessive
892                             I/O when zebraidx processes deletions.
893                             If this option is used for normal indexing,
894                             a cronjob should be set up to run
895                             rebuild_zebra.pl -z without --skip-deletes
896                             during off hours.
897                             Only effective with -z.
898
899     -r                      clear Zebra index before
900                             adding records to index. Implies -w.
901
902     -d                      Temporary directory for indexing.
903                             If not specified, one is automatically
904                             created.  The export directory
905                             is automatically deleted unless
906                             you supply the -k switch.
907
908     -k                      Do not delete export directory.
909
910     -s                      Skip export.  Used if you have
911                             already exported the records
912                             in a previous run.
913
914     -noxml                  index from ISO MARC blob
915                             instead of MARC XML.  This
916                             option is recommended only
917                             for advanced user.
918
919     -nosanitize             export biblio/authority records directly from DB marcxml
920                             field without sanitizing records. It speed up
921                             dump process but could fail if DB contains badly
922                             encoded records. Works only with -x,
923
924     -w                      skip shadow indexing for this batch
925
926     -y                      do NOT clear zebraqueue after indexing; normally,
927                             after doing batch indexing, zebraqueue should be
928                             marked done for the affected record type(s) so that
929                             a running zebraqueue_daemon doesn't try to reindex
930                             the same records - specify -y to override this.
931                             Cannot be used with -z.
932
933     -v                      increase the amount of logging.  Normally only
934                             warnings and errors from the indexing are shown.
935                             Use log level 2 (-v -v) to include all Zebra logs.
936
937     --length   1234         how many biblio you want to export
938     --offset 1243           offset you want to start to
939                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
940                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
941     --where                 let you specify a WHERE query, like itemtype='BOOK'
942                             or something like that
943
944     --run-as-root           explicitily allow script to run as 'root' user
945
946     --wait-for-lock         when not running in daemon mode, the default
947                             behavior is to abort a rebuild if the rebuild
948                             lock is busy.  This option will cause the program
949                             to wait for the lock to free and then continue
950                             processing the rebuild request,
951
952     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
953                             biblioitems is the default value.
954
955     --help or -h            show this message.
956 _USAGE_
957 }