Merge commit 'koha-biblibre/master' into bl-acq
[koha.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Charset;
25 use C4::Items;
26 use Unicode::Normalize;
27 use Time::HiRes qw(gettimeofday);
28 use Getopt::Long;
29 use IO::File;
30
31 binmode(STDOUT, ":utf8");
32 my ( $input_marc_file, $number, $offset) = ('',0,0);
33 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
34 my ($sourcetag,$sourcesubfield,$idmapfl);
35
36 $|=1;
37
38 GetOptions(
39     'commit:f'    => \$commit,
40     'file:s'    => \$input_marc_file,
41     'n:f' => \$number,
42     'o|offset:f' => \$offset,
43     'h' => \$version,
44     'd' => \$delete,
45     't' => \$test_parameter,
46     's' => \$skip_marc8_conversion,
47     'c:s' => \$char_encoding,
48     'v:s' => \$verbose,
49     'fk' => \$fk_off,
50     'm:s' => \$format,
51     'l:s' => \$logfile,
52     'k|keepids:s' => \$keepids,
53     'b|biblios' => \$biblios,
54     'a|authorities' => \$authorities,
55     'match=s@'    => \$match,
56     'i|isbn' => \$isbn_check,
57     'x:s' => \$sourcetag,
58     'y:s' => \$sourcesubfield,
59     'idmap:s' => \$idmapfl,
60 );
61 $biblios=!$authorities||$biblios;
62
63 if ($version || ($input_marc_file eq '')) {
64     print <<EOF
65 Small script to import bibliographic records into Koha.
66
67 Parameters:
68   h      this version/help screen
69   file   /path/to/file/to/dump: the file to import
70   v      verbose mode. 1 means "some infos", 2 means "MARC dumping"
71   fk     Turn off foreign key checks during import.
72   n      the number of records to import. If missing, all the file is imported
73   o      file offset before importing, ie number of records to skip.
74   commit the number of records to wait before performing a 'commit' operation
75   l file logs actions done for each record and their status into file
76   t      test mode: parses the file, saying what he would do, but doing nothing.
77   s      skip automatic conversion of MARC-8 to UTF-8.  This option is 
78          provided for debugging.
79   c      the characteristic MARC flavour. At the moment, only MARC21 and 
80          UNIMARC are supported. MARC21 by default.
81   d      delete EVERYTHING related to biblio in koha-DB before import. Tables:
82          biblio, biblioitems, titems
83   m      format, MARCXML or ISO2709 (defaults to ISO2709)
84   keepids field store ids in field (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
85   b|biblios type of import : bibliographic records
86   a|authorities type of import : authority records
87   match  matchindex,fieldtomatch matchpoint to use to deduplicate
88           fieldtomatch can be either 001 to 999 
89                        or field and list of subfields as such 100abcde
90   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
91          method to deduplicate. 
92          match & i can be both set.
93   x      source bib tag for reporting the source bib number
94   y      source subfield for reporting the source bib number
95   idmap  file for the koha bib and source id
96   keepids store ids in 009 (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
97   b|biblios type of import : bibliographic records
98   a|authorities type of import : authority records
99   match  matchindex,fieldtomatch matchpoint to use to deduplicate
100           fieldtomatch can be either 001 to 999 
101                        or field and list of subfields as such 100abcde
102   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
103          method to deduplicate. 
104          match & i can be both set.
105 IMPORTANT: don't use this script before you've entered and checked your MARC 
106            parameters tables twice (or more!). Otherwise, the import won't work 
107            correctly and you will get invalid data.
108
109 SAMPLE: 
110   \$ export KOHA_CONF=/etc/koha.conf
111   \$ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
112     -file /home/jmf/koha.mrc -n 3000
113 EOF
114 ;#'
115 exit;
116 }
117
118 if (defined $idmapfl) {
119   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
120 }
121
122 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
123   $sourcetag="910";
124   $sourcesubfield="a";
125 }
126
127 my $dbh = C4::Context->dbh;
128
129 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
130 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
131 my $CataloguingLog = C4::Context->preference('CataloguingLog');
132 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
133
134 if ($fk_off) {
135         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
136 }
137
138
139 if ($delete) {
140         if ($biblios){
141         print "deleting biblios\n";
142         $dbh->do("truncate biblio");
143         $dbh->do("truncate biblioitems");
144         $dbh->do("truncate items");
145         }
146         else {
147         print "deleting authorities\n";
148         $dbh->do("truncate auth_header");
149         }
150     $dbh->do("truncate zebraqueue");
151 }
152
153
154
155 if ($test_parameter) {
156     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
157 }
158
159 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
160
161 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
162 my $starttime = gettimeofday;
163 my $batch;
164 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
165 if (defined $format && $format =~ /XML/i) {
166     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
167     # appears to try to convert incoming XML records from MARC-8
168     # to UTF-8.  Setting the BinaryEncoding key turns that off
169     # TODO: see what happens to ISO-8859-1 XML files.
170     # TODO: determine if MARC::Batch can be fixed to handle
171     #       XML records properly -- it probably should be
172     #       be using a proper push or pull XML parser to
173     #       extract the records, not using regexes to look
174     #       for <record>.*</record>.
175     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
176     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
177 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
178     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
179     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
180     $batch = MARC::Batch->new( 'XML', $fh );
181 } else {
182     $batch = MARC::Batch->new( 'USMARC', $fh );
183 }
184 $batch->warnings_off();
185 $batch->strict_off();
186 my $i=0;
187 my $commitnum = $commit ? $commit : 50;
188
189
190 # Skip file offset
191 if ( $offset ) {
192     print "Skipping file offset: $offset records\n";
193     $batch->next() while ($offset--);
194 }
195
196 my ($tagid,$subfieldid);
197 if ($authorities){
198           $tagid='001';
199 }
200 else {
201    ( $tagid, $subfieldid ) =
202             GetMarcFromKohaField( "biblio.biblionumber", '' );
203         $tagid||="001";
204 }
205
206 # the SQL query to search on isbn
207 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
208
209 $dbh->{AutoCommit} = 0;
210 my $loghandle;
211 if ($logfile){
212    $loghandle= IO::File->new($logfile,"w") ;
213    print $loghandle "id;operation;status\n";
214 }
215 RECORD: while (  ) {
216     my $record;
217     # get records
218     eval { $record = $batch->next() };
219     if ( $@ ) {
220         print "Bad MARC record: skipped\n";
221         # FIXME - because MARC::Batch->next() combines grabbing the next
222         # blob and parsing it into one operation, a correctable condition
223         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
224         # from because we don't have access to the original blob.  Note
225         # that the staging import can deal with this condition (via
226         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
227         next;
228     }
229     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
230     last unless ( $record );
231     $i++;
232     print ".";
233     print "\r$i" unless $i % 100;
234     
235     # transcode the record to UTF8 if needed & applicable.
236     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
237         # FIXME update condition
238         my ($guessed_charset, $charset_errors);
239          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
240         if ($guessed_charset eq 'failed') {
241             warn "ERROR: failed to perform character conversion for record $i\n";
242             next RECORD;            
243         }
244     }
245     my $isbn;
246     # remove trailing - in isbn (only for biblios, of course)
247     if ($biblios) {
248         if ($marcFlavour eq 'UNIMARC') {
249             if (my $f010 = $record->field('010')) {
250                 $isbn = $f010->subfield('a');
251                 $isbn =~ s/-//g;
252                 $f010->update('a' => $isbn);
253             }
254         } else {
255             if (my $f020 = $record->field('020')) {
256                 $isbn = $f020->subfield('a');
257                 $isbn =~ s/-//g;
258                 $f020->update('a' => $isbn);
259             }
260         }
261     }
262     my $id;
263     # search for duplicates (based on Local-number)
264     if ($match){
265        require C4::Search;
266        my $query=build_query($match,$record);
267        my $server=($authorities?'authorityserver':'biblioserver');
268        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
269        die "unable to search the database for duplicates : $error" if (defined $error);
270        warn "$query $server : $totalhits";
271        if ($results && scalar(@$results)==1){
272            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
273                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
274        } 
275        elsif  ($results && scalar(@$results)>1){
276        warn "more than one match for $query";
277        } 
278        else {
279        warn "nomatch for $query";
280        }
281     }
282         my $originalid;
283     if ($keepids){
284           $originalid=GetRecordId($record,$tagid,$subfieldid);
285       if ($originalid){
286                  my $storeidfield;
287                  if (length($keepids)==3){
288                         $storeidfield=MARC::Field->new($keepids,$originalid);
289                  }
290                  else  {
291                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
292                  }
293          $record->insert_fields_ordered($storeidfield);
294              $record->delete_field($record->field($tagid));
295       }
296     }
297     unless ($test_parameter) {
298         if ($authorities){
299             use C4::AuthoritiesMarc;
300             my $authtypecode=GuessAuthTypeCode($record);
301             my $authid= ($id?$id:GuessAuthId($record));
302             if ($authid && GetAuthority($authid)){
303             ## Authority has an id and is in database : Replace
304                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
305                 if ($@){
306                     warn "Problem with authority $authid Cannot Modify";
307                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
308                 }
309                                 else{
310                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
311                                 }
312             }  
313             elsif (defined $authid) {
314             ## An authid is defined but no authority in database : add
315                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
316                 if ($@){
317                     warn "Problem with authority $authid Cannot Add ".$@;
318                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
319                 }
320                                 else{
321                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
322                                 }
323             }
324                 else {
325             ## True insert in database
326                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
327                 if ($@){
328                     warn "Problem with authority $authid Cannot Add".$@;
329                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
330                 }
331                                 else{
332                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
333                                 }
334                 }
335         }
336         else {
337             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
338             $biblionumber = $id;
339             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
340             if (!$biblionumber && $isbn_check && $isbn) {
341     #         warn "search ISBN : $isbn";
342                 $sth_isbn->execute($isbn);
343                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
344             }
345                 if (defined $idmapfl) {
346                                 if ($sourcetag < "010"){
347                                         if ($record->field($sourcetag)){
348                                           my $source = $record->field($sourcetag)->data();
349                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
350                                         }
351                             } else {
352                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
353                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
354                           }
355                         }
356                                         # create biblio, unless we already have it ( either match or isbn )
357             if ($biblionumber) {
358                                 eval{$biblioitemnumber=GetBiblioData($biblionumber)->{biblioitemnumber};}
359                         }
360                         else 
361                         {
362                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
363             }
364             if ( $@ ) {
365                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
366                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
367                 next RECORD;
368             } 
369                         else{
370                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
371                         }
372             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
373             if ( $@ ) {
374                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
375                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
376                 # if we failed because of an exception, assume that 
377                 # the MARC columns in biblioitems were not set.
378                 ModBiblioMarc( $record, $biblionumber, '' );
379                 next RECORD;
380             } 
381                         else{
382                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
383                         }
384             if ($#{ $errors_ref } > -1) { 
385                 report_item_errors($biblionumber, $errors_ref);
386             }
387         }
388         $dbh->commit() if (0 == $i % $commitnum);
389     }
390     last if $i == $number;
391 }
392 $dbh->commit();
393
394
395
396 if ($fk_off) {
397         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
398 }
399
400 # restore CataloguingLog
401 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
402
403 my $timeneeded = gettimeofday - $starttime;
404 print "\n$i MARC records done in $timeneeded seconds\n";
405 if ($logfile){
406   print $loghandle "file : $input_marc_file\n";
407   print $loghandle "$i MARC records done in $timeneeded seconds\n";
408   $loghandle->close;
409 }
410 exit 0;
411
412 sub GetRecordId{
413         my $marcrecord=shift;
414         my $tag=shift;
415         my $subfield=shift;
416         my $id;
417         if ($tag lt "010"){
418                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
419         } 
420         elsif ($subfield){
421                 if ($marcrecord->field($tag)){
422                         return $marcrecord->subfield($tag,$subfield);
423                 }
424         }
425         return $id;
426 }
427 sub build_query {
428         my $match = shift;
429         my $record=shift;
430         my @searchstrings;
431         foreach my $matchingpoint (@$match){
432           my $string = build_simplequery($matchingpoint,$record);
433           push @searchstrings,$string if (length($string)>0);
434         }
435         return join(" and ",@searchstrings);
436 }
437 sub build_simplequery {
438         my $element=shift;
439         my $record=shift;
440         my ($index,$recorddata)=split /,/,$element;
441         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
442         my @searchstrings;
443         foreach my $field ($record->field($tag)){
444                   if (length($field->as_string("$subfields"))>0){
445                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
446                   }
447         }
448         return join(" and ",@searchstrings);
449 }
450 sub report_item_errors {
451     my $biblionumber = shift;
452     my $errors_ref = shift;
453
454     foreach my $error (@{ $errors_ref }) {
455         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
456         my $error_code = $error->{'error_code'};
457         $error_code =~ s/_/ /g;
458         $msg .= "$error_code $error->{'error_information'}";
459         print $msg, "\n";
460     }
461 }
462 sub printlog{
463         my $logelements=shift;
464         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
465 }