Bug 29325: Fix commit_file.pl
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       AddImportBatch
50       GetImportBatch
51       AddAuthToBatch
52       AddBiblioToBatch
53       AddItemsToImportBiblio
54       ModAuthorityInBatch
55       ModBiblioInBatch
56
57       BatchStageMarcRecords
58       BatchFindDuplicates
59       BatchCommitRecords
60       BatchRevertRecords
61       CleanBatch
62       DeleteBatch
63
64       GetAllImportBatches
65       GetStagedWebserviceBatches
66       GetImportBatchRangeDesc
67       GetNumberOfNonZ3950ImportBatches
68       GetImportBiblios
69       GetImportRecordsRange
70       GetItemNumbersFromImportBatch
71
72       GetImportBatchStatus
73       SetImportBatchStatus
74       GetImportBatchOverlayAction
75       SetImportBatchOverlayAction
76       GetImportBatchNoMatchAction
77       SetImportBatchNoMatchAction
78       GetImportBatchItemAction
79       SetImportBatchItemAction
80       GetImportBatchMatcher
81       SetImportBatchMatcher
82       GetImportRecordOverlayStatus
83       SetImportRecordOverlayStatus
84       GetImportRecordStatus
85       SetImportRecordStatus
86       SetMatchedBiblionumber
87       GetImportRecordMatches
88       SetImportRecordMatches
89
90       RecordsFromMARCXMLFile
91       RecordsFromISO2709File
92       RecordsFromMarcPlugin
93     );
94 }
95
96 =head1 NAME
97
98 C4::ImportBatch - manage batches of imported MARC records
99
100 =head1 SYNOPSIS
101
102 use C4::ImportBatch;
103
104 =head1 FUNCTIONS
105
106 =head2 GetZ3950BatchId
107
108   my $batchid = GetZ3950BatchId($z3950server);
109
110 Retrieves the ID of the import batch for the Z39.50
111 reservoir for the given target.  If necessary,
112 creates the import batch.
113
114 =cut
115
116 sub GetZ3950BatchId {
117     my ($z3950server) = @_;
118
119     my $dbh = C4::Context->dbh;
120     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
121                              WHERE  batch_type = 'z3950'
122                              AND    file_name = ?");
123     $sth->execute($z3950server);
124     my $rowref = $sth->fetchrow_arrayref();
125     $sth->finish();
126     if (defined $rowref) {
127         return $rowref->[0];
128     } else {
129         my $batch_id = AddImportBatch( {
130                 overlay_action => 'create_new',
131                 import_status => 'staged',
132                 batch_type => 'z3950',
133                 file_name => $z3950server,
134             } );
135         return $batch_id;
136     }
137     
138 }
139
140 =head2 GetWebserviceBatchId
141
142   my $batchid = GetWebserviceBatchId();
143
144 Retrieves the ID of the import batch for webservice.
145 If necessary, creates the import batch.
146
147 =cut
148
149 my $WEBSERVICE_BASE_QRY = <<EOQ;
150 SELECT import_batch_id FROM import_batches
151 WHERE  batch_type = 'webservice'
152 AND    import_status = 'staged'
153 EOQ
154 sub GetWebserviceBatchId {
155     my ($params) = @_;
156
157     my $dbh = C4::Context->dbh;
158     my $sql = $WEBSERVICE_BASE_QRY;
159     my @args;
160     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
161         if (my $val = $params->{$field}) {
162             $sql .= " AND $field = ?";
163             push @args, $val;
164         }
165     }
166     my $id = $dbh->selectrow_array($sql, undef, @args);
167     return $id if $id;
168
169     $params->{batch_type} = 'webservice';
170     $params->{import_status} = 'staged';
171     return AddImportBatch($params);
172 }
173
174 =head2 GetImportRecordMarc
175
176   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
177
178 =cut
179
180 sub GetImportRecordMarc {
181     my ($import_record_id) = @_;
182
183     my $dbh = C4::Context->dbh;
184     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
185         SELECT marc, encoding
186         FROM import_records
187         WHERE import_record_id = ?
188     |, undef, $import_record_id );
189
190     return $marc, $encoding;
191 }
192
193 sub EmbedItemsInImportBiblio {
194     my ( $record, $import_record_id ) = @_;
195     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
196     my $dbh = C4::Context->dbh;
197     my $import_items = $dbh->selectall_arrayref(q|
198         SELECT import_items.marcxml
199         FROM import_items
200         WHERE import_record_id = ?
201     |, { Slice => {} }, $import_record_id );
202     my @item_fields;
203     for my $import_item ( @$import_items ) {
204         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
205         push @item_fields, $item_marc->field($itemtag);
206     }
207     $record->append_fields(@item_fields);
208     return $record;
209 }
210
211 =head2 AddImportBatch
212
213   my $batch_id = AddImportBatch($params_hash);
214
215 =cut
216
217 sub AddImportBatch {
218     my ($params) = @_;
219
220     my (@fields, @vals);
221     foreach (qw( matcher_id template_id branchcode
222                  overlay_action nomatch_action item_action
223                  import_status batch_type file_name comments record_type )) {
224         if (exists $params->{$_}) {
225             push @fields, $_;
226             push @vals, $params->{$_};
227         }
228     }
229     my $dbh = C4::Context->dbh;
230     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
231                                   VALUES (".join( ',', map '?', @fields).")",
232              undef,
233              @vals);
234     return $dbh->{'mysql_insertid'};
235 }
236
237 =head2 GetImportBatch 
238
239   my $row = GetImportBatch($batch_id);
240
241 Retrieve a hashref of an import_batches row.
242
243 =cut
244
245 sub GetImportBatch {
246     my ($batch_id) = @_;
247
248     my $dbh = C4::Context->dbh;
249     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
250     $sth->bind_param(1, $batch_id);
251     $sth->execute();
252     my $result = $sth->fetchrow_hashref;
253     $sth->finish();
254     return $result;
255
256 }
257
258 =head2 AddBiblioToBatch 
259
260   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
261                 $marc_record, $encoding, $update_counts);
262
263 =cut
264
265 sub AddBiblioToBatch {
266     my $batch_id = shift;
267     my $record_sequence = shift;
268     my $marc_record = shift;
269     my $encoding = shift;
270     my $update_counts = @_ ? shift : 1;
271
272     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
273     _add_biblio_fields($import_record_id, $marc_record);
274     _update_batch_record_counts($batch_id) if $update_counts;
275     return $import_record_id;
276 }
277
278 =head2 ModBiblioInBatch
279
280   ModBiblioInBatch($import_record_id, $marc_record);
281
282 =cut
283
284 sub ModBiblioInBatch {
285     my ($import_record_id, $marc_record) = @_;
286
287     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
288     _update_biblio_fields($import_record_id, $marc_record);
289
290 }
291
292 =head2 AddAuthToBatch
293
294   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
295                 $marc_record, $encoding, $update_counts, [$marc_type]);
296
297 =cut
298
299 sub AddAuthToBatch {
300     my $batch_id = shift;
301     my $record_sequence = shift;
302     my $marc_record = shift;
303     my $encoding = shift;
304     my $update_counts = @_ ? shift : 1;
305     my $marc_type = shift || C4::Context->preference('marcflavour');
306
307     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
308
309     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
310     _add_auth_fields($import_record_id, $marc_record);
311     _update_batch_record_counts($batch_id) if $update_counts;
312     return $import_record_id;
313 }
314
315 =head2 ModAuthInBatch
316
317   ModAuthInBatch($import_record_id, $marc_record);
318
319 =cut
320
321 sub ModAuthInBatch {
322     my ($import_record_id, $marc_record) = @_;
323
324     my $marcflavour = C4::Context->preference('marcflavour');
325     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
326
327 }
328
329 =head2 BatchStageMarcRecords
330
331 ( $batch_id, $num_records, $num_items, @invalid_records ) =
332   BatchStageMarcRecords(
333     $record_type,                $encoding,
334     $marc_records,               $file_name,
335     $marc_modification_template, $comments,
336     $branch_code,                $parse_items,
337     $leave_as_staging,           $progress_interval,
338     $progress_callback
339   );
340
341 =cut
342
343 sub BatchStageMarcRecords {
344     my $record_type = shift;
345     my $encoding = shift;
346     my $marc_records = shift;
347     my $file_name = shift;
348     my $marc_modification_template = shift;
349     my $comments = shift;
350     my $branch_code = shift;
351     my $parse_items = shift;
352     my $leave_as_staging = shift;
353
354     # optional callback to monitor status 
355     # of job
356     my $progress_interval = 0;
357     my $progress_callback = undef;
358     if ($#_ == 1) {
359         $progress_interval = shift;
360         $progress_callback = shift;
361         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
362         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
363     } 
364     
365     my $batch_id = AddImportBatch( {
366             overlay_action => 'create_new',
367             import_status => 'staging',
368             batch_type => 'batch',
369             file_name => $file_name,
370             comments => $comments,
371             record_type => $record_type,
372         } );
373     if ($parse_items) {
374         SetImportBatchItemAction($batch_id, 'always_add');
375     } else {
376         SetImportBatchItemAction($batch_id, 'ignore');
377     }
378
379
380     my $marc_type = C4::Context->preference('marcflavour');
381     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
382     my @invalid_records = ();
383     my $num_valid = 0;
384     my $num_items = 0;
385     # FIXME - for now, we're dealing only with bibs
386     my $rec_num = 0;
387     foreach my $marc_record (@$marc_records) {
388         $rec_num++;
389         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
390             &$progress_callback($rec_num);
391         }
392
393         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
394
395         my $import_record_id;
396         if (scalar($marc_record->fields()) == 0) {
397             push @invalid_records, $marc_record;
398         } else {
399
400             # Normalize the record so it doesn't have separated diacritics
401             SetUTF8Flag($marc_record);
402
403             $num_valid++;
404             if ($record_type eq 'biblio') {
405                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
406                 if ($parse_items) {
407                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
408                     $num_items += scalar(@import_items_ids);
409                 }
410             } elsif ($record_type eq 'auth') {
411                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
412             }
413         }
414     }
415     unless ($leave_as_staging) {
416         SetImportBatchStatus($batch_id, 'staged');
417     }
418     # FIXME branch_code, number of bibs, number of items
419     _update_batch_record_counts($batch_id);
420     return ($batch_id, $num_valid, $num_items, @invalid_records);
421 }
422
423 =head2 AddItemsToImportBiblio
424
425   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
426                 $import_record_id, $marc_record, $update_counts);
427
428 =cut
429
430 sub AddItemsToImportBiblio {
431     my $batch_id = shift;
432     my $import_record_id = shift;
433     my $marc_record = shift;
434     my $update_counts = @_ ? shift : 0;
435
436     my @import_items_ids = ();
437    
438     my $dbh = C4::Context->dbh; 
439     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
440     foreach my $item_field ($marc_record->field($item_tag)) {
441         my $item_marc = MARC::Record->new();
442         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
443         $item_marc->append_fields($item_field);
444         $marc_record->delete_field($item_field);
445         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
446                                         VALUES (?, ?, ?)");
447         $sth->bind_param(1, $import_record_id);
448         $sth->bind_param(2, 'staged');
449         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
450         $sth->execute();
451         push @import_items_ids, $dbh->{'mysql_insertid'};
452         $sth->finish();
453     }
454
455     if ($#import_items_ids > -1) {
456         _update_batch_record_counts($batch_id) if $update_counts;
457     }
458     return @import_items_ids;
459 }
460
461 =head2 BatchFindDuplicates
462
463   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
464              $max_matches, $progress_interval, $progress_callback);
465
466 Goes through the records loaded in the batch and attempts to 
467 find duplicates for each one.  Sets the matching status 
468 of each record to "no_match" or "auto_match" as appropriate.
469
470 The $max_matches parameter is optional; if it is not supplied,
471 it defaults to 10.
472
473 The $progress_interval and $progress_callback parameters are 
474 optional; if both are supplied, the sub referred to by
475 $progress_callback will be invoked every $progress_interval
476 records using the number of records processed as the 
477 singular argument.
478
479 =cut
480
481 sub BatchFindDuplicates {
482     my $batch_id = shift;
483     my $matcher = shift;
484     my $max_matches = @_ ? shift : 10;
485
486     # optional callback to monitor status 
487     # of job
488     my $progress_interval = 0;
489     my $progress_callback = undef;
490     if ($#_ == 1) {
491         $progress_interval = shift;
492         $progress_callback = shift;
493         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
494         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
495     }
496
497     my $dbh = C4::Context->dbh;
498
499     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
500                              FROM import_records
501                              WHERE import_batch_id = ?");
502     $sth->execute($batch_id);
503     my $num_with_matches = 0;
504     my $rec_num = 0;
505     while (my $rowref = $sth->fetchrow_hashref) {
506         $rec_num++;
507         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
508             &$progress_callback($rec_num);
509         }
510         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
511         my @matches = ();
512         if (defined $matcher) {
513             @matches = $matcher->get_matches($marc_record, $max_matches);
514         }
515         if (scalar(@matches) > 0) {
516             $num_with_matches++;
517             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
518             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
519         } else {
520             SetImportRecordMatches($rowref->{'import_record_id'}, ());
521             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
522         }
523     }
524     $sth->finish();
525     return $num_with_matches;
526 }
527
528 =head2 BatchCommitRecords
529
530   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
531         BatchCommitRecords($batch_id, $framework,
532         $progress_interval, $progress_callback);
533
534 =cut
535
536 sub BatchCommitRecords {
537     my $batch_id = shift;
538     my $framework = shift;
539
540     my $schema = Koha::Database->schema;
541
542     # optional callback to monitor status 
543     # of job
544     my $progress_interval = 0;
545     my $progress_callback = undef;
546     if ($#_ == 1) {
547         $progress_interval = shift;
548         $progress_callback = shift;
549         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
550         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
551     }
552
553     my $record_type;
554     my $num_added = 0;
555     my $num_updated = 0;
556     my $num_items_added = 0;
557     my $num_items_replaced = 0;
558     my $num_items_errored = 0;
559     my $num_ignored = 0;
560     # commit (i.e., save, all records in the batch)
561     SetImportBatchStatus($batch_id, 'importing');
562     my $overlay_action = GetImportBatchOverlayAction($batch_id);
563     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
564     my $item_action = GetImportBatchItemAction($batch_id);
565     my $item_tag;
566     my $item_subfield;
567     my $dbh = C4::Context->dbh;
568     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
569                              FROM import_records
570                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
571                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
572                              WHERE import_batch_id = ?");
573     $sth->execute($batch_id);
574     my $marcflavour = C4::Context->preference('marcflavour');
575
576     my $userenv = C4::Context->userenv;
577     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
578
579     my $rec_num = 0;
580     $schema->txn_begin; # We commit in a transaction
581     while (my $rowref = $sth->fetchrow_hashref) {
582         $record_type = $rowref->{'record_type'};
583
584         $rec_num++;
585
586         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
587             # report progress and commit
588             $schema->txn_commit;
589             &$progress_callback( $rec_num );
590             $schema->txn_begin;
591         }
592         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
593             $num_ignored++;
594             next;
595         }
596
597         my $marc_type;
598         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
599             $marc_type = 'UNIMARCAUTH';
600         } elsif ($marcflavour eq 'UNIMARC') {
601             $marc_type = 'UNIMARC';
602         } else {
603             $marc_type = 'USMARC';
604         }
605         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
606
607         if ($record_type eq 'biblio') {
608             # remove any item tags - rely on BatchCommitItems
609             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
610             foreach my $item_field ($marc_record->field($item_tag)) {
611                 $marc_record->delete_field($item_field);
612             }
613         }
614
615         my ($record_result, $item_result, $record_match) =
616             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
617                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
618
619         my $recordid;
620         my $query;
621         if ($record_result eq 'create_new') {
622             $num_added++;
623             if ($record_type eq 'biblio') {
624                 my $biblioitemnumber;
625                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
626                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
627                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
628                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
629                     $num_items_added += $bib_items_added;
630                     $num_items_replaced += $bib_items_replaced;
631                     $num_items_errored += $bib_items_errored;
632                 }
633             } else {
634                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
635                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
636             }
637             my $sth = $dbh->prepare_cached($query);
638             $sth->execute($recordid, $rowref->{'import_record_id'});
639             $sth->finish();
640             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
641         } elsif ($record_result eq 'replace') {
642             $num_updated++;
643             $recordid = $record_match;
644             my $oldxml;
645             if ($record_type eq 'biblio') {
646                 my $oldbiblio = Koha::Biblios->find( $recordid );
647                 $oldxml = GetXmlBiblio($recordid);
648
649                 # remove item fields so that they don't get
650                 # added again if record is reverted
651                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
652                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
653                 foreach my $item_field ($old_marc->field($item_tag)) {
654                     $old_marc->delete_field($item_field);
655                 }
656                 $oldxml = $old_marc->as_xml($marc_type);
657
658                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
659                     overlay_context => {
660                         source => 'batchimport',
661                         categorycode => $logged_in_patron->categorycode,
662                         userid => $logged_in_patron->userid
663                     },
664                 });
665                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
666
667                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
668                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
669                     $num_items_added += $bib_items_added;
670                     $num_items_replaced += $bib_items_replaced;
671                     $num_items_errored += $bib_items_errored;
672                 }
673             } else {
674                 $oldxml = GetAuthorityXML($recordid);
675
676                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
677                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
678             }
679             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
680             $sth->execute($oldxml, $rowref->{'import_record_id'});
681             $sth->finish();
682             my $sth2 = $dbh->prepare_cached($query);
683             $sth2->execute($recordid, $rowref->{'import_record_id'});
684             $sth2->finish();
685             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
686             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
687         } elsif ($record_result eq 'ignore') {
688             $recordid = $record_match;
689             $num_ignored++;
690             $recordid = $record_match;
691             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
692                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
693                 $num_items_added += $bib_items_added;
694          $num_items_replaced += $bib_items_replaced;
695                 $num_items_errored += $bib_items_errored;
696                 # still need to record the matched biblionumber so that the
697                 # items can be reverted
698                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
699                 $sth2->execute($recordid, $rowref->{'import_record_id'});
700                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
701             }
702             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
703         }
704     }
705     $schema->txn_commit; # Commit final records that may not have hit callback threshold
706     $sth->finish();
707     SetImportBatchStatus($batch_id, 'imported');
708     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
709 }
710
711 =head2 BatchCommitItems
712
713   ($num_items_added, $num_items_errored) = 
714          BatchCommitItems($import_record_id, $biblionumber);
715
716 =cut
717
718 sub BatchCommitItems {
719     my ( $import_record_id, $biblionumber, $action ) = @_;
720
721     my $dbh = C4::Context->dbh;
722
723     my $num_items_added = 0;
724     my $num_items_errored = 0;
725     my $num_items_replaced = 0;
726
727     my $sth = $dbh->prepare( "
728         SELECT import_items_id, import_items.marcxml, encoding
729         FROM import_items
730         JOIN import_records USING (import_record_id)
731         WHERE import_record_id = ?
732         ORDER BY import_items_id
733     " );
734     $sth->bind_param( 1, $import_record_id );
735     $sth->execute();
736
737     while ( my $row = $sth->fetchrow_hashref() ) {
738         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
739
740         # Delete date_due subfield as to not accidentally delete item checkout due dates
741         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
742         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
743
744         my $item = TransformMarcToKoha( $item_marc );
745
746         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
747         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
748
749         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
750         if ( $action eq "replace" && $duplicate_itemnumber ) {
751             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
752             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
753             $updsth->bind_param( 1, 'imported' );
754             $updsth->bind_param( 2, $item->{itemnumber} );
755             $updsth->bind_param( 3, undef );
756             $updsth->bind_param( 4, $row->{'import_items_id'} );
757             $updsth->execute();
758             $updsth->finish();
759             $num_items_replaced++;
760         } elsif ( $action eq "replace" && $duplicate_barcode ) {
761             my $itemnumber = $duplicate_barcode->itemnumber;
762             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
763             $updsth->bind_param( 1, 'imported' );
764             $updsth->bind_param( 2, $item->{itemnumber} );
765             $updsth->bind_param( 3, undef );
766             $updsth->bind_param( 4, $row->{'import_items_id'} );
767             $updsth->execute();
768             $updsth->finish();
769             $num_items_replaced++;
770         } elsif ($duplicate_barcode) {
771             $updsth->bind_param( 1, 'error' );
772             $updsth->bind_param( 2, undef );
773             $updsth->bind_param( 3, 'duplicate item barcode' );
774             $updsth->bind_param( 4, $row->{'import_items_id'} );
775             $updsth->execute();
776             $num_items_errored++;
777         } else {
778             # Remove the itemnumber if it exists, we want to create a new item
779             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
780             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
781
782             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
783             if( $itemnumber ) {
784                 $updsth->bind_param( 1, 'imported' );
785                 $updsth->bind_param( 2, $itemnumber );
786                 $updsth->bind_param( 3, undef );
787                 $updsth->bind_param( 4, $row->{'import_items_id'} );
788                 $updsth->execute();
789                 $updsth->finish();
790                 $num_items_added++;
791             }
792         }
793     }
794
795     return ( $num_items_added, $num_items_replaced, $num_items_errored );
796 }
797
798 =head2 BatchRevertRecords
799
800   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
801       $num_ignored) = BatchRevertRecords($batch_id);
802
803 =cut
804
805 sub BatchRevertRecords {
806     my $batch_id = shift;
807
808     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
809
810     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
811
812     my $record_type;
813     my $num_deleted = 0;
814     my $num_errors = 0;
815     my $num_reverted = 0;
816     my $num_ignored = 0;
817     my $num_items_deleted = 0;
818     # commit (i.e., save, all records in the batch)
819     SetImportBatchStatus($batch_id, 'reverting');
820     my $overlay_action = GetImportBatchOverlayAction($batch_id);
821     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
822     my $dbh = C4::Context->dbh;
823     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
824                              FROM import_records
825                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
826                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
827                              WHERE import_batch_id = ?");
828     $sth->execute($batch_id);
829     my $marc_type;
830     my $marcflavour = C4::Context->preference('marcflavour');
831     while (my $rowref = $sth->fetchrow_hashref) {
832         $record_type = $rowref->{'record_type'};
833         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
834             $num_ignored++;
835             next;
836         }
837         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
838             $marc_type = 'UNIMARCAUTH';
839         } elsif ($marcflavour eq 'UNIMARC') {
840             $marc_type = 'UNIMARC';
841         } else {
842             $marc_type = 'USMARC';
843         }
844
845         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
846
847         if ($record_result eq 'delete') {
848             my $error = undef;
849             if  ($record_type eq 'biblio') {
850                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
851                 $error = DelBiblio($rowref->{'matched_biblionumber'});
852             } else {
853                 DelAuthority({ authid => $rowref->{'matched_authid'} });
854             }
855             if (defined $error) {
856                 $num_errors++;
857             } else {
858                 $num_deleted++;
859                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
860             }
861         } elsif ($record_result eq 'restore') {
862             $num_reverted++;
863             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
864             if ($record_type eq 'biblio') {
865                 my $biblionumber = $rowref->{'matched_biblionumber'};
866                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
867
868                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
869                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
870
871                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
872                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
873             } else {
874                 my $authid = $rowref->{'matched_authid'};
875                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
876             }
877             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
878         } elsif ($record_result eq 'ignore') {
879             if ($record_type eq 'biblio') {
880                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
881             }
882             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
883         }
884         my $query;
885         if ($record_type eq 'biblio') {
886             # remove matched_biblionumber only if there is no 'imported' item left
887             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
888             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
889         } else {
890             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
891         }
892         my $sth2 = $dbh->prepare_cached($query);
893         $sth2->execute($rowref->{'import_record_id'});
894     }
895
896     $sth->finish();
897     SetImportBatchStatus($batch_id, 'reverted');
898     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
899 }
900
901 =head2 BatchRevertItems
902
903   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
904
905 =cut
906
907 sub BatchRevertItems {
908     my ($import_record_id, $biblionumber) = @_;
909
910     my $dbh = C4::Context->dbh;
911     my $num_items_deleted = 0;
912
913     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
914                                    FROM import_items
915                                    JOIN items USING (itemnumber)
916                                    WHERE import_record_id = ?");
917     $sth->bind_param(1, $import_record_id);
918     $sth->execute();
919     while (my $row = $sth->fetchrow_hashref()) {
920         my $item = Koha::Items->find($row->{itemnumber});
921         my $error = $item->safe_delete;
922         if ($error eq '1'){
923             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
924             $updsth->bind_param(1, 'reverted');
925             $updsth->bind_param(2, $row->{'import_items_id'});
926             $updsth->execute();
927             $updsth->finish();
928             $num_items_deleted++;
929         }
930         else {
931             next;
932         }
933     }
934     $sth->finish();
935     return $num_items_deleted;
936 }
937
938 =head2 CleanBatch
939
940   CleanBatch($batch_id)
941
942 Deletes all staged records from the import batch
943 and sets the status of the batch to 'cleaned'.  Note
944 that deleting a stage record does *not* affect
945 any record that has been committed to the database.
946
947 =cut
948
949 sub CleanBatch {
950     my $batch_id = shift;
951     return unless defined $batch_id;
952
953     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
954     SetImportBatchStatus($batch_id, 'cleaned');
955 }
956
957 =head2 DeleteBatch
958
959   DeleteBatch($batch_id)
960
961 Deletes the record from the database. This can only be done
962 once the batch has been cleaned.
963
964 =cut
965
966 sub DeleteBatch {
967     my $batch_id = shift;
968     return unless defined $batch_id;
969
970     my $dbh = C4::Context->dbh;
971     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
972     $sth->execute( $batch_id );
973 }
974
975 =head2 GetAllImportBatches
976
977   my $results = GetAllImportBatches();
978
979 Returns a references to an array of hash references corresponding
980 to all import_batches rows (of batch_type 'batch'), sorted in 
981 ascending order by import_batch_id.
982
983 =cut
984
985 sub  GetAllImportBatches {
986     my $dbh = C4::Context->dbh;
987     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
988                                     WHERE batch_type IN ('batch', 'webservice')
989                                     ORDER BY import_batch_id ASC");
990
991     my $results = [];
992     $sth->execute();
993     while (my $row = $sth->fetchrow_hashref) {
994         push @$results, $row;
995     }
996     $sth->finish();
997     return $results;
998 }
999
1000 =head2 GetStagedWebserviceBatches
1001
1002   my $batch_ids = GetStagedWebserviceBatches();
1003
1004 Returns a references to an array of batch id's
1005 of batch_type 'webservice' that are not imported
1006
1007 =cut
1008
1009 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1010 SELECT import_batch_id FROM import_batches
1011 WHERE batch_type = 'webservice'
1012 AND import_status = 'staged'
1013 EOQ
1014 sub  GetStagedWebserviceBatches {
1015     my $dbh = C4::Context->dbh;
1016     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1017 }
1018
1019 =head2 GetImportBatchRangeDesc
1020
1021   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1022
1023 Returns a reference to an array of hash references corresponding to
1024 import_batches rows (sorted in descending order by import_batch_id)
1025 start at the given offset.
1026
1027 =cut
1028
1029 sub GetImportBatchRangeDesc {
1030     my ($offset, $results_per_group) = @_;
1031
1032     my $dbh = C4::Context->dbh;
1033     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1034                                     LEFT JOIN import_batch_profiles p
1035                                     ON b.profile_id = p.id
1036                                     WHERE b.batch_type IN ('batch', 'webservice')
1037                                     ORDER BY b.import_batch_id DESC";
1038     my @params;
1039     if ($results_per_group){
1040         $query .= " LIMIT ?";
1041         push(@params, $results_per_group);
1042     }
1043     if ($offset){
1044         $query .= " OFFSET ?";
1045         push(@params, $offset);
1046     }
1047     my $sth = $dbh->prepare_cached($query);
1048     $sth->execute(@params);
1049     my $results = $sth->fetchall_arrayref({});
1050     $sth->finish();
1051     return $results;
1052 }
1053
1054 =head2 GetItemNumbersFromImportBatch
1055
1056   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1057
1058 =cut
1059
1060 sub GetItemNumbersFromImportBatch {
1061     my ($batch_id) = @_;
1062     my $dbh = C4::Context->dbh;
1063     my $sql = q|
1064 SELECT itemnumber FROM import_items
1065 INNER JOIN items USING (itemnumber)
1066 INNER JOIN import_records USING (import_record_id)
1067 WHERE import_batch_id = ?|;
1068     my  $sth = $dbh->prepare( $sql );
1069     $sth->execute($batch_id);
1070     my @items ;
1071     while ( my ($itm) = $sth->fetchrow_array ) {
1072         push @items, $itm;
1073     }
1074     return @items;
1075 }
1076
1077 =head2 GetNumberOfImportBatches
1078
1079   my $count = GetNumberOfImportBatches();
1080
1081 =cut
1082
1083 sub GetNumberOfNonZ3950ImportBatches {
1084     my $dbh = C4::Context->dbh;
1085     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1086     $sth->execute();
1087     my ($count) = $sth->fetchrow_array();
1088     $sth->finish();
1089     return $count;
1090 }
1091
1092 =head2 GetImportBiblios
1093
1094   my $results = GetImportBiblios($importid);
1095
1096 =cut
1097
1098 sub GetImportBiblios {
1099     my ($import_record_id) = @_;
1100
1101     my $dbh = C4::Context->dbh;
1102     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1103     return $dbh->selectall_arrayref(
1104         $query,
1105         { Slice => {} },
1106         $import_record_id
1107     );
1108
1109 }
1110
1111 =head2 GetImportRecordsRange
1112
1113   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1114
1115 Returns a reference to an array of hash references corresponding to
1116 import_biblios/import_auths/import_records rows for a given batch
1117 starting at the given offset.
1118
1119 =cut
1120
1121 sub GetImportRecordsRange {
1122     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1123
1124     my $dbh = C4::Context->dbh;
1125
1126     my $order_by = $parameters->{order_by} || 'import_record_id';
1127     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1128
1129     my $order_by_direction =
1130       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1131
1132     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1133
1134     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1135                                            record_sequence, status, overlay_status,
1136                                            matched_biblionumber, matched_authid, record_type
1137                                     FROM   import_records
1138                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1139                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1140                                     WHERE  import_batch_id = ?";
1141     my @params;
1142     push(@params, $batch_id);
1143     if ($status) {
1144         $query .= " AND status=?";
1145         push(@params,$status);
1146     }
1147
1148     $query.=" ORDER BY $order_by $order_by_direction";
1149
1150     if($results_per_group){
1151         $query .= " LIMIT ?";
1152         push(@params, $results_per_group);
1153     }
1154     if($offset){
1155         $query .= " OFFSET ?";
1156         push(@params, $offset);
1157     }
1158     my $sth = $dbh->prepare_cached($query);
1159     $sth->execute(@params);
1160     my $results = $sth->fetchall_arrayref({});
1161     $sth->finish();
1162     return $results;
1163
1164 }
1165
1166 =head2 GetBestRecordMatch
1167
1168   my $record_id = GetBestRecordMatch($import_record_id);
1169
1170 =cut
1171
1172 sub GetBestRecordMatch {
1173     my ($import_record_id) = @_;
1174
1175     my $dbh = C4::Context->dbh;
1176     my $sth = $dbh->prepare("SELECT candidate_match_id
1177                              FROM   import_record_matches
1178                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1179                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1180                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1181                              WHERE  import_record_matches.import_record_id = ? AND
1182                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1183                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1184                              ORDER BY score DESC, candidate_match_id DESC");
1185     $sth->execute($import_record_id);
1186     my ($record_id) = $sth->fetchrow_array();
1187     $sth->finish();
1188     return $record_id;
1189 }
1190
1191 =head2 GetImportBatchStatus
1192
1193   my $status = GetImportBatchStatus($batch_id);
1194
1195 =cut
1196
1197 sub GetImportBatchStatus {
1198     my ($batch_id) = @_;
1199
1200     my $dbh = C4::Context->dbh;
1201     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1202     $sth->execute($batch_id);
1203     my ($status) = $sth->fetchrow_array();
1204     $sth->finish();
1205     return $status;
1206
1207 }
1208
1209 =head2 SetImportBatchStatus
1210
1211   SetImportBatchStatus($batch_id, $new_status);
1212
1213 =cut
1214
1215 sub SetImportBatchStatus {
1216     my ($batch_id, $new_status) = @_;
1217
1218     my $dbh = C4::Context->dbh;
1219     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1220     $sth->execute($new_status, $batch_id);
1221     $sth->finish();
1222
1223 }
1224
1225 =head2 SetMatchedBiblionumber
1226
1227   SetMatchedBiblionumber($import_record_id, $biblionumber);
1228
1229 =cut
1230
1231 sub SetMatchedBiblionumber {
1232     my ($import_record_id, $biblionumber) = @_;
1233
1234     my $dbh = C4::Context->dbh;
1235     $dbh->do(
1236         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1237         undef, $biblionumber, $import_record_id
1238     );
1239 }
1240
1241 =head2 GetImportBatchOverlayAction
1242
1243   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1244
1245 =cut
1246
1247 sub GetImportBatchOverlayAction {
1248     my ($batch_id) = @_;
1249
1250     my $dbh = C4::Context->dbh;
1251     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1252     $sth->execute($batch_id);
1253     my ($overlay_action) = $sth->fetchrow_array();
1254     $sth->finish();
1255     return $overlay_action;
1256
1257 }
1258
1259
1260 =head2 SetImportBatchOverlayAction
1261
1262   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1263
1264 =cut
1265
1266 sub SetImportBatchOverlayAction {
1267     my ($batch_id, $new_overlay_action) = @_;
1268
1269     my $dbh = C4::Context->dbh;
1270     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1271     $sth->execute($new_overlay_action, $batch_id);
1272     $sth->finish();
1273
1274 }
1275
1276 =head2 GetImportBatchNoMatchAction
1277
1278   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1279
1280 =cut
1281
1282 sub GetImportBatchNoMatchAction {
1283     my ($batch_id) = @_;
1284
1285     my $dbh = C4::Context->dbh;
1286     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1287     $sth->execute($batch_id);
1288     my ($nomatch_action) = $sth->fetchrow_array();
1289     $sth->finish();
1290     return $nomatch_action;
1291
1292 }
1293
1294
1295 =head2 SetImportBatchNoMatchAction
1296
1297   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1298
1299 =cut
1300
1301 sub SetImportBatchNoMatchAction {
1302     my ($batch_id, $new_nomatch_action) = @_;
1303
1304     my $dbh = C4::Context->dbh;
1305     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1306     $sth->execute($new_nomatch_action, $batch_id);
1307     $sth->finish();
1308
1309 }
1310
1311 =head2 GetImportBatchItemAction
1312
1313   my $item_action = GetImportBatchItemAction($batch_id);
1314
1315 =cut
1316
1317 sub GetImportBatchItemAction {
1318     my ($batch_id) = @_;
1319
1320     my $dbh = C4::Context->dbh;
1321     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1322     $sth->execute($batch_id);
1323     my ($item_action) = $sth->fetchrow_array();
1324     $sth->finish();
1325     return $item_action;
1326
1327 }
1328
1329
1330 =head2 SetImportBatchItemAction
1331
1332   SetImportBatchItemAction($batch_id, $new_item_action);
1333
1334 =cut
1335
1336 sub SetImportBatchItemAction {
1337     my ($batch_id, $new_item_action) = @_;
1338
1339     my $dbh = C4::Context->dbh;
1340     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1341     $sth->execute($new_item_action, $batch_id);
1342     $sth->finish();
1343
1344 }
1345
1346 =head2 GetImportBatchMatcher
1347
1348   my $matcher_id = GetImportBatchMatcher($batch_id);
1349
1350 =cut
1351
1352 sub GetImportBatchMatcher {
1353     my ($batch_id) = @_;
1354
1355     my $dbh = C4::Context->dbh;
1356     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1357     $sth->execute($batch_id);
1358     my ($matcher_id) = $sth->fetchrow_array();
1359     $sth->finish();
1360     return $matcher_id;
1361
1362 }
1363
1364
1365 =head2 SetImportBatchMatcher
1366
1367   SetImportBatchMatcher($batch_id, $new_matcher_id);
1368
1369 =cut
1370
1371 sub SetImportBatchMatcher {
1372     my ($batch_id, $new_matcher_id) = @_;
1373
1374     my $dbh = C4::Context->dbh;
1375     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1376     $sth->execute($new_matcher_id, $batch_id);
1377     $sth->finish();
1378
1379 }
1380
1381 =head2 GetImportRecordOverlayStatus
1382
1383   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1384
1385 =cut
1386
1387 sub GetImportRecordOverlayStatus {
1388     my ($import_record_id) = @_;
1389
1390     my $dbh = C4::Context->dbh;
1391     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1392     $sth->execute($import_record_id);
1393     my ($overlay_status) = $sth->fetchrow_array();
1394     $sth->finish();
1395     return $overlay_status;
1396
1397 }
1398
1399
1400 =head2 SetImportRecordOverlayStatus
1401
1402   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1403
1404 =cut
1405
1406 sub SetImportRecordOverlayStatus {
1407     my ($import_record_id, $new_overlay_status) = @_;
1408
1409     my $dbh = C4::Context->dbh;
1410     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1411     $sth->execute($new_overlay_status, $import_record_id);
1412     $sth->finish();
1413
1414 }
1415
1416 =head2 GetImportRecordStatus
1417
1418   my $status = GetImportRecordStatus($import_record_id);
1419
1420 =cut
1421
1422 sub GetImportRecordStatus {
1423     my ($import_record_id) = @_;
1424
1425     my $dbh = C4::Context->dbh;
1426     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1427     $sth->execute($import_record_id);
1428     my ($status) = $sth->fetchrow_array();
1429     $sth->finish();
1430     return $status;
1431
1432 }
1433
1434
1435 =head2 SetImportRecordStatus
1436
1437   SetImportRecordStatus($import_record_id, $new_status);
1438
1439 =cut
1440
1441 sub SetImportRecordStatus {
1442     my ($import_record_id, $new_status) = @_;
1443
1444     my $dbh = C4::Context->dbh;
1445     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1446     $sth->execute($new_status, $import_record_id);
1447     $sth->finish();
1448
1449 }
1450
1451 =head2 GetImportRecordMatches
1452
1453   my $results = GetImportRecordMatches($import_record_id, $best_only);
1454
1455 =cut
1456
1457 sub GetImportRecordMatches {
1458     my $import_record_id = shift;
1459     my $best_only = @_ ? shift : 0;
1460
1461     my $dbh = C4::Context->dbh;
1462     # FIXME currently biblio only
1463     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1464                                     candidate_match_id, score, record_type
1465                                     FROM import_records
1466                                     JOIN import_record_matches USING (import_record_id)
1467                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1468                                     WHERE import_record_id = ?
1469                                     ORDER BY score DESC, biblionumber DESC");
1470     $sth->bind_param(1, $import_record_id);
1471     my $results = [];
1472     $sth->execute();
1473     while (my $row = $sth->fetchrow_hashref) {
1474         if ($row->{'record_type'} eq 'auth') {
1475             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1476         }
1477         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1478         push @$results, $row;
1479         last if $best_only;
1480     }
1481     $sth->finish();
1482
1483     return $results;
1484     
1485 }
1486
1487 =head2 SetImportRecordMatches
1488
1489   SetImportRecordMatches($import_record_id, @matches);
1490
1491 =cut
1492
1493 sub SetImportRecordMatches {
1494     my $import_record_id = shift;
1495     my @matches = @_;
1496
1497     my $dbh = C4::Context->dbh;
1498     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1499     $delsth->execute($import_record_id);
1500     $delsth->finish();
1501
1502     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1503                                     VALUES (?, ?, ?)");
1504     foreach my $match (@matches) {
1505         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1506     }
1507 }
1508
1509 =head2 RecordsFromISO2709File
1510
1511     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1512
1513 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1514
1515 @PARAM1, String, absolute path to the ISO2709 file.
1516 @PARAM2, String, see stage_file.pl
1517 @PARAM3, String, should be utf8
1518
1519 Returns two array refs.
1520
1521 =cut
1522
1523 sub RecordsFromISO2709File {
1524     my ($input_file, $record_type, $encoding) = @_;
1525     my @errors;
1526
1527     my $marc_type = C4::Context->preference('marcflavour');
1528     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1529
1530     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1531     my @marc_records;
1532     $/ = "\035";
1533     while (<$fh>) {
1534         s/^\s+//;
1535         s/\s+$//;
1536         next unless $_; # skip if record has only whitespace, as might occur
1537                         # if file includes newlines between each MARC record
1538         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1539         push @marc_records, $marc_record;
1540         if ($charset_guessed ne $encoding) {
1541             push @errors,
1542                 "Unexpected charset $charset_guessed, expecting $encoding";
1543         }
1544     }
1545     close $fh;
1546     return ( \@errors, \@marc_records );
1547 }
1548
1549 =head2 RecordsFromMARCXMLFile
1550
1551     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1552
1553 Creates MARC::Record-objects out of the given MARCXML-file.
1554
1555 @PARAM1, String, absolute path to the ISO2709 file.
1556 @PARAM2, String, should be utf8
1557
1558 Returns two array refs.
1559
1560 =cut
1561
1562 sub RecordsFromMARCXMLFile {
1563     my ( $filename, $encoding ) = @_;
1564     my $batch = MARC::File::XML->in( $filename );
1565     my ( @marcRecords, @errors, $record );
1566     do {
1567         eval { $record = $batch->next( $encoding ); };
1568         if ($@) {
1569             push @errors, $@;
1570         }
1571         push @marcRecords, $record if $record;
1572     } while( $record );
1573     return (\@errors, \@marcRecords);
1574 }
1575
1576 =head2 RecordsFromMarcPlugin
1577
1578     Converts text of input_file into array of MARC records with to_marc plugin
1579
1580 =cut
1581
1582 sub RecordsFromMarcPlugin {
1583     my ($input_file, $plugin_class, $encoding) = @_;
1584     my ( $text, @return );
1585     return \@return if !$input_file || !$plugin_class;
1586
1587     # Read input file
1588     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1589     $/ = "\035";
1590     while (<$fh>) {
1591         s/^\s+//;
1592         s/\s+$//;
1593         next unless $_;
1594         $text .= $_;
1595     }
1596     close $fh;
1597
1598     # Convert to large MARC blob with plugin
1599     $text = Koha::Plugins::Handler->run({
1600         class  => $plugin_class,
1601         method => 'to_marc',
1602         params => { data => $text },
1603     }) if $text;
1604
1605     # Convert to array of MARC records
1606     if( $text ) {
1607         my $marc_type = C4::Context->preference('marcflavour');
1608         foreach my $blob ( split(/\x1D/, $text) ) {
1609             next if $blob =~ /^\s*$/;
1610             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1611             push @return, $marcrecord;
1612         }
1613     }
1614     return \@return;
1615 }
1616
1617 # internal functions
1618
1619 sub _create_import_record {
1620     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1621
1622     my $dbh = C4::Context->dbh;
1623     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1624                                                          record_type, encoding)
1625                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1626     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1627                   $record_type, $encoding);
1628     my $import_record_id = $dbh->{'mysql_insertid'};
1629     $sth->finish();
1630     return $import_record_id;
1631 }
1632
1633 sub _add_auth_fields {
1634     my ($import_record_id, $marc_record) = @_;
1635
1636     my $controlnumber;
1637     if ($marc_record->field('001')) {
1638         $controlnumber = $marc_record->field('001')->data();
1639     }
1640     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1641     my $dbh = C4::Context->dbh;
1642     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1643     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1644     $sth->finish();
1645 }
1646
1647 sub _add_biblio_fields {
1648     my ($import_record_id, $marc_record) = @_;
1649
1650     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1651     my $dbh = C4::Context->dbh;
1652     # FIXME no controlnumber, originalsource
1653     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1654     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1655     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1656     $sth->finish();
1657                 
1658 }
1659
1660 sub _update_biblio_fields {
1661     my ($import_record_id, $marc_record) = @_;
1662
1663     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1664     my $dbh = C4::Context->dbh;
1665     # FIXME no controlnumber, originalsource
1666     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1667     $isbn =~ s/\(.*$//;
1668     $isbn =~ tr/ -_//;
1669     $isbn = uc $isbn;
1670     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1671                              WHERE  import_record_id = ?");
1672     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1673     $sth->finish();
1674 }
1675
1676 sub _parse_biblio_fields {
1677     my ($marc_record) = @_;
1678
1679     my $dbh = C4::Context->dbh;
1680     my $bibliofields = TransformMarcToKoha($marc_record, '');
1681     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1682
1683 }
1684
1685 sub _update_batch_record_counts {
1686     my ($batch_id) = @_;
1687
1688     my $dbh = C4::Context->dbh;
1689     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1690                                         num_records = (
1691                                             SELECT COUNT(*)
1692                                             FROM import_records
1693                                             WHERE import_batch_id = import_batches.import_batch_id),
1694                                         num_items = (
1695                                             SELECT COUNT(*)
1696                                             FROM import_records
1697                                             JOIN import_items USING (import_record_id)
1698                                             WHERE import_batch_id = import_batches.import_batch_id
1699                                             AND record_type = 'biblio')
1700                                     WHERE import_batch_id = ?");
1701     $sth->bind_param(1, $batch_id);
1702     $sth->execute();
1703     $sth->finish();
1704 }
1705
1706 sub _get_commit_action {
1707     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1708     
1709     if ($record_type eq 'biblio') {
1710         my ($bib_result, $bib_match, $item_result);
1711
1712         if ($overlay_status ne 'no_match') {
1713             $bib_match = GetBestRecordMatch($import_record_id);
1714             if ($overlay_action eq 'replace') {
1715                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1716             } elsif ($overlay_action eq 'create_new') {
1717                 $bib_result  = 'create_new';
1718             } elsif ($overlay_action eq 'ignore') {
1719                 $bib_result  = 'ignore';
1720             }
1721          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1722                 $item_result = 'create_new';
1723        }
1724       elsif($item_action eq 'replace'){
1725           $item_result = 'replace';
1726           }
1727       else {
1728              $item_result = 'ignore';
1729            }
1730         } else {
1731             $bib_result = $nomatch_action;
1732             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1733         }
1734         return ($bib_result, $item_result, $bib_match);
1735     } else { # must be auths
1736         my ($auth_result, $auth_match);
1737
1738         if ($overlay_status ne 'no_match') {
1739             $auth_match = GetBestRecordMatch($import_record_id);
1740             if ($overlay_action eq 'replace') {
1741                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1742             } elsif ($overlay_action eq 'create_new') {
1743                 $auth_result  = 'create_new';
1744             } elsif ($overlay_action eq 'ignore') {
1745                 $auth_result  = 'ignore';
1746             }
1747         } else {
1748             $auth_result = $nomatch_action;
1749         }
1750
1751         return ($auth_result, undef, $auth_match);
1752
1753     }
1754 }
1755
1756 sub _get_revert_action {
1757     my ($overlay_action, $overlay_status, $status) = @_;
1758
1759     my $bib_result;
1760
1761     if ($status eq 'ignored') {
1762         $bib_result = 'ignore';
1763     } else {
1764         if ($overlay_action eq 'create_new') {
1765             $bib_result = 'delete';
1766         } else {
1767             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1768         }
1769     }
1770     return $bib_result;
1771 }
1772
1773 1;
1774 __END__
1775
1776 =head1 AUTHOR
1777
1778 Koha Development Team <http://koha-community.org/>
1779
1780 Galen Charlton <galen.charlton@liblime.com>
1781
1782 =cut