Merge commit 'biblibre/3.2_biblibre' into to-push
[koha.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Debug;
25 use C4::Charset;
26 use C4::Items;
27 use Unicode::Normalize;
28 use Time::HiRes qw(gettimeofday);
29 use Getopt::Long;
30 use IO::File;
31
32 binmode(STDOUT, ":utf8");
33 my ( $input_marc_file, $number, $offset) = ('',0,0);
34 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
35 my ($sourcetag,$sourcesubfield,$idmapfl);
36
37 $|=1;
38
39 GetOptions(
40     'commit:f'    => \$commit,
41     'file:s'    => \$input_marc_file,
42     'n:f' => \$number,
43     'o|offset:f' => \$offset,
44     'h' => \$version,
45     'd' => \$delete,
46     't' => \$test_parameter,
47     's' => \$skip_marc8_conversion,
48     'c:s' => \$char_encoding,
49     'v:s' => \$verbose,
50     'fk' => \$fk_off,
51     'm:s' => \$format,
52     'l:s' => \$logfile,
53     'k|keepids:s' => \$keepids,
54     'b|biblios' => \$biblios,
55     'a|authorities' => \$authorities,
56     'match=s@'    => \$match,
57     'i|isbn' => \$isbn_check,
58     'x:s' => \$sourcetag,
59     'y:s' => \$sourcesubfield,
60     'idmap:s' => \$idmapfl,
61 );
62 $biblios=!$authorities||$biblios;
63
64 if ($version || ($input_marc_file eq '')) {
65     print <<EOF
66 Small script to import bibliographic records into Koha.
67
68 Parameters:
69   h      this version/help screen
70   file   /path/to/file/to/dump: the file to import
71   v      verbose mode. 1 means "some infos", 2 means "MARC dumping"
72   fk     Turn off foreign key checks during import.
73   n      the number of records to import. If missing, all the file is imported
74   o      file offset before importing, ie number of records to skip.
75   commit the number of records to wait before performing a 'commit' operation
76   l file logs actions done for each record and their status into file
77   t      test mode: parses the file, saying what he would do, but doing nothing.
78   s      skip automatic conversion of MARC-8 to UTF-8.  This option is 
79          provided for debugging.
80   c      the characteristic MARC flavour. At the moment, only MARC21 and 
81          UNIMARC are supported. MARC21 by default.
82   d      delete EVERYTHING related to biblio in koha-DB before import. Tables:
83          biblio, biblioitems, titems
84   m      format, MARCXML or ISO2709 (defaults to ISO2709)
85   keepids field store ids in field (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
86   b|biblios type of import : bibliographic records
87   a|authorities type of import : authority records
88   match  matchindex,fieldtomatch matchpoint to use to deduplicate
89           fieldtomatch can be either 001 to 999 
90                        or field and list of subfields as such 100abcde
91   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
92          method to deduplicate. 
93          match & i can be both set.
94   x      source bib tag for reporting the source bib number
95   y      source subfield for reporting the source bib number
96   idmap  file for the koha bib and source id
97   keepids store ids in 009 (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
98   b|biblios type of import : bibliographic records
99   a|authorities type of import : authority records
100   match  matchindex,fieldtomatch matchpoint to use to deduplicate
101           fieldtomatch can be either 001 to 999 
102                        or field and list of subfields as such 100abcde
103   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
104          method to deduplicate. 
105          match & i can be both set.
106 IMPORTANT: don't use this script before you've entered and checked your MARC 
107            parameters tables twice (or more!). Otherwise, the import won't work 
108            correctly and you will get invalid data.
109
110 SAMPLE: 
111   \$ export KOHA_CONF=/etc/koha.conf
112   \$ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
113     -file /home/jmf/koha.mrc -n 3000
114 EOF
115 ;#'
116 exit;
117 }
118
119 if (defined $idmapfl) {
120   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
121 }
122
123 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
124   $sourcetag="910";
125   $sourcesubfield="a";
126 }
127
128 my $dbh = C4::Context->dbh;
129
130 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
131 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
132 my $CataloguingLog = C4::Context->preference('CataloguingLog');
133 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
134
135 if ($fk_off) {
136         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
137 }
138
139
140 if ($delete) {
141         if ($biblios){
142         print "deleting biblios\n";
143         $dbh->do("truncate biblio");
144         $dbh->do("truncate biblioitems");
145         $dbh->do("truncate items");
146         }
147         else {
148         print "deleting authorities\n";
149         $dbh->do("truncate auth_header");
150         }
151     $dbh->do("truncate zebraqueue");
152 }
153
154
155
156 if ($test_parameter) {
157     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
158 }
159
160 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
161
162 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
163 my $starttime = gettimeofday;
164 my $batch;
165 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
166 if (defined $format && $format =~ /XML/i) {
167     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
168     # appears to try to convert incoming XML records from MARC-8
169     # to UTF-8.  Setting the BinaryEncoding key turns that off
170     # TODO: see what happens to ISO-8859-1 XML files.
171     # TODO: determine if MARC::Batch can be fixed to handle
172     #       XML records properly -- it probably should be
173     #       be using a proper push or pull XML parser to
174     #       extract the records, not using regexes to look
175     #       for <record>.*</record>.
176     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
177     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
178 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
179     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
180     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
181     $batch = MARC::Batch->new( 'XML', $fh );
182 } else {
183     $batch = MARC::Batch->new( 'USMARC', $fh );
184 }
185 $batch->warnings_off();
186 $batch->strict_off();
187 my $i=0;
188 my $commitnum = $commit ? $commit : 50;
189
190
191 # Skip file offset
192 if ( $offset ) {
193     print "Skipping file offset: $offset records\n";
194     $batch->next() while ($offset--);
195 }
196
197 my ($tagid,$subfieldid);
198 if ($authorities){
199           $tagid='001';
200 }
201 else {
202    ( $tagid, $subfieldid ) =
203             GetMarcFromKohaField( "biblio.biblionumber", '' );
204         $tagid||="001";
205 }
206
207 # the SQL query to search on isbn
208 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
209
210 $dbh->{AutoCommit} = 0;
211 my $loghandle;
212 if ($logfile){
213    $loghandle= IO::File->new($logfile,"w") ;
214    print $loghandle "id;operation;status\n";
215 }
216 RECORD: while (  ) {
217     my $record;
218     # get records
219     eval { $record = $batch->next() };
220     if ( $@ ) {
221         print "Bad MARC record: skipped\n";
222         # FIXME - because MARC::Batch->next() combines grabbing the next
223         # blob and parsing it into one operation, a correctable condition
224         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
225         # from because we don't have access to the original blob.  Note
226         # that the staging import can deal with this condition (via
227         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
228         next;
229     }
230     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
231     last unless ( $record );
232     $i++;
233     print ".";
234     print "\r$i" unless $i % 100;
235     
236     # transcode the record to UTF8 if needed & applicable.
237     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
238         # FIXME update condition
239         my ($guessed_charset, $charset_errors);
240          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
241         if ($guessed_charset eq 'failed') {
242             warn "ERROR: failed to perform character conversion for record $i\n";
243             next RECORD;            
244         }
245     }
246     my $isbn;
247     # remove trailing - in isbn (only for biblios, of course)
248     if ($biblios) {
249         if ($marcFlavour eq 'UNIMARC') {
250             if (my $f010 = $record->field('010')) {
251                 $isbn = $f010->subfield('a');
252                 $isbn =~ s/-//g;
253                 $f010->update('a' => $isbn);
254             }
255         } else {
256             if (my $f020 = $record->field('020')) {
257                 $isbn = $f020->subfield('a');
258                 $isbn =~ s/-//g;
259                 $f020->update('a' => $isbn);
260             }
261         }
262     }
263     my $id;
264     # search for duplicates (based on Local-number)
265     if ($match){
266        require C4::Search;
267        my $query=build_query($match,$record);
268        my $server=($authorities?'authorityserver':'biblioserver');
269        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
270        die "unable to search the database for duplicates : $error" if (defined $error);
271        #warn "$query $server : $totalhits";
272        if ($results && scalar(@$results)==1){
273            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
274                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
275        } 
276        elsif  ($results && scalar(@$results)>1){
277        $debug && warn "more than one match for $query";
278        } 
279        else {
280        $debug && warn "nomatch for $query";
281        }
282     }
283         my $originalid;
284     if ($keepids){
285           $originalid=GetRecordId($record,$tagid,$subfieldid);
286       if ($originalid){
287                  my $storeidfield;
288                  if (length($keepids)==3){
289                         $storeidfield=MARC::Field->new($keepids,$originalid);
290                  }
291                  else  {
292                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
293                  }
294          $record->insert_fields_ordered($storeidfield);
295              $record->delete_field($record->field($tagid));
296       }
297     }
298     unless ($test_parameter) {
299         if ($authorities){
300             use C4::AuthoritiesMarc;
301             my $authtypecode=GuessAuthTypeCode($record);
302             my $authid= ($id?$id:GuessAuthId($record));
303             if ($authid && GetAuthority($authid)){
304             ## Authority has an id and is in database : Replace
305                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
306                 if ($@){
307                     warn "Problem with authority $authid Cannot Modify";
308                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
309                 }
310                                 else{
311                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
312                                 }
313             }  
314             elsif (defined $authid) {
315             ## An authid is defined but no authority in database : add
316                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
317                 if ($@){
318                     warn "Problem with authority $authid Cannot Add ".$@;
319                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
320                 }
321                                 else{
322                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
323                                 }
324             }
325                 else {
326             ## True insert in database
327                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
328                 if ($@){
329                     warn "Problem with authority $authid Cannot Add".$@;
330                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
331                 }
332                                 else{
333                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
334                                 }
335                 }
336         }
337         else {
338             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
339             $biblionumber = $id;
340             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
341             if (!$biblionumber && $isbn_check && $isbn) {
342     #         warn "search ISBN : $isbn";
343                 $sth_isbn->execute($isbn);
344                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
345             }
346                 if (defined $idmapfl) {
347                                 if ($sourcetag < "010"){
348                                         if ($record->field($sourcetag)){
349                                           my $source = $record->field($sourcetag)->data();
350                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
351                                         }
352                             } else {
353                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
354                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
355                           }
356                         }
357                                         # create biblio, unless we already have it ( either match or isbn )
358             if ($biblionumber) {
359                                 eval{$biblioitemnumber=GetBiblioData($biblionumber)->{biblioitemnumber};}
360                         }
361                         else 
362                         {
363                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
364             }
365             if ( $@ ) {
366                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
367                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
368                 next RECORD;
369             } 
370                         else{
371                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
372                         }
373             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
374             if ( $@ ) {
375                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
376                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
377                 # if we failed because of an exception, assume that 
378                 # the MARC columns in biblioitems were not set.
379                 ModBiblioMarc( $record, $biblionumber, '' );
380                 next RECORD;
381             } 
382                         else{
383                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
384                         }
385             if ($#{ $errors_ref } > -1) { 
386                 report_item_errors($biblionumber, $errors_ref);
387             }
388         }
389         $dbh->commit() if (0 == $i % $commitnum);
390     }
391     last if $i == $number;
392 }
393 $dbh->commit();
394
395
396
397 if ($fk_off) {
398         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
399 }
400
401 # restore CataloguingLog
402 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
403
404 my $timeneeded = gettimeofday - $starttime;
405 print "\n$i MARC records done in $timeneeded seconds\n";
406 if ($logfile){
407   print $loghandle "file : $input_marc_file\n";
408   print $loghandle "$i MARC records done in $timeneeded seconds\n";
409   $loghandle->close;
410 }
411 exit 0;
412
413 sub GetRecordId{
414         my $marcrecord=shift;
415         my $tag=shift;
416         my $subfield=shift;
417         my $id;
418         if ($tag lt "010"){
419                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
420         } 
421         elsif ($subfield){
422                 if ($marcrecord->field($tag)){
423                         return $marcrecord->subfield($tag,$subfield);
424                 }
425         }
426         return $id;
427 }
428 sub build_query {
429         my $match = shift;
430         my $record=shift;
431         my @searchstrings;
432         foreach my $matchingpoint (@$match){
433           my $string = build_simplequery($matchingpoint,$record);
434           push @searchstrings,$string if (length($string)>0);
435         }
436         return join(" and ",@searchstrings);
437 }
438 sub build_simplequery {
439         my $element=shift;
440         my $record=shift;
441         my ($index,$recorddata)=split /,/,$element;
442         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
443         my @searchstrings;
444         foreach my $field ($record->field($tag)){
445                   if (length($field->as_string("$subfields"))>0){
446                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
447                   }
448         }
449         return join(" and ",@searchstrings);
450 }
451 sub report_item_errors {
452     my $biblionumber = shift;
453     my $errors_ref = shift;
454
455     foreach my $error (@{ $errors_ref }) {
456         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
457         my $error_code = $error->{'error_code'};
458         $error_code =~ s/_/ /g;
459         $msg .= "$error_code $error->{'error_information'}";
460         print $msg, "\n";
461     }
462 }
463 sub printlog{
464         my $logelements=shift;
465         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
466 }