Bug 12538: Remove Solr without breaking anything else
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $process_zebraqueue_skip_deletes;
43 my $do_not_clear_zebraqueue;
44 my $length;
45 my $where;
46 my $offset;
47 my $run_as_root;
48 my $run_user = (getpwuid($<))[0];
49 my $wait_for_lock = 0;
50 my $use_flock;
51
52 my $verbose_logging = 0;
53 my $zebraidx_log_opt = " -v none,fatal,warn ";
54 my $result = GetOptions(
55     'daemon'        => \$daemon_mode,
56     'sleep:i'       => \$daemon_sleep,
57     'd:s'           => \$directory,
58     'r|reset'       => \$reset,
59     's'             => \$skip_export,
60     'k'             => \$keep_export,
61     'I|skip-index'  => \$skip_index,
62     'nosanitize'    => \$nosanitize,
63     'b'             => \$biblios,
64     'noxml'         => \$noxml,
65     'w'             => \$noshadow,
66     'a'             => \$authorities,
67     'h|help'        => \$want_help,
68     'x'             => \$as_xml,
69     'y'             => \$do_not_clear_zebraqueue,
70     'z'             => \$process_zebraqueue,
71     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
72     'where:s'       => \$where,
73     'length:i'      => \$length,
74     'offset:i'      => \$offset,
75     'v+'            => \$verbose_logging,
76     'run-as-root'   => \$run_as_root,
77     'wait-for-lock' => \$wait_for_lock,
78 );
79
80 if (not $result or $want_help) {
81     print_usage();
82     exit 0;
83 }
84
85 if( not defined $run_as_root and $run_user eq 'root') {
86     my $msg = "Warning: You are running this script as the user 'root'.\n";
87     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
88     $msg   .= "Please do '$0 --help' to see usage.\n";
89     die $msg;
90 }
91
92 if ( !$as_xml and $nosanitize ) {
93     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
94     $msg   .= "Please do '$0 --help' to see usage.\n";
95     die $msg;
96 }
97
98 if ($process_zebraqueue and ($skip_export or $reset)) {
99     my $msg = "Cannot specify -r or -s if -z is specified\n";
100     $msg   .= "Please do '$0 --help' to see usage.\n";
101     die $msg;
102 }
103
104 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
105     my $msg = "Cannot specify both -y and -z\n";
106     $msg   .= "Please do '$0 --help' to see usage.\n";
107     die $msg;
108 }
109
110 if ($reset) {
111     $noshadow = 1;
112 }
113
114 if ($noshadow) {
115     $noshadow = ' -n ';
116 }
117
118 if ($daemon_mode) {
119     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
120     if ($skip_export or $keep_export or $skip_index or
121           $where or $length or $offset) {
122         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
123         $msg   .= "Please do '$0 --help' to see usage.\n";
124         die $msg;
125     }
126     $authorities = 1;
127     $biblios = 1;
128     $process_zebraqueue = 1;
129 }
130
131 if (not $biblios and not $authorities) {
132     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
133     $msg   .= "Please do '$0 --help' to see usage.\n";
134     die $msg;
135 }
136
137
138 #  -v is for verbose, which seems backwards here because of how logging is set
139 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
140 if ($verbose_logging >= 2) {
141     $zebraidx_log_opt = '-v none,fatal,warn,all';
142 }
143
144 my $use_tempdir = 0;
145 unless ($directory) {
146     $use_tempdir = 1;
147     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
148 }
149
150
151 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
152 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
153
154 my $kohadir = C4::Context->config('intranetdir');
155 my $bib_index_mode = C4::Context->config('zebra_bib_index_mode') || 'grs1';
156 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') || 'dom';
157
158 my $dbh = C4::Context->dbh;
159 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
160 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
161
162 # Protect again simultaneous update of the zebra index by using a lock file.
163 # Create our own lock directory if its missing.  This shouild be created
164 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
165 # does not exist and cannot be created, we fall back on /tmp - which will
166 # always work.
167
168 my ($lockfile, $LockFH);
169 foreach (
170     C4::Context->config("zebra_lockdir"),
171     '/var/lock/zebra_' . C4::Context->config('database'),
172     '/tmp/zebra_' . C4::Context->config('database')
173 ) {
174     #we try three possibilities (we really want to lock :)
175     next if !$_;
176     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
177     last if defined $LockFH;
178 }
179 if( !defined $LockFH ) {
180     print "WARNING: Could not create lock file $lockfile: $!\n";
181     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
182     print "Verify file permissions for it too.\n";
183     $use_flock = 0; # we disable file locking now and will continue
184                     # without it
185                     # note that this mimics old behavior (before we used
186                     # the lockfile)
187 };
188
189 if ( $verbose_logging ) {
190     print "Zebra configuration information\n";
191     print "================================\n";
192     print "Zebra biblio directory      = $biblioserverdir\n";
193     print "Zebra authorities directory = $authorityserverdir\n";
194     print "Koha directory              = $kohadir\n";
195     print "Lockfile                    = $lockfile\n" if $lockfile;
196     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
197     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
198     print "================================\n";
199 }
200
201 my $tester = XML::LibXML->new();
202
203 # The main work is done here by calling do_one_pass().  We have added locking
204 # avoid race conditions between full rebuilds and incremental updates either from
205 # daemon mode or periodic invocation from cron.  The race can lead to an updated
206 # record being overwritten by a rebuild if the update is applied after the export
207 # by the rebuild and before the rebuild finishes (more likely to affect large
208 # catalogs).
209 #
210 # We have chosen to exit immediately by default if we cannot obtain the lock
211 # to prevent the potential for a infinite backlog from cron invocations, but an
212 # option (wait-for-lock) is provided to let the program wait for the lock.
213 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
214 if ($daemon_mode) {
215     while (1) {
216         # For incremental updates, skip the update if the updates are locked
217         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
218             do_one_pass() if ( zebraqueue_not_empty() );
219             _flock($LockFH, LOCK_UN);
220         }
221         sleep $daemon_sleep;
222     }
223 } else {
224     # all one-off invocations
225     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
226     if (_flock($LockFH, $lock_mode)) {
227         do_one_pass();
228         _flock($LockFH, LOCK_UN);
229     } else {
230         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
231     }
232 }
233
234
235 if ( $verbose_logging ) {
236     print "====================\n";
237     print "CLEANING\n";
238     print "====================\n";
239 }
240 if ($keep_export) {
241     print "NOTHING cleaned : the export $directory has been kept.\n";
242     print "You can re-run this script with the -s ";
243     if ($use_tempdir) {
244         print " and -d $directory parameters";
245     } else {
246         print "parameter";
247     }
248     print "\n";
249     print "if you just want to rebuild zebra after changing the record.abs\n";
250     print "or another zebra config file\n";
251 } else {
252     unless ($use_tempdir) {
253         # if we're using a temporary directory
254         # created by File::Temp, it will be removed
255         # automatically.
256         rmtree($directory, 0, 1);
257         print "directory $directory deleted\n";
258     }
259 }
260
261 sub do_one_pass {
262     if ($authorities) {
263         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
264     } else {
265         print "skipping authorities\n" if ( $verbose_logging );
266     }
267
268     if ($biblios) {
269         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
270     } else {
271         print "skipping biblios\n" if ( $verbose_logging );
272     }
273 }
274
275 # Check the zebra update queue and return true if there are records to process
276 # This routine will handle each of -ab, -a, or -b, but in practice we force
277 # -ab when in daemon mode.
278 sub zebraqueue_not_empty {
279     my $where_str;
280
281     if ($authorities && $biblios) {
282         $where_str = 'done = 0;';
283     } elsif ($biblios) {
284         $where_str = 'server = "biblioserver" AND done = 0;';
285     } else {
286         $where_str = 'server = "authorityserver" AND done = 0;';
287     }
288     my $query =
289         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
290
291     $query->execute;
292     my $count = $query->fetchrow_arrayref->[0];
293     print "queued records: $count\n" if $verbose_logging > 0;
294     return $count > 0;
295 }
296
297 # This checks to see if the zebra directories exist under the provided path.
298 # If they don't, then zebra is likely to spit the dummy. This returns true
299 # if the directories had to be created, false otherwise.
300 sub check_zebra_dirs {
301     my ($base) = shift() . '/';
302     my $needed_repairing = 0;
303     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
304     foreach my $dir (@dirs) {
305         my $bdir = $base . $dir;
306         if (! -d $bdir) {
307             $needed_repairing = 1;
308             mkdir $bdir || die "Unable to create '$bdir': $!\n";
309             print "$0: needed to create '$bdir'\n";
310         }
311     }
312     return $needed_repairing;
313 }   # ----------  end of subroutine check_zebra_dirs  ----------
314
315 sub index_records {
316     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
317
318     my $num_records_exported = 0;
319     my $records_deleted = {};
320     my $need_reset = check_zebra_dirs($server_dir);
321     if ($need_reset) {
322         print "$0: found broken zebra server directories: forcing a rebuild\n";
323         $reset = 1;
324     }
325     if ($skip_export && $verbose_logging) {
326         print "====================\n";
327         print "SKIPPING $record_type export\n";
328         print "====================\n";
329     } else {
330         if ( $verbose_logging ) {
331             print "====================\n";
332             print "exporting $record_type\n";
333             print "====================\n";
334         }
335         mkdir "$directory" unless (-d $directory);
336         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
337         if ($process_zebraqueue) {
338             my $entries;
339
340             unless ( $process_zebraqueue_skip_deletes ) {
341                 $entries = select_zebraqueue_records($record_type, 'deleted');
342                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
343                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
344                 mark_zebraqueue_batch_done($entries);
345             }
346
347             $entries = select_zebraqueue_records($record_type, 'updated');
348             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
349             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
350             mark_zebraqueue_batch_done($entries);
351
352         } else {
353             my $sth = select_all_records($record_type);
354             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
355             unless ($do_not_clear_zebraqueue) {
356                 mark_all_zebraqueue_done($record_type);
357             }
358         }
359     }
360
361     #
362     # and reindexing everything
363     #
364     if ($skip_index) {
365         if ($verbose_logging) {
366             print "====================\n";
367             print "SKIPPING $record_type indexing\n";
368             print "====================\n";
369         }
370     } else {
371         if ( $verbose_logging ) {
372             print "====================\n";
373             print "REINDEXING zebra\n";
374             print "====================\n";
375         }
376         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
377         if ($process_zebraqueue) {
378             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
379                 if %$records_deleted;
380             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
381                 if $num_records_exported;
382         } else {
383             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
384                 if ($num_records_exported or $skip_export);
385         }
386     }
387 }
388
389
390 sub select_zebraqueue_records {
391     my ($record_type, $update_type) = @_;
392
393     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
394     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
395
396     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
397                              FROM zebraqueue
398                              WHERE server = ?
399                              AND   operation = ?
400                              AND   done = 0
401                              ORDER BY id DESC");
402     $sth->execute($server, $op);
403     my $entries = $sth->fetchall_arrayref({});
404 }
405
406 sub mark_all_zebraqueue_done {
407     my ($record_type) = @_;
408
409     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
410
411     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
412                              WHERE server = ?
413                              AND done = 0");
414     $sth->execute($server);
415 }
416
417 sub mark_zebraqueue_batch_done {
418     my ($entries) = @_;
419
420     $dbh->{AutoCommit} = 0;
421     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
422     $dbh->commit();
423     foreach my $id (map { $_->{id} } @$entries) {
424         $sth->execute($id);
425     }
426     $dbh->{AutoCommit} = 1;
427 }
428
429 sub select_all_records {
430     my $record_type = shift;
431     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
432 }
433
434 sub select_all_authorities {
435     my $strsth=qq{SELECT authid FROM auth_header};
436     $strsth.=qq{ WHERE $where } if ($where);
437     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
438     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
439     my $sth = $dbh->prepare($strsth);
440     $sth->execute();
441     return $sth;
442 }
443
444 sub select_all_biblios {
445     my $strsth = qq{ SELECT biblionumber FROM biblioitems };
446     $strsth.=qq{ WHERE $where } if ($where);
447     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
448     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
449     my $sth = $dbh->prepare($strsth);
450     $sth->execute();
451     return $sth;
452 }
453
454 sub include_xml_wrapper {
455     my $as_xml = shift;
456     my $record_type = shift;
457
458     return 0 unless $as_xml;
459     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
460     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
461     return 0;
462
463 }
464
465 sub export_marc_records_from_sth {
466     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
467
468     my $num_exported = 0;
469     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
470     if (include_xml_wrapper($as_xml, $record_type)) {
471         # include XML declaration and root element
472         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
473     }
474     my $i = 0;
475     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
476     while (my ($record_number) = $sth->fetchrow_array) {
477         print "." if ( $verbose_logging );
478         print "\r$i" unless ($i++ %100 or !$verbose_logging);
479         if ( $nosanitize ) {
480             my $marcxml = $record_type eq 'biblio'
481                           ? GetXmlBiblio( $record_number )
482                           : GetAuthorityXML( $record_number );
483             if ($record_type eq 'biblio'){
484                 my @items = GetItemsInfo($record_number);
485                 if (@items){
486                     my $record = MARC::Record->new;
487                     $record->encoding('UTF-8');
488                     my @itemsrecord;
489                     foreach my $item (@items){
490                         my $record = Item2Marc($item, $record_number);
491                         push @itemsrecord, $record->field($itemtag);
492                     }
493                     $record->insert_fields_ordered(@itemsrecord);
494                     my $itemsxml = $record->as_xml_record();
495                     $marcxml =
496                         substr($marcxml, 0, length($marcxml)-10) .
497                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
498                 }
499             }
500             # extra test to ensure that result is valid XML; otherwise
501             # Zebra won't parse it in DOM mode
502             eval {
503                 my $doc = $tester->parse_string($marcxml);
504             };
505             if ($@) {
506                 warn "Error exporting record $record_number ($record_type): $@\n";
507                 next;
508             }
509             if ( $marcxml ) {
510                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
511                 print {$fh} $marcxml;
512                 $num_exported++;
513             }
514             next;
515         }
516         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
517         if (defined $marc) {
518             eval {
519                 my $rec;
520                 if ($as_xml) {
521                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
522                     eval {
523                         my $doc = $tester->parse_string($rec);
524                     };
525                     if ($@) {
526                         die "invalid XML: $@";
527                     }
528                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
529                 } else {
530                     $rec = $marc->as_usmarc();
531                 }
532                 print {$fh} $rec;
533                 $num_exported++;
534             };
535             if ($@) {
536                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
537                 warn "... specific error is $@" if $verbose_logging;
538             }
539         }
540     }
541     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
542     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
543     close $fh;
544     return $num_exported;
545 }
546
547 sub export_marc_records_from_list {
548     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
549
550     my $num_exported = 0;
551     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
552     if (include_xml_wrapper($as_xml, $record_type)) {
553         # include XML declaration and root element
554         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
555     }
556     my $i = 0;
557
558     # Skip any deleted records. We check for this anyway, but this reduces error spam
559     my %found = %$records_deleted;
560     foreach my $record_number ( map { $_->{biblio_auth_number} }
561                                 grep { !$found{ $_->{biblio_auth_number} }++ }
562                                 @$entries ) {
563         print "." if ( $verbose_logging );
564         print "\r$i" unless ($i++ %100 or !$verbose_logging);
565         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
566         if (defined $marc) {
567             eval {
568                 my $rec;
569                 if ($as_xml) {
570                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
571                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
572                 } else {
573                     $rec = $marc->as_usmarc();
574                 }
575                 print {$fh} $rec;
576                 $num_exported++;
577             };
578             if ($@) {
579               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
580             }
581         }
582     }
583     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
584     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
585     close $fh;
586     return $num_exported;
587 }
588
589 sub generate_deleted_marc_records {
590     my ($record_type, $entries, $directory, $as_xml) = @_;
591
592     my $records_deleted = {};
593     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
594     if (include_xml_wrapper($as_xml, $record_type)) {
595         # include XML declaration and root element
596         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
597     }
598     my $i = 0;
599     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
600         print "\r$i" unless ($i++ %100 or !$verbose_logging);
601         print "." if ( $verbose_logging );
602
603         my $marc = MARC::Record->new();
604         if ($record_type eq 'biblio') {
605             fix_biblio_ids($marc, $record_number, $record_number);
606         } else {
607             fix_authority_id($marc, $record_number);
608         }
609         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
610             fix_unimarc_100($marc);
611         }
612
613         my $rec;
614         if ($as_xml) {
615             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
616             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
617         } else {
618             $rec = $marc->as_usmarc();
619         }
620         print {$fh} $rec;
621
622         $records_deleted->{$record_number} = 1;
623     }
624     print "\nRecords exported: $i\n" if ( $verbose_logging );
625     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
626     close $fh;
627     return $records_deleted;
628
629
630 }
631
632 sub get_corrected_marc_record {
633     my ($record_type, $record_number, $noxml) = @_;
634
635     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
636
637     if (defined $marc) {
638         fix_leader($marc);
639         if ($record_type eq 'authority') {
640             fix_authority_id($marc, $record_number);
641         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
642             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
643             $marc = $normalizer->process($marc);
644         }
645         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
646             fix_unimarc_100($marc);
647         }
648     }
649
650     return $marc;
651 }
652
653 sub get_raw_marc_record {
654     my ($record_type, $record_number, $noxml) = @_;
655
656     my $marc;
657     if ($record_type eq 'biblio') {
658         if ($noxml) {
659             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
660             $fetch_sth->execute($record_number);
661             if (my ($blob) = $fetch_sth->fetchrow_array) {
662                 $marc = MARC::Record->new_from_usmarc($blob);
663                 unless ($marc) {
664                     warn "error creating MARC::Record from $blob";
665                 }
666             }
667             # failure to find a bib is not a problem -
668             # a delete could have been done before
669             # trying to process a record update
670
671             $fetch_sth->finish();
672             return unless $marc;
673         } else {
674             eval { $marc = GetMarcBiblio($record_number, 1); };
675             if ($@ || !$marc) {
676                 # here we do warn since catching an exception
677                 # means that the bib was found but failed
678                 # to be parsed
679                 warn "error retrieving biblio $record_number";
680                 return;
681             }
682         }
683     } else {
684         eval { $marc = GetAuthority($record_number); };
685         if ($@) {
686             warn "error retrieving authority $record_number";
687             return;
688         }
689     }
690     return $marc;
691 }
692
693 sub fix_leader {
694     # FIXME - this routine is suspect
695     # It blanks the Leader/00-05 and Leader/12-16 to
696     # force them to be recalculated correct when
697     # the $marc->as_usmarc() or $marc->as_xml() is called.
698     # But why is this necessary?  It would be a serious bug
699     # in MARC::Record (definitely) and MARC::File::XML (arguably)
700     # if they are emitting incorrect leader values.
701     my $marc = shift;
702
703     my $leader = $marc->leader;
704     substr($leader,  0, 5) = '     ';
705     substr($leader, 10, 7) = '22     ';
706     $marc->leader(substr($leader, 0, 24));
707 }
708
709 sub fix_biblio_ids {
710     # FIXME - it is essential to ensure that the biblionumber is present,
711     #         otherwise, Zebra will choke on the record.  However, this
712     #         logic belongs in the relevant C4::Biblio APIs.
713     my $marc = shift;
714     my $biblionumber = shift;
715     my $biblioitemnumber;
716     if (@_) {
717         $biblioitemnumber = shift;
718     } else {
719         my $sth = $dbh->prepare(
720             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
721         $sth->execute($biblionumber);
722         ($biblioitemnumber) = $sth->fetchrow_array;
723         $sth->finish;
724         unless ($biblioitemnumber) {
725             warn "failed to get biblioitemnumber for biblio $biblionumber";
726             return 0;
727         }
728     }
729
730     # FIXME - this is cheating on two levels
731     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
732     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
733     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
734     #
735     # On the other hand, this better for now than what rebuild_zebra.pl used to
736     # do, which was duplicate the code for inserting the biblionumber
737     # and biblioitemnumber
738     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
739
740     return 1;
741 }
742
743 sub fix_authority_id {
744     # FIXME - as with fix_biblio_ids, the authid must be present
745     #         for Zebra's sake.  However, this really belongs
746     #         in C4::AuthoritiesMarc.
747     my ($marc, $authid) = @_;
748     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
749         $marc->delete_field($marc->field('001'));
750         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
751     }
752 }
753
754 sub fix_unimarc_100 {
755     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
756     my $marc = shift;
757
758     my $string;
759     if ( length($marc->subfield( 100, "a" )) == 36 ) {
760         $string = $marc->subfield( 100, "a" );
761         my $f100 = $marc->field(100);
762         $marc->delete_field($f100);
763     }
764     else {
765         $string = POSIX::strftime( "%Y%m%d", localtime );
766         $string =~ s/\-//g;
767         $string = sprintf( "%-*s", 35, $string );
768     }
769     substr( $string, 22, 6, "frey50" );
770     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
771         $marc->delete_field($marc->field(100));
772         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
773     }
774 }
775
776 sub do_indexing {
777     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
778
779     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
780     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
781     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
782     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
783
784     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
785     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
786     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
787
788 }
789
790 sub _flock {
791     # test if flock is present; if so, use it; if not, return true
792     # op refers to the official flock operations including LOCK_EX,
793     # LOCK_UN, etc.
794     # combining LOCK_EX with LOCK_NB returns immediately
795     my ($fh, $op)= @_;
796     if( !defined($use_flock) ) {
797         #check if flock is present; if not, you will have a fatal error
798         my $lock_acquired = eval { flock($fh, $op) };
799         # assuming that $fh and $op are fine(..), an undef $lock_acquired
800         # means no flock
801         $use_flock = defined($lock_acquired) ? 1 : 0;
802         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
803         return 1 if !$use_flock;
804         return $lock_acquired;
805     } else {
806         return 1 if !$use_flock;
807         return flock($fh, $op);
808     }
809 }
810
811 sub _create_lockfile { #returns undef on failure
812     my $dir= shift;
813     unless (-d $dir) {
814         eval { mkpath($dir, 0, oct(755)) };
815         return if $@;
816     }
817     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
818     return ( $fh, $dir.'/'.LOCK_FILENAME );
819 }
820
821 sub print_usage {
822     print <<_USAGE_;
823 $0: reindex MARC bibs and/or authorities in Zebra.
824
825 Use this batch job to reindex all biblio or authority
826 records in your Koha database.
827
828 Parameters:
829
830     -b                      index bibliographic records
831
832     -a                      index authority records
833
834     -daemon                 Run in daemon mode.  The program will loop checking
835                             for entries on the zebraqueue table, processing
836                             them incrementally if present, and then sleep
837                             for a few seconds before repeating the process
838                             Checking the zebraqueue table is done with a cheap
839                             SQL query.  This allows for near realtime update of
840                             the zebra search index with low system overhead.
841                             Use -sleep to control the checking interval.
842
843                             Daemon mode implies -z, -a, -b.  The program will
844                             refuse to start if options are present that do not
845                             make sense while running as an incremental update
846                             daemon (e.g. -r or -offset).
847
848     -sleep 10               Seconds to sleep between checks of the zebraqueue
849                             table in daemon mode.  The default is 5 seconds.
850
851     -z                      select only updated and deleted
852                             records marked in the zebraqueue
853                             table.  Cannot be used with -r
854                             or -s.
855
856     --skip-deletes          only select record updates, not record
857                             deletions, to avoid potential excessive
858                             I/O when zebraidx processes deletions.
859                             If this option is used for normal indexing,
860                             a cronjob should be set up to run
861                             rebuild_zebra.pl -z without --skip-deletes
862                             during off hours.
863                             Only effective with -z.
864
865     -r                      clear Zebra index before
866                             adding records to index. Implies -w.
867
868     -d                      Temporary directory for indexing.
869                             If not specified, one is automatically
870                             created.  The export directory
871                             is automatically deleted unless
872                             you supply the -k switch.
873
874     -k                      Do not delete export directory.
875
876     -s                      Skip export.  Used if you have
877                             already exported the records
878                             in a previous run.
879
880     -noxml                  index from ISO MARC blob
881                             instead of MARC XML.  This
882                             option is recommended only
883                             for advanced user.
884
885     -x                      export and index as xml instead of is02709 (biblios only).
886                             use this if you might have records > 99,999 chars,
887
888     -nosanitize             export biblio/authority records directly from DB marcxml
889                             field without sanitizing records. It speed up
890                             dump process but could fail if DB contains badly
891                             encoded records. Works only with -x,
892
893     -w                      skip shadow indexing for this batch
894
895     -y                      do NOT clear zebraqueue after indexing; normally,
896                             after doing batch indexing, zebraqueue should be
897                             marked done for the affected record type(s) so that
898                             a running zebraqueue_daemon doesn't try to reindex
899                             the same records - specify -y to override this.
900                             Cannot be used with -z.
901
902     -v                      increase the amount of logging.  Normally only
903                             warnings and errors from the indexing are shown.
904                             Use log level 2 (-v -v) to include all Zebra logs.
905
906     --length   1234         how many biblio you want to export
907     --offset 1243           offset you want to start to
908                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
909                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
910     --where                 let you specify a WHERE query, like itemtype='BOOK'
911                             or something like that
912
913     --run-as-root           explicitily allow script to run as 'root' user
914
915     --wait-for-lock         when not running in daemon mode, the default
916                             behavior is to abort a rebuild if the rebuild
917                             lock is busy.  This option will cause the program
918                             to wait for the lock to free and then continue
919                             processing the rebuild request,
920
921     --help or -h            show this message.
922 _USAGE_
923 }