Merge remote branch 'kc/new/bug_5162' into kcmaster
[koha.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Debug;
25 use C4::Charset;
26 use C4::Items;
27 use Unicode::Normalize;
28 use Time::HiRes qw(gettimeofday);
29 use Getopt::Long;
30 use IO::File;
31 use Pod::Usage;
32
33 binmode(STDOUT, ":utf8");
34 my ( $input_marc_file, $number, $offset) = ('',0,0);
35 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
36 my ($sourcetag,$sourcesubfield,$idmapfl);
37
38 $|=1;
39
40 GetOptions(
41     'commit:f'    => \$commit,
42     'file:s'    => \$input_marc_file,
43     'n:f' => \$number,
44     'o|offset:f' => \$offset,
45     'h' => \$version,
46     'd' => \$delete,
47     't' => \$test_parameter,
48     's' => \$skip_marc8_conversion,
49     'c:s' => \$char_encoding,
50     'v:s' => \$verbose,
51     'fk' => \$fk_off,
52     'm:s' => \$format,
53     'l:s' => \$logfile,
54     'k|keepids:s' => \$keepids,
55     'b|biblios' => \$biblios,
56     'a|authorities' => \$authorities,
57     'match=s@'    => \$match,
58     'i|isbn' => \$isbn_check,
59     'x:s' => \$sourcetag,
60     'y:s' => \$sourcesubfield,
61     'idmap:s' => \$idmapfl,
62 );
63 $biblios=!$authorities||$biblios;
64
65 if ($version || ($input_marc_file eq '')) {
66     pod2usage( -verbose => 2 );
67     exit;
68 }
69
70 if (defined $idmapfl) {
71   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
72 }
73
74 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
75   $sourcetag="910";
76   $sourcesubfield="a";
77 }
78
79 my $dbh = C4::Context->dbh;
80
81 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
82 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
83 my $CataloguingLog = C4::Context->preference('CataloguingLog');
84 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
85
86 if ($fk_off) {
87         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
88 }
89
90
91 if ($delete) {
92         if ($biblios){
93         print "deleting biblios\n";
94         $dbh->do("truncate biblio");
95         $dbh->do("truncate biblioitems");
96         $dbh->do("truncate items");
97         }
98         else {
99         print "deleting authorities\n";
100         $dbh->do("truncate auth_header");
101         }
102     $dbh->do("truncate zebraqueue");
103 }
104
105
106
107 if ($test_parameter) {
108     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
109 }
110
111 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
112
113 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
114 my $starttime = gettimeofday;
115 my $batch;
116 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
117 if (defined $format && $format =~ /XML/i) {
118     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
119     # appears to try to convert incoming XML records from MARC-8
120     # to UTF-8.  Setting the BinaryEncoding key turns that off
121     # TODO: see what happens to ISO-8859-1 XML files.
122     # TODO: determine if MARC::Batch can be fixed to handle
123     #       XML records properly -- it probably should be
124     #       be using a proper push or pull XML parser to
125     #       extract the records, not using regexes to look
126     #       for <record>.*</record>.
127     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
128     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
129 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
130     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
131     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
132     $batch = MARC::Batch->new( 'XML', $fh );
133 } else {
134     $batch = MARC::Batch->new( 'USMARC', $fh );
135 }
136 $batch->warnings_off();
137 $batch->strict_off();
138 my $i=0;
139 my $commitnum = $commit ? $commit : 50;
140
141
142 # Skip file offset
143 if ( $offset ) {
144     print "Skipping file offset: $offset records\n";
145     $batch->next() while ($offset--);
146 }
147
148 my ($tagid,$subfieldid);
149 if ($authorities){
150           $tagid='001';
151 }
152 else {
153    ( $tagid, $subfieldid ) =
154             GetMarcFromKohaField( "biblio.biblionumber", '' );
155         $tagid||="001";
156 }
157
158 # the SQL query to search on isbn
159 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
160
161 $dbh->{AutoCommit} = 0;
162 my $loghandle;
163 if ($logfile){
164    $loghandle= IO::File->new($logfile,"w") ;
165    print $loghandle "id;operation;status\n";
166 }
167 RECORD: while (  ) {
168     my $record;
169     # get records
170     eval { $record = $batch->next() };
171     if ( $@ ) {
172         print "Bad MARC record $i: $@ skipped\n";
173         # FIXME - because MARC::Batch->next() combines grabbing the next
174         # blob and parsing it into one operation, a correctable condition
175         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
176         # from because we don't have access to the original blob.  Note
177         # that the staging import can deal with this condition (via
178         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
179         $i++;
180         next;
181     }
182     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
183     last unless ( $record );
184     $i++;
185     print ".";
186     print "\r$i" unless $i % 100;
187     
188     # transcode the record to UTF8 if needed & applicable.
189     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
190         # FIXME update condition
191         my ($guessed_charset, $charset_errors);
192          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
193         if ($guessed_charset eq 'failed') {
194             warn "ERROR: failed to perform character conversion for record $i\n";
195             next RECORD;            
196         }
197     }
198     my $isbn;
199     # remove trailing - in isbn (only for biblios, of course)
200     if ($biblios) {
201         if ($marcFlavour eq 'UNIMARC') {
202             if (my $f010 = $record->field('010')) {
203                 $isbn = $f010->subfield('a');
204                 $isbn =~ s/-//g;
205                 $f010->update('a' => $isbn);
206             }
207         } else {
208             if (my $f020 = $record->field('020')) {
209                 $isbn = $f020->subfield('a');
210                 $isbn =~ s/-//g;
211                 $f020->update('a' => $isbn);
212             }
213         }
214     }
215     my $id;
216     # search for duplicates (based on Local-number)
217     if ($match){
218        require C4::Search;
219        my $query=build_query($match,$record);
220        my $server=($authorities?'authorityserver':'biblioserver');
221        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
222        die "unable to search the database for duplicates : $error" if (defined $error);
223        #warn "$query $server : $totalhits";
224        if ($results && scalar(@$results)==1){
225            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
226                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
227        } 
228        elsif  ($results && scalar(@$results)>1){
229        $debug && warn "more than one match for $query";
230        } 
231        else {
232        $debug && warn "nomatch for $query";
233        }
234     }
235         my $originalid;
236     if ($keepids){
237           $originalid=GetRecordId($record,$tagid,$subfieldid);
238       if ($originalid){
239                  my $storeidfield;
240                  if (length($keepids)==3){
241                         $storeidfield=MARC::Field->new($keepids,$originalid);
242                  }
243                  else  {
244                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
245                  }
246          $record->insert_fields_ordered($storeidfield);
247              $record->delete_field($record->field($tagid));
248       }
249     }
250     unless ($test_parameter) {
251         if ($authorities){
252             use C4::AuthoritiesMarc;
253             my $authtypecode=GuessAuthTypeCode($record);
254             my $authid= ($id?$id:GuessAuthId($record));
255             if ($authid && GetAuthority($authid)){
256             ## Authority has an id and is in database : Replace
257                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
258                 if ($@){
259                     warn "Problem with authority $authid Cannot Modify";
260                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
261                 }
262                                 else{
263                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
264                                 }
265             }  
266             elsif (defined $authid) {
267             ## An authid is defined but no authority in database : add
268                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
269                 if ($@){
270                     warn "Problem with authority $authid Cannot Add ".$@;
271                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
272                 }
273                                 else{
274                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
275                                 }
276             }
277                 else {
278             ## True insert in database
279                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
280                 if ($@){
281                     warn "Problem with authority $authid Cannot Add".$@;
282                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
283                 }
284                                 else{
285                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
286                                 }
287                 }
288         }
289         else {
290             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
291             $biblionumber = $id;
292             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
293             if (!$biblionumber && $isbn_check && $isbn) {
294     #         warn "search ISBN : $isbn";
295                 $sth_isbn->execute($isbn);
296                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
297             }
298                 if (defined $idmapfl) {
299                                 if ($sourcetag < "010"){
300                                         if ($record->field($sourcetag)){
301                                           my $source = $record->field($sourcetag)->data();
302                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
303                                         }
304                             } else {
305                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
306                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
307                           }
308                         }
309                                         # create biblio, unless we already have it ( either match or isbn )
310             if ($biblionumber) {
311                                 eval{$biblioitemnumber=GetBiblioData($biblionumber)->{biblioitemnumber};}
312                         }
313                         else 
314                         {
315                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
316             }
317             if ( $@ ) {
318                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
319                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
320                 next RECORD;
321             } 
322                         else{
323                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
324                         }
325             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
326             if ( $@ ) {
327                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
328                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
329                 # if we failed because of an exception, assume that 
330                 # the MARC columns in biblioitems were not set.
331                 ModBiblioMarc( $record, $biblionumber, '' );
332                 next RECORD;
333             } 
334                         else{
335                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
336                         }
337             if ($#{ $errors_ref } > -1) { 
338                 report_item_errors($biblionumber, $errors_ref);
339             }
340         }
341         $dbh->commit() if (0 == $i % $commitnum);
342     }
343     last if $i == $number;
344 }
345 $dbh->commit();
346
347
348
349 if ($fk_off) {
350         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
351 }
352
353 # restore CataloguingLog
354 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
355
356 my $timeneeded = gettimeofday - $starttime;
357 print "\n$i MARC records done in $timeneeded seconds\n";
358 if ($logfile){
359   print $loghandle "file : $input_marc_file\n";
360   print $loghandle "$i MARC records done in $timeneeded seconds\n";
361   $loghandle->close;
362 }
363 exit 0;
364
365 sub GetRecordId{
366         my $marcrecord=shift;
367         my $tag=shift;
368         my $subfield=shift;
369         my $id;
370         if ($tag lt "010"){
371                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
372         } 
373         elsif ($subfield){
374                 if ($marcrecord->field($tag)){
375                         return $marcrecord->subfield($tag,$subfield);
376                 }
377         }
378         return $id;
379 }
380 sub build_query {
381         my $match = shift;
382         my $record=shift;
383         my @searchstrings;
384         foreach my $matchingpoint (@$match){
385           my $string = build_simplequery($matchingpoint,$record);
386           push @searchstrings,$string if (length($string)>0);
387         }
388         return join(" and ",@searchstrings);
389 }
390 sub build_simplequery {
391         my $element=shift;
392         my $record=shift;
393         my ($index,$recorddata)=split /,/,$element;
394         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
395         my @searchstrings;
396         foreach my $field ($record->field($tag)){
397                   if (length($field->as_string("$subfields"))>0){
398                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
399                   }
400         }
401         return join(" and ",@searchstrings);
402 }
403 sub report_item_errors {
404     my $biblionumber = shift;
405     my $errors_ref = shift;
406
407     foreach my $error (@{ $errors_ref }) {
408         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
409         my $error_code = $error->{'error_code'};
410         $error_code =~ s/_/ /g;
411         $msg .= "$error_code $error->{'error_information'}";
412         print $msg, "\n";
413     }
414 }
415 sub printlog{
416         my $logelements=shift;
417         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
418 }
419
420
421 =head1 NAME
422
423 bulkmarcimport.pl - Import bibliographic/authority records into Koha
424
425 =head1 USAGE
426
427  $ export KOHA_CONF=/etc/koha.conf
428  $ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
429     -file /home/jmf/koha.mrc -n 3000
430
431 =head1 WARNING
432
433 Don't use this script before you've entered and checked your MARC parameters
434 tables twice (or more!). Otherwise, the import won't work correctly and you
435 will get invalid data.
436
437 =head1 DESCRIPTION
438
439 =over
440
441 =item  B<-h>
442
443 This version/help screen
444
445 =item B<-b, -biblios>
446
447 Type of import: bibliographic records
448
449 =item B<-a, -authorities>
450
451 Type of import: authority records
452
453 =item B<-file>=I<FILE>
454
455 The I<FILE> to import
456
457 =item  B<-v>
458
459 Verbose mode. 1 means "some infos", 2 means "MARC dumping"
460
461 =item B<-fk>
462
463 Turn off foreign key checks during import.
464
465 =item B<-n>=I<NUMBER>
466
467 The I<NUMBER> of records to import. If missing, all the file is imported
468
469 =item B<-o, -offset>=I<NUMBER>
470
471 File offset before importing, ie I<NUMBER> of records to skip.
472
473 =item B<-commit>=I<NUMBER>
474
475 The I<NUMBER> of records to wait before performing a 'commit' operation
476
477 =item B<-l>
478
479 File logs actions done for each record and their status into file
480
481 =item B<-t>
482
483 Test mode: parses the file, saying what he would do, but doing nothing.
484
485 =item B<-s>
486
487 Skip automatic conversion of MARC-8 to UTF-8.  This option is provided for
488 debugging.
489
490 =item B<-c>=I<CHARACTERISTIC>
491
492 The I<CHARACTERISTIC> MARC flavour. At the moment, only I<MARC21> and
493 I<UNIMARC> are supported. MARC21 by default.
494
495 =item B<-d>
496
497 Delete EVERYTHING related to biblio in koha-DB before import. Tables: biblio,
498 biblioitems, items
499
500 =item B<-m>=I<FORMAT>
501
502 Input file I<FORMAT>: I<MARCXML> or I<ISO2709> (defaults to ISO2709)
503
504 =item B<-k, -keepids>=<FIELD>
505
506 Field store ids in I<FIELD> (usefull for authorities, where 001 contains the
507 authid for Koha, that can contain a very valuable info for authorities coming
508 from LOC or BNF. useless for biblios probably)
509
510 =item B<-match>=<FIELD>
511
512 I<FIELD> matchindex,fieldtomatch matchpoint to use to deduplicate fieldtomatch
513 can be either 001 to 999 or field and list of subfields as such 100abcde
514
515 =item B<-i,-isbn>
516
517 If set, a search will be done on isbn, and, if the same isbn is found, the
518 biblio is not added. It's another method to deduplicate.  B<-match> & B<-isbn>
519 can be both set.
520
521 =item B<-x>=I<TAG>
522
523 Source bib I<TAG> for reporting the source bib number
524
525 =item B<-y>=I<SUBFIELD>
526
527 Source I<SUBFIELD> for reporting the source bib number
528
529 =item B<-idmap>=I<FILE>
530
531 I<FILE> for the koha bib and source id
532
533 =item B<-keepids>
534
535 Store ids in 009 (usefull for authorities, where 001 contains the authid for
536 Koha, that can contain a very valuable info for authorities coming from LOC or
537 BNF. useless for biblios probably)
538
539 =back
540
541 =cut
542