Authorities bulkmarcimport
[koha.git] / misc / migration_tools / bulkmarcimport.pl
1 #!/usr/bin/perl
2 # Import an iso2709 file into Koha 3
3
4 use strict;
5 use warnings;
6 #use diagnostics;
7 BEGIN {
8     # find Koha's Perl modules
9     # test carefully before changing this
10     use FindBin;
11     eval { require "$FindBin::Bin/../kohalib.pl" };
12 }
13
14 # Koha modules used
15 use MARC::File::USMARC;
16 use MARC::File::XML;
17 use MARC::Record;
18 use MARC::Batch;
19 use MARC::Charset;
20
21 use C4::Context;
22 use C4::Biblio;
23 use C4::Koha;
24 use C4::Charset;
25 use C4::Items;
26 use Unicode::Normalize;
27 use Time::HiRes qw(gettimeofday);
28 use Getopt::Long;
29 use IO::File;
30
31 binmode(STDOUT, ":utf8");
32 my ( $input_marc_file, $number, $offset) = ('',0,0);
33 my ($version, $delete, $test_parameter, $skip_marc8_conversion, $char_encoding, $verbose, $commit, $fk_off,$format,$biblios,$authorities,$keepids,$match, $isbn_check, $logfile);
34 my ($sourcetag,$sourcesubfield,$idmapfl);
35
36 $|=1;
37
38 GetOptions(
39     'commit:f'    => \$commit,
40     'file:s'    => \$input_marc_file,
41     'n:f' => \$number,
42     'o|offset:f' => \$offset,
43     'h' => \$version,
44     'd' => \$delete,
45     't' => \$test_parameter,
46     's' => \$skip_marc8_conversion,
47     'c:s' => \$char_encoding,
48     'v:s' => \$verbose,
49     'fk' => \$fk_off,
50     'm:s' => \$format,
51     'l:s' => \$logfile,
52     'k|keepids:s' => \$keepids,
53     'b|biblios' => \$biblios,
54     'a|authorities' => \$authorities,
55     'match=s@'    => \$match,
56     'i|isbn' => \$isbn_check,
57     'x:s' => \$sourcetag,
58     'y:s' => \$sourcesubfield,
59     'idmap:s' => \$idmapfl,
60 );
61 $biblios=!$authorities||$biblios;
62
63 if ($version || ($input_marc_file eq '')) {
64     print <<EOF
65 Small script to import bibliographic records into Koha.
66
67 Parameters:
68   h      this version/help screen
69   file   /path/to/file/to/dump: the file to import
70   v      verbose mode. 1 means "some infos", 2 means "MARC dumping"
71   fk     Turn off foreign key checks during import.
72   n      the number of records to import. If missing, all the file is imported
73   o      file offset before importing, ie number of records to skip.
74   commit the number of records to wait before performing a 'commit' operation
75   l file logs actions done for each record and their status into file
76   t      test mode: parses the file, saying what he would do, but doing nothing.
77   s      skip automatic conversion of MARC-8 to UTF-8.  This option is 
78          provided for debugging.
79   c      the characteristic MARC flavour. At the moment, only MARC21 and 
80          UNIMARC are supported. MARC21 by default.
81   d      delete EVERYTHING related to biblio in koha-DB before import. Tables:
82          biblio, biblioitems, titems
83   m      format, MARCXML or ISO2709 (defaults to ISO2709)
84   keepids field store ids in field (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
85   b|biblios type of import : bibliographic records
86   a|authorities type of import : authority records
87   match  matchindex,fieldtomatch matchpoint to use to deduplicate
88           fieldtomatch can be either 001 to 999 
89                        or field and list of subfields as such 100abcde
90   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
91          method to deduplicate. 
92          match & i can be both set.
93   x      source bib tag for reporting the source bib number
94   y      source subfield for reporting the source bib number
95   idmap  file for the koha bib and source id
96   keepids store ids in 009 (usefull for authorities, where 001 contains the authid for Koha, that can contain a very valuable info for authorities coming from LOC or BNF. useless for biblios probably)
97   b|biblios type of import : bibliographic records
98   a|authorities type of import : authority records
99   match  matchindex,fieldtomatch matchpoint to use to deduplicate
100           fieldtomatch can be either 001 to 999 
101                        or field and list of subfields as such 100abcde
102   i|isbn if set, a search will be done on isbn, and, if the same isbn is found, the biblio is not added. It's another
103          method to deduplicate. 
104          match & i can be both set.
105 IMPORTANT: don't use this script before you've entered and checked your MARC 
106            parameters tables twice (or more!). Otherwise, the import won't work 
107            correctly and you will get invalid data.
108
109 SAMPLE: 
110   \$ export KOHA_CONF=/etc/koha.conf
111   \$ perl misc/migration_tools/bulkmarcimport.pl -d -commit 1000 \\
112     -file /home/jmf/koha.mrc -n 3000
113 EOF
114 ;#'
115 exit;
116 }
117
118 if (defined $idmapfl) {
119   open(IDMAP,">$idmapfl") or die "cannot open $idmapfl \n";
120 }
121
122 if ((not defined $sourcesubfield) && (not defined $sourcetag)){
123   $sourcetag="910";
124   $sourcesubfield="a";
125 }
126
127 my $dbh = C4::Context->dbh;
128
129 # save the CataloguingLog property : we don't want to log a bulkmarcimport. It will slow the import & 
130 # will create problems in the action_logs table, that can't handle more than 1 entry per second per user.
131 my $CataloguingLog = C4::Context->preference('CataloguingLog');
132 $dbh->do("UPDATE systempreferences SET value=0 WHERE variable='CataloguingLog'");
133
134 if ($fk_off) {
135         $dbh->do("SET FOREIGN_KEY_CHECKS = 0");
136 }
137
138
139 if ($delete) {
140         if ($biblios){
141         print "deleting biblios\n";
142         $dbh->do("truncate biblio");
143         $dbh->do("truncate biblioitems");
144         $dbh->do("truncate items");
145         }
146         else {
147         print "deleting authorities\n";
148         $dbh->do("truncate auth_header");
149         }
150     $dbh->do("truncate zebraqueue");
151 }
152
153
154
155 if ($test_parameter) {
156     print "TESTING MODE ONLY\n    DOING NOTHING\n===============\n";
157 }
158
159 my $marcFlavour = C4::Context->preference('marcflavour') || 'MARC21';
160
161 print "Characteristic MARC flavour: $marcFlavour\n" if $verbose;
162 my $starttime = gettimeofday;
163 my $batch;
164 my $fh = IO::File->new($input_marc_file); # don't let MARC::Batch open the file, as it applies the ':utf8' IO layer
165 if (defined $format && $format =~ /XML/i) {
166     # ugly hack follows -- MARC::File::XML, when used by MARC::Batch,
167     # appears to try to convert incoming XML records from MARC-8
168     # to UTF-8.  Setting the BinaryEncoding key turns that off
169     # TODO: see what happens to ISO-8859-1 XML files.
170     # TODO: determine if MARC::Batch can be fixed to handle
171     #       XML records properly -- it probably should be
172     #       be using a proper push or pull XML parser to
173     #       extract the records, not using regexes to look
174     #       for <record>.*</record>.
175     $MARC::File::XML::_load_args{BinaryEncoding} = 'utf-8';
176     my $recordformat= ($marcFlavour eq "MARC21"?"USMARC":uc($marcFlavour));
177 #UNIMARC Authorities have a different way to manage encoding than UNIMARC biblios.
178     $recordformat=$recordformat."AUTH" if ($authorities and $marcFlavour ne "MARC21");
179     $MARC::File::XML::_load_args{RecordFormat} = $recordformat;
180     $batch = MARC::Batch->new( 'XML', $fh );
181 } else {
182     $batch = MARC::Batch->new( 'USMARC', $fh );
183 }
184 $batch->warnings_off();
185 $batch->strict_off();
186 my $i=0;
187 my $commitnum = $commit ? $commit : 50;
188
189
190 # Skip file offset
191 if ( $offset ) {
192     print "Skipping file offset: $offset records\n";
193     $batch->next() while ($offset--);
194 }
195
196 my ($tagid,$subfieldid);
197 if ($authorities){
198           $tagid='001';
199 }
200 else {
201    ( $tagid, $subfieldid ) =
202             GetMarcFromKohaField( "biblio.biblionumber", '' );
203         $tagid||="001";
204 }
205
206 # the SQL query to search on isbn
207 my $sth_isbn = $dbh->prepare("SELECT biblionumber,biblioitemnumber FROM biblioitems WHERE isbn=?");
208
209 $dbh->{AutoCommit} = 0;
210 my $loghandle;
211 if ($logfile){
212    $loghandle= IO::File->new($logfile,"w") ;
213    print $loghandle "id;operation;status\n";
214 }
215 RECORD: while (  ) {
216     my $record;
217     # get records
218     eval { $record = $batch->next() };
219     if ( $@ ) {
220         print "Bad MARC record: skipped\n";
221         # FIXME - because MARC::Batch->next() combines grabbing the next
222         # blob and parsing it into one operation, a correctable condition
223         # such as a MARC-8 record claiming that it's UTF-8 can't be recovered
224         # from because we don't have access to the original blob.  Note
225         # that the staging import can deal with this condition (via
226         # C4::Charset::MarcToUTF8Record) because it doesn't use MARC::Batch.
227         next;
228     }
229     # skip if we get an empty record (that is MARC valid, but will result in AddBiblio failure
230     last unless ( $record );
231     $i++;
232     print ".";
233     print "\r$i" unless $i % 100;
234     
235     # transcode the record to UTF8 if needed & applicable.
236     if ($record->encoding() eq 'MARC-8' and not $skip_marc8_conversion) {
237         # FIXME update condition
238         my ($guessed_charset, $charset_errors);
239          ($record, $guessed_charset, $charset_errors) = MarcToUTF8Record($record, $marcFlavour.(($authorities and $marcFlavour ne "MARC21")?'AUTH':''));
240         if ($guessed_charset eq 'failed') {
241             warn "ERROR: failed to perform character conversion for record $i\n";
242             next RECORD;            
243         }
244     }
245     my $isbn;
246     # remove trailing - in isbn (only for biblios, of course)
247     if ($biblios) {
248         if ($marcFlavour eq 'UNIMARC') {
249             if (my $f010 = $record->field('010')) {
250                 $isbn = $f010->subfield('a');
251                 $isbn =~ s/-//g;
252                 $f010->update('a' => $isbn);
253             }
254         } else {
255             if (my $f020 = $record->field('020')) {
256                 $isbn = $f020->subfield('a');
257                 $isbn =~ s/-//g;
258                 $f020->update('a' => $isbn);
259             }
260         }
261     }
262     my $id;
263     # search for duplicates (based on Local-number)
264     if ($match){
265        require C4::Search;
266        my $query=build_query($match,$record);
267        my $server=($authorities?'authorityserver':'biblioserver');
268        my ($error, $results,$totalhits)=C4::Search::SimpleSearch( $query, 0, 3, [$server] );
269        die "unable to search the database for duplicates : $error" if (defined $error);
270        warn "$query $server : $totalhits";
271        if ($results && scalar(@$results)==1){
272            my $marcrecord = MARC::File::USMARC::decode($results->[0]);
273                    $id=GetRecordId($marcrecord,$tagid,$subfieldid);
274        } 
275        elsif  ($results && scalar(@$results)>1){
276        warn "more than one match for $query";
277        } 
278        else {
279        warn "nomatch for $query";
280        }
281     }
282         my $originalid;
283     if ($keepids){
284           $originalid=GetRecordId($record,$tagid,$subfieldid);
285       if ($originalid){
286                  my $storeidfield;
287                  if (length($keepids)==3){
288                         $storeidfield=MARC::Field->new($keepids,$originalid);
289                  }
290                  else  {
291                         $storeidfield=MARC::Field->new(substr($keepids,0,3),"","",substr($keepids,3,1),$originalid);
292                  }
293          $record->insert_fields_ordered($storeidfield);
294              $record->delete_field($record->field($tagid));
295       }
296     }
297     unless ($test_parameter) {
298         if ($authorities){
299             use C4::AuthoritiesMarc;
300             my $authtypecode=GuessAuthTypeCode($record);
301             my $authid= ($id?$id:GuessAuthId($record));
302             if ($authid && GetAuthority($authid)){
303             ## Authority has an id and is in database : Replace
304                 eval { ( $authid ) = ModAuthority($authid,$record, $authtypecode) };
305                 if ($@){
306                     warn "Problem with authority $authid Cannot Modify";
307                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ERROR"}) if ($logfile);
308                 }
309                                 else{
310                                         printlog({id=>$originalid||$id||$authid, op=>"edit",status=>"ok"}) if ($logfile);
311                                 }
312             }  
313             elsif (defined $authid) {
314             ## An authid is defined but no authority in database : add
315                 eval { ( $authid ) = AddAuthority($record,$authid, $authtypecode) };
316                 if ($@){
317                     warn "Problem with authority $authid Cannot Add";
318                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
319                 }
320                                 else{
321                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
322                                 }
323             }
324                 else {
325             ## True insert in database
326                 eval { ( $authid ) = AddAuthority($record,"", $authtypecode) };
327                 if ($@){
328                     warn "Problem with authority $authid Cannot Add";
329                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ERROR"}) if ($logfile);
330                 }
331                                 else{
332                                         printlog({id=>$originalid||$id||$authid, op=>"insert",status=>"ok"}) if ($logfile);
333                                 }
334                 }
335         }
336         else {
337             my ( $biblionumber, $biblioitemnumber, $itemnumbers_ref, $errors_ref );
338             $biblionumber = $id;
339             # check for duplicate, based on ISBN (skip it if we already have found a duplicate with match parameter
340             if (!$biblionumber && $isbn_check && $isbn) {
341     #         warn "search ISBN : $isbn";
342                 $sth_isbn->execute($isbn);
343                 ($biblionumber,$biblioitemnumber) = $sth_isbn->fetchrow;
344             }
345                 if (defined $idmapfl) {
346                                 if ($sourcetag < "010"){
347                                         if ($record->field($sourcetag)){
348                                           my $source = $record->field($sourcetag)->data();
349                                           printf(IDMAP "%s|%s\n",$source,$biblionumber);
350                                         }
351                             } else {
352                                         my $source=$record->subfield($sourcetag,$sourcesubfield);
353                                         printf(IDMAP "%s|%s\n",$source,$biblionumber);
354                           }
355                         }
356                                         # create biblio, unless we already have it ( either match or isbn )
357             unless ($biblionumber) {
358                 eval { ( $biblionumber, $biblioitemnumber ) = AddBiblio($record, '', { defer_marc_save => 1 }) };
359             }
360             if ( $@ ) {
361                 warn "ERROR: Adding biblio $biblionumber failed: $@\n";
362                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ERROR"}) if ($logfile);
363                 next RECORD;
364             } 
365                         else{
366                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
367                         }
368             eval { ( $itemnumbers_ref, $errors_ref ) = AddItemBatchFromMarc( $record, $biblionumber, $biblioitemnumber, '' ); };
369             if ( $@ ) {
370                 warn "ERROR: Adding items to bib $biblionumber failed: $@\n";
371                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insertitem",status=>"ERROR"}) if ($logfile);
372                 # if we failed because of an exception, assume that 
373                 # the MARC columns in biblioitems were not set.
374                 ModBiblioMarc( $record, $biblionumber, '' );
375                 next RECORD;
376             } 
377                         else{
378                                 printlog({id=>$id||$originalid||$biblionumber, op=>"insert",status=>"ok"}) if ($logfile);
379                         }
380             if ($#{ $errors_ref } > -1) { 
381                 report_item_errors($biblionumber, $errors_ref);
382             }
383         }
384         $dbh->commit() if (0 == $i % $commitnum);
385     }
386     last if $i == $number;
387 }
388 $dbh->commit();
389
390
391
392 if ($fk_off) {
393         $dbh->do("SET FOREIGN_KEY_CHECKS = 1");
394 }
395
396 # restore CataloguingLog
397 $dbh->do("UPDATE systempreferences SET value=$CataloguingLog WHERE variable='CataloguingLog'");
398
399 my $timeneeded = gettimeofday - $starttime;
400 print "\n$i MARC records done in $timeneeded seconds\n";
401 if ($logfile){
402   print $loghandle "file : $input_marc_file\n";
403   print $loghandle "$i MARC records done in $timeneeded seconds\n";
404   $loghandle->close;
405 }
406 exit 0;
407
408 sub GetRecordId{
409         my $marcrecord=shift;
410         my $tag=shift;
411         my $subfield=shift;
412         my $id;
413         if ($tag lt "010"){
414                 return $marcrecord->field($tag)->data() if $marcrecord->field($tag);
415         } 
416         elsif ($subfield){
417                 if ($marcrecord->field($tag)){
418                         return $marcrecord->subfield($tag,$subfield);
419                 }
420         }
421         return $id;
422 }
423 sub build_query {
424         my $match = shift;
425         my $record=shift;
426         my @searchstrings;
427         foreach my $matchingpoint (@$match){
428           my $string = build_simplequery($matchingpoint,$record);
429           push @searchstrings,$string if (length($string)>0);
430         }
431         return join(" and ",@searchstrings);
432 }
433 sub build_simplequery {
434         my $element=shift;
435         my $record=shift;
436         my ($index,$recorddata)=split /,/,$element;
437         my ($tag,$subfields) =($1,$2) if ($recorddata=~/(\d{3})(.*)/);
438         my @searchstrings;
439         foreach my $field ($record->field($tag)){
440                   if (length($field->as_string("$subfields"))>0){
441                 push @searchstrings,"$index,wrdl=\"".$field->as_string("$subfields")."\"";
442                   }
443         }
444         return join(" and ",@searchstrings);
445 }
446 sub report_item_errors {
447     my $biblionumber = shift;
448     my $errors_ref = shift;
449
450     foreach my $error (@{ $errors_ref }) {
451         my $msg = "Item not added (bib $biblionumber, item tag #$error->{'item_sequence'}, barcode $error->{'item_barcode'}): ";
452         my $error_code = $error->{'error_code'};
453         $error_code =~ s/_/ /g;
454         $msg .= "$error_code $error->{'error_information'}";
455         print $msg, "\n";
456     }
457 }
458 sub printlog{
459         my $logelements=shift;
460         print $loghandle join (";",@$logelements{qw<id op status>}),"\n";
461 }