Bug 28092: Add reserve notes column to holds to pull
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Items;
31 use Koha::Plugins::Handler;
32 use Koha::Logger;
33
34 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
35
36 BEGIN {
37         require Exporter;
38         @ISA    = qw(Exporter);
39         @EXPORT = qw(
40     GetZ3950BatchId
41     GetWebserviceBatchId
42     GetImportRecordMarc
43     GetImportRecordMarcXML
44     AddImportBatch
45     GetImportBatch
46     AddAuthToBatch
47     AddBiblioToBatch
48     AddItemsToImportBiblio
49     ModAuthorityInBatch
50     ModBiblioInBatch
51
52     BatchStageMarcRecords
53     BatchFindDuplicates
54     BatchCommitRecords
55     BatchRevertRecords
56     CleanBatch
57     DeleteBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     SetMatchedBiblionumber
82     GetImportRecordMatches
83     SetImportRecordMatches
84         );
85 }
86
87 =head1 NAME
88
89 C4::ImportBatch - manage batches of imported MARC records
90
91 =head1 SYNOPSIS
92
93 use C4::ImportBatch;
94
95 =head1 FUNCTIONS
96
97 =head2 GetZ3950BatchId
98
99   my $batchid = GetZ3950BatchId($z3950server);
100
101 Retrieves the ID of the import batch for the Z39.50
102 reservoir for the given target.  If necessary,
103 creates the import batch.
104
105 =cut
106
107 sub GetZ3950BatchId {
108     my ($z3950server) = @_;
109
110     my $dbh = C4::Context->dbh;
111     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
112                              WHERE  batch_type = 'z3950'
113                              AND    file_name = ?");
114     $sth->execute($z3950server);
115     my $rowref = $sth->fetchrow_arrayref();
116     $sth->finish();
117     if (defined $rowref) {
118         return $rowref->[0];
119     } else {
120         my $batch_id = AddImportBatch( {
121                 overlay_action => 'create_new',
122                 import_status => 'staged',
123                 batch_type => 'z3950',
124                 file_name => $z3950server,
125             } );
126         return $batch_id;
127     }
128     
129 }
130
131 =head2 GetWebserviceBatchId
132
133   my $batchid = GetWebserviceBatchId();
134
135 Retrieves the ID of the import batch for webservice.
136 If necessary, creates the import batch.
137
138 =cut
139
140 my $WEBSERVICE_BASE_QRY = <<EOQ;
141 SELECT import_batch_id FROM import_batches
142 WHERE  batch_type = 'webservice'
143 AND    import_status = 'staged'
144 EOQ
145 sub GetWebserviceBatchId {
146     my ($params) = @_;
147
148     my $dbh = C4::Context->dbh;
149     my $sql = $WEBSERVICE_BASE_QRY;
150     my @args;
151     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
152         if (my $val = $params->{$field}) {
153             $sql .= " AND $field = ?";
154             push @args, $val;
155         }
156     }
157     my $id = $dbh->selectrow_array($sql, undef, @args);
158     return $id if $id;
159
160     $params->{batch_type} = 'webservice';
161     $params->{import_status} = 'staged';
162     return AddImportBatch($params);
163 }
164
165 =head2 GetImportRecordMarc
166
167   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
168
169 =cut
170
171 sub GetImportRecordMarc {
172     my ($import_record_id) = @_;
173
174     my $dbh = C4::Context->dbh;
175     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
176         SELECT marc, encoding
177         FROM import_records
178         WHERE import_record_id = ?
179     |, undef, $import_record_id );
180
181     return $marc, $encoding;
182 }
183
184 sub GetRecordFromImportBiblio {
185     my ( $import_record_id, $embed_items ) = @_;
186
187     my ($marc) = GetImportRecordMarc($import_record_id);
188     my $record = MARC::Record->new_from_usmarc($marc);
189
190     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
191
192     return $record;
193 }
194
195 sub EmbedItemsInImportBiblio {
196     my ( $record, $import_record_id ) = @_;
197     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
198     my $dbh = C4::Context->dbh;
199     my $import_items = $dbh->selectall_arrayref(q|
200         SELECT import_items.marcxml
201         FROM import_items
202         WHERE import_record_id = ?
203     |, { Slice => {} }, $import_record_id );
204     my @item_fields;
205     for my $import_item ( @$import_items ) {
206         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
207         push @item_fields, $item_marc->field($itemtag);
208     }
209     $record->append_fields(@item_fields);
210     return $record;
211 }
212
213 =head2 GetImportRecordMarcXML
214
215   my $marcxml = GetImportRecordMarcXML($import_record_id);
216
217 =cut
218
219 sub GetImportRecordMarcXML {
220     my ($import_record_id) = @_;
221
222     my $dbh = C4::Context->dbh;
223     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
224     $sth->execute($import_record_id);
225     my ($marcxml) = $sth->fetchrow();
226     $sth->finish();
227     return $marcxml;
228
229 }
230
231 =head2 AddImportBatch
232
233   my $batch_id = AddImportBatch($params_hash);
234
235 =cut
236
237 sub AddImportBatch {
238     my ($params) = @_;
239
240     my (@fields, @vals);
241     foreach (qw( matcher_id template_id branchcode
242                  overlay_action nomatch_action item_action
243                  import_status batch_type file_name comments record_type )) {
244         if (exists $params->{$_}) {
245             push @fields, $_;
246             push @vals, $params->{$_};
247         }
248     }
249     my $dbh = C4::Context->dbh;
250     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
251                                   VALUES (".join( ',', map '?', @fields).")",
252              undef,
253              @vals);
254     return $dbh->{'mysql_insertid'};
255 }
256
257 =head2 GetImportBatch 
258
259   my $row = GetImportBatch($batch_id);
260
261 Retrieve a hashref of an import_batches row.
262
263 =cut
264
265 sub GetImportBatch {
266     my ($batch_id) = @_;
267
268     my $dbh = C4::Context->dbh;
269     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
270     $sth->bind_param(1, $batch_id);
271     $sth->execute();
272     my $result = $sth->fetchrow_hashref;
273     $sth->finish();
274     return $result;
275
276 }
277
278 =head2 AddBiblioToBatch 
279
280   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
281                 $marc_record, $encoding, $update_counts);
282
283 =cut
284
285 sub AddBiblioToBatch {
286     my $batch_id = shift;
287     my $record_sequence = shift;
288     my $marc_record = shift;
289     my $encoding = shift;
290     my $update_counts = @_ ? shift : 1;
291
292     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
293     _add_biblio_fields($import_record_id, $marc_record);
294     _update_batch_record_counts($batch_id) if $update_counts;
295     return $import_record_id;
296 }
297
298 =head2 ModBiblioInBatch
299
300   ModBiblioInBatch($import_record_id, $marc_record);
301
302 =cut
303
304 sub ModBiblioInBatch {
305     my ($import_record_id, $marc_record) = @_;
306
307     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
308     _update_biblio_fields($import_record_id, $marc_record);
309
310 }
311
312 =head2 AddAuthToBatch
313
314   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
315                 $marc_record, $encoding, $update_counts, [$marc_type]);
316
317 =cut
318
319 sub AddAuthToBatch {
320     my $batch_id = shift;
321     my $record_sequence = shift;
322     my $marc_record = shift;
323     my $encoding = shift;
324     my $update_counts = @_ ? shift : 1;
325     my $marc_type = shift || C4::Context->preference('marcflavour');
326
327     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
328
329     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
330     _add_auth_fields($import_record_id, $marc_record);
331     _update_batch_record_counts($batch_id) if $update_counts;
332     return $import_record_id;
333 }
334
335 =head2 ModAuthInBatch
336
337   ModAuthInBatch($import_record_id, $marc_record);
338
339 =cut
340
341 sub ModAuthInBatch {
342     my ($import_record_id, $marc_record) = @_;
343
344     my $marcflavour = C4::Context->preference('marcflavour');
345     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
346
347 }
348
349 =head2 BatchStageMarcRecords
350
351 ( $batch_id, $num_records, $num_items, @invalid_records ) =
352   BatchStageMarcRecords(
353     $record_type,                $encoding,
354     $marc_records,               $file_name,
355     $marc_modification_template, $comments,
356     $branch_code,                $parse_items,
357     $leave_as_staging,           $progress_interval,
358     $progress_callback
359   );
360
361 =cut
362
363 sub BatchStageMarcRecords {
364     my $record_type = shift;
365     my $encoding = shift;
366     my $marc_records = shift;
367     my $file_name = shift;
368     my $marc_modification_template = shift;
369     my $comments = shift;
370     my $branch_code = shift;
371     my $parse_items = shift;
372     my $leave_as_staging = shift;
373
374     # optional callback to monitor status 
375     # of job
376     my $progress_interval = 0;
377     my $progress_callback = undef;
378     if ($#_ == 1) {
379         $progress_interval = shift;
380         $progress_callback = shift;
381         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
382         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
383     } 
384     
385     my $batch_id = AddImportBatch( {
386             overlay_action => 'create_new',
387             import_status => 'staging',
388             batch_type => 'batch',
389             file_name => $file_name,
390             comments => $comments,
391             record_type => $record_type,
392         } );
393     if ($parse_items) {
394         SetImportBatchItemAction($batch_id, 'always_add');
395     } else {
396         SetImportBatchItemAction($batch_id, 'ignore');
397     }
398
399
400     my $marc_type = C4::Context->preference('marcflavour');
401     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
402     my @invalid_records = ();
403     my $num_valid = 0;
404     my $num_items = 0;
405     # FIXME - for now, we're dealing only with bibs
406     my $rec_num = 0;
407     foreach my $marc_record (@$marc_records) {
408         $rec_num++;
409         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
410             &$progress_callback($rec_num);
411         }
412
413         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
414
415         my $import_record_id;
416         if (scalar($marc_record->fields()) == 0) {
417             push @invalid_records, $marc_record;
418         } else {
419
420             # Normalize the record so it doesn't have separated diacritics
421             SetUTF8Flag($marc_record);
422
423             $num_valid++;
424             if ($record_type eq 'biblio') {
425                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
426                 if ($parse_items) {
427                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
428                     $num_items += scalar(@import_items_ids);
429                 }
430             } elsif ($record_type eq 'auth') {
431                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
432             }
433         }
434     }
435     unless ($leave_as_staging) {
436         SetImportBatchStatus($batch_id, 'staged');
437     }
438     # FIXME branch_code, number of bibs, number of items
439     _update_batch_record_counts($batch_id);
440     return ($batch_id, $num_valid, $num_items, @invalid_records);
441 }
442
443 =head2 AddItemsToImportBiblio
444
445   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
446                 $import_record_id, $marc_record, $update_counts);
447
448 =cut
449
450 sub AddItemsToImportBiblio {
451     my $batch_id = shift;
452     my $import_record_id = shift;
453     my $marc_record = shift;
454     my $update_counts = @_ ? shift : 0;
455
456     my @import_items_ids = ();
457    
458     my $dbh = C4::Context->dbh; 
459     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
460     foreach my $item_field ($marc_record->field($item_tag)) {
461         my $item_marc = MARC::Record->new();
462         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
463         $item_marc->append_fields($item_field);
464         $marc_record->delete_field($item_field);
465         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
466                                         VALUES (?, ?, ?)");
467         $sth->bind_param(1, $import_record_id);
468         $sth->bind_param(2, 'staged');
469         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
470         $sth->execute();
471         push @import_items_ids, $dbh->{'mysql_insertid'};
472         $sth->finish();
473     }
474
475     if ($#import_items_ids > -1) {
476         _update_batch_record_counts($batch_id) if $update_counts;
477         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
478     }
479     return @import_items_ids;
480 }
481
482 =head2 BatchFindDuplicates
483
484   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
485              $max_matches, $progress_interval, $progress_callback);
486
487 Goes through the records loaded in the batch and attempts to 
488 find duplicates for each one.  Sets the matching status 
489 of each record to "no_match" or "auto_match" as appropriate.
490
491 The $max_matches parameter is optional; if it is not supplied,
492 it defaults to 10.
493
494 The $progress_interval and $progress_callback parameters are 
495 optional; if both are supplied, the sub referred to by
496 $progress_callback will be invoked every $progress_interval
497 records using the number of records processed as the 
498 singular argument.
499
500 =cut
501
502 sub BatchFindDuplicates {
503     my $batch_id = shift;
504     my $matcher = shift;
505     my $max_matches = @_ ? shift : 10;
506
507     # optional callback to monitor status 
508     # of job
509     my $progress_interval = 0;
510     my $progress_callback = undef;
511     if ($#_ == 1) {
512         $progress_interval = shift;
513         $progress_callback = shift;
514         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
515         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
516     }
517
518     my $dbh = C4::Context->dbh;
519
520     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
521                              FROM import_records
522                              WHERE import_batch_id = ?");
523     $sth->execute($batch_id);
524     my $num_with_matches = 0;
525     my $rec_num = 0;
526     while (my $rowref = $sth->fetchrow_hashref) {
527         $rec_num++;
528         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
529             &$progress_callback($rec_num);
530         }
531         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
532         my @matches = ();
533         if (defined $matcher) {
534             @matches = $matcher->get_matches($marc_record, $max_matches);
535         }
536         if (scalar(@matches) > 0) {
537             $num_with_matches++;
538             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
539             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
540         } else {
541             SetImportRecordMatches($rowref->{'import_record_id'}, ());
542             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
543         }
544     }
545     $sth->finish();
546     return $num_with_matches;
547 }
548
549 =head2 BatchCommitRecords
550
551   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
552         BatchCommitRecords($batch_id, $framework,
553         $progress_interval, $progress_callback);
554
555 =cut
556
557 sub BatchCommitRecords {
558     my $batch_id = shift;
559     my $framework = shift;
560
561     # optional callback to monitor status 
562     # of job
563     my $progress_interval = 0;
564     my $progress_callback = undef;
565     if ($#_ == 1) {
566         $progress_interval = shift;
567         $progress_callback = shift;
568         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
569         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
570     }
571
572     my $record_type;
573     my $num_added = 0;
574     my $num_updated = 0;
575     my $num_items_added = 0;
576     my $num_items_replaced = 0;
577     my $num_items_errored = 0;
578     my $num_ignored = 0;
579     # commit (i.e., save, all records in the batch)
580     SetImportBatchStatus($batch_id, 'importing');
581     my $overlay_action = GetImportBatchOverlayAction($batch_id);
582     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
583     my $item_action = GetImportBatchItemAction($batch_id);
584     my $item_tag;
585     my $item_subfield;
586     my $dbh = C4::Context->dbh;
587     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
588                              FROM import_records
589                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
590                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
591                              WHERE import_batch_id = ?");
592     $sth->execute($batch_id);
593     my $marcflavour = C4::Context->preference('marcflavour');
594     my $rec_num = 0;
595     while (my $rowref = $sth->fetchrow_hashref) {
596         $record_type = $rowref->{'record_type'};
597         $rec_num++;
598         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
599             &$progress_callback($rec_num);
600         }
601         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
602             $num_ignored++;
603             next;
604         }
605
606         my $marc_type;
607         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
608             $marc_type = 'UNIMARCAUTH';
609         } elsif ($marcflavour eq 'UNIMARC') {
610             $marc_type = 'UNIMARC';
611         } else {
612             $marc_type = 'USMARC';
613         }
614         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
615
616         if ($record_type eq 'biblio') {
617             # remove any item tags - rely on BatchCommitItems
618             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
619             foreach my $item_field ($marc_record->field($item_tag)) {
620                 $marc_record->delete_field($item_field);
621             }
622         }
623
624         my ($record_result, $item_result, $record_match) =
625             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
626                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
627
628         my $recordid;
629         my $query;
630         if ($record_result eq 'create_new') {
631             $num_added++;
632             if ($record_type eq 'biblio') {
633                 my $biblioitemnumber;
634                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
635                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
636                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
637                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
638                     $num_items_added += $bib_items_added;
639                     $num_items_replaced += $bib_items_replaced;
640                     $num_items_errored += $bib_items_errored;
641                 }
642             } else {
643                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
644                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
645             }
646             my $sth = $dbh->prepare_cached($query);
647             $sth->execute($recordid, $rowref->{'import_record_id'});
648             $sth->finish();
649             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
650         } elsif ($record_result eq 'replace') {
651             $num_updated++;
652             $recordid = $record_match;
653             my $oldxml;
654             if ($record_type eq 'biblio') {
655                 my $oldbiblio = Koha::Biblios->find( $recordid );
656                 $oldxml = GetXmlBiblio($recordid);
657
658                 # remove item fields so that they don't get
659                 # added again if record is reverted
660                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
661                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
662                 foreach my $item_field ($old_marc->field($item_tag)) {
663                     $old_marc->delete_field($item_field);
664                 }
665                 $oldxml = $old_marc->as_xml($marc_type);
666
667                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
668                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
669
670                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
671                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
672                     $num_items_added += $bib_items_added;
673                     $num_items_replaced += $bib_items_replaced;
674                     $num_items_errored += $bib_items_errored;
675                 }
676             } else {
677                 $oldxml = GetAuthorityXML($recordid);
678
679                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
680                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
681             }
682             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
683             $sth->execute($oldxml, $rowref->{'import_record_id'});
684             $sth->finish();
685             my $sth2 = $dbh->prepare_cached($query);
686             $sth2->execute($recordid, $rowref->{'import_record_id'});
687             $sth2->finish();
688             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
689             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
690         } elsif ($record_result eq 'ignore') {
691             $recordid = $record_match;
692             $num_ignored++;
693             $recordid = $record_match;
694             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
695                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
696                 $num_items_added += $bib_items_added;
697          $num_items_replaced += $bib_items_replaced;
698                 $num_items_errored += $bib_items_errored;
699                 # still need to record the matched biblionumber so that the
700                 # items can be reverted
701                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
702                 $sth2->execute($recordid, $rowref->{'import_record_id'});
703                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
704             }
705             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
706         }
707     }
708     $sth->finish();
709     SetImportBatchStatus($batch_id, 'imported');
710     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
711 }
712
713 =head2 BatchCommitItems
714
715   ($num_items_added, $num_items_errored) = 
716          BatchCommitItems($import_record_id, $biblionumber);
717
718 =cut
719
720 sub BatchCommitItems {
721     my ( $import_record_id, $biblionumber, $action ) = @_;
722
723     my $dbh = C4::Context->dbh;
724
725     my $num_items_added = 0;
726     my $num_items_errored = 0;
727     my $num_items_replaced = 0;
728
729     my $sth = $dbh->prepare( "
730         SELECT import_items_id, import_items.marcxml, encoding
731         FROM import_items
732         JOIN import_records USING (import_record_id)
733         WHERE import_record_id = ?
734         ORDER BY import_items_id
735     " );
736     $sth->bind_param( 1, $import_record_id );
737     $sth->execute();
738
739     while ( my $row = $sth->fetchrow_hashref() ) {
740         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
741
742         # Delete date_due subfield as to not accidentally delete item checkout due dates
743         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
744         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
745
746         my $item = TransformMarcToKoha( $item_marc );
747
748         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
749         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
750
751         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
752         if ( $action eq "replace" && $duplicate_itemnumber ) {
753             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
754             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
755             $updsth->bind_param( 1, 'imported' );
756             $updsth->bind_param( 2, $item->{itemnumber} );
757             $updsth->bind_param( 3, $row->{'import_items_id'} );
758             $updsth->execute();
759             $updsth->finish();
760             $num_items_replaced++;
761         } elsif ( $action eq "replace" && $duplicate_barcode ) {
762             my $itemnumber = $duplicate_barcode->itemnumber;
763             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
764             $updsth->bind_param( 1, 'imported' );
765             $updsth->bind_param( 2, $item->{itemnumber} );
766             $updsth->bind_param( 3, $row->{'import_items_id'} );
767             $updsth->execute();
768             $updsth->finish();
769             $num_items_replaced++;
770         } elsif ($duplicate_barcode) {
771             $updsth->bind_param( 1, 'error' );
772             $updsth->bind_param( 2, 'duplicate item barcode' );
773             $updsth->bind_param( 3, $row->{'import_items_id'} );
774             $updsth->execute();
775             $num_items_errored++;
776         } else {
777             # Remove the itemnumber if it exists, we want to create a new item
778             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
779             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
780
781             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
782             if( $itemnumber ) {
783                 $updsth->bind_param( 1, 'imported' );
784                 $updsth->bind_param( 2, $itemnumber );
785                 $updsth->bind_param( 3, $row->{'import_items_id'} );
786                 $updsth->execute();
787                 $updsth->finish();
788                 $num_items_added++;
789             }
790         }
791     }
792
793     return ( $num_items_added, $num_items_replaced, $num_items_errored );
794 }
795
796 =head2 BatchRevertRecords
797
798   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
799       $num_ignored) = BatchRevertRecords($batch_id);
800
801 =cut
802
803 sub BatchRevertRecords {
804     my $batch_id = shift;
805
806     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
807
808     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
809
810     my $record_type;
811     my $num_deleted = 0;
812     my $num_errors = 0;
813     my $num_reverted = 0;
814     my $num_ignored = 0;
815     my $num_items_deleted = 0;
816     # commit (i.e., save, all records in the batch)
817     SetImportBatchStatus($batch_id, 'reverting');
818     my $overlay_action = GetImportBatchOverlayAction($batch_id);
819     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
820     my $dbh = C4::Context->dbh;
821     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
822                              FROM import_records
823                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
824                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
825                              WHERE import_batch_id = ?");
826     $sth->execute($batch_id);
827     my $marc_type;
828     my $marcflavour = C4::Context->preference('marcflavour');
829     while (my $rowref = $sth->fetchrow_hashref) {
830         $record_type = $rowref->{'record_type'};
831         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
832             $num_ignored++;
833             next;
834         }
835         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
836             $marc_type = 'UNIMARCAUTH';
837         } elsif ($marcflavour eq 'UNIMARC') {
838             $marc_type = 'UNIMARC';
839         } else {
840             $marc_type = 'USMARC';
841         }
842
843         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
844
845         if ($record_result eq 'delete') {
846             my $error = undef;
847             if  ($record_type eq 'biblio') {
848                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
849                 $error = DelBiblio($rowref->{'matched_biblionumber'});
850             } else {
851                 DelAuthority({ authid => $rowref->{'matched_authid'} });
852             }
853             if (defined $error) {
854                 $num_errors++;
855             } else {
856                 $num_deleted++;
857                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
858             }
859         } elsif ($record_result eq 'restore') {
860             $num_reverted++;
861             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
862             if ($record_type eq 'biblio') {
863                 my $biblionumber = $rowref->{'matched_biblionumber'};
864                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
865
866                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
867                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
868
869                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
870                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
871             } else {
872                 my $authid = $rowref->{'matched_authid'};
873                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
874             }
875             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
876         } elsif ($record_result eq 'ignore') {
877             if ($record_type eq 'biblio') {
878                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
879             }
880             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
881         }
882         my $query;
883         if ($record_type eq 'biblio') {
884             # remove matched_biblionumber only if there is no 'imported' item left
885             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
886             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
887         } else {
888             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
889         }
890         my $sth2 = $dbh->prepare_cached($query);
891         $sth2->execute($rowref->{'import_record_id'});
892     }
893
894     $sth->finish();
895     SetImportBatchStatus($batch_id, 'reverted');
896     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
897 }
898
899 =head2 BatchRevertItems
900
901   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
902
903 =cut
904
905 sub BatchRevertItems {
906     my ($import_record_id, $biblionumber) = @_;
907
908     my $dbh = C4::Context->dbh;
909     my $num_items_deleted = 0;
910
911     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
912                                    FROM import_items
913                                    JOIN items USING (itemnumber)
914                                    WHERE import_record_id = ?");
915     $sth->bind_param(1, $import_record_id);
916     $sth->execute();
917     while (my $row = $sth->fetchrow_hashref()) {
918         my $item = Koha::Items->find($row->{itemnumber});
919         my $error = $item->safe_delete;
920         if ($error eq '1'){
921             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
922             $updsth->bind_param(1, 'reverted');
923             $updsth->bind_param(2, $row->{'import_items_id'});
924             $updsth->execute();
925             $updsth->finish();
926             $num_items_deleted++;
927         }
928         else {
929             next;
930         }
931     }
932     $sth->finish();
933     return $num_items_deleted;
934 }
935
936 =head2 CleanBatch
937
938   CleanBatch($batch_id)
939
940 Deletes all staged records from the import batch
941 and sets the status of the batch to 'cleaned'.  Note
942 that deleting a stage record does *not* affect
943 any record that has been committed to the database.
944
945 =cut
946
947 sub CleanBatch {
948     my $batch_id = shift;
949     return unless defined $batch_id;
950
951     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
952     SetImportBatchStatus($batch_id, 'cleaned');
953 }
954
955 =head2 DeleteBatch
956
957   DeleteBatch($batch_id)
958
959 Deletes the record from the database. This can only be done
960 once the batch has been cleaned.
961
962 =cut
963
964 sub DeleteBatch {
965     my $batch_id = shift;
966     return unless defined $batch_id;
967
968     my $dbh = C4::Context->dbh;
969     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
970     $sth->execute( $batch_id );
971 }
972
973 =head2 GetAllImportBatches
974
975   my $results = GetAllImportBatches();
976
977 Returns a references to an array of hash references corresponding
978 to all import_batches rows (of batch_type 'batch'), sorted in 
979 ascending order by import_batch_id.
980
981 =cut
982
983 sub  GetAllImportBatches {
984     my $dbh = C4::Context->dbh;
985     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
986                                     WHERE batch_type IN ('batch', 'webservice')
987                                     ORDER BY import_batch_id ASC");
988
989     my $results = [];
990     $sth->execute();
991     while (my $row = $sth->fetchrow_hashref) {
992         push @$results, $row;
993     }
994     $sth->finish();
995     return $results;
996 }
997
998 =head2 GetStagedWebserviceBatches
999
1000   my $batch_ids = GetStagedWebserviceBatches();
1001
1002 Returns a references to an array of batch id's
1003 of batch_type 'webservice' that are not imported
1004
1005 =cut
1006
1007 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1008 SELECT import_batch_id FROM import_batches
1009 WHERE batch_type = 'webservice'
1010 AND import_status = 'staged'
1011 EOQ
1012 sub  GetStagedWebserviceBatches {
1013     my $dbh = C4::Context->dbh;
1014     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1015 }
1016
1017 =head2 GetImportBatchRangeDesc
1018
1019   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1020
1021 Returns a reference to an array of hash references corresponding to
1022 import_batches rows (sorted in descending order by import_batch_id)
1023 start at the given offset.
1024
1025 =cut
1026
1027 sub GetImportBatchRangeDesc {
1028     my ($offset, $results_per_group) = @_;
1029
1030     my $dbh = C4::Context->dbh;
1031     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1032                                     LEFT JOIN import_batch_profiles p
1033                                     ON b.profile_id = p.id
1034                                     WHERE b.batch_type IN ('batch', 'webservice')
1035                                     ORDER BY b.import_batch_id DESC";
1036     my @params;
1037     if ($results_per_group){
1038         $query .= " LIMIT ?";
1039         push(@params, $results_per_group);
1040     }
1041     if ($offset){
1042         $query .= " OFFSET ?";
1043         push(@params, $offset);
1044     }
1045     my $sth = $dbh->prepare_cached($query);
1046     $sth->execute(@params);
1047     my $results = $sth->fetchall_arrayref({});
1048     $sth->finish();
1049     return $results;
1050 }
1051
1052 =head2 GetItemNumbersFromImportBatch
1053
1054   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1055
1056 =cut
1057
1058 sub GetItemNumbersFromImportBatch {
1059     my ($batch_id) = @_;
1060     my $dbh = C4::Context->dbh;
1061     my $sql = q|
1062 SELECT itemnumber FROM import_items
1063 INNER JOIN items USING (itemnumber)
1064 INNER JOIN import_records USING (import_record_id)
1065 WHERE import_batch_id = ?|;
1066     my  $sth = $dbh->prepare( $sql );
1067     $sth->execute($batch_id);
1068     my @items ;
1069     while ( my ($itm) = $sth->fetchrow_array ) {
1070         push @items, $itm;
1071     }
1072     return @items;
1073 }
1074
1075 =head2 GetNumberOfImportBatches
1076
1077   my $count = GetNumberOfImportBatches();
1078
1079 =cut
1080
1081 sub GetNumberOfNonZ3950ImportBatches {
1082     my $dbh = C4::Context->dbh;
1083     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1084     $sth->execute();
1085     my ($count) = $sth->fetchrow_array();
1086     $sth->finish();
1087     return $count;
1088 }
1089
1090 =head2 GetImportBiblios
1091
1092   my $results = GetImportBiblios($importid);
1093
1094 =cut
1095
1096 sub GetImportBiblios {
1097     my ($import_record_id) = @_;
1098
1099     my $dbh = C4::Context->dbh;
1100     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1101     return $dbh->selectall_arrayref(
1102         $query,
1103         { Slice => {} },
1104         $import_record_id
1105     );
1106
1107 }
1108
1109 =head2 GetImportRecordsRange
1110
1111   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1112
1113 Returns a reference to an array of hash references corresponding to
1114 import_biblios/import_auths/import_records rows for a given batch
1115 starting at the given offset.
1116
1117 =cut
1118
1119 sub GetImportRecordsRange {
1120     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1121
1122     my $dbh = C4::Context->dbh;
1123
1124     my $order_by = $parameters->{order_by} || 'import_record_id';
1125     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1126
1127     my $order_by_direction =
1128       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1129
1130     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1131
1132     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1133                                            record_sequence, status, overlay_status,
1134                                            matched_biblionumber, matched_authid, record_type
1135                                     FROM   import_records
1136                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1137                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1138                                     WHERE  import_batch_id = ?";
1139     my @params;
1140     push(@params, $batch_id);
1141     if ($status) {
1142         $query .= " AND status=?";
1143         push(@params,$status);
1144     }
1145
1146     $query.=" ORDER BY $order_by $order_by_direction";
1147
1148     if($results_per_group){
1149         $query .= " LIMIT ?";
1150         push(@params, $results_per_group);
1151     }
1152     if($offset){
1153         $query .= " OFFSET ?";
1154         push(@params, $offset);
1155     }
1156     my $sth = $dbh->prepare_cached($query);
1157     $sth->execute(@params);
1158     my $results = $sth->fetchall_arrayref({});
1159     $sth->finish();
1160     return $results;
1161
1162 }
1163
1164 =head2 GetBestRecordMatch
1165
1166   my $record_id = GetBestRecordMatch($import_record_id);
1167
1168 =cut
1169
1170 sub GetBestRecordMatch {
1171     my ($import_record_id) = @_;
1172
1173     my $dbh = C4::Context->dbh;
1174     my $sth = $dbh->prepare("SELECT candidate_match_id
1175                              FROM   import_record_matches
1176                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1177                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1178                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1179                              WHERE  import_record_matches.import_record_id = ? AND
1180                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1181                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1182                              ORDER BY score DESC, candidate_match_id DESC");
1183     $sth->execute($import_record_id);
1184     my ($record_id) = $sth->fetchrow_array();
1185     $sth->finish();
1186     return $record_id;
1187 }
1188
1189 =head2 GetImportBatchStatus
1190
1191   my $status = GetImportBatchStatus($batch_id);
1192
1193 =cut
1194
1195 sub GetImportBatchStatus {
1196     my ($batch_id) = @_;
1197
1198     my $dbh = C4::Context->dbh;
1199     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1200     $sth->execute($batch_id);
1201     my ($status) = $sth->fetchrow_array();
1202     $sth->finish();
1203     return $status;
1204
1205 }
1206
1207 =head2 SetImportBatchStatus
1208
1209   SetImportBatchStatus($batch_id, $new_status);
1210
1211 =cut
1212
1213 sub SetImportBatchStatus {
1214     my ($batch_id, $new_status) = @_;
1215
1216     my $dbh = C4::Context->dbh;
1217     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1218     $sth->execute($new_status, $batch_id);
1219     $sth->finish();
1220
1221 }
1222
1223 =head2 SetMatchedBiblionumber
1224
1225   SetMatchedBiblionumber($import_record_id, $biblionumber);
1226
1227 =cut
1228
1229 sub SetMatchedBiblionumber {
1230     my ($import_record_id, $biblionumber) = @_;
1231
1232     my $dbh = C4::Context->dbh;
1233     $dbh->do(
1234         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1235         undef, $biblionumber, $import_record_id
1236     );
1237 }
1238
1239 =head2 GetImportBatchOverlayAction
1240
1241   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1242
1243 =cut
1244
1245 sub GetImportBatchOverlayAction {
1246     my ($batch_id) = @_;
1247
1248     my $dbh = C4::Context->dbh;
1249     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1250     $sth->execute($batch_id);
1251     my ($overlay_action) = $sth->fetchrow_array();
1252     $sth->finish();
1253     return $overlay_action;
1254
1255 }
1256
1257
1258 =head2 SetImportBatchOverlayAction
1259
1260   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1261
1262 =cut
1263
1264 sub SetImportBatchOverlayAction {
1265     my ($batch_id, $new_overlay_action) = @_;
1266
1267     my $dbh = C4::Context->dbh;
1268     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1269     $sth->execute($new_overlay_action, $batch_id);
1270     $sth->finish();
1271
1272 }
1273
1274 =head2 GetImportBatchNoMatchAction
1275
1276   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1277
1278 =cut
1279
1280 sub GetImportBatchNoMatchAction {
1281     my ($batch_id) = @_;
1282
1283     my $dbh = C4::Context->dbh;
1284     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1285     $sth->execute($batch_id);
1286     my ($nomatch_action) = $sth->fetchrow_array();
1287     $sth->finish();
1288     return $nomatch_action;
1289
1290 }
1291
1292
1293 =head2 SetImportBatchNoMatchAction
1294
1295   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1296
1297 =cut
1298
1299 sub SetImportBatchNoMatchAction {
1300     my ($batch_id, $new_nomatch_action) = @_;
1301
1302     my $dbh = C4::Context->dbh;
1303     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1304     $sth->execute($new_nomatch_action, $batch_id);
1305     $sth->finish();
1306
1307 }
1308
1309 =head2 GetImportBatchItemAction
1310
1311   my $item_action = GetImportBatchItemAction($batch_id);
1312
1313 =cut
1314
1315 sub GetImportBatchItemAction {
1316     my ($batch_id) = @_;
1317
1318     my $dbh = C4::Context->dbh;
1319     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1320     $sth->execute($batch_id);
1321     my ($item_action) = $sth->fetchrow_array();
1322     $sth->finish();
1323     return $item_action;
1324
1325 }
1326
1327
1328 =head2 SetImportBatchItemAction
1329
1330   SetImportBatchItemAction($batch_id, $new_item_action);
1331
1332 =cut
1333
1334 sub SetImportBatchItemAction {
1335     my ($batch_id, $new_item_action) = @_;
1336
1337     my $dbh = C4::Context->dbh;
1338     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1339     $sth->execute($new_item_action, $batch_id);
1340     $sth->finish();
1341
1342 }
1343
1344 =head2 GetImportBatchMatcher
1345
1346   my $matcher_id = GetImportBatchMatcher($batch_id);
1347
1348 =cut
1349
1350 sub GetImportBatchMatcher {
1351     my ($batch_id) = @_;
1352
1353     my $dbh = C4::Context->dbh;
1354     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1355     $sth->execute($batch_id);
1356     my ($matcher_id) = $sth->fetchrow_array();
1357     $sth->finish();
1358     return $matcher_id;
1359
1360 }
1361
1362
1363 =head2 SetImportBatchMatcher
1364
1365   SetImportBatchMatcher($batch_id, $new_matcher_id);
1366
1367 =cut
1368
1369 sub SetImportBatchMatcher {
1370     my ($batch_id, $new_matcher_id) = @_;
1371
1372     my $dbh = C4::Context->dbh;
1373     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1374     $sth->execute($new_matcher_id, $batch_id);
1375     $sth->finish();
1376
1377 }
1378
1379 =head2 GetImportRecordOverlayStatus
1380
1381   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1382
1383 =cut
1384
1385 sub GetImportRecordOverlayStatus {
1386     my ($import_record_id) = @_;
1387
1388     my $dbh = C4::Context->dbh;
1389     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1390     $sth->execute($import_record_id);
1391     my ($overlay_status) = $sth->fetchrow_array();
1392     $sth->finish();
1393     return $overlay_status;
1394
1395 }
1396
1397
1398 =head2 SetImportRecordOverlayStatus
1399
1400   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1401
1402 =cut
1403
1404 sub SetImportRecordOverlayStatus {
1405     my ($import_record_id, $new_overlay_status) = @_;
1406
1407     my $dbh = C4::Context->dbh;
1408     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1409     $sth->execute($new_overlay_status, $import_record_id);
1410     $sth->finish();
1411
1412 }
1413
1414 =head2 GetImportRecordStatus
1415
1416   my $status = GetImportRecordStatus($import_record_id);
1417
1418 =cut
1419
1420 sub GetImportRecordStatus {
1421     my ($import_record_id) = @_;
1422
1423     my $dbh = C4::Context->dbh;
1424     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1425     $sth->execute($import_record_id);
1426     my ($status) = $sth->fetchrow_array();
1427     $sth->finish();
1428     return $status;
1429
1430 }
1431
1432
1433 =head2 SetImportRecordStatus
1434
1435   SetImportRecordStatus($import_record_id, $new_status);
1436
1437 =cut
1438
1439 sub SetImportRecordStatus {
1440     my ($import_record_id, $new_status) = @_;
1441
1442     my $dbh = C4::Context->dbh;
1443     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1444     $sth->execute($new_status, $import_record_id);
1445     $sth->finish();
1446
1447 }
1448
1449 =head2 GetImportRecordMatches
1450
1451   my $results = GetImportRecordMatches($import_record_id, $best_only);
1452
1453 =cut
1454
1455 sub GetImportRecordMatches {
1456     my $import_record_id = shift;
1457     my $best_only = @_ ? shift : 0;
1458
1459     my $dbh = C4::Context->dbh;
1460     # FIXME currently biblio only
1461     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1462                                     candidate_match_id, score, record_type
1463                                     FROM import_records
1464                                     JOIN import_record_matches USING (import_record_id)
1465                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1466                                     WHERE import_record_id = ?
1467                                     ORDER BY score DESC, biblionumber DESC");
1468     $sth->bind_param(1, $import_record_id);
1469     my $results = [];
1470     $sth->execute();
1471     while (my $row = $sth->fetchrow_hashref) {
1472         if ($row->{'record_type'} eq 'auth') {
1473             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1474         }
1475         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1476         push @$results, $row;
1477         last if $best_only;
1478     }
1479     $sth->finish();
1480
1481     return $results;
1482     
1483 }
1484
1485 =head2 SetImportRecordMatches
1486
1487   SetImportRecordMatches($import_record_id, @matches);
1488
1489 =cut
1490
1491 sub SetImportRecordMatches {
1492     my $import_record_id = shift;
1493     my @matches = @_;
1494
1495     my $dbh = C4::Context->dbh;
1496     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1497     $delsth->execute($import_record_id);
1498     $delsth->finish();
1499
1500     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1501                                     VALUES (?, ?, ?)");
1502     foreach my $match (@matches) {
1503         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1504     }
1505 }
1506
1507 =head2 RecordsFromISO2709File
1508
1509     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1510
1511 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1512
1513 @PARAM1, String, absolute path to the ISO2709 file.
1514 @PARAM2, String, see stage_file.pl
1515 @PARAM3, String, should be utf8
1516
1517 Returns two array refs.
1518
1519 =cut
1520
1521 sub RecordsFromISO2709File {
1522     my ($input_file, $record_type, $encoding) = @_;
1523     my @errors;
1524
1525     my $marc_type = C4::Context->preference('marcflavour');
1526     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1527
1528     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1529     my @marc_records;
1530     $/ = "\035";
1531     while (<$fh>) {
1532         s/^\s+//;
1533         s/\s+$//;
1534         next unless $_; # skip if record has only whitespace, as might occur
1535                         # if file includes newlines between each MARC record
1536         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1537         push @marc_records, $marc_record;
1538         if ($charset_guessed ne $encoding) {
1539             push @errors,
1540                 "Unexpected charset $charset_guessed, expecting $encoding";
1541         }
1542     }
1543     close $fh;
1544     return ( \@errors, \@marc_records );
1545 }
1546
1547 =head2 RecordsFromMARCXMLFile
1548
1549     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1550
1551 Creates MARC::Record-objects out of the given MARCXML-file.
1552
1553 @PARAM1, String, absolute path to the ISO2709 file.
1554 @PARAM2, String, should be utf8
1555
1556 Returns two array refs.
1557
1558 =cut
1559
1560 sub RecordsFromMARCXMLFile {
1561     my ( $filename, $encoding ) = @_;
1562     my $batch = MARC::File::XML->in( $filename );
1563     my ( @marcRecords, @errors, $record );
1564     do {
1565         eval { $record = $batch->next( $encoding ); };
1566         if ($@) {
1567             push @errors, $@;
1568         }
1569         push @marcRecords, $record if $record;
1570     } while( $record );
1571     return (\@errors, \@marcRecords);
1572 }
1573
1574 =head2 RecordsFromMarcPlugin
1575
1576     Converts text of input_file into array of MARC records with to_marc plugin
1577
1578 =cut
1579
1580 sub RecordsFromMarcPlugin {
1581     my ($input_file, $plugin_class, $encoding) = @_;
1582     my ( $text, @return );
1583     return \@return if !$input_file || !$plugin_class;
1584
1585     # Read input file
1586     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1587     $/ = "\035";
1588     while (<$fh>) {
1589         s/^\s+//;
1590         s/\s+$//;
1591         next unless $_;
1592         $text .= $_;
1593     }
1594     close $fh;
1595
1596     # Convert to large MARC blob with plugin
1597     $text = Koha::Plugins::Handler->run({
1598         class  => $plugin_class,
1599         method => 'to_marc',
1600         params => { data => $text },
1601     }) if $text;
1602
1603     # Convert to array of MARC records
1604     if( $text ) {
1605         my $marc_type = C4::Context->preference('marcflavour');
1606         foreach my $blob ( split(/\x1D/, $text) ) {
1607             next if $blob =~ /^\s*$/;
1608             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1609             push @return, $marcrecord;
1610         }
1611     }
1612     return \@return;
1613 }
1614
1615 # internal functions
1616
1617 sub _create_import_record {
1618     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1619
1620     my $dbh = C4::Context->dbh;
1621     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1622                                                          record_type, encoding)
1623                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1624     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1625                   $record_type, $encoding);
1626     my $import_record_id = $dbh->{'mysql_insertid'};
1627     $sth->finish();
1628     return $import_record_id;
1629 }
1630
1631 sub _update_import_record_marc {
1632     my ($import_record_id, $marc_record, $marc_type) = @_;
1633
1634     my $dbh = C4::Context->dbh;
1635     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1636                              WHERE  import_record_id = ?");
1637     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1638     $sth->finish();
1639 }
1640
1641 sub _add_auth_fields {
1642     my ($import_record_id, $marc_record) = @_;
1643
1644     my $controlnumber;
1645     if ($marc_record->field('001')) {
1646         $controlnumber = $marc_record->field('001')->data();
1647     }
1648     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1649     my $dbh = C4::Context->dbh;
1650     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1651     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1652     $sth->finish();
1653 }
1654
1655 sub _add_biblio_fields {
1656     my ($import_record_id, $marc_record) = @_;
1657
1658     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1659     my $dbh = C4::Context->dbh;
1660     # FIXME no controlnumber, originalsource
1661     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1662     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1663     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1664     $sth->finish();
1665                 
1666 }
1667
1668 sub _update_biblio_fields {
1669     my ($import_record_id, $marc_record) = @_;
1670
1671     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1672     my $dbh = C4::Context->dbh;
1673     # FIXME no controlnumber, originalsource
1674     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1675     $isbn =~ s/\(.*$//;
1676     $isbn =~ tr/ -_//;
1677     $isbn = uc $isbn;
1678     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1679                              WHERE  import_record_id = ?");
1680     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1681     $sth->finish();
1682 }
1683
1684 sub _parse_biblio_fields {
1685     my ($marc_record) = @_;
1686
1687     my $dbh = C4::Context->dbh;
1688     my $bibliofields = TransformMarcToKoha($marc_record, '');
1689     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1690
1691 }
1692
1693 sub _update_batch_record_counts {
1694     my ($batch_id) = @_;
1695
1696     my $dbh = C4::Context->dbh;
1697     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1698                                         num_records = (
1699                                             SELECT COUNT(*)
1700                                             FROM import_records
1701                                             WHERE import_batch_id = import_batches.import_batch_id),
1702                                         num_items = (
1703                                             SELECT COUNT(*)
1704                                             FROM import_records
1705                                             JOIN import_items USING (import_record_id)
1706                                             WHERE import_batch_id = import_batches.import_batch_id
1707                                             AND record_type = 'biblio')
1708                                     WHERE import_batch_id = ?");
1709     $sth->bind_param(1, $batch_id);
1710     $sth->execute();
1711     $sth->finish();
1712 }
1713
1714 sub _get_commit_action {
1715     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1716     
1717     if ($record_type eq 'biblio') {
1718         my ($bib_result, $bib_match, $item_result);
1719
1720         if ($overlay_status ne 'no_match') {
1721             $bib_match = GetBestRecordMatch($import_record_id);
1722             if ($overlay_action eq 'replace') {
1723                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1724             } elsif ($overlay_action eq 'create_new') {
1725                 $bib_result  = 'create_new';
1726             } elsif ($overlay_action eq 'ignore') {
1727                 $bib_result  = 'ignore';
1728             }
1729          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1730                 $item_result = 'create_new';
1731        }
1732       elsif($item_action eq 'replace'){
1733           $item_result = 'replace';
1734           }
1735       else {
1736              $item_result = 'ignore';
1737            }
1738         } else {
1739             $bib_result = $nomatch_action;
1740             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1741         }
1742         return ($bib_result, $item_result, $bib_match);
1743     } else { # must be auths
1744         my ($auth_result, $auth_match);
1745
1746         if ($overlay_status ne 'no_match') {
1747             $auth_match = GetBestRecordMatch($import_record_id);
1748             if ($overlay_action eq 'replace') {
1749                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1750             } elsif ($overlay_action eq 'create_new') {
1751                 $auth_result  = 'create_new';
1752             } elsif ($overlay_action eq 'ignore') {
1753                 $auth_result  = 'ignore';
1754             }
1755         } else {
1756             $auth_result = $nomatch_action;
1757         }
1758
1759         return ($auth_result, undef, $auth_match);
1760
1761     }
1762 }
1763
1764 sub _get_revert_action {
1765     my ($overlay_action, $overlay_status, $status) = @_;
1766
1767     my $bib_result;
1768
1769     if ($status eq 'ignored') {
1770         $bib_result = 'ignore';
1771     } else {
1772         if ($overlay_action eq 'create_new') {
1773             $bib_result = 'delete';
1774         } else {
1775             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1776         }
1777     }
1778     return $bib_result;
1779 }
1780
1781 1;
1782 __END__
1783
1784 =head1 AUTHOR
1785
1786 Koha Development Team <http://koha-community.org/>
1787
1788 Galen Charlton <galen.charlton@liblime.com>
1789
1790 =cut