Bug 27324: Use Koha.Preference() for intranetbookbag everywhere
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Items;
31 use Koha::Plugins::Handler;
32 use Koha::Logger;
33
34 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
35
36 BEGIN {
37         require Exporter;
38         @ISA    = qw(Exporter);
39         @EXPORT = qw(
40     GetZ3950BatchId
41     GetWebserviceBatchId
42     GetImportRecordMarc
43     GetImportRecordMarcXML
44     AddImportBatch
45     GetImportBatch
46     AddAuthToBatch
47     AddBiblioToBatch
48     AddItemsToImportBiblio
49     ModAuthorityInBatch
50     ModBiblioInBatch
51
52     BatchStageMarcRecords
53     BatchFindDuplicates
54     BatchCommitRecords
55     BatchRevertRecords
56     CleanBatch
57     DeleteBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     GetImportRecordMatches
82     SetImportRecordMatches
83         );
84 }
85
86 =head1 NAME
87
88 C4::ImportBatch - manage batches of imported MARC records
89
90 =head1 SYNOPSIS
91
92 use C4::ImportBatch;
93
94 =head1 FUNCTIONS
95
96 =head2 GetZ3950BatchId
97
98   my $batchid = GetZ3950BatchId($z3950server);
99
100 Retrieves the ID of the import batch for the Z39.50
101 reservoir for the given target.  If necessary,
102 creates the import batch.
103
104 =cut
105
106 sub GetZ3950BatchId {
107     my ($z3950server) = @_;
108
109     my $dbh = C4::Context->dbh;
110     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
111                              WHERE  batch_type = 'z3950'
112                              AND    file_name = ?");
113     $sth->execute($z3950server);
114     my $rowref = $sth->fetchrow_arrayref();
115     $sth->finish();
116     if (defined $rowref) {
117         return $rowref->[0];
118     } else {
119         my $batch_id = AddImportBatch( {
120                 overlay_action => 'create_new',
121                 import_status => 'staged',
122                 batch_type => 'z3950',
123                 file_name => $z3950server,
124             } );
125         return $batch_id;
126     }
127     
128 }
129
130 =head2 GetWebserviceBatchId
131
132   my $batchid = GetWebserviceBatchId();
133
134 Retrieves the ID of the import batch for webservice.
135 If necessary, creates the import batch.
136
137 =cut
138
139 my $WEBSERVICE_BASE_QRY = <<EOQ;
140 SELECT import_batch_id FROM import_batches
141 WHERE  batch_type = 'webservice'
142 AND    import_status = 'staged'
143 EOQ
144 sub GetWebserviceBatchId {
145     my ($params) = @_;
146
147     my $dbh = C4::Context->dbh;
148     my $sql = $WEBSERVICE_BASE_QRY;
149     my @args;
150     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
151         if (my $val = $params->{$field}) {
152             $sql .= " AND $field = ?";
153             push @args, $val;
154         }
155     }
156     my $id = $dbh->selectrow_array($sql, undef, @args);
157     return $id if $id;
158
159     $params->{batch_type} = 'webservice';
160     $params->{import_status} = 'staged';
161     return AddImportBatch($params);
162 }
163
164 =head2 GetImportRecordMarc
165
166   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
167
168 =cut
169
170 sub GetImportRecordMarc {
171     my ($import_record_id) = @_;
172
173     my $dbh = C4::Context->dbh;
174     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
175         SELECT marc, encoding
176         FROM import_records
177         WHERE import_record_id = ?
178     |, undef, $import_record_id );
179
180     return $marc, $encoding;
181 }
182
183 sub GetRecordFromImportBiblio {
184     my ( $import_record_id, $embed_items ) = @_;
185
186     my ($marc) = GetImportRecordMarc($import_record_id);
187     my $record = MARC::Record->new_from_usmarc($marc);
188
189     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
190
191     return $record;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 GetImportRecordMarcXML
213
214   my $marcxml = GetImportRecordMarcXML($import_record_id);
215
216 =cut
217
218 sub GetImportRecordMarcXML {
219     my ($import_record_id) = @_;
220
221     my $dbh = C4::Context->dbh;
222     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
223     $sth->execute($import_record_id);
224     my ($marcxml) = $sth->fetchrow();
225     $sth->finish();
226     return $marcxml;
227
228 }
229
230 =head2 AddImportBatch
231
232   my $batch_id = AddImportBatch($params_hash);
233
234 =cut
235
236 sub AddImportBatch {
237     my ($params) = @_;
238
239     my (@fields, @vals);
240     foreach (qw( matcher_id template_id branchcode
241                  overlay_action nomatch_action item_action
242                  import_status batch_type file_name comments record_type )) {
243         if (exists $params->{$_}) {
244             push @fields, $_;
245             push @vals, $params->{$_};
246         }
247     }
248     my $dbh = C4::Context->dbh;
249     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
250                                   VALUES (".join( ',', map '?', @fields).")",
251              undef,
252              @vals);
253     return $dbh->{'mysql_insertid'};
254 }
255
256 =head2 GetImportBatch 
257
258   my $row = GetImportBatch($batch_id);
259
260 Retrieve a hashref of an import_batches row.
261
262 =cut
263
264 sub GetImportBatch {
265     my ($batch_id) = @_;
266
267     my $dbh = C4::Context->dbh;
268     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
269     $sth->bind_param(1, $batch_id);
270     $sth->execute();
271     my $result = $sth->fetchrow_hashref;
272     $sth->finish();
273     return $result;
274
275 }
276
277 =head2 AddBiblioToBatch 
278
279   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
280                 $marc_record, $encoding, $update_counts);
281
282 =cut
283
284 sub AddBiblioToBatch {
285     my $batch_id = shift;
286     my $record_sequence = shift;
287     my $marc_record = shift;
288     my $encoding = shift;
289     my $update_counts = @_ ? shift : 1;
290
291     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
292     _add_biblio_fields($import_record_id, $marc_record);
293     _update_batch_record_counts($batch_id) if $update_counts;
294     return $import_record_id;
295 }
296
297 =head2 ModBiblioInBatch
298
299   ModBiblioInBatch($import_record_id, $marc_record);
300
301 =cut
302
303 sub ModBiblioInBatch {
304     my ($import_record_id, $marc_record) = @_;
305
306     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
307     _update_biblio_fields($import_record_id, $marc_record);
308
309 }
310
311 =head2 AddAuthToBatch
312
313   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
314                 $marc_record, $encoding, $update_counts, [$marc_type]);
315
316 =cut
317
318 sub AddAuthToBatch {
319     my $batch_id = shift;
320     my $record_sequence = shift;
321     my $marc_record = shift;
322     my $encoding = shift;
323     my $update_counts = @_ ? shift : 1;
324     my $marc_type = shift || C4::Context->preference('marcflavour');
325
326     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
327
328     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
329     _add_auth_fields($import_record_id, $marc_record);
330     _update_batch_record_counts($batch_id) if $update_counts;
331     return $import_record_id;
332 }
333
334 =head2 ModAuthInBatch
335
336   ModAuthInBatch($import_record_id, $marc_record);
337
338 =cut
339
340 sub ModAuthInBatch {
341     my ($import_record_id, $marc_record) = @_;
342
343     my $marcflavour = C4::Context->preference('marcflavour');
344     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
345
346 }
347
348 =head2 BatchStageMarcRecords
349
350 ( $batch_id, $num_records, $num_items, @invalid_records ) =
351   BatchStageMarcRecords(
352     $record_type,                $encoding,
353     $marc_records,               $file_name,
354     $marc_modification_template, $comments,
355     $branch_code,                $parse_items,
356     $leave_as_staging,           $progress_interval,
357     $progress_callback
358   );
359
360 =cut
361
362 sub BatchStageMarcRecords {
363     my $record_type = shift;
364     my $encoding = shift;
365     my $marc_records = shift;
366     my $file_name = shift;
367     my $marc_modification_template = shift;
368     my $comments = shift;
369     my $branch_code = shift;
370     my $parse_items = shift;
371     my $leave_as_staging = shift;
372
373     # optional callback to monitor status 
374     # of job
375     my $progress_interval = 0;
376     my $progress_callback = undef;
377     if ($#_ == 1) {
378         $progress_interval = shift;
379         $progress_callback = shift;
380         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
381         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
382     } 
383     
384     my $batch_id = AddImportBatch( {
385             overlay_action => 'create_new',
386             import_status => 'staging',
387             batch_type => 'batch',
388             file_name => $file_name,
389             comments => $comments,
390             record_type => $record_type,
391         } );
392     if ($parse_items) {
393         SetImportBatchItemAction($batch_id, 'always_add');
394     } else {
395         SetImportBatchItemAction($batch_id, 'ignore');
396     }
397
398
399     my $marc_type = C4::Context->preference('marcflavour');
400     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
401     my @invalid_records = ();
402     my $num_valid = 0;
403     my $num_items = 0;
404     # FIXME - for now, we're dealing only with bibs
405     my $rec_num = 0;
406     foreach my $marc_record (@$marc_records) {
407         $rec_num++;
408         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
409             &$progress_callback($rec_num);
410         }
411
412         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
413
414         my $import_record_id;
415         if (scalar($marc_record->fields()) == 0) {
416             push @invalid_records, $marc_record;
417         } else {
418
419             # Normalize the record so it doesn't have separated diacritics
420             SetUTF8Flag($marc_record);
421
422             $num_valid++;
423             if ($record_type eq 'biblio') {
424                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
425                 if ($parse_items) {
426                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
427                     $num_items += scalar(@import_items_ids);
428                 }
429             } elsif ($record_type eq 'auth') {
430                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
431             }
432         }
433     }
434     unless ($leave_as_staging) {
435         SetImportBatchStatus($batch_id, 'staged');
436     }
437     # FIXME branch_code, number of bibs, number of items
438     _update_batch_record_counts($batch_id);
439     return ($batch_id, $num_valid, $num_items, @invalid_records);
440 }
441
442 =head2 AddItemsToImportBiblio
443
444   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
445                 $import_record_id, $marc_record, $update_counts);
446
447 =cut
448
449 sub AddItemsToImportBiblio {
450     my $batch_id = shift;
451     my $import_record_id = shift;
452     my $marc_record = shift;
453     my $update_counts = @_ ? shift : 0;
454
455     my @import_items_ids = ();
456    
457     my $dbh = C4::Context->dbh; 
458     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
459     foreach my $item_field ($marc_record->field($item_tag)) {
460         my $item_marc = MARC::Record->new();
461         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
462         $item_marc->append_fields($item_field);
463         $marc_record->delete_field($item_field);
464         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
465                                         VALUES (?, ?, ?)");
466         $sth->bind_param(1, $import_record_id);
467         $sth->bind_param(2, 'staged');
468         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
469         $sth->execute();
470         push @import_items_ids, $dbh->{'mysql_insertid'};
471         $sth->finish();
472     }
473
474     if ($#import_items_ids > -1) {
475         _update_batch_record_counts($batch_id) if $update_counts;
476         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
477     }
478     return @import_items_ids;
479 }
480
481 =head2 BatchFindDuplicates
482
483   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
484              $max_matches, $progress_interval, $progress_callback);
485
486 Goes through the records loaded in the batch and attempts to 
487 find duplicates for each one.  Sets the matching status 
488 of each record to "no_match" or "auto_match" as appropriate.
489
490 The $max_matches parameter is optional; if it is not supplied,
491 it defaults to 10.
492
493 The $progress_interval and $progress_callback parameters are 
494 optional; if both are supplied, the sub referred to by
495 $progress_callback will be invoked every $progress_interval
496 records using the number of records processed as the 
497 singular argument.
498
499 =cut
500
501 sub BatchFindDuplicates {
502     my $batch_id = shift;
503     my $matcher = shift;
504     my $max_matches = @_ ? shift : 10;
505
506     # optional callback to monitor status 
507     # of job
508     my $progress_interval = 0;
509     my $progress_callback = undef;
510     if ($#_ == 1) {
511         $progress_interval = shift;
512         $progress_callback = shift;
513         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
514         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
515     }
516
517     my $dbh = C4::Context->dbh;
518
519     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
520                              FROM import_records
521                              WHERE import_batch_id = ?");
522     $sth->execute($batch_id);
523     my $num_with_matches = 0;
524     my $rec_num = 0;
525     while (my $rowref = $sth->fetchrow_hashref) {
526         $rec_num++;
527         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
528             &$progress_callback($rec_num);
529         }
530         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
531         my @matches = ();
532         if (defined $matcher) {
533             @matches = $matcher->get_matches($marc_record, $max_matches);
534         }
535         if (scalar(@matches) > 0) {
536             $num_with_matches++;
537             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
538             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
539         } else {
540             SetImportRecordMatches($rowref->{'import_record_id'}, ());
541             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
542         }
543     }
544     $sth->finish();
545     return $num_with_matches;
546 }
547
548 =head2 BatchCommitRecords
549
550   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
551         BatchCommitRecords($batch_id, $framework,
552         $progress_interval, $progress_callback);
553
554 =cut
555
556 sub BatchCommitRecords {
557     my $batch_id = shift;
558     my $framework = shift;
559
560     # optional callback to monitor status 
561     # of job
562     my $progress_interval = 0;
563     my $progress_callback = undef;
564     if ($#_ == 1) {
565         $progress_interval = shift;
566         $progress_callback = shift;
567         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
568         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
569     }
570
571     my $record_type;
572     my $num_added = 0;
573     my $num_updated = 0;
574     my $num_items_added = 0;
575     my $num_items_replaced = 0;
576     my $num_items_errored = 0;
577     my $num_ignored = 0;
578     # commit (i.e., save, all records in the batch)
579     SetImportBatchStatus('importing');
580     my $overlay_action = GetImportBatchOverlayAction($batch_id);
581     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
582     my $item_action = GetImportBatchItemAction($batch_id);
583     my $item_tag;
584     my $item_subfield;
585     my $dbh = C4::Context->dbh;
586     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
587                              FROM import_records
588                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
589                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
590                              WHERE import_batch_id = ?");
591     $sth->execute($batch_id);
592     my $marcflavour = C4::Context->preference('marcflavour');
593     my $rec_num = 0;
594     while (my $rowref = $sth->fetchrow_hashref) {
595         $record_type = $rowref->{'record_type'};
596         $rec_num++;
597         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
598             &$progress_callback($rec_num);
599         }
600         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
601             $num_ignored++;
602             next;
603         }
604
605         my $marc_type;
606         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
607             $marc_type = 'UNIMARCAUTH';
608         } elsif ($marcflavour eq 'UNIMARC') {
609             $marc_type = 'UNIMARC';
610         } else {
611             $marc_type = 'USMARC';
612         }
613         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
614
615         if ($record_type eq 'biblio') {
616             # remove any item tags - rely on BatchCommitItems
617             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
618             foreach my $item_field ($marc_record->field($item_tag)) {
619                 $marc_record->delete_field($item_field);
620             }
621         }
622
623         my ($record_result, $item_result, $record_match) =
624             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
625                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
626
627         my $recordid;
628         my $query;
629         if ($record_result eq 'create_new') {
630             $num_added++;
631             if ($record_type eq 'biblio') {
632                 my $biblioitemnumber;
633                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
634                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
635                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
636                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
637                     $num_items_added += $bib_items_added;
638                     $num_items_replaced += $bib_items_replaced;
639                     $num_items_errored += $bib_items_errored;
640                 }
641             } else {
642                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
643                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
644             }
645             my $sth = $dbh->prepare_cached($query);
646             $sth->execute($recordid, $rowref->{'import_record_id'});
647             $sth->finish();
648             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
649         } elsif ($record_result eq 'replace') {
650             $num_updated++;
651             $recordid = $record_match;
652             my $oldxml;
653             if ($record_type eq 'biblio') {
654                 my $oldbiblio = Koha::Biblios->find( $recordid );
655                 $oldxml = GetXmlBiblio($recordid);
656
657                 # remove item fields so that they don't get
658                 # added again if record is reverted
659                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
660                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
661                 foreach my $item_field ($old_marc->field($item_tag)) {
662                     $old_marc->delete_field($item_field);
663                 }
664                 $oldxml = $old_marc->as_xml($marc_type);
665
666                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
667                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
668
669                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
670                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
671                     $num_items_added += $bib_items_added;
672                     $num_items_replaced += $bib_items_replaced;
673                     $num_items_errored += $bib_items_errored;
674                 }
675             } else {
676                 $oldxml = GetAuthorityXML($recordid);
677
678                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
679                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
680             }
681             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
682             $sth->execute($oldxml, $rowref->{'import_record_id'});
683             $sth->finish();
684             my $sth2 = $dbh->prepare_cached($query);
685             $sth2->execute($recordid, $rowref->{'import_record_id'});
686             $sth2->finish();
687             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
688             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
689         } elsif ($record_result eq 'ignore') {
690             $recordid = $record_match;
691             $num_ignored++;
692             $recordid = $record_match;
693             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
694                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
695                 $num_items_added += $bib_items_added;
696          $num_items_replaced += $bib_items_replaced;
697                 $num_items_errored += $bib_items_errored;
698                 # still need to record the matched biblionumber so that the
699                 # items can be reverted
700                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
701                 $sth2->execute($recordid, $rowref->{'import_record_id'});
702                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
703             }
704             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
705         }
706     }
707     $sth->finish();
708     SetImportBatchStatus($batch_id, 'imported');
709     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
710 }
711
712 =head2 BatchCommitItems
713
714   ($num_items_added, $num_items_errored) = 
715          BatchCommitItems($import_record_id, $biblionumber);
716
717 =cut
718
719 sub BatchCommitItems {
720     my ( $import_record_id, $biblionumber, $action ) = @_;
721
722     my $dbh = C4::Context->dbh;
723
724     my $num_items_added = 0;
725     my $num_items_errored = 0;
726     my $num_items_replaced = 0;
727
728     my $sth = $dbh->prepare( "
729         SELECT import_items_id, import_items.marcxml, encoding
730         FROM import_items
731         JOIN import_records USING (import_record_id)
732         WHERE import_record_id = ?
733         ORDER BY import_items_id
734     " );
735     $sth->bind_param( 1, $import_record_id );
736     $sth->execute();
737
738     while ( my $row = $sth->fetchrow_hashref() ) {
739         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
740
741         # Delete date_due subfield as to not accidentally delete item checkout due dates
742         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
743         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
744
745         my $item = TransformMarcToKoha( $item_marc );
746
747         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
748         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
749
750         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
751         if ( $action eq "replace" && $duplicate_itemnumber ) {
752             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
753             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
754             $updsth->bind_param( 1, 'imported' );
755             $updsth->bind_param( 2, $item->{itemnumber} );
756             $updsth->bind_param( 3, $row->{'import_items_id'} );
757             $updsth->execute();
758             $updsth->finish();
759             $num_items_replaced++;
760         } elsif ( $action eq "replace" && $duplicate_barcode ) {
761             my $itemnumber = $duplicate_barcode->itemnumber;
762             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
763             $updsth->bind_param( 1, 'imported' );
764             $updsth->bind_param( 2, $item->{itemnumber} );
765             $updsth->bind_param( 3, $row->{'import_items_id'} );
766             $updsth->execute();
767             $updsth->finish();
768             $num_items_replaced++;
769         } elsif ($duplicate_barcode) {
770             $updsth->bind_param( 1, 'error' );
771             $updsth->bind_param( 2, 'duplicate item barcode' );
772             $updsth->bind_param( 3, $row->{'import_items_id'} );
773             $updsth->execute();
774             $num_items_errored++;
775         } else {
776             # Remove the itemnumber if it exists, we want to create a new item
777             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
778             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
779
780             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
781             if( $itemnumber ) {
782                 $updsth->bind_param( 1, 'imported' );
783                 $updsth->bind_param( 2, $itemnumber );
784                 $updsth->bind_param( 3, $row->{'import_items_id'} );
785                 $updsth->execute();
786                 $updsth->finish();
787                 $num_items_added++;
788             }
789         }
790     }
791
792     return ( $num_items_added, $num_items_replaced, $num_items_errored );
793 }
794
795 =head2 BatchRevertRecords
796
797   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
798       $num_ignored) = BatchRevertRecords($batch_id);
799
800 =cut
801
802 sub BatchRevertRecords {
803     my $batch_id = shift;
804
805     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
806
807     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
808
809     my $record_type;
810     my $num_deleted = 0;
811     my $num_errors = 0;
812     my $num_reverted = 0;
813     my $num_ignored = 0;
814     my $num_items_deleted = 0;
815     # commit (i.e., save, all records in the batch)
816     SetImportBatchStatus('reverting');
817     my $overlay_action = GetImportBatchOverlayAction($batch_id);
818     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
819     my $dbh = C4::Context->dbh;
820     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
821                              FROM import_records
822                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
823                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
824                              WHERE import_batch_id = ?");
825     $sth->execute($batch_id);
826     my $marc_type;
827     my $marcflavour = C4::Context->preference('marcflavour');
828     while (my $rowref = $sth->fetchrow_hashref) {
829         $record_type = $rowref->{'record_type'};
830         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
831             $num_ignored++;
832             next;
833         }
834         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
835             $marc_type = 'UNIMARCAUTH';
836         } elsif ($marcflavour eq 'UNIMARC') {
837             $marc_type = 'UNIMARC';
838         } else {
839             $marc_type = 'USMARC';
840         }
841
842         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
843
844         if ($record_result eq 'delete') {
845             my $error = undef;
846             if  ($record_type eq 'biblio') {
847                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
848                 $error = DelBiblio($rowref->{'matched_biblionumber'});
849             } else {
850                 DelAuthority({ authid => $rowref->{'matched_authid'} });
851             }
852             if (defined $error) {
853                 $num_errors++;
854             } else {
855                 $num_deleted++;
856                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
857             }
858         } elsif ($record_result eq 'restore') {
859             $num_reverted++;
860             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
861             if ($record_type eq 'biblio') {
862                 my $biblionumber = $rowref->{'matched_biblionumber'};
863                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
864
865                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
866                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
867
868                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
869                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
870             } else {
871                 my $authid = $rowref->{'matched_authid'};
872                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
873             }
874             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
875         } elsif ($record_result eq 'ignore') {
876             if ($record_type eq 'biblio') {
877                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
878             }
879             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
880         }
881         my $query;
882         if ($record_type eq 'biblio') {
883             # remove matched_biblionumber only if there is no 'imported' item left
884             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
885             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
886         } else {
887             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
888         }
889         my $sth2 = $dbh->prepare_cached($query);
890         $sth2->execute($rowref->{'import_record_id'});
891     }
892
893     $sth->finish();
894     SetImportBatchStatus($batch_id, 'reverted');
895     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
896 }
897
898 =head2 BatchRevertItems
899
900   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
901
902 =cut
903
904 sub BatchRevertItems {
905     my ($import_record_id, $biblionumber) = @_;
906
907     my $dbh = C4::Context->dbh;
908     my $num_items_deleted = 0;
909
910     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
911                                    FROM import_items
912                                    JOIN items USING (itemnumber)
913                                    WHERE import_record_id = ?");
914     $sth->bind_param(1, $import_record_id);
915     $sth->execute();
916     while (my $row = $sth->fetchrow_hashref()) {
917         my $item = Koha::Items->find($row->{itemnumber});
918         my $error = $item->safe_delete;
919         if ($error eq '1'){
920             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
921             $updsth->bind_param(1, 'reverted');
922             $updsth->bind_param(2, $row->{'import_items_id'});
923             $updsth->execute();
924             $updsth->finish();
925             $num_items_deleted++;
926         }
927         else {
928             next;
929         }
930     }
931     $sth->finish();
932     return $num_items_deleted;
933 }
934
935 =head2 CleanBatch
936
937   CleanBatch($batch_id)
938
939 Deletes all staged records from the import batch
940 and sets the status of the batch to 'cleaned'.  Note
941 that deleting a stage record does *not* affect
942 any record that has been committed to the database.
943
944 =cut
945
946 sub CleanBatch {
947     my $batch_id = shift;
948     return unless defined $batch_id;
949
950     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
951     SetImportBatchStatus($batch_id, 'cleaned');
952 }
953
954 =head2 DeleteBatch
955
956   DeleteBatch($batch_id)
957
958 Deletes the record from the database. This can only be done
959 once the batch has been cleaned.
960
961 =cut
962
963 sub DeleteBatch {
964     my $batch_id = shift;
965     return unless defined $batch_id;
966
967     my $dbh = C4::Context->dbh;
968     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
969     $sth->execute( $batch_id );
970 }
971
972 =head2 GetAllImportBatches
973
974   my $results = GetAllImportBatches();
975
976 Returns a references to an array of hash references corresponding
977 to all import_batches rows (of batch_type 'batch'), sorted in 
978 ascending order by import_batch_id.
979
980 =cut
981
982 sub  GetAllImportBatches {
983     my $dbh = C4::Context->dbh;
984     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
985                                     WHERE batch_type IN ('batch', 'webservice')
986                                     ORDER BY import_batch_id ASC");
987
988     my $results = [];
989     $sth->execute();
990     while (my $row = $sth->fetchrow_hashref) {
991         push @$results, $row;
992     }
993     $sth->finish();
994     return $results;
995 }
996
997 =head2 GetStagedWebserviceBatches
998
999   my $batch_ids = GetStagedWebserviceBatches();
1000
1001 Returns a references to an array of batch id's
1002 of batch_type 'webservice' that are not imported
1003
1004 =cut
1005
1006 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1007 SELECT import_batch_id FROM import_batches
1008 WHERE batch_type = 'webservice'
1009 AND import_status = 'staged'
1010 EOQ
1011 sub  GetStagedWebserviceBatches {
1012     my $dbh = C4::Context->dbh;
1013     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1014 }
1015
1016 =head2 GetImportBatchRangeDesc
1017
1018   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1019
1020 Returns a reference to an array of hash references corresponding to
1021 import_batches rows (sorted in descending order by import_batch_id)
1022 start at the given offset.
1023
1024 =cut
1025
1026 sub GetImportBatchRangeDesc {
1027     my ($offset, $results_per_group) = @_;
1028
1029     my $dbh = C4::Context->dbh;
1030     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1031                                     LEFT JOIN import_batch_profiles p
1032                                     ON b.profile_id = p.id
1033                                     WHERE b.batch_type IN ('batch', 'webservice')
1034                                     ORDER BY b.import_batch_id DESC";
1035     my @params;
1036     if ($results_per_group){
1037         $query .= " LIMIT ?";
1038         push(@params, $results_per_group);
1039     }
1040     if ($offset){
1041         $query .= " OFFSET ?";
1042         push(@params, $offset);
1043     }
1044     my $sth = $dbh->prepare_cached($query);
1045     $sth->execute(@params);
1046     my $results = $sth->fetchall_arrayref({});
1047     $sth->finish();
1048     return $results;
1049 }
1050
1051 =head2 GetItemNumbersFromImportBatch
1052
1053   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1054
1055 =cut
1056
1057 sub GetItemNumbersFromImportBatch {
1058     my ($batch_id) = @_;
1059     my $dbh = C4::Context->dbh;
1060     my $sql = q|
1061 SELECT itemnumber FROM import_items
1062 INNER JOIN items USING (itemnumber)
1063 INNER JOIN import_records USING (import_record_id)
1064 WHERE import_batch_id = ?|;
1065     my  $sth = $dbh->prepare( $sql );
1066     $sth->execute($batch_id);
1067     my @items ;
1068     while ( my ($itm) = $sth->fetchrow_array ) {
1069         push @items, $itm;
1070     }
1071     return @items;
1072 }
1073
1074 =head2 GetNumberOfImportBatches
1075
1076   my $count = GetNumberOfImportBatches();
1077
1078 =cut
1079
1080 sub GetNumberOfNonZ3950ImportBatches {
1081     my $dbh = C4::Context->dbh;
1082     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1083     $sth->execute();
1084     my ($count) = $sth->fetchrow_array();
1085     $sth->finish();
1086     return $count;
1087 }
1088
1089 =head2 GetImportBiblios
1090
1091   my $results = GetImportBiblios($importid);
1092
1093 =cut
1094
1095 sub GetImportBiblios {
1096     my ($import_record_id) = @_;
1097
1098     my $dbh = C4::Context->dbh;
1099     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1100     return $dbh->selectall_arrayref(
1101         $query,
1102         { Slice => {} },
1103         $import_record_id
1104     );
1105
1106 }
1107
1108 =head2 GetImportRecordsRange
1109
1110   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1111
1112 Returns a reference to an array of hash references corresponding to
1113 import_biblios/import_auths/import_records rows for a given batch
1114 starting at the given offset.
1115
1116 =cut
1117
1118 sub GetImportRecordsRange {
1119     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1120
1121     my $dbh = C4::Context->dbh;
1122
1123     my $order_by = $parameters->{order_by} || 'import_record_id';
1124     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1125
1126     my $order_by_direction =
1127       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1128
1129     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1130
1131     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1132                                            record_sequence, status, overlay_status,
1133                                            matched_biblionumber, matched_authid, record_type
1134                                     FROM   import_records
1135                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1136                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1137                                     WHERE  import_batch_id = ?";
1138     my @params;
1139     push(@params, $batch_id);
1140     if ($status) {
1141         $query .= " AND status=?";
1142         push(@params,$status);
1143     }
1144
1145     $query.=" ORDER BY $order_by $order_by_direction";
1146
1147     if($results_per_group){
1148         $query .= " LIMIT ?";
1149         push(@params, $results_per_group);
1150     }
1151     if($offset){
1152         $query .= " OFFSET ?";
1153         push(@params, $offset);
1154     }
1155     my $sth = $dbh->prepare_cached($query);
1156     $sth->execute(@params);
1157     my $results = $sth->fetchall_arrayref({});
1158     $sth->finish();
1159     return $results;
1160
1161 }
1162
1163 =head2 GetBestRecordMatch
1164
1165   my $record_id = GetBestRecordMatch($import_record_id);
1166
1167 =cut
1168
1169 sub GetBestRecordMatch {
1170     my ($import_record_id) = @_;
1171
1172     my $dbh = C4::Context->dbh;
1173     my $sth = $dbh->prepare("SELECT candidate_match_id
1174                              FROM   import_record_matches
1175                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1176                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1177                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1178                              WHERE  import_record_matches.import_record_id = ? AND
1179                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1180                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1181                              ORDER BY score DESC, candidate_match_id DESC");
1182     $sth->execute($import_record_id);
1183     my ($record_id) = $sth->fetchrow_array();
1184     $sth->finish();
1185     return $record_id;
1186 }
1187
1188 =head2 GetImportBatchStatus
1189
1190   my $status = GetImportBatchStatus($batch_id);
1191
1192 =cut
1193
1194 sub GetImportBatchStatus {
1195     my ($batch_id) = @_;
1196
1197     my $dbh = C4::Context->dbh;
1198     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1199     $sth->execute($batch_id);
1200     my ($status) = $sth->fetchrow_array();
1201     $sth->finish();
1202     return $status;
1203
1204 }
1205
1206 =head2 SetImportBatchStatus
1207
1208   SetImportBatchStatus($batch_id, $new_status);
1209
1210 =cut
1211
1212 sub SetImportBatchStatus {
1213     my ($batch_id, $new_status) = @_;
1214
1215     my $dbh = C4::Context->dbh;
1216     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1217     $sth->execute($new_status, $batch_id);
1218     $sth->finish();
1219
1220 }
1221
1222 =head2 GetImportBatchOverlayAction
1223
1224   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1225
1226 =cut
1227
1228 sub GetImportBatchOverlayAction {
1229     my ($batch_id) = @_;
1230
1231     my $dbh = C4::Context->dbh;
1232     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1233     $sth->execute($batch_id);
1234     my ($overlay_action) = $sth->fetchrow_array();
1235     $sth->finish();
1236     return $overlay_action;
1237
1238 }
1239
1240
1241 =head2 SetImportBatchOverlayAction
1242
1243   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1244
1245 =cut
1246
1247 sub SetImportBatchOverlayAction {
1248     my ($batch_id, $new_overlay_action) = @_;
1249
1250     my $dbh = C4::Context->dbh;
1251     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1252     $sth->execute($new_overlay_action, $batch_id);
1253     $sth->finish();
1254
1255 }
1256
1257 =head2 GetImportBatchNoMatchAction
1258
1259   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1260
1261 =cut
1262
1263 sub GetImportBatchNoMatchAction {
1264     my ($batch_id) = @_;
1265
1266     my $dbh = C4::Context->dbh;
1267     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1268     $sth->execute($batch_id);
1269     my ($nomatch_action) = $sth->fetchrow_array();
1270     $sth->finish();
1271     return $nomatch_action;
1272
1273 }
1274
1275
1276 =head2 SetImportBatchNoMatchAction
1277
1278   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1279
1280 =cut
1281
1282 sub SetImportBatchNoMatchAction {
1283     my ($batch_id, $new_nomatch_action) = @_;
1284
1285     my $dbh = C4::Context->dbh;
1286     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1287     $sth->execute($new_nomatch_action, $batch_id);
1288     $sth->finish();
1289
1290 }
1291
1292 =head2 GetImportBatchItemAction
1293
1294   my $item_action = GetImportBatchItemAction($batch_id);
1295
1296 =cut
1297
1298 sub GetImportBatchItemAction {
1299     my ($batch_id) = @_;
1300
1301     my $dbh = C4::Context->dbh;
1302     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1303     $sth->execute($batch_id);
1304     my ($item_action) = $sth->fetchrow_array();
1305     $sth->finish();
1306     return $item_action;
1307
1308 }
1309
1310
1311 =head2 SetImportBatchItemAction
1312
1313   SetImportBatchItemAction($batch_id, $new_item_action);
1314
1315 =cut
1316
1317 sub SetImportBatchItemAction {
1318     my ($batch_id, $new_item_action) = @_;
1319
1320     my $dbh = C4::Context->dbh;
1321     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1322     $sth->execute($new_item_action, $batch_id);
1323     $sth->finish();
1324
1325 }
1326
1327 =head2 GetImportBatchMatcher
1328
1329   my $matcher_id = GetImportBatchMatcher($batch_id);
1330
1331 =cut
1332
1333 sub GetImportBatchMatcher {
1334     my ($batch_id) = @_;
1335
1336     my $dbh = C4::Context->dbh;
1337     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1338     $sth->execute($batch_id);
1339     my ($matcher_id) = $sth->fetchrow_array();
1340     $sth->finish();
1341     return $matcher_id;
1342
1343 }
1344
1345
1346 =head2 SetImportBatchMatcher
1347
1348   SetImportBatchMatcher($batch_id, $new_matcher_id);
1349
1350 =cut
1351
1352 sub SetImportBatchMatcher {
1353     my ($batch_id, $new_matcher_id) = @_;
1354
1355     my $dbh = C4::Context->dbh;
1356     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1357     $sth->execute($new_matcher_id, $batch_id);
1358     $sth->finish();
1359
1360 }
1361
1362 =head2 GetImportRecordOverlayStatus
1363
1364   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1365
1366 =cut
1367
1368 sub GetImportRecordOverlayStatus {
1369     my ($import_record_id) = @_;
1370
1371     my $dbh = C4::Context->dbh;
1372     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1373     $sth->execute($import_record_id);
1374     my ($overlay_status) = $sth->fetchrow_array();
1375     $sth->finish();
1376     return $overlay_status;
1377
1378 }
1379
1380
1381 =head2 SetImportRecordOverlayStatus
1382
1383   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1384
1385 =cut
1386
1387 sub SetImportRecordOverlayStatus {
1388     my ($import_record_id, $new_overlay_status) = @_;
1389
1390     my $dbh = C4::Context->dbh;
1391     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1392     $sth->execute($new_overlay_status, $import_record_id);
1393     $sth->finish();
1394
1395 }
1396
1397 =head2 GetImportRecordStatus
1398
1399   my $status = GetImportRecordStatus($import_record_id);
1400
1401 =cut
1402
1403 sub GetImportRecordStatus {
1404     my ($import_record_id) = @_;
1405
1406     my $dbh = C4::Context->dbh;
1407     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1408     $sth->execute($import_record_id);
1409     my ($status) = $sth->fetchrow_array();
1410     $sth->finish();
1411     return $status;
1412
1413 }
1414
1415
1416 =head2 SetImportRecordStatus
1417
1418   SetImportRecordStatus($import_record_id, $new_status);
1419
1420 =cut
1421
1422 sub SetImportRecordStatus {
1423     my ($import_record_id, $new_status) = @_;
1424
1425     my $dbh = C4::Context->dbh;
1426     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1427     $sth->execute($new_status, $import_record_id);
1428     $sth->finish();
1429
1430 }
1431
1432 =head2 GetImportRecordMatches
1433
1434   my $results = GetImportRecordMatches($import_record_id, $best_only);
1435
1436 =cut
1437
1438 sub GetImportRecordMatches {
1439     my $import_record_id = shift;
1440     my $best_only = @_ ? shift : 0;
1441
1442     my $dbh = C4::Context->dbh;
1443     # FIXME currently biblio only
1444     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1445                                     candidate_match_id, score, record_type
1446                                     FROM import_records
1447                                     JOIN import_record_matches USING (import_record_id)
1448                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1449                                     WHERE import_record_id = ?
1450                                     ORDER BY score DESC, biblionumber DESC");
1451     $sth->bind_param(1, $import_record_id);
1452     my $results = [];
1453     $sth->execute();
1454     while (my $row = $sth->fetchrow_hashref) {
1455         if ($row->{'record_type'} eq 'auth') {
1456             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1457         }
1458         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1459         push @$results, $row;
1460         last if $best_only;
1461     }
1462     $sth->finish();
1463
1464     return $results;
1465     
1466 }
1467
1468 =head2 SetImportRecordMatches
1469
1470   SetImportRecordMatches($import_record_id, @matches);
1471
1472 =cut
1473
1474 sub SetImportRecordMatches {
1475     my $import_record_id = shift;
1476     my @matches = @_;
1477
1478     my $dbh = C4::Context->dbh;
1479     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1480     $delsth->execute($import_record_id);
1481     $delsth->finish();
1482
1483     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1484                                     VALUES (?, ?, ?)");
1485     foreach my $match (@matches) {
1486         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1487     }
1488 }
1489
1490 =head2 RecordsFromISO2709File
1491
1492     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1493
1494 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1495
1496 @PARAM1, String, absolute path to the ISO2709 file.
1497 @PARAM2, String, see stage_file.pl
1498 @PARAM3, String, should be utf8
1499
1500 Returns two array refs.
1501
1502 =cut
1503
1504 sub RecordsFromISO2709File {
1505     my ($input_file, $record_type, $encoding) = @_;
1506     my @errors;
1507
1508     my $marc_type = C4::Context->preference('marcflavour');
1509     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1510
1511     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1512     my @marc_records;
1513     $/ = "\035";
1514     while (<$fh>) {
1515         s/^\s+//;
1516         s/\s+$//;
1517         next unless $_; # skip if record has only whitespace, as might occur
1518                         # if file includes newlines between each MARC record
1519         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1520         push @marc_records, $marc_record;
1521         if ($charset_guessed ne $encoding) {
1522             push @errors,
1523                 "Unexpected charset $charset_guessed, expecting $encoding";
1524         }
1525     }
1526     close $fh;
1527     return ( \@errors, \@marc_records );
1528 }
1529
1530 =head2 RecordsFromMARCXMLFile
1531
1532     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1533
1534 Creates MARC::Record-objects out of the given MARCXML-file.
1535
1536 @PARAM1, String, absolute path to the ISO2709 file.
1537 @PARAM2, String, should be utf8
1538
1539 Returns two array refs.
1540
1541 =cut
1542
1543 sub RecordsFromMARCXMLFile {
1544     my ( $filename, $encoding ) = @_;
1545     my $batch = MARC::File::XML->in( $filename );
1546     my ( @marcRecords, @errors, $record );
1547     do {
1548         eval { $record = $batch->next( $encoding ); };
1549         if ($@) {
1550             push @errors, $@;
1551         }
1552         push @marcRecords, $record if $record;
1553     } while( $record );
1554     return (\@errors, \@marcRecords);
1555 }
1556
1557 =head2 RecordsFromMarcPlugin
1558
1559     Converts text of input_file into array of MARC records with to_marc plugin
1560
1561 =cut
1562
1563 sub RecordsFromMarcPlugin {
1564     my ($input_file, $plugin_class, $encoding) = @_;
1565     my ( $text, @return );
1566     return \@return if !$input_file || !$plugin_class;
1567
1568     # Read input file
1569     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1570     $/ = "\035";
1571     while (<$fh>) {
1572         s/^\s+//;
1573         s/\s+$//;
1574         next unless $_;
1575         $text .= $_;
1576     }
1577     close $fh;
1578
1579     # Convert to large MARC blob with plugin
1580     $text = Koha::Plugins::Handler->run({
1581         class  => $plugin_class,
1582         method => 'to_marc',
1583         params => { data => $text },
1584     }) if $text;
1585
1586     # Convert to array of MARC records
1587     if( $text ) {
1588         my $marc_type = C4::Context->preference('marcflavour');
1589         foreach my $blob ( split(/\x1D/, $text) ) {
1590             next if $blob =~ /^\s*$/;
1591             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1592             push @return, $marcrecord;
1593         }
1594     }
1595     return \@return;
1596 }
1597
1598 # internal functions
1599
1600 sub _create_import_record {
1601     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1602
1603     my $dbh = C4::Context->dbh;
1604     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1605                                                          record_type, encoding)
1606                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1607     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1608                   $record_type, $encoding);
1609     my $import_record_id = $dbh->{'mysql_insertid'};
1610     $sth->finish();
1611     return $import_record_id;
1612 }
1613
1614 sub _update_import_record_marc {
1615     my ($import_record_id, $marc_record, $marc_type) = @_;
1616
1617     my $dbh = C4::Context->dbh;
1618     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1619                              WHERE  import_record_id = ?");
1620     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1621     $sth->finish();
1622 }
1623
1624 sub _add_auth_fields {
1625     my ($import_record_id, $marc_record) = @_;
1626
1627     my $controlnumber;
1628     if ($marc_record->field('001')) {
1629         $controlnumber = $marc_record->field('001')->data();
1630     }
1631     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1632     my $dbh = C4::Context->dbh;
1633     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1634     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1635     $sth->finish();
1636 }
1637
1638 sub _add_biblio_fields {
1639     my ($import_record_id, $marc_record) = @_;
1640
1641     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1642     my $dbh = C4::Context->dbh;
1643     # FIXME no controlnumber, originalsource
1644     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1645     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1646     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1647     $sth->finish();
1648                 
1649 }
1650
1651 sub _update_biblio_fields {
1652     my ($import_record_id, $marc_record) = @_;
1653
1654     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1655     my $dbh = C4::Context->dbh;
1656     # FIXME no controlnumber, originalsource
1657     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1658     $isbn =~ s/\(.*$//;
1659     $isbn =~ tr/ -_//;
1660     $isbn = uc $isbn;
1661     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1662                              WHERE  import_record_id = ?");
1663     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1664     $sth->finish();
1665 }
1666
1667 sub _parse_biblio_fields {
1668     my ($marc_record) = @_;
1669
1670     my $dbh = C4::Context->dbh;
1671     my $bibliofields = TransformMarcToKoha($marc_record, '');
1672     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1673
1674 }
1675
1676 sub _update_batch_record_counts {
1677     my ($batch_id) = @_;
1678
1679     my $dbh = C4::Context->dbh;
1680     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1681                                         num_records = (
1682                                             SELECT COUNT(*)
1683                                             FROM import_records
1684                                             WHERE import_batch_id = import_batches.import_batch_id),
1685                                         num_items = (
1686                                             SELECT COUNT(*)
1687                                             FROM import_records
1688                                             JOIN import_items USING (import_record_id)
1689                                             WHERE import_batch_id = import_batches.import_batch_id
1690                                             AND record_type = 'biblio')
1691                                     WHERE import_batch_id = ?");
1692     $sth->bind_param(1, $batch_id);
1693     $sth->execute();
1694     $sth->finish();
1695 }
1696
1697 sub _get_commit_action {
1698     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1699     
1700     if ($record_type eq 'biblio') {
1701         my ($bib_result, $bib_match, $item_result);
1702
1703         if ($overlay_status ne 'no_match') {
1704             $bib_match = GetBestRecordMatch($import_record_id);
1705             if ($overlay_action eq 'replace') {
1706                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1707             } elsif ($overlay_action eq 'create_new') {
1708                 $bib_result  = 'create_new';
1709             } elsif ($overlay_action eq 'ignore') {
1710                 $bib_result  = 'ignore';
1711             }
1712          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1713                 $item_result = 'create_new';
1714        }
1715       elsif($item_action eq 'replace'){
1716           $item_result = 'replace';
1717           }
1718       else {
1719              $item_result = 'ignore';
1720            }
1721         } else {
1722             $bib_result = $nomatch_action;
1723             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1724         }
1725         return ($bib_result, $item_result, $bib_match);
1726     } else { # must be auths
1727         my ($auth_result, $auth_match);
1728
1729         if ($overlay_status ne 'no_match') {
1730             $auth_match = GetBestRecordMatch($import_record_id);
1731             if ($overlay_action eq 'replace') {
1732                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1733             } elsif ($overlay_action eq 'create_new') {
1734                 $auth_result  = 'create_new';
1735             } elsif ($overlay_action eq 'ignore') {
1736                 $auth_result  = 'ignore';
1737             }
1738         } else {
1739             $auth_result = $nomatch_action;
1740         }
1741
1742         return ($auth_result, undef, $auth_match);
1743
1744     }
1745 }
1746
1747 sub _get_revert_action {
1748     my ($overlay_action, $overlay_status, $status) = @_;
1749
1750     my $bib_result;
1751
1752     if ($status eq 'ignored') {
1753         $bib_result = 'ignore';
1754     } else {
1755         if ($overlay_action eq 'create_new') {
1756             $bib_result = 'delete';
1757         } else {
1758             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1759         }
1760     }
1761     return $bib_result;
1762 }
1763
1764 1;
1765 __END__
1766
1767 =head1 AUTHOR
1768
1769 Koha Development Team <http://koha-community.org/>
1770
1771 Galen Charlton <galen.charlton@liblime.com>
1772
1773 =cut