Merge remote branch 'kc/new/bug_5942' into kcmaster
[wip/koha-chris_n.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Debug;
25 use C4::Charset;
26 use C4::Items;
27 use Unicode::Normalize;
28 use Time::HiRes qw(gettimeofday);
29 use Getopt::Long;
30 use IO::File;
31 use Pod::Usage;
32
33 binmode(STDOUT, ":utf8");
34 my ( $input_marc_file, $number, $offset) = ('',0,0);
35 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
36 my ($sourcetag,$sourcesubfield,$idmapfl);
37
38 $|=1;
39
40 GetOptions(
41     'commit:f'    => \$commit,
42     'file:s'    => \$input_marc_file,
43     'n:f' => \$number,
44     'o|offset:f' => \$offset,
45     'h' => \$version,
46     'd' => \$delete,
47     't' => \$test_parameter,
48     's' => \$skip_marc8_conversion,
49     'c:s' => \$char_encoding,
50     'v:s' => \$verbose,
51     'fk' => \$fk_off,
52     'm:s' => \$format,
53     'l:s' => \$logfile,
54     'k|keepids:s' => \$keepids,
55     'b|biblios' => \$biblios,
56     'a|authorities' => \$authorities,
57     'match=s@'    => \$match,
58     'i|isbn' => \$isbn_check,
59     'x:s' => \$sourcetag,
60     'y:s' => \$sourcesubfield,
61     'idmap:s' => \$idmapfl,
62 );
63 $biblios=!$authorities||$biblios;
64
65 if ($version || ($input_marc_file eq '')) {
66     pod2usage( -verbose => 2 );
67     exit;
68 }
69
70 if (defined $idmapfl) {
71   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
72 }
73
74 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
75   $sourcetag="910";
76   $sourcesubfield="a";
77 }
78
79 my $dbh = C4::Context->dbh;
80
81 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
82 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
83 my $CataloguingLog = C4::Context->preference('CataloguingLog');
84 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
85
86 if ($fk_off) {
87         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
88 }
89
90
91 if ($delete) {
92         if ($biblios){
93         print "deleting biblios\n";
94         $dbh->do("truncate biblio");
95         $dbh->do("truncate biblioitems");
96         $dbh->do("truncate items");
97         }
98         else {
99         print "deleting authorities\n";
100         $dbh->do("truncate auth_header");
101         }
102     $dbh->do("truncate zebraqueue");
103 }
104
105
106
107 if ($test_parameter) {
108     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
109 }
110
111 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
112
113 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
114 my $starttime = gettimeofday;
115 my $batch;
116 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
117 if (defined $format && $format =~ /XML/i) {
118     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
119     # appears to try to convert incoming XML records from MARC-8
120     # to UTF-8.  Setting the BinaryEncoding key turns that off
121     # TODO: see what happens to ISO-8859-1 XML files.
122     # TODO: determine if MARC::Batch can be fixed to handle
123     #       XML records properly -- it probably should be
124     #       be using a proper push or pull XML parser to
125     #       extract the records, not using regexes to look
126     #       for <record>.*</record>.
127     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
128     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
129 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
130     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
131     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
132     $batch = MARC::Batch->new( 'XML', $fh );
133 } else {
134     $batch = MARC::Batch->new( 'USMARC', $fh );
135 }
136 $batch->warnings_off();
137 $batch->strict_off();
138 my $i=0;
139 my $commitnum = $commit ? $commit : 50;
140
141
142 # Skip file offset
143 if ( $offset ) {
144     print "Skipping file offset: $offset records\n";
145     $batch->next() while ($offset--);
146 }
147
148 my ($tagid,$subfieldid);
149 if ($authorities){
150           $tagid='001';
151 }
152 else {
153    ( $tagid, $subfieldid ) =
154             GetMarcFromKohaField( "biblio.biblionumber", '' );
155         $tagid||="001";
156 }
157
158 # the SQL query to search on isbn
159 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
160
161 $dbh->{AutoCommit} = 0;
162 my $loghandle;
163 if ($logfile){
164    $loghandle= IO::File->new($logfile,"w") ;
165    print $loghandle "id;operation;status\n";
166 }
167 RECORD: while (  ) {
168     my $record;
169     # get records
170     eval { $record = $batch->next() };
171     if ( $@ ) {
172         print "Bad MARC record $i: $@ skipped\n";
173         # FIXME - because MARC::Batch->next() combines grabbing the next
174         # blob and parsing it into one operation, a correctable condition
175         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
176         # from because we don't have access to the original blob.  Note
177         # that the staging import can deal with this condition (via
178         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
179         $i++;
180         next;
181     }
182     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
183     last unless ( $record );
184     $i++;
185     print ".";
186     print "\r$i" unless $i % 100;
187     
188     # transcode the record to UTF8 if needed & applicable.
189     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
190         # FIXME update condition
191         my ($guessed_charset, $charset_errors);
192          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
193         if ($guessed_charset eq 'failed') {
194             warn "ERROR: failed to perform character conversion for record $i\n";
195             next RECORD;            
196         }
197     }
198     my $isbn;
199     # remove trailing - in isbn (only for biblios, of course)
200     if ($biblios) {
201         if ($marcFlavour eq 'UNIMARC') {
202             if (my $f010 = $record->field('010')) {
203                 $isbn = $f010->subfield('a');
204                 $isbn =~ s/-//g;
205                 $f010->update('a' => $isbn);
206             }
207         } else {
208             if (my $f020 = $record->field('020')) {
209                 if ($isbn = $f020->subfield('a')) {
210                     $isbn =~ s/-//g;
211                     $f020->update('a' => $isbn);
212                 }
213             }
214         }
215     }
216     my $id;
217     # search for duplicates (based on Local-number)
218     if ($match){
219        require C4::Search;
220        my $query=build_query($match,$record);
221        my $server=($authorities?'authorityserver':'biblioserver');
222        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
223        die "unable to search the database for duplicates : $error" if (defined $error);
224        #warn "$query $server : $totalhits";
225        if ($results && scalar(@$results)==1){
226            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
227                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
228        } 
229        elsif  ($results && scalar(@$results)>1){
230        $debug && warn "more than one match for $query";
231        } 
232        else {
233        $debug && warn "nomatch for $query";
234        }
235     }
236         my $originalid;
237     if ($keepids){
238           $originalid=GetRecordId($record,$tagid,$subfieldid);
239       if ($originalid){
240                  my $storeidfield;
241                  if (length($keepids)==3){
242                         $storeidfield=MARC::Field->new($keepids,$originalid);
243                  }
244                  else  {
245                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
246                  }
247          $record->insert_fields_ordered($storeidfield);
248              $record->delete_field($record->field($tagid));
249       }
250     }
251     unless ($test_parameter) {
252         if ($authorities){
253             use C4::AuthoritiesMarc;
254             my $authtypecode=GuessAuthTypeCode($record);
255             my $authid= ($id?$id:GuessAuthId($record));
256             if ($authid && GetAuthority($authid)){
257             ## Authority has an id and is in database : Replace
258                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
259                 if ($@){
260                     warn "Problem with authority $authid Cannot Modify";
261                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
262                 }
263                                 else{
264                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
265                                 }
266             }  
267             elsif (defined $authid) {
268             ## An authid is defined but no authority in database : add
269                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
270                 if ($@){
271                     warn "Problem with authority $authid Cannot Add ".$@;
272                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
273                 }
274                                 else{
275                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
276                                 }
277             }
278                 else {
279             ## True insert in database
280                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
281                 if ($@){
282                     warn "Problem with authority $authid Cannot Add".$@;
283                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
284                 }
285                                 else{
286                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
287                                 }
288                 }
289         }
290         else {
291             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
292             $biblionumber = $id;
293             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
294             if (!$biblionumber && $isbn_check && $isbn) {
295     #         warn "search ISBN : $isbn";
296                 $sth_isbn->execute($isbn);
297                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
298             }
299                 if (defined $idmapfl) {
300                                 if ($sourcetag < "010"){
301                                         if ($record->field($sourcetag)){
302                                           my $source = $record->field($sourcetag)->data();
303                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
304                                         }
305                             } else {
306                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
307                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
308                           }
309                         }
310                                         # create biblio, unless we already have it ( either match or isbn )
311             if ($biblionumber) {
312                                 eval{$biblioitemnumber=GetBiblioData($biblionumber)->{biblioitemnumber};}
313                         }
314                         else 
315                         {
316                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
317             }
318             if ( $@ ) {
319                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
320                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
321                 next RECORD;
322             } 
323                         else{
324                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
325                         }
326             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
327             if ( $@ ) {
328                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
329                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
330                 # if we failed because of an exception, assume that 
331                 # the MARC columns in biblioitems were not set.
332                 ModBiblioMarc( $record, $biblionumber, '' );
333                 next RECORD;
334             } 
335                         else{
336                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
337                         }
338             if ($#{ $errors_ref } > -1) { 
339                 report_item_errors($biblionumber, $errors_ref);
340             }
341         }
342         $dbh->commit() if (0 == $i % $commitnum);
343     }
344     last if $i == $number;
345 }
346 $dbh->commit();
347
348
349
350 if ($fk_off) {
351         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
352 }
353
354 # restore CataloguingLog
355 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
356
357 my $timeneeded = gettimeofday - $starttime;
358 print "\n$i MARC records done in $timeneeded seconds\n";
359 if ($logfile){
360   print $loghandle "file : $input_marc_file\n";
361   print $loghandle "$i MARC records done in $timeneeded seconds\n";
362   $loghandle->close;
363 }
364 exit 0;
365
366 sub GetRecordId{
367         my $marcrecord=shift;
368         my $tag=shift;
369         my $subfield=shift;
370         my $id;
371         if ($tag lt "010"){
372                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
373         } 
374         elsif ($subfield){
375                 if ($marcrecord->field($tag)){
376                         return $marcrecord->subfield($tag,$subfield);
377                 }
378         }
379         return $id;
380 }
381 sub build_query {
382         my $match = shift;
383         my $record=shift;
384         my @searchstrings;
385         foreach my $matchingpoint (@$match){
386           my $string = build_simplequery($matchingpoint,$record);
387           push @searchstrings,$string if (length($string)>0);
388         }
389         return join(" and ",@searchstrings);
390 }
391 sub build_simplequery {
392         my $element=shift;
393         my $record=shift;
394         my ($index,$recorddata)=split /,/,$element;
395         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
396         my @searchstrings;
397         foreach my $field ($record->field($tag)){
398                   if (length($field->as_string("$subfields"))>0){
399                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
400                   }
401         }
402         return join(" and ",@searchstrings);
403 }
404 sub report_item_errors {
405     my $biblionumber = shift;
406     my $errors_ref = shift;
407
408     foreach my $error (@{ $errors_ref }) {
409         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
410         my $error_code = $error->{'error_code'};
411         $error_code =~ s/_/ /g;
412         $msg .= "$error_code $error->{'error_information'}";
413         print $msg, "\n";
414     }
415 }
416 sub printlog{
417         my $logelements=shift;
418         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
419 }
420
421
422 =head1 NAME
423
424 bulkmarcimport.pl - Import bibliographic/authority records into Koha
425
426 =head1 USAGE
427
428  $ export KOHA_CONF=/etc/koha.conf
429  $ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
430     -file /home/jmf/koha.mrc -n 3000
431
432 =head1 WARNING
433
434 Don't use this script before you've entered and checked your MARC parameters
435 tables twice (or more!). Otherwise, the import won't work correctly and you
436 will get invalid data.
437
438 =head1 DESCRIPTION
439
440 =over
441
442 =item  B<-h>
443
444 This version/help screen
445
446 =item B<-b, -biblios>
447
448 Type of import: bibliographic records
449
450 =item B<-a, -authorities>
451
452 Type of import: authority records
453
454 =item B<-file>=I<FILE>
455
456 The I<FILE> to import
457
458 =item  B<-v>
459
460 Verbose mode. 1 means "some infos", 2 means "MARC dumping"
461
462 =item B<-fk>
463
464 Turn off foreign key checks during import.
465
466 =item B<-n>=I<NUMBER>
467
468 The I<NUMBER> of records to import. If missing, all the file is imported
469
470 =item B<-o, -offset>=I<NUMBER>
471
472 File offset before importing, ie I<NUMBER> of records to skip.
473
474 =item B<-commit>=I<NUMBER>
475
476 The I<NUMBER> of records to wait before performing a 'commit' operation
477
478 =item B<-l>
479
480 File logs actions done for each record and their status into file
481
482 =item B<-t>
483
484 Test mode: parses the file, saying what he would do, but doing nothing.
485
486 =item B<-s>
487
488 Skip automatic conversion of MARC-8 to UTF-8.  This option is provided for
489 debugging.
490
491 =item B<-c>=I<CHARACTERISTIC>
492
493 The I<CHARACTERISTIC> MARC flavour. At the moment, only I<MARC21> and
494 I<UNIMARC> are supported. MARC21 by default.
495
496 =item B<-d>
497
498 Delete EVERYTHING related to biblio in koha-DB before import. Tables: biblio,
499 biblioitems, items
500
501 =item B<-m>=I<FORMAT>
502
503 Input file I<FORMAT>: I<MARCXML> or I<ISO2709> (defaults to ISO2709)
504
505 =item B<-k, -keepids>=<FIELD>
506
507 Field store ids in I<FIELD> (usefull for authorities, where 001 contains the
508 authid for Koha, that can contain a very valuable info for authorities coming
509 from LOC or BNF. useless for biblios probably)
510
511 =item B<-match>=<FIELD>
512
513 I<FIELD> matchindex,fieldtomatch matchpoint to use to deduplicate fieldtomatch
514 can be either 001 to 999 or field and list of subfields as such 100abcde
515
516 =item B<-i,-isbn>
517
518 If set, a search will be done on isbn, and, if the same isbn is found, the
519 biblio is not added. It's another method to deduplicate.  B<-match> & B<-isbn>
520 can be both set.
521
522 =item B<-x>=I<TAG>
523
524 Source bib I<TAG> for reporting the source bib number
525
526 =item B<-y>=I<SUBFIELD>
527
528 Source I<SUBFIELD> for reporting the source bib number
529
530 =item B<-idmap>=I<FILE>
531
532 I<FILE> for the koha bib and source id
533
534 =item B<-keepids>
535
536 Store ids in 009 (usefull for authorities, where 001 contains the authid for
537 Koha, that can contain a very valuable info for authorities coming from LOC or
538 BNF. useless for biblios probably)
539
540 =back
541
542 =cut
543