Merge remote branch 'kc/new/enh/bug_5917' into kcmaster
[koha.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Debug;
25 use C4::Charset;
26 use C4::Items;
27 use Unicode::Normalize;
28 use Time::HiRes qw(gettimeofday);
29 use Getopt::Long;
30 use IO::File;
31 use Pod::Usage;
32
33 binmode(STDOUT, ":utf8");
34 my ( $input_marc_file, $number, $offset) = ('',0,0);
35 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
36 my ($sourcetag,$sourcesubfield,$idmapfl);
37 my $cleanisbn = 1;
38
39 $|=1;
40
41 GetOptions(
42     'commit:f'    => \$commit,
43     'file:s'    => \$input_marc_file,
44     'n:f' => \$number,
45     'o|offset:f' => \$offset,
46     'h' => \$version,
47     'd' => \$delete,
48     't' => \$test_parameter,
49     's' => \$skip_marc8_conversion,
50     'c:s' => \$char_encoding,
51     'v:s' => \$verbose,
52     'fk' => \$fk_off,
53     'm:s' => \$format,
54     'l:s' => \$logfile,
55     'k|keepids:s' => \$keepids,
56     'b|biblios' => \$biblios,
57     'a|authorities' => \$authorities,
58     'match=s@'    => \$match,
59     'i|isbn' => \$isbn_check,
60     'x:s' => \$sourcetag,
61     'y:s' => \$sourcesubfield,
62     'idmap:s' => \$idmapfl,
63     'cleanisbn!'     => \$cleanisbn,
64 );
65 $biblios=!$authorities||$biblios;
66
67 if ($version || ($input_marc_file eq '')) {
68     pod2usage( -verbose => 2 );
69     exit;
70 }
71
72 if (defined $idmapfl) {
73   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
74 }
75
76 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
77   $sourcetag="910";
78   $sourcesubfield="a";
79 }
80
81 my $dbh = C4::Context->dbh;
82
83 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
84 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
85 my $CataloguingLog = C4::Context->preference('CataloguingLog');
86 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
87
88 if ($fk_off) {
89         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
90 }
91
92
93 if ($delete) {
94         if ($biblios){
95         print "deleting biblios\n";
96         $dbh->do("truncate biblio");
97         $dbh->do("truncate biblioitems");
98         $dbh->do("truncate items");
99         }
100         else {
101         print "deleting authorities\n";
102         $dbh->do("truncate auth_header");
103         }
104     $dbh->do("truncate zebraqueue");
105 }
106
107
108
109 if ($test_parameter) {
110     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
111 }
112
113 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
114
115 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
116 my $starttime = gettimeofday;
117 my $batch;
118 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
119 if (defined $format && $format =~ /XML/i) {
120     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
121     # appears to try to convert incoming XML records from MARC-8
122     # to UTF-8.  Setting the BinaryEncoding key turns that off
123     # TODO: see what happens to ISO-8859-1 XML files.
124     # TODO: determine if MARC::Batch can be fixed to handle
125     #       XML records properly -- it probably should be
126     #       be using a proper push or pull XML parser to
127     #       extract the records, not using regexes to look
128     #       for <record>.*</record>.
129     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
130     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
131 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
132     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
133     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
134     $batch = MARC::Batch->new( 'XML', $fh );
135 } else {
136     $batch = MARC::Batch->new( 'USMARC', $fh );
137 }
138 $batch->warnings_off();
139 $batch->strict_off();
140 my $i=0;
141 my $commitnum = $commit ? $commit : 50;
142
143
144 # Skip file offset
145 if ( $offset ) {
146     print "Skipping file offset: $offset records\n";
147     $batch->next() while ($offset--);
148 }
149
150 my ($tagid,$subfieldid);
151 if ($authorities){
152           $tagid='001';
153 }
154 else {
155    ( $tagid, $subfieldid ) =
156             GetMarcFromKohaField( "biblio.biblionumber", '' );
157         $tagid||="001";
158 }
159
160 # the SQL query to search on isbn
161 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
162
163 $dbh->{AutoCommit} = 0;
164 my $loghandle;
165 if ($logfile){
166    $loghandle= IO::File->new($logfile,"w") ;
167    print $loghandle "id;operation;status\n";
168 }
169 RECORD: while (  ) {
170     my $record;
171     # get records
172     eval { $record = $batch->next() };
173     if ( $@ ) {
174         print "Bad MARC record $i: $@ skipped\n";
175         # FIXME - because MARC::Batch->next() combines grabbing the next
176         # blob and parsing it into one operation, a correctable condition
177         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
178         # from because we don't have access to the original blob.  Note
179         # that the staging import can deal with this condition (via
180         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
181         next;
182     }
183     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
184     last unless ( $record );
185     $i++;
186     print ".";
187     print "\r$i" unless $i % 100;
188     
189     # transcode the record to UTF8 if needed & applicable.
190     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
191         # FIXME update condition
192         my ($guessed_charset, $charset_errors);
193          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
194         if ($guessed_charset eq 'failed') {
195             warn "ERROR: failed to perform character conversion for record $i\n";
196             next RECORD;            
197         }
198     }
199     my $isbn;
200     # remove trailing - in isbn (only for biblios, of course)
201     if ($biblios && $cleanisbn) {
202         my $tag = $marcFlavour eq 'UNIMARC' ? '010' : '020';
203         my $field = $record->field($tag);
204         my $isbn = $field && $field->subfield('a');
205         if ( $isbn ) {
206             $isbn =~ s/-//g;
207             $field->update('a' => $isbn);
208         }
209     }
210     my $id;
211     # search for duplicates (based on Local-number)
212     if ($match){
213        require C4::Search;
214        my $query=build_query($match,$record);
215        my $server=($authorities?'authorityserver':'biblioserver');
216        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
217        die "unable to search the database for duplicates : $error" if (defined $error);
218        #warn "$query $server : $totalhits";
219        if ( @{$results} == 1 ){
220            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
221                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
222        } 
223        elsif  ( @{$results} > 1){
224            $debug && warn "more than one match for $query";
225        } 
226        else {
227            $debug && warn "nomatch for $query";
228        }
229     }
230         my $originalid;
231     if ($keepids){
232           $originalid=GetRecordId($record,$tagid,$subfieldid);
233       if ($originalid){
234                  my $storeidfield;
235                  if (length($keepids)==3){
236                         $storeidfield=MARC::Field->new($keepids,$originalid);
237                  }
238                  else  {
239                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
240                  }
241          $record->insert_fields_ordered($storeidfield);
242              $record->delete_field($record->field($tagid));
243       }
244     }
245     unless ($test_parameter) {
246         if ($authorities){
247             use C4::AuthoritiesMarc;
248             my $authtypecode=GuessAuthTypeCode($record);
249             my $authid= ($id?$id:GuessAuthId($record));
250             if ($authid && GetAuthority($authid)){
251             ## Authority has an id and is in database : Replace
252                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
253                 if ($@){
254                     warn "Problem with authority $authid Cannot Modify";
255                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
256                 }
257                                 else{
258                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
259                                 }
260             }  
261             elsif (defined $authid) {
262             ## An authid is defined but no authority in database : add
263                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
264                 if ($@){
265                     warn "Problem with authority $authid Cannot Add ".$@;
266                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
267                 }
268                                 else{
269                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
270                                 }
271             }
272                 else {
273             ## True insert in database
274                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
275                 if ($@){
276                     warn "Problem with authority $authid Cannot Add".$@;
277                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
278                 }
279                                 else{
280                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
281                                 }
282                 }
283         }
284         else {
285             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
286             $biblionumber = $id;
287             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
288             if (!$biblionumber && $isbn_check && $isbn) {
289     #         warn "search ISBN : $isbn";
290                 $sth_isbn->execute($isbn);
291                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
292             }
293                 if (defined $idmapfl) {
294                                 if ($sourcetag < "010"){
295                                         if ($record->field($sourcetag)){
296                                           my $source = $record->field($sourcetag)->data();
297                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
298                                         }
299                             } else {
300                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
301                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
302                           }
303                         }
304                                         # create biblio, unless we already have it ( either match or isbn )
305             if ($biblionumber) {
306                                 eval{$biblioitemnumber=GetBiblioData($biblionumber)->{biblioitemnumber};}
307                         }
308                         else 
309                         {
310                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
311             }
312             if ( $@ ) {
313                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
314                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
315                 next RECORD;
316             } 
317                         else{
318                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
319                         }
320             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
321             if ( $@ ) {
322                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
323                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
324                 # if we failed because of an exception, assume that 
325                 # the MARC columns in biblioitems were not set.
326                 ModBiblioMarc( $record, $biblionumber, '' );
327                 next RECORD;
328             } 
329                         else{
330                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
331                         }
332             if ($#{ $errors_ref } > -1) { 
333                 report_item_errors($biblionumber, $errors_ref);
334             }
335         }
336         $dbh->commit() if (0 == $i % $commitnum);
337     }
338     last if $i == $number;
339 }
340 $dbh->commit();
341
342
343
344 if ($fk_off) {
345         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
346 }
347
348 # restore CataloguingLog
349 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
350
351 my $timeneeded = gettimeofday - $starttime;
352 print "\n$i MARC records done in $timeneeded seconds\n";
353 if ($logfile){
354   print $loghandle "file : $input_marc_file\n";
355   print $loghandle "$i MARC records done in $timeneeded seconds\n";
356   $loghandle->close;
357 }
358 exit 0;
359
360 sub GetRecordId{
361         my $marcrecord=shift;
362         my $tag=shift;
363         my $subfield=shift;
364         my $id;
365         if ($tag lt "010"){
366                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
367         } 
368         elsif ($subfield){
369                 if ($marcrecord->field($tag)){
370                         return $marcrecord->subfield($tag,$subfield);
371                 }
372         }
373         return $id;
374 }
375 sub build_query {
376         my $match = shift;
377         my $record=shift;
378         my @searchstrings;
379         foreach my $matchingpoint (@$match){
380           my $string = build_simplequery($matchingpoint,$record);
381           push @searchstrings,$string if (length($string)>0);
382         }
383         return join(" and ",@searchstrings);
384 }
385 sub build_simplequery {
386         my $element=shift;
387         my $record=shift;
388         my ($index,$recorddata)=split /,/,$element;
389         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
390         my @searchstrings;
391         foreach my $field ($record->field($tag)){
392                   if (length($field->as_string("$subfields"))>0){
393                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
394                   }
395         }
396         return join(" and ",@searchstrings);
397 }
398 sub report_item_errors {
399     my $biblionumber = shift;
400     my $errors_ref = shift;
401
402     foreach my $error (@{ $errors_ref }) {
403         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
404         my $error_code = $error->{'error_code'};
405         $error_code =~ s/_/ /g;
406         $msg .= "$error_code $error->{'error_information'}";
407         print $msg, "\n";
408     }
409 }
410 sub printlog{
411         my $logelements=shift;
412         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
413 }
414
415
416 =head1 NAME
417
418 bulkmarcimport.pl - Import bibliographic/authority records into Koha
419
420 =head1 USAGE
421
422  $ export KOHA_CONF=/etc/koha.conf
423  $ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
424     -file /home/jmf/koha.mrc -n 3000
425
426 =head1 WARNING
427
428 Don't use this script before you've entered and checked your MARC parameters
429 tables twice (or more!). Otherwise, the import won't work correctly and you
430 will get invalid data.
431
432 =head1 DESCRIPTION
433
434 =over
435
436 =item  B<-h>
437
438 This version/help screen
439
440 =item B<-b, -biblios>
441
442 Type of import: bibliographic records
443
444 =item B<-a, -authorities>
445
446 Type of import: authority records
447
448 =item B<-file>=I<FILE>
449
450 The I<FILE> to import
451
452 =item  B<-v>
453
454 Verbose mode. 1 means "some infos", 2 means "MARC dumping"
455
456 =item B<-fk>
457
458 Turn off foreign key checks during import.
459
460 =item B<-n>=I<NUMBER>
461
462 The I<NUMBER> of records to import. If missing, all the file is imported
463
464 =item B<-o, -offset>=I<NUMBER>
465
466 File offset before importing, ie I<NUMBER> of records to skip.
467
468 =item B<-commit>=I<NUMBER>
469
470 The I<NUMBER> of records to wait before performing a 'commit' operation
471
472 =item B<-l>
473
474 File logs actions done for each record and their status into file
475
476 =item B<-t>
477
478 Test mode: parses the file, saying what he would do, but doing nothing.
479
480 =item B<-s>
481
482 Skip automatic conversion of MARC-8 to UTF-8.  This option is provided for
483 debugging.
484
485 =item B<-c>=I<CHARACTERISTIC>
486
487 The I<CHARACTERISTIC> MARC flavour. At the moment, only I<MARC21> and
488 I<UNIMARC> are supported. MARC21 by default.
489
490 =item B<-d>
491
492 Delete EVERYTHING related to biblio in koha-DB before import. Tables: biblio,
493 biblioitems, items
494
495 =item B<-m>=I<FORMAT>
496
497 Input file I<FORMAT>: I<MARCXML> or I<ISO2709> (defaults to ISO2709)
498
499 =item B<-k, -keepids>=<FIELD>
500
501 Field store ids in I<FIELD> (usefull for authorities, where 001 contains the
502 authid for Koha, that can contain a very valuable info for authorities coming
503 from LOC or BNF. useless for biblios probably)
504
505 =item B<-match>=<FIELD>
506
507 I<FIELD> matchindex,fieldtomatch matchpoint to use to deduplicate fieldtomatch
508 can be either 001 to 999 or field and list of subfields as such 100abcde
509
510 =item B<-i,-isbn>
511
512 If set, a search will be done on isbn, and, if the same isbn is found, the
513 biblio is not added. It's another method to deduplicate.  B<-match> & B<-isbn>
514 can be both set.
515
516 =item B<-cleanisbn>
517
518 Clean ISBN fields from entering biblio records, ie removes hyphens. By default,
519 ISBN are cleaned. --nocleanisbn will keep ISBN unchanged.
520
521 =item B<-x>=I<TAG>
522
523 Source bib I<TAG> for reporting the source bib number
524
525 =item B<-y>=I<SUBFIELD>
526
527 Source I<SUBFIELD> for reporting the source bib number
528
529 =item B<-idmap>=I<FILE>
530
531 I<FILE> for the koha bib and source id
532
533 =item B<-keepids>
534
535 Store ids in 009 (usefull for authorities, where 001 contains the authid for
536 Koha, that can contain a very valuable info for authorities coming from LOC or
537 BNF. useless for biblios probably)
538
539 =back
540
541 =cut
542