Bug 22770: DBRev 19.05.00.003
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Items;
31 use Koha::Plugins::Handler;
32 use Koha::Logger;
33
34 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
35
36 BEGIN {
37         require Exporter;
38         @ISA    = qw(Exporter);
39         @EXPORT = qw(
40     GetZ3950BatchId
41     GetWebserviceBatchId
42     GetImportRecordMarc
43     GetImportRecordMarcXML
44     AddImportBatch
45     GetImportBatch
46     AddAuthToBatch
47     AddBiblioToBatch
48     AddItemsToImportBiblio
49     ModAuthorityInBatch
50     ModBiblioInBatch
51
52     BatchStageMarcRecords
53     BatchFindDuplicates
54     BatchCommitRecords
55     BatchRevertRecords
56     CleanBatch
57     DeleteBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     GetImportRecordMatches
82     SetImportRecordMatches
83         );
84 }
85
86 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
87
88 =head1 NAME
89
90 C4::ImportBatch - manage batches of imported MARC records
91
92 =head1 SYNOPSIS
93
94 use C4::ImportBatch;
95
96 =head1 FUNCTIONS
97
98 =head2 GetZ3950BatchId
99
100   my $batchid = GetZ3950BatchId($z3950server);
101
102 Retrieves the ID of the import batch for the Z39.50
103 reservoir for the given target.  If necessary,
104 creates the import batch.
105
106 =cut
107
108 sub GetZ3950BatchId {
109     my ($z3950server) = @_;
110
111     my $dbh = C4::Context->dbh;
112     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
113                              WHERE  batch_type = 'z3950'
114                              AND    file_name = ?");
115     $sth->execute($z3950server);
116     my $rowref = $sth->fetchrow_arrayref();
117     $sth->finish();
118     if (defined $rowref) {
119         return $rowref->[0];
120     } else {
121         my $batch_id = AddImportBatch( {
122                 overlay_action => 'create_new',
123                 import_status => 'staged',
124                 batch_type => 'z3950',
125                 file_name => $z3950server,
126             } );
127         return $batch_id;
128     }
129     
130 }
131
132 =head2 GetWebserviceBatchId
133
134   my $batchid = GetWebserviceBatchId();
135
136 Retrieves the ID of the import batch for webservice.
137 If necessary, creates the import batch.
138
139 =cut
140
141 my $WEBSERVICE_BASE_QRY = <<EOQ;
142 SELECT import_batch_id FROM import_batches
143 WHERE  batch_type = 'webservice'
144 AND    import_status = 'staged'
145 EOQ
146 sub GetWebserviceBatchId {
147     my ($params) = @_;
148
149     my $dbh = C4::Context->dbh;
150     my $sql = $WEBSERVICE_BASE_QRY;
151     my @args;
152     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
153         if (my $val = $params->{$field}) {
154             $sql .= " AND $field = ?";
155             push @args, $val;
156         }
157     }
158     my $id = $dbh->selectrow_array($sql, undef, @args);
159     return $id if $id;
160
161     $params->{batch_type} = 'webservice';
162     $params->{import_status} = 'staged';
163     return AddImportBatch($params);
164 }
165
166 =head2 GetImportRecordMarc
167
168   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
169
170 =cut
171
172 sub GetImportRecordMarc {
173     my ($import_record_id) = @_;
174
175     my $dbh = C4::Context->dbh;
176     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
177         SELECT marc, encoding
178         FROM import_records
179         WHERE import_record_id = ?
180     |, undef, $import_record_id );
181
182     return $marc, $encoding;
183 }
184
185 sub GetRecordFromImportBiblio {
186     my ( $import_record_id, $embed_items ) = @_;
187
188     my ($marc) = GetImportRecordMarc($import_record_id);
189     my $record = MARC::Record->new_from_usmarc($marc);
190
191     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
192
193     return $record;
194 }
195
196 sub EmbedItemsInImportBiblio {
197     my ( $record, $import_record_id ) = @_;
198     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
199     my $dbh = C4::Context->dbh;
200     my $import_items = $dbh->selectall_arrayref(q|
201         SELECT import_items.marcxml
202         FROM import_items
203         WHERE import_record_id = ?
204     |, { Slice => {} }, $import_record_id );
205     my @item_fields;
206     for my $import_item ( @$import_items ) {
207         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
208         push @item_fields, $item_marc->field($itemtag);
209     }
210     $record->append_fields(@item_fields);
211     return $record;
212 }
213
214 =head2 GetImportRecordMarcXML
215
216   my $marcxml = GetImportRecordMarcXML($import_record_id);
217
218 =cut
219
220 sub GetImportRecordMarcXML {
221     my ($import_record_id) = @_;
222
223     my $dbh = C4::Context->dbh;
224     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
225     $sth->execute($import_record_id);
226     my ($marcxml) = $sth->fetchrow();
227     $sth->finish();
228     return $marcxml;
229
230 }
231
232 =head2 AddImportBatch
233
234   my $batch_id = AddImportBatch($params_hash);
235
236 =cut
237
238 sub AddImportBatch {
239     my ($params) = @_;
240
241     my (@fields, @vals);
242     foreach (qw( matcher_id template_id branchcode
243                  overlay_action nomatch_action item_action
244                  import_status batch_type file_name comments record_type )) {
245         if (exists $params->{$_}) {
246             push @fields, $_;
247             push @vals, $params->{$_};
248         }
249     }
250     my $dbh = C4::Context->dbh;
251     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
252                                   VALUES (".join( ',', map '?', @fields).")",
253              undef,
254              @vals);
255     return $dbh->{'mysql_insertid'};
256 }
257
258 =head2 GetImportBatch 
259
260   my $row = GetImportBatch($batch_id);
261
262 Retrieve a hashref of an import_batches row.
263
264 =cut
265
266 sub GetImportBatch {
267     my ($batch_id) = @_;
268
269     my $dbh = C4::Context->dbh;
270     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
271     $sth->bind_param(1, $batch_id);
272     $sth->execute();
273     my $result = $sth->fetchrow_hashref;
274     $sth->finish();
275     return $result;
276
277 }
278
279 =head2 AddBiblioToBatch 
280
281   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
282                 $marc_record, $encoding, $update_counts);
283
284 =cut
285
286 sub AddBiblioToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292
293     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
294     _add_biblio_fields($import_record_id, $marc_record);
295     _update_batch_record_counts($batch_id) if $update_counts;
296     return $import_record_id;
297 }
298
299 =head2 ModBiblioInBatch
300
301   ModBiblioInBatch($import_record_id, $marc_record);
302
303 =cut
304
305 sub ModBiblioInBatch {
306     my ($import_record_id, $marc_record) = @_;
307
308     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
309     _update_biblio_fields($import_record_id, $marc_record);
310
311 }
312
313 =head2 AddAuthToBatch
314
315   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
316                 $marc_record, $encoding, $update_counts, [$marc_type]);
317
318 =cut
319
320 sub AddAuthToBatch {
321     my $batch_id = shift;
322     my $record_sequence = shift;
323     my $marc_record = shift;
324     my $encoding = shift;
325     my $update_counts = @_ ? shift : 1;
326     my $marc_type = shift || C4::Context->preference('marcflavour');
327
328     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
329
330     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
331     _add_auth_fields($import_record_id, $marc_record);
332     _update_batch_record_counts($batch_id) if $update_counts;
333     return $import_record_id;
334 }
335
336 =head2 ModAuthInBatch
337
338   ModAuthInBatch($import_record_id, $marc_record);
339
340 =cut
341
342 sub ModAuthInBatch {
343     my ($import_record_id, $marc_record) = @_;
344
345     my $marcflavour = C4::Context->preference('marcflavour');
346     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
347
348 }
349
350 =head2 BatchStageMarcRecords
351
352 ( $batch_id, $num_records, $num_items, @invalid_records ) =
353   BatchStageMarcRecords(
354     $record_type,                $encoding,
355     $marc_records,               $file_name,
356     $marc_modification_template, $comments,
357     $branch_code,                $parse_items,
358     $leave_as_staging,           $progress_interval,
359     $progress_callback
360   );
361
362 =cut
363
364 sub BatchStageMarcRecords {
365     my $record_type = shift;
366     my $encoding = shift;
367     my $marc_records = shift;
368     my $file_name = shift;
369     my $marc_modification_template = shift;
370     my $comments = shift;
371     my $branch_code = shift;
372     my $parse_items = shift;
373     my $leave_as_staging = shift;
374
375     # optional callback to monitor status 
376     # of job
377     my $progress_interval = 0;
378     my $progress_callback = undef;
379     if ($#_ == 1) {
380         $progress_interval = shift;
381         $progress_callback = shift;
382         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
383         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
384     } 
385     
386     my $batch_id = AddImportBatch( {
387             overlay_action => 'create_new',
388             import_status => 'staging',
389             batch_type => 'batch',
390             file_name => $file_name,
391             comments => $comments,
392             record_type => $record_type,
393         } );
394     if ($parse_items) {
395         SetImportBatchItemAction($batch_id, 'always_add');
396     } else {
397         SetImportBatchItemAction($batch_id, 'ignore');
398     }
399
400
401     my $marc_type = C4::Context->preference('marcflavour');
402     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
403     my @invalid_records = ();
404     my $num_valid = 0;
405     my $num_items = 0;
406     # FIXME - for now, we're dealing only with bibs
407     my $rec_num = 0;
408     foreach my $marc_record (@$marc_records) {
409         $rec_num++;
410         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
411             &$progress_callback($rec_num);
412         }
413
414         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
415
416         my $import_record_id;
417         if (scalar($marc_record->fields()) == 0) {
418             push @invalid_records, $marc_record;
419         } else {
420
421             # Normalize the record so it doesn't have separated diacritics
422             SetUTF8Flag($marc_record);
423
424             $num_valid++;
425             if ($record_type eq 'biblio') {
426                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
427                 if ($parse_items) {
428                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
429                     $num_items += scalar(@import_items_ids);
430                 }
431             } elsif ($record_type eq 'auth') {
432                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
433             }
434         }
435     }
436     unless ($leave_as_staging) {
437         SetImportBatchStatus($batch_id, 'staged');
438     }
439     # FIXME branch_code, number of bibs, number of items
440     _update_batch_record_counts($batch_id);
441     return ($batch_id, $num_valid, $num_items, @invalid_records);
442 }
443
444 =head2 AddItemsToImportBiblio
445
446   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
447                 $import_record_id, $marc_record, $update_counts);
448
449 =cut
450
451 sub AddItemsToImportBiblio {
452     my $batch_id = shift;
453     my $import_record_id = shift;
454     my $marc_record = shift;
455     my $update_counts = @_ ? shift : 0;
456
457     my @import_items_ids = ();
458    
459     my $dbh = C4::Context->dbh; 
460     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
461     foreach my $item_field ($marc_record->field($item_tag)) {
462         my $item_marc = MARC::Record->new();
463         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
464         $item_marc->append_fields($item_field);
465         $marc_record->delete_field($item_field);
466         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
467                                         VALUES (?, ?, ?)");
468         $sth->bind_param(1, $import_record_id);
469         $sth->bind_param(2, 'staged');
470         $sth->bind_param(3, $item_marc->as_xml());
471         $sth->execute();
472         push @import_items_ids, $dbh->{'mysql_insertid'};
473         $sth->finish();
474     }
475
476     if ($#import_items_ids > -1) {
477         _update_batch_record_counts($batch_id) if $update_counts;
478         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
479     }
480     return @import_items_ids;
481 }
482
483 =head2 BatchFindDuplicates
484
485   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
486              $max_matches, $progress_interval, $progress_callback);
487
488 Goes through the records loaded in the batch and attempts to 
489 find duplicates for each one.  Sets the matching status 
490 of each record to "no_match" or "auto_match" as appropriate.
491
492 The $max_matches parameter is optional; if it is not supplied,
493 it defaults to 10.
494
495 The $progress_interval and $progress_callback parameters are 
496 optional; if both are supplied, the sub referred to by
497 $progress_callback will be invoked every $progress_interval
498 records using the number of records processed as the 
499 singular argument.
500
501 =cut
502
503 sub BatchFindDuplicates {
504     my $batch_id = shift;
505     my $matcher = shift;
506     my $max_matches = @_ ? shift : 10;
507
508     # optional callback to monitor status 
509     # of job
510     my $progress_interval = 0;
511     my $progress_callback = undef;
512     if ($#_ == 1) {
513         $progress_interval = shift;
514         $progress_callback = shift;
515         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
516         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
517     }
518
519     my $dbh = C4::Context->dbh;
520
521     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
522                              FROM import_records
523                              WHERE import_batch_id = ?");
524     $sth->execute($batch_id);
525     my $num_with_matches = 0;
526     my $rec_num = 0;
527     while (my $rowref = $sth->fetchrow_hashref) {
528         $rec_num++;
529         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
530             &$progress_callback($rec_num);
531         }
532         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
533         my @matches = ();
534         if (defined $matcher) {
535             @matches = $matcher->get_matches($marc_record, $max_matches);
536         }
537         if (scalar(@matches) > 0) {
538             $num_with_matches++;
539             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
540             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
541         } else {
542             SetImportRecordMatches($rowref->{'import_record_id'}, ());
543             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
544         }
545     }
546     $sth->finish();
547     return $num_with_matches;
548 }
549
550 =head2 BatchCommitRecords
551
552   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
553         BatchCommitRecords($batch_id, $framework,
554         $progress_interval, $progress_callback);
555
556 =cut
557
558 sub BatchCommitRecords {
559     my $batch_id = shift;
560     my $framework = shift;
561
562     # optional callback to monitor status 
563     # of job
564     my $progress_interval = 0;
565     my $progress_callback = undef;
566     if ($#_ == 1) {
567         $progress_interval = shift;
568         $progress_callback = shift;
569         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
570         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
571     }
572
573     my $record_type;
574     my $num_added = 0;
575     my $num_updated = 0;
576     my $num_items_added = 0;
577     my $num_items_replaced = 0;
578     my $num_items_errored = 0;
579     my $num_ignored = 0;
580     # commit (i.e., save, all records in the batch)
581     SetImportBatchStatus('importing');
582     my $overlay_action = GetImportBatchOverlayAction($batch_id);
583     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
584     my $item_action = GetImportBatchItemAction($batch_id);
585     my $item_tag;
586     my $item_subfield;
587     my $dbh = C4::Context->dbh;
588     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
589                              FROM import_records
590                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
591                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
592                              WHERE import_batch_id = ?");
593     $sth->execute($batch_id);
594     my $marcflavour = C4::Context->preference('marcflavour');
595     my $rec_num = 0;
596     while (my $rowref = $sth->fetchrow_hashref) {
597         $record_type = $rowref->{'record_type'};
598         $rec_num++;
599         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
600             &$progress_callback($rec_num);
601         }
602         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
603             $num_ignored++;
604             next;
605         }
606
607         my $marc_type;
608         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
609             $marc_type = 'UNIMARCAUTH';
610         } elsif ($marcflavour eq 'UNIMARC') {
611             $marc_type = 'UNIMARC';
612         } else {
613             $marc_type = 'USMARC';
614         }
615         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
616
617         if ($record_type eq 'biblio') {
618             # remove any item tags - rely on BatchCommitItems
619             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
620             foreach my $item_field ($marc_record->field($item_tag)) {
621                 $marc_record->delete_field($item_field);
622             }
623         }
624
625         my ($record_result, $item_result, $record_match) =
626             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
627                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
628
629         my $recordid;
630         my $query;
631         if ($record_result eq 'create_new') {
632             $num_added++;
633             if ($record_type eq 'biblio') {
634                 my $biblioitemnumber;
635                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
636                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
637                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
638                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
639                     $num_items_added += $bib_items_added;
640                     $num_items_replaced += $bib_items_replaced;
641                     $num_items_errored += $bib_items_errored;
642                 }
643             } else {
644                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
645                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
646             }
647             my $sth = $dbh->prepare_cached($query);
648             $sth->execute($recordid, $rowref->{'import_record_id'});
649             $sth->finish();
650             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
651         } elsif ($record_result eq 'replace') {
652             $num_updated++;
653             $recordid = $record_match;
654             my $oldxml;
655             if ($record_type eq 'biblio') {
656                 my $oldbiblio = Koha::Biblios->find( $recordid );
657                 $oldxml = GetXmlBiblio($recordid);
658
659                 # remove item fields so that they don't get
660                 # added again if record is reverted
661                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
662                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
663                 foreach my $item_field ($old_marc->field($item_tag)) {
664                     $old_marc->delete_field($item_field);
665                 }
666                 $oldxml = $old_marc->as_xml($marc_type);
667
668                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
669                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
670
671                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
672                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
673                     $num_items_added += $bib_items_added;
674                     $num_items_replaced += $bib_items_replaced;
675                     $num_items_errored += $bib_items_errored;
676                 }
677             } else {
678                 $oldxml = GetAuthorityXML($recordid);
679
680                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
681                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
682             }
683             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
684             $sth->execute($oldxml, $rowref->{'import_record_id'});
685             $sth->finish();
686             my $sth2 = $dbh->prepare_cached($query);
687             $sth2->execute($recordid, $rowref->{'import_record_id'});
688             $sth2->finish();
689             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
690             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
691         } elsif ($record_result eq 'ignore') {
692             $recordid = $record_match;
693             $num_ignored++;
694             $recordid = $record_match;
695             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
696                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
697                 $num_items_added += $bib_items_added;
698          $num_items_replaced += $bib_items_replaced;
699                 $num_items_errored += $bib_items_errored;
700                 # still need to record the matched biblionumber so that the
701                 # items can be reverted
702                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
703                 $sth2->execute($recordid, $rowref->{'import_record_id'});
704                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
705             }
706             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
707         }
708     }
709     $sth->finish();
710     SetImportBatchStatus($batch_id, 'imported');
711     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
712 }
713
714 =head2 BatchCommitItems
715
716   ($num_items_added, $num_items_errored) = 
717          BatchCommitItems($import_record_id, $biblionumber);
718
719 =cut
720
721 sub BatchCommitItems {
722     my ( $import_record_id, $biblionumber, $action ) = @_;
723
724     my $dbh = C4::Context->dbh;
725
726     my $num_items_added = 0;
727     my $num_items_errored = 0;
728     my $num_items_replaced = 0;
729
730     my $sth = $dbh->prepare( "
731         SELECT import_items_id, import_items.marcxml, encoding
732         FROM import_items
733         JOIN import_records USING (import_record_id)
734         WHERE import_record_id = ?
735         ORDER BY import_items_id
736     " );
737     $sth->bind_param( 1, $import_record_id );
738     $sth->execute();
739
740     while ( my $row = $sth->fetchrow_hashref() ) {
741         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
742
743         # Delete date_due subfield as to not accidentally delete item checkout due dates
744         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
745         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
746
747         my $item = TransformMarcToKoha( $item_marc );
748
749         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
750         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
751
752         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
753         if ( $action eq "replace" && $duplicate_itemnumber ) {
754             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
755             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
756             $updsth->bind_param( 1, 'imported' );
757             $updsth->bind_param( 2, $item->{itemnumber} );
758             $updsth->bind_param( 3, $row->{'import_items_id'} );
759             $updsth->execute();
760             $updsth->finish();
761             $num_items_replaced++;
762         } elsif ( $action eq "replace" && $duplicate_barcode ) {
763             my $itemnumber = $duplicate_barcode->itemnumber;
764             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
765             $updsth->bind_param( 1, 'imported' );
766             $updsth->bind_param( 2, $item->{itemnumber} );
767             $updsth->bind_param( 3, $row->{'import_items_id'} );
768             $updsth->execute();
769             $updsth->finish();
770             $num_items_replaced++;
771         } elsif ($duplicate_barcode) {
772             $updsth->bind_param( 1, 'error' );
773             $updsth->bind_param( 2, 'duplicate item barcode' );
774             $updsth->bind_param( 3, $row->{'import_items_id'} );
775             $updsth->execute();
776             $num_items_errored++;
777         } else {
778             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
779             if( $itemnumber ) {
780                 $updsth->bind_param( 1, 'imported' );
781                 $updsth->bind_param( 2, $itemnumber );
782                 $updsth->bind_param( 3, $row->{'import_items_id'} );
783                 $updsth->execute();
784                 $updsth->finish();
785                 $num_items_added++;
786             }
787         }
788     }
789
790     return ( $num_items_added, $num_items_replaced, $num_items_errored );
791 }
792
793 =head2 BatchRevertRecords
794
795   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
796       $num_ignored) = BatchRevertRecords($batch_id);
797
798 =cut
799
800 sub BatchRevertRecords {
801     my $batch_id = shift;
802
803     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
804
805     my $record_type;
806     my $num_deleted = 0;
807     my $num_errors = 0;
808     my $num_reverted = 0;
809     my $num_ignored = 0;
810     my $num_items_deleted = 0;
811     # commit (i.e., save, all records in the batch)
812     SetImportBatchStatus('reverting');
813     my $overlay_action = GetImportBatchOverlayAction($batch_id);
814     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
815     my $dbh = C4::Context->dbh;
816     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
817                              FROM import_records
818                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
819                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
820                              WHERE import_batch_id = ?");
821     $sth->execute($batch_id);
822     my $marc_type;
823     my $marcflavour = C4::Context->preference('marcflavour');
824     while (my $rowref = $sth->fetchrow_hashref) {
825         $record_type = $rowref->{'record_type'};
826         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
827             $num_ignored++;
828             next;
829         }
830         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
831             $marc_type = 'UNIMARCAUTH';
832         } elsif ($marcflavour eq 'UNIMARC') {
833             $marc_type = 'UNIMARC';
834         } else {
835             $marc_type = 'USMARC';
836         }
837
838         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
839
840         if ($record_result eq 'delete') {
841             my $error = undef;
842             if  ($record_type eq 'biblio') {
843                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
844                 $error = DelBiblio($rowref->{'matched_biblionumber'});
845             } else {
846                 DelAuthority({ authid => $rowref->{'matched_authid'} });
847             }
848             if (defined $error) {
849                 $num_errors++;
850             } else {
851                 $num_deleted++;
852                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
853             }
854         } elsif ($record_result eq 'restore') {
855             $num_reverted++;
856             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
857             if ($record_type eq 'biblio') {
858                 my $biblionumber = $rowref->{'matched_biblionumber'};
859                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
860
861                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
862                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
863
864                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
865                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
866             } else {
867                 my $authid = $rowref->{'matched_authid'};
868                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
869             }
870             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
871         } elsif ($record_result eq 'ignore') {
872             if ($record_type eq 'biblio') {
873                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
874             }
875             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
876         }
877         my $query;
878         if ($record_type eq 'biblio') {
879             # remove matched_biblionumber only if there is no 'imported' item left
880             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
881             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
882         } else {
883             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
884         }
885         my $sth2 = $dbh->prepare_cached($query);
886         $sth2->execute($rowref->{'import_record_id'});
887     }
888
889     $sth->finish();
890     SetImportBatchStatus($batch_id, 'reverted');
891     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
892 }
893
894 =head2 BatchRevertItems
895
896   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
897
898 =cut
899
900 sub BatchRevertItems {
901     my ($import_record_id, $biblionumber) = @_;
902
903     my $dbh = C4::Context->dbh;
904     my $num_items_deleted = 0;
905
906     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
907                                    FROM import_items
908                                    JOIN items USING (itemnumber)
909                                    WHERE import_record_id = ?");
910     $sth->bind_param(1, $import_record_id);
911     $sth->execute();
912     while (my $row = $sth->fetchrow_hashref()) {
913         my $error = DelItemCheck( $biblionumber, $row->{'itemnumber'});
914         if ($error == 1){
915             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
916             $updsth->bind_param(1, 'reverted');
917             $updsth->bind_param(2, $row->{'import_items_id'});
918             $updsth->execute();
919             $updsth->finish();
920             $num_items_deleted++;
921         }
922         else {
923             next;
924         }
925     }
926     $sth->finish();
927     return $num_items_deleted;
928 }
929
930 =head2 CleanBatch
931
932   CleanBatch($batch_id)
933
934 Deletes all staged records from the import batch
935 and sets the status of the batch to 'cleaned'.  Note
936 that deleting a stage record does *not* affect
937 any record that has been committed to the database.
938
939 =cut
940
941 sub CleanBatch {
942     my $batch_id = shift;
943     return unless defined $batch_id;
944
945     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
946     SetImportBatchStatus($batch_id, 'cleaned');
947 }
948
949 =head2 DeleteBatch
950
951   DeleteBatch($batch_id)
952
953 Deletes the record from the database. This can only be done
954 once the batch has been cleaned.
955
956 =cut
957
958 sub DeleteBatch {
959     my $batch_id = shift;
960     return unless defined $batch_id;
961
962     my $dbh = C4::Context->dbh;
963     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
964     $sth->execute( $batch_id );
965 }
966
967 =head2 GetAllImportBatches
968
969   my $results = GetAllImportBatches();
970
971 Returns a references to an array of hash references corresponding
972 to all import_batches rows (of batch_type 'batch'), sorted in 
973 ascending order by import_batch_id.
974
975 =cut
976
977 sub  GetAllImportBatches {
978     my $dbh = C4::Context->dbh;
979     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
980                                     WHERE batch_type IN ('batch', 'webservice')
981                                     ORDER BY import_batch_id ASC");
982
983     my $results = [];
984     $sth->execute();
985     while (my $row = $sth->fetchrow_hashref) {
986         push @$results, $row;
987     }
988     $sth->finish();
989     return $results;
990 }
991
992 =head2 GetStagedWebserviceBatches
993
994   my $batch_ids = GetStagedWebserviceBatches();
995
996 Returns a references to an array of batch id's
997 of batch_type 'webservice' that are not imported
998
999 =cut
1000
1001 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1002 SELECT import_batch_id FROM import_batches
1003 WHERE batch_type = 'webservice'
1004 AND import_status = 'staged'
1005 EOQ
1006 sub  GetStagedWebserviceBatches {
1007     my $dbh = C4::Context->dbh;
1008     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1009 }
1010
1011 =head2 GetImportBatchRangeDesc
1012
1013   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1014
1015 Returns a reference to an array of hash references corresponding to
1016 import_batches rows (sorted in descending order by import_batch_id)
1017 start at the given offset.
1018
1019 =cut
1020
1021 sub GetImportBatchRangeDesc {
1022     my ($offset, $results_per_group) = @_;
1023
1024     my $dbh = C4::Context->dbh;
1025     my $query = "SELECT * FROM import_batches
1026                                     WHERE batch_type IN ('batch', 'webservice')
1027                                     ORDER BY import_batch_id DESC";
1028     my @params;
1029     if ($results_per_group){
1030         $query .= " LIMIT ?";
1031         push(@params, $results_per_group);
1032     }
1033     if ($offset){
1034         $query .= " OFFSET ?";
1035         push(@params, $offset);
1036     }
1037     my $sth = $dbh->prepare_cached($query);
1038     $sth->execute(@params);
1039     my $results = $sth->fetchall_arrayref({});
1040     $sth->finish();
1041     return $results;
1042 }
1043
1044 =head2 GetItemNumbersFromImportBatch
1045
1046   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1047
1048 =cut
1049
1050 sub GetItemNumbersFromImportBatch {
1051     my ($batch_id) = @_;
1052     my $dbh = C4::Context->dbh;
1053     my $sql = q|
1054 SELECT itemnumber FROM import_items
1055 INNER JOIN items USING (itemnumber)
1056 INNER JOIN import_records USING (import_record_id)
1057 WHERE import_batch_id = ?|;
1058     my  $sth = $dbh->prepare( $sql );
1059     $sth->execute($batch_id);
1060     my @items ;
1061     while ( my ($itm) = $sth->fetchrow_array ) {
1062         push @items, $itm;
1063     }
1064     return @items;
1065 }
1066
1067 =head2 GetNumberOfImportBatches
1068
1069   my $count = GetNumberOfImportBatches();
1070
1071 =cut
1072
1073 sub GetNumberOfNonZ3950ImportBatches {
1074     my $dbh = C4::Context->dbh;
1075     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1076     $sth->execute();
1077     my ($count) = $sth->fetchrow_array();
1078     $sth->finish();
1079     return $count;
1080 }
1081
1082 =head2 GetImportBiblios
1083
1084   my $results = GetImportBiblios($importid);
1085
1086 =cut
1087
1088 sub GetImportBiblios {
1089     my ($import_record_id) = @_;
1090
1091     my $dbh = C4::Context->dbh;
1092     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1093     return $dbh->selectall_arrayref(
1094         $query,
1095         { Slice => {} },
1096         $import_record_id
1097     );
1098
1099 }
1100
1101 =head2 GetImportRecordsRange
1102
1103   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1104
1105 Returns a reference to an array of hash references corresponding to
1106 import_biblios/import_auths/import_records rows for a given batch
1107 starting at the given offset.
1108
1109 =cut
1110
1111 sub GetImportRecordsRange {
1112     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1113
1114     my $dbh = C4::Context->dbh;
1115
1116     my $order_by = $parameters->{order_by} || 'import_record_id';
1117     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1118
1119     my $order_by_direction =
1120       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1121
1122     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1123
1124     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1125                                            record_sequence, status, overlay_status,
1126                                            matched_biblionumber, matched_authid, record_type
1127                                     FROM   import_records
1128                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1129                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1130                                     WHERE  import_batch_id = ?";
1131     my @params;
1132     push(@params, $batch_id);
1133     if ($status) {
1134         $query .= " AND status=?";
1135         push(@params,$status);
1136     }
1137
1138     $query.=" ORDER BY $order_by $order_by_direction";
1139
1140     if($results_per_group){
1141         $query .= " LIMIT ?";
1142         push(@params, $results_per_group);
1143     }
1144     if($offset){
1145         $query .= " OFFSET ?";
1146         push(@params, $offset);
1147     }
1148     my $sth = $dbh->prepare_cached($query);
1149     $sth->execute(@params);
1150     my $results = $sth->fetchall_arrayref({});
1151     $sth->finish();
1152     return $results;
1153
1154 }
1155
1156 =head2 GetBestRecordMatch
1157
1158   my $record_id = GetBestRecordMatch($import_record_id);
1159
1160 =cut
1161
1162 sub GetBestRecordMatch {
1163     my ($import_record_id) = @_;
1164
1165     my $dbh = C4::Context->dbh;
1166     my $sth = $dbh->prepare("SELECT candidate_match_id
1167                              FROM   import_record_matches
1168                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1169                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1170                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1171                              WHERE  import_record_matches.import_record_id = ? AND
1172                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1173                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1174                              ORDER BY score DESC, candidate_match_id DESC");
1175     $sth->execute($import_record_id);
1176     my ($record_id) = $sth->fetchrow_array();
1177     $sth->finish();
1178     return $record_id;
1179 }
1180
1181 =head2 GetImportBatchStatus
1182
1183   my $status = GetImportBatchStatus($batch_id);
1184
1185 =cut
1186
1187 sub GetImportBatchStatus {
1188     my ($batch_id) = @_;
1189
1190     my $dbh = C4::Context->dbh;
1191     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1192     $sth->execute($batch_id);
1193     my ($status) = $sth->fetchrow_array();
1194     $sth->finish();
1195     return $status;
1196
1197 }
1198
1199 =head2 SetImportBatchStatus
1200
1201   SetImportBatchStatus($batch_id, $new_status);
1202
1203 =cut
1204
1205 sub SetImportBatchStatus {
1206     my ($batch_id, $new_status) = @_;
1207
1208     my $dbh = C4::Context->dbh;
1209     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1210     $sth->execute($new_status, $batch_id);
1211     $sth->finish();
1212
1213 }
1214
1215 =head2 GetImportBatchOverlayAction
1216
1217   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1218
1219 =cut
1220
1221 sub GetImportBatchOverlayAction {
1222     my ($batch_id) = @_;
1223
1224     my $dbh = C4::Context->dbh;
1225     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1226     $sth->execute($batch_id);
1227     my ($overlay_action) = $sth->fetchrow_array();
1228     $sth->finish();
1229     return $overlay_action;
1230
1231 }
1232
1233
1234 =head2 SetImportBatchOverlayAction
1235
1236   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1237
1238 =cut
1239
1240 sub SetImportBatchOverlayAction {
1241     my ($batch_id, $new_overlay_action) = @_;
1242
1243     my $dbh = C4::Context->dbh;
1244     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1245     $sth->execute($new_overlay_action, $batch_id);
1246     $sth->finish();
1247
1248 }
1249
1250 =head2 GetImportBatchNoMatchAction
1251
1252   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1253
1254 =cut
1255
1256 sub GetImportBatchNoMatchAction {
1257     my ($batch_id) = @_;
1258
1259     my $dbh = C4::Context->dbh;
1260     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1261     $sth->execute($batch_id);
1262     my ($nomatch_action) = $sth->fetchrow_array();
1263     $sth->finish();
1264     return $nomatch_action;
1265
1266 }
1267
1268
1269 =head2 SetImportBatchNoMatchAction
1270
1271   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1272
1273 =cut
1274
1275 sub SetImportBatchNoMatchAction {
1276     my ($batch_id, $new_nomatch_action) = @_;
1277
1278     my $dbh = C4::Context->dbh;
1279     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1280     $sth->execute($new_nomatch_action, $batch_id);
1281     $sth->finish();
1282
1283 }
1284
1285 =head2 GetImportBatchItemAction
1286
1287   my $item_action = GetImportBatchItemAction($batch_id);
1288
1289 =cut
1290
1291 sub GetImportBatchItemAction {
1292     my ($batch_id) = @_;
1293
1294     my $dbh = C4::Context->dbh;
1295     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1296     $sth->execute($batch_id);
1297     my ($item_action) = $sth->fetchrow_array();
1298     $sth->finish();
1299     return $item_action;
1300
1301 }
1302
1303
1304 =head2 SetImportBatchItemAction
1305
1306   SetImportBatchItemAction($batch_id, $new_item_action);
1307
1308 =cut
1309
1310 sub SetImportBatchItemAction {
1311     my ($batch_id, $new_item_action) = @_;
1312
1313     my $dbh = C4::Context->dbh;
1314     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1315     $sth->execute($new_item_action, $batch_id);
1316     $sth->finish();
1317
1318 }
1319
1320 =head2 GetImportBatchMatcher
1321
1322   my $matcher_id = GetImportBatchMatcher($batch_id);
1323
1324 =cut
1325
1326 sub GetImportBatchMatcher {
1327     my ($batch_id) = @_;
1328
1329     my $dbh = C4::Context->dbh;
1330     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1331     $sth->execute($batch_id);
1332     my ($matcher_id) = $sth->fetchrow_array();
1333     $sth->finish();
1334     return $matcher_id;
1335
1336 }
1337
1338
1339 =head2 SetImportBatchMatcher
1340
1341   SetImportBatchMatcher($batch_id, $new_matcher_id);
1342
1343 =cut
1344
1345 sub SetImportBatchMatcher {
1346     my ($batch_id, $new_matcher_id) = @_;
1347
1348     my $dbh = C4::Context->dbh;
1349     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1350     $sth->execute($new_matcher_id, $batch_id);
1351     $sth->finish();
1352
1353 }
1354
1355 =head2 GetImportRecordOverlayStatus
1356
1357   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1358
1359 =cut
1360
1361 sub GetImportRecordOverlayStatus {
1362     my ($import_record_id) = @_;
1363
1364     my $dbh = C4::Context->dbh;
1365     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1366     $sth->execute($import_record_id);
1367     my ($overlay_status) = $sth->fetchrow_array();
1368     $sth->finish();
1369     return $overlay_status;
1370
1371 }
1372
1373
1374 =head2 SetImportRecordOverlayStatus
1375
1376   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1377
1378 =cut
1379
1380 sub SetImportRecordOverlayStatus {
1381     my ($import_record_id, $new_overlay_status) = @_;
1382
1383     my $dbh = C4::Context->dbh;
1384     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1385     $sth->execute($new_overlay_status, $import_record_id);
1386     $sth->finish();
1387
1388 }
1389
1390 =head2 GetImportRecordStatus
1391
1392   my $status = GetImportRecordStatus($import_record_id);
1393
1394 =cut
1395
1396 sub GetImportRecordStatus {
1397     my ($import_record_id) = @_;
1398
1399     my $dbh = C4::Context->dbh;
1400     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1401     $sth->execute($import_record_id);
1402     my ($status) = $sth->fetchrow_array();
1403     $sth->finish();
1404     return $status;
1405
1406 }
1407
1408
1409 =head2 SetImportRecordStatus
1410
1411   SetImportRecordStatus($import_record_id, $new_status);
1412
1413 =cut
1414
1415 sub SetImportRecordStatus {
1416     my ($import_record_id, $new_status) = @_;
1417
1418     my $dbh = C4::Context->dbh;
1419     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1420     $sth->execute($new_status, $import_record_id);
1421     $sth->finish();
1422
1423 }
1424
1425 =head2 GetImportRecordMatches
1426
1427   my $results = GetImportRecordMatches($import_record_id, $best_only);
1428
1429 =cut
1430
1431 sub GetImportRecordMatches {
1432     my $import_record_id = shift;
1433     my $best_only = @_ ? shift : 0;
1434
1435     my $dbh = C4::Context->dbh;
1436     # FIXME currently biblio only
1437     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1438                                     candidate_match_id, score, record_type
1439                                     FROM import_records
1440                                     JOIN import_record_matches USING (import_record_id)
1441                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1442                                     WHERE import_record_id = ?
1443                                     ORDER BY score DESC, biblionumber DESC");
1444     $sth->bind_param(1, $import_record_id);
1445     my $results = [];
1446     $sth->execute();
1447     while (my $row = $sth->fetchrow_hashref) {
1448         if ($row->{'record_type'} eq 'auth') {
1449             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1450         }
1451         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1452         push @$results, $row;
1453         last if $best_only;
1454     }
1455     $sth->finish();
1456
1457     return $results;
1458     
1459 }
1460
1461 =head2 SetImportRecordMatches
1462
1463   SetImportRecordMatches($import_record_id, @matches);
1464
1465 =cut
1466
1467 sub SetImportRecordMatches {
1468     my $import_record_id = shift;
1469     my @matches = @_;
1470
1471     my $dbh = C4::Context->dbh;
1472     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1473     $delsth->execute($import_record_id);
1474     $delsth->finish();
1475
1476     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1477                                     VALUES (?, ?, ?)");
1478     foreach my $match (@matches) {
1479         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1480     }
1481 }
1482
1483 =head2 RecordsFromISO2709File
1484
1485     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1486
1487 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1488
1489 @PARAM1, String, absolute path to the ISO2709 file.
1490 @PARAM2, String, see stage_file.pl
1491 @PARAM3, String, should be utf8
1492
1493 Returns two array refs.
1494
1495 =cut
1496
1497 sub RecordsFromISO2709File {
1498     my ($input_file, $record_type, $encoding) = @_;
1499     my @errors;
1500
1501     my $marc_type = C4::Context->preference('marcflavour');
1502     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1503
1504     open IN, "<$input_file" or die "$0: cannot open input file $input_file: $!\n";
1505     my @marc_records;
1506     $/ = "\035";
1507     while (<IN>) {
1508         s/^\s+//;
1509         s/\s+$//;
1510         next unless $_; # skip if record has only whitespace, as might occur
1511                         # if file includes newlines between each MARC record
1512         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1513         push @marc_records, $marc_record;
1514         if ($charset_guessed ne $encoding) {
1515             push @errors,
1516                 "Unexpected charset $charset_guessed, expecting $encoding";
1517         }
1518     }
1519     close IN;
1520     return ( \@errors, \@marc_records );
1521 }
1522
1523 =head2 RecordsFromMARCXMLFile
1524
1525     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1526
1527 Creates MARC::Record-objects out of the given MARCXML-file.
1528
1529 @PARAM1, String, absolute path to the ISO2709 file.
1530 @PARAM2, String, should be utf8
1531
1532 Returns two array refs.
1533
1534 =cut
1535
1536 sub RecordsFromMARCXMLFile {
1537     my ( $filename, $encoding ) = @_;
1538     my $batch = MARC::File::XML->in( $filename );
1539     my ( @marcRecords, @errors, $record );
1540     do {
1541         eval { $record = $batch->next( $encoding ); };
1542         if ($@) {
1543             push @errors, $@;
1544         }
1545         push @marcRecords, $record if $record;
1546     } while( $record );
1547     return (\@errors, \@marcRecords);
1548 }
1549
1550 =head2 RecordsFromMarcPlugin
1551
1552     Converts text of input_file into array of MARC records with to_marc plugin
1553
1554 =cut
1555
1556 sub RecordsFromMarcPlugin {
1557     my ($input_file, $plugin_class, $encoding) = @_;
1558     my ( $text, @return );
1559     return \@return if !$input_file || !$plugin_class;
1560
1561     # Read input file
1562     open IN, "<$input_file" or die "$0: cannot open input file $input_file: $!\n";
1563     $/ = "\035";
1564     while (<IN>) {
1565         s/^\s+//;
1566         s/\s+$//;
1567         next unless $_;
1568         $text .= $_;
1569     }
1570     close IN;
1571
1572     # Convert to large MARC blob with plugin
1573     $text = Koha::Plugins::Handler->run({
1574         class  => $plugin_class,
1575         method => 'to_marc',
1576         params => { data => $text },
1577     }) if $text;
1578
1579     # Convert to array of MARC records
1580     if( $text ) {
1581         my $marc_type = C4::Context->preference('marcflavour');
1582         foreach my $blob ( split(/\x1D/, $text) ) {
1583             next if $blob =~ /^\s*$/;
1584             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1585             push @return, $marcrecord;
1586         }
1587     }
1588     return \@return;
1589 }
1590
1591 # internal functions
1592
1593 sub _create_import_record {
1594     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1595
1596     my $dbh = C4::Context->dbh;
1597     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1598                                                          record_type, encoding)
1599                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1600     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1601                   $record_type, $encoding);
1602     my $import_record_id = $dbh->{'mysql_insertid'};
1603     $sth->finish();
1604     return $import_record_id;
1605 }
1606
1607 sub _update_import_record_marc {
1608     my ($import_record_id, $marc_record, $marc_type) = @_;
1609
1610     my $dbh = C4::Context->dbh;
1611     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1612                              WHERE  import_record_id = ?");
1613     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1614     $sth->finish();
1615 }
1616
1617 sub _add_auth_fields {
1618     my ($import_record_id, $marc_record) = @_;
1619
1620     my $controlnumber;
1621     if ($marc_record->field('001')) {
1622         $controlnumber = $marc_record->field('001')->data();
1623     }
1624     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1625     my $dbh = C4::Context->dbh;
1626     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1627     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1628     $sth->finish();
1629 }
1630
1631 sub _add_biblio_fields {
1632     my ($import_record_id, $marc_record) = @_;
1633
1634     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1635     my $dbh = C4::Context->dbh;
1636     # FIXME no controlnumber, originalsource
1637     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1638     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1639     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1640     $sth->finish();
1641                 
1642 }
1643
1644 sub _update_biblio_fields {
1645     my ($import_record_id, $marc_record) = @_;
1646
1647     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1648     my $dbh = C4::Context->dbh;
1649     # FIXME no controlnumber, originalsource
1650     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1651     $isbn =~ s/\(.*$//;
1652     $isbn =~ tr/ -_//;
1653     $isbn = uc $isbn;
1654     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1655                              WHERE  import_record_id = ?");
1656     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1657     $sth->finish();
1658 }
1659
1660 sub _parse_biblio_fields {
1661     my ($marc_record) = @_;
1662
1663     my $dbh = C4::Context->dbh;
1664     my $bibliofields = TransformMarcToKoha($marc_record, '');
1665     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1666
1667 }
1668
1669 sub _update_batch_record_counts {
1670     my ($batch_id) = @_;
1671
1672     my $dbh = C4::Context->dbh;
1673     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1674                                         num_records = (
1675                                             SELECT COUNT(*)
1676                                             FROM import_records
1677                                             WHERE import_batch_id = import_batches.import_batch_id),
1678                                         num_items = (
1679                                             SELECT COUNT(*)
1680                                             FROM import_records
1681                                             JOIN import_items USING (import_record_id)
1682                                             WHERE import_batch_id = import_batches.import_batch_id
1683                                             AND record_type = 'biblio')
1684                                     WHERE import_batch_id = ?");
1685     $sth->bind_param(1, $batch_id);
1686     $sth->execute();
1687     $sth->finish();
1688 }
1689
1690 sub _get_commit_action {
1691     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1692     
1693     if ($record_type eq 'biblio') {
1694         my ($bib_result, $bib_match, $item_result);
1695
1696         if ($overlay_status ne 'no_match') {
1697             $bib_match = GetBestRecordMatch($import_record_id);
1698             if ($overlay_action eq 'replace') {
1699                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1700             } elsif ($overlay_action eq 'create_new') {
1701                 $bib_result  = 'create_new';
1702             } elsif ($overlay_action eq 'ignore') {
1703                 $bib_result  = 'ignore';
1704             }
1705          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1706                 $item_result = 'create_new';
1707        }
1708       elsif($item_action eq 'replace'){
1709           $item_result = 'replace';
1710           }
1711       else {
1712              $item_result = 'ignore';
1713            }
1714         } else {
1715             $bib_result = $nomatch_action;
1716             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1717         }
1718         return ($bib_result, $item_result, $bib_match);
1719     } else { # must be auths
1720         my ($auth_result, $auth_match);
1721
1722         if ($overlay_status ne 'no_match') {
1723             $auth_match = GetBestRecordMatch($import_record_id);
1724             if ($overlay_action eq 'replace') {
1725                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1726             } elsif ($overlay_action eq 'create_new') {
1727                 $auth_result  = 'create_new';
1728             } elsif ($overlay_action eq 'ignore') {
1729                 $auth_result  = 'ignore';
1730             }
1731         } else {
1732             $auth_result = $nomatch_action;
1733         }
1734
1735         return ($auth_result, undef, $auth_match);
1736
1737     }
1738 }
1739
1740 sub _get_revert_action {
1741     my ($overlay_action, $overlay_status, $status) = @_;
1742
1743     my $bib_result;
1744
1745     if ($status eq 'ignored') {
1746         $bib_result = 'ignore';
1747     } else {
1748         if ($overlay_action eq 'create_new') {
1749             $bib_result = 'delete';
1750         } else {
1751             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1752         }
1753     }
1754     return $bib_result;
1755 }
1756
1757 1;
1758 __END__
1759
1760 =head1 AUTHOR
1761
1762 Koha Development Team <http://koha-community.org/>
1763
1764 Galen Charlton <galen.charlton@liblime.com>
1765
1766 =cut