Bug 30947: Simplify CanBookBeIssued date handling
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       GetImportRecordMarcXML
52       GetRecordFromImportBiblio
53       AddImportBatch
54       GetImportBatch
55       AddAuthToBatch
56       AddBiblioToBatch
57       AddItemsToImportBiblio
58       ModAuthorityInBatch
59
60       BatchStageMarcRecords
61       BatchFindDuplicates
62       BatchCommitRecords
63       BatchRevertRecords
64       CleanBatch
65       DeleteBatch
66
67       GetAllImportBatches
68       GetStagedWebserviceBatches
69       GetImportBatchRangeDesc
70       GetNumberOfNonZ3950ImportBatches
71       GetImportBiblios
72       GetImportRecordsRange
73       GetItemNumbersFromImportBatch
74
75       GetImportBatchStatus
76       SetImportBatchStatus
77       GetImportBatchOverlayAction
78       SetImportBatchOverlayAction
79       GetImportBatchNoMatchAction
80       SetImportBatchNoMatchAction
81       GetImportBatchItemAction
82       SetImportBatchItemAction
83       GetImportBatchMatcher
84       SetImportBatchMatcher
85       GetImportRecordOverlayStatus
86       SetImportRecordOverlayStatus
87       GetImportRecordStatus
88       SetImportRecordStatus
89       SetMatchedBiblionumber
90       GetImportRecordMatches
91       SetImportRecordMatches
92
93       RecordsFromMARCXMLFile
94       RecordsFromISO2709File
95       RecordsFromMarcPlugin
96     );
97 }
98
99 =head1 NAME
100
101 C4::ImportBatch - manage batches of imported MARC records
102
103 =head1 SYNOPSIS
104
105 use C4::ImportBatch;
106
107 =head1 FUNCTIONS
108
109 =head2 GetZ3950BatchId
110
111   my $batchid = GetZ3950BatchId($z3950server);
112
113 Retrieves the ID of the import batch for the Z39.50
114 reservoir for the given target.  If necessary,
115 creates the import batch.
116
117 =cut
118
119 sub GetZ3950BatchId {
120     my ($z3950server) = @_;
121
122     my $dbh = C4::Context->dbh;
123     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
124                              WHERE  batch_type = 'z3950'
125                              AND    file_name = ?");
126     $sth->execute($z3950server);
127     my $rowref = $sth->fetchrow_arrayref();
128     $sth->finish();
129     if (defined $rowref) {
130         return $rowref->[0];
131     } else {
132         my $batch_id = AddImportBatch( {
133                 overlay_action => 'create_new',
134                 import_status => 'staged',
135                 batch_type => 'z3950',
136                 file_name => $z3950server,
137             } );
138         return $batch_id;
139     }
140     
141 }
142
143 =head2 GetWebserviceBatchId
144
145   my $batchid = GetWebserviceBatchId();
146
147 Retrieves the ID of the import batch for webservice.
148 If necessary, creates the import batch.
149
150 =cut
151
152 my $WEBSERVICE_BASE_QRY = <<EOQ;
153 SELECT import_batch_id FROM import_batches
154 WHERE  batch_type = 'webservice'
155 AND    import_status = 'staged'
156 EOQ
157 sub GetWebserviceBatchId {
158     my ($params) = @_;
159
160     my $dbh = C4::Context->dbh;
161     my $sql = $WEBSERVICE_BASE_QRY;
162     my @args;
163     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
164         if (my $val = $params->{$field}) {
165             $sql .= " AND $field = ?";
166             push @args, $val;
167         }
168     }
169     my $id = $dbh->selectrow_array($sql, undef, @args);
170     return $id if $id;
171
172     $params->{batch_type} = 'webservice';
173     $params->{import_status} = 'staged';
174     return AddImportBatch($params);
175 }
176
177 =head2 GetImportRecordMarc
178
179   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
180
181 =cut
182
183 sub GetImportRecordMarc {
184     my ($import_record_id) = @_;
185
186     my $dbh = C4::Context->dbh;
187     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
188         SELECT marc, encoding
189         FROM import_records
190         WHERE import_record_id = ?
191     |, undef, $import_record_id );
192
193     return $marc, $encoding;
194 }
195
196 sub GetRecordFromImportBiblio {
197     my ( $import_record_id, $embed_items ) = @_;
198
199     my ($marc) = GetImportRecordMarc($import_record_id);
200     my $record = MARC::Record->new_from_usmarc($marc);
201
202     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
203
204     return $record;
205 }
206
207 sub EmbedItemsInImportBiblio {
208     my ( $record, $import_record_id ) = @_;
209     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
210     my $dbh = C4::Context->dbh;
211     my $import_items = $dbh->selectall_arrayref(q|
212         SELECT import_items.marcxml
213         FROM import_items
214         WHERE import_record_id = ?
215     |, { Slice => {} }, $import_record_id );
216     my @item_fields;
217     for my $import_item ( @$import_items ) {
218         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
219         push @item_fields, $item_marc->field($itemtag);
220     }
221     $record->append_fields(@item_fields);
222     return $record;
223 }
224
225 =head2 GetImportRecordMarcXML
226
227   my $marcxml = GetImportRecordMarcXML($import_record_id);
228
229 =cut
230
231 sub GetImportRecordMarcXML {
232     my ($import_record_id) = @_;
233
234     my $dbh = C4::Context->dbh;
235     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
236     $sth->execute($import_record_id);
237     my ($marcxml) = $sth->fetchrow();
238     $sth->finish();
239     return $marcxml;
240
241 }
242
243 =head2 AddImportBatch
244
245   my $batch_id = AddImportBatch($params_hash);
246
247 =cut
248
249 sub AddImportBatch {
250     my ($params) = @_;
251
252     my (@fields, @vals);
253     foreach (qw( matcher_id template_id branchcode
254                  overlay_action nomatch_action item_action
255                  import_status batch_type file_name comments record_type )) {
256         if (exists $params->{$_}) {
257             push @fields, $_;
258             push @vals, $params->{$_};
259         }
260     }
261     my $dbh = C4::Context->dbh;
262     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
263                                   VALUES (".join( ',', map '?', @fields).")",
264              undef,
265              @vals);
266     return $dbh->{'mysql_insertid'};
267 }
268
269 =head2 GetImportBatch 
270
271   my $row = GetImportBatch($batch_id);
272
273 Retrieve a hashref of an import_batches row.
274
275 =cut
276
277 sub GetImportBatch {
278     my ($batch_id) = @_;
279
280     my $dbh = C4::Context->dbh;
281     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
282     $sth->bind_param(1, $batch_id);
283     $sth->execute();
284     my $result = $sth->fetchrow_hashref;
285     $sth->finish();
286     return $result;
287
288 }
289
290 =head2 AddBiblioToBatch 
291
292   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
293                 $marc_record, $encoding, $update_counts);
294
295 =cut
296
297 sub AddBiblioToBatch {
298     my $batch_id = shift;
299     my $record_sequence = shift;
300     my $marc_record = shift;
301     my $encoding = shift;
302     my $update_counts = @_ ? shift : 1;
303
304     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
305     _add_biblio_fields($import_record_id, $marc_record);
306     _update_batch_record_counts($batch_id) if $update_counts;
307     return $import_record_id;
308 }
309
310 =head2 AddAuthToBatch
311
312   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
313                 $marc_record, $encoding, $update_counts, [$marc_type]);
314
315 =cut
316
317 sub AddAuthToBatch {
318     my $batch_id = shift;
319     my $record_sequence = shift;
320     my $marc_record = shift;
321     my $encoding = shift;
322     my $update_counts = @_ ? shift : 1;
323     my $marc_type = shift || C4::Context->preference('marcflavour');
324
325     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
326
327     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
328     _add_auth_fields($import_record_id, $marc_record);
329     _update_batch_record_counts($batch_id) if $update_counts;
330     return $import_record_id;
331 }
332
333 =head2 BatchStageMarcRecords
334
335 ( $batch_id, $num_records, $num_items, @invalid_records ) =
336   BatchStageMarcRecords(
337     $record_type,                $encoding,
338     $marc_records,               $file_name,
339     $marc_modification_template, $comments,
340     $branch_code,                $parse_items,
341     $leave_as_staging,           $progress_interval,
342     $progress_callback
343   );
344
345 =cut
346
347 sub BatchStageMarcRecords {
348     my $record_type = shift;
349     my $encoding = shift;
350     my $marc_records = shift;
351     my $file_name = shift;
352     my $marc_modification_template = shift;
353     my $comments = shift;
354     my $branch_code = shift;
355     my $parse_items = shift;
356     my $leave_as_staging = shift;
357
358     # optional callback to monitor status 
359     # of job
360     my $progress_interval = 0;
361     my $progress_callback = undef;
362     if ($#_ == 1) {
363         $progress_interval = shift;
364         $progress_callback = shift;
365         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
366         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
367     } 
368     
369     my $batch_id = AddImportBatch( {
370             overlay_action => 'create_new',
371             import_status => 'staging',
372             batch_type => 'batch',
373             file_name => $file_name,
374             comments => $comments,
375             record_type => $record_type,
376         } );
377     if ($parse_items) {
378         SetImportBatchItemAction($batch_id, 'always_add');
379     } else {
380         SetImportBatchItemAction($batch_id, 'ignore');
381     }
382
383
384     my $marc_type = C4::Context->preference('marcflavour');
385     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
386     my @invalid_records = ();
387     my $num_valid = 0;
388     my $num_items = 0;
389     # FIXME - for now, we're dealing only with bibs
390     my $rec_num = 0;
391     foreach my $marc_record (@$marc_records) {
392         $rec_num++;
393         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
394             &$progress_callback($rec_num);
395         }
396
397         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
398
399         my $import_record_id;
400         if (scalar($marc_record->fields()) == 0) {
401             push @invalid_records, $marc_record;
402         } else {
403
404             # Normalize the record so it doesn't have separated diacritics
405             SetUTF8Flag($marc_record);
406
407             $num_valid++;
408             if ($record_type eq 'biblio') {
409                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
410                 if ($parse_items) {
411                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
412                     $num_items += scalar(@import_items_ids);
413                 }
414             } elsif ($record_type eq 'auth') {
415                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
416             }
417         }
418     }
419     unless ($leave_as_staging) {
420         SetImportBatchStatus($batch_id, 'staged');
421     }
422     # FIXME branch_code, number of bibs, number of items
423     _update_batch_record_counts($batch_id);
424     return ($batch_id, $num_valid, $num_items, @invalid_records);
425 }
426
427 =head2 AddItemsToImportBiblio
428
429   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
430                 $import_record_id, $marc_record, $update_counts);
431
432 =cut
433
434 sub AddItemsToImportBiblio {
435     my $batch_id = shift;
436     my $import_record_id = shift;
437     my $marc_record = shift;
438     my $update_counts = @_ ? shift : 0;
439
440     my @import_items_ids = ();
441    
442     my $dbh = C4::Context->dbh; 
443     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
444     foreach my $item_field ($marc_record->field($item_tag)) {
445         my $item_marc = MARC::Record->new();
446         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
447         $item_marc->append_fields($item_field);
448         $marc_record->delete_field($item_field);
449         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
450                                         VALUES (?, ?, ?)");
451         $sth->bind_param(1, $import_record_id);
452         $sth->bind_param(2, 'staged');
453         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
454         $sth->execute();
455         push @import_items_ids, $dbh->{'mysql_insertid'};
456         $sth->finish();
457     }
458
459     if ($#import_items_ids > -1) {
460         _update_batch_record_counts($batch_id) if $update_counts;
461         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
462     }
463     return @import_items_ids;
464 }
465
466 =head2 BatchFindDuplicates
467
468   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
469              $max_matches, $progress_interval, $progress_callback);
470
471 Goes through the records loaded in the batch and attempts to 
472 find duplicates for each one.  Sets the matching status 
473 of each record to "no_match" or "auto_match" as appropriate.
474
475 The $max_matches parameter is optional; if it is not supplied,
476 it defaults to 10.
477
478 The $progress_interval and $progress_callback parameters are 
479 optional; if both are supplied, the sub referred to by
480 $progress_callback will be invoked every $progress_interval
481 records using the number of records processed as the 
482 singular argument.
483
484 =cut
485
486 sub BatchFindDuplicates {
487     my $batch_id = shift;
488     my $matcher = shift;
489     my $max_matches = @_ ? shift : 10;
490
491     # optional callback to monitor status 
492     # of job
493     my $progress_interval = 0;
494     my $progress_callback = undef;
495     if ($#_ == 1) {
496         $progress_interval = shift;
497         $progress_callback = shift;
498         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
499         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
500     }
501
502     my $dbh = C4::Context->dbh;
503
504     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
505                              FROM import_records
506                              WHERE import_batch_id = ?");
507     $sth->execute($batch_id);
508     my $num_with_matches = 0;
509     my $rec_num = 0;
510     while (my $rowref = $sth->fetchrow_hashref) {
511         $rec_num++;
512         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
513             &$progress_callback($rec_num);
514         }
515         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
516         my @matches = ();
517         if (defined $matcher) {
518             @matches = $matcher->get_matches($marc_record, $max_matches);
519         }
520         if (scalar(@matches) > 0) {
521             $num_with_matches++;
522             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
523             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
524         } else {
525             SetImportRecordMatches($rowref->{'import_record_id'}, ());
526             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
527         }
528     }
529     $sth->finish();
530     return $num_with_matches;
531 }
532
533 =head2 BatchCommitRecords
534
535   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
536         BatchCommitRecords($batch_id, $framework,
537         $progress_interval, $progress_callback);
538
539 =cut
540
541 sub BatchCommitRecords {
542     my $batch_id = shift;
543     my $framework = shift;
544
545     # optional callback to monitor status 
546     # of job
547     my $progress_interval = 0;
548     my $progress_callback = undef;
549     if ($#_ == 1) {
550         $progress_interval = shift;
551         $progress_callback = shift;
552         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
553         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
554     }
555
556     my $record_type;
557     my $num_added = 0;
558     my $num_updated = 0;
559     my $num_items_added = 0;
560     my $num_items_replaced = 0;
561     my $num_items_errored = 0;
562     my $num_ignored = 0;
563     # commit (i.e., save, all records in the batch)
564     SetImportBatchStatus($batch_id, 'importing');
565     my $overlay_action = GetImportBatchOverlayAction($batch_id);
566     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
567     my $item_action = GetImportBatchItemAction($batch_id);
568     my $item_tag;
569     my $item_subfield;
570     my $dbh = C4::Context->dbh;
571     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
572                              FROM import_records
573                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
574                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
575                              WHERE import_batch_id = ?");
576     $sth->execute($batch_id);
577     my $marcflavour = C4::Context->preference('marcflavour');
578
579     my $userenv = C4::Context->userenv;
580     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
581
582     my $rec_num = 0;
583     my @biblio_ids;
584     while (my $rowref = $sth->fetchrow_hashref) {
585         $record_type = $rowref->{'record_type'};
586         $rec_num++;
587         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
588             &$progress_callback($rec_num);
589         }
590         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
591             $num_ignored++;
592             next;
593         }
594
595         my $marc_type;
596         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
597             $marc_type = 'UNIMARCAUTH';
598         } elsif ($marcflavour eq 'UNIMARC') {
599             $marc_type = 'UNIMARC';
600         } else {
601             $marc_type = 'USMARC';
602         }
603         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
604
605         if ($record_type eq 'biblio') {
606             # remove any item tags - rely on _batchCommitItems
607             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
608             foreach my $item_field ($marc_record->field($item_tag)) {
609                 $marc_record->delete_field($item_field);
610             }
611         }
612
613         my ($record_result, $item_result, $record_match) =
614             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
615                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
616
617         my $recordid;
618         my $query;
619         if ($record_result eq 'create_new') {
620             $num_added++;
621             if ($record_type eq 'biblio') {
622                 my $biblioitemnumber;
623                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
624                 push @biblio_ids, $recordid;
625                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
626                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
627                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
628                     $num_items_added += $bib_items_added;
629                     $num_items_replaced += $bib_items_replaced;
630                     $num_items_errored += $bib_items_errored;
631                 }
632             } else {
633                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
634                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
635             }
636             my $sth = $dbh->prepare_cached($query);
637             $sth->execute($recordid, $rowref->{'import_record_id'});
638             $sth->finish();
639             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
640         } elsif ($record_result eq 'replace') {
641             $num_updated++;
642             $recordid = $record_match;
643             my $oldxml;
644             if ($record_type eq 'biblio') {
645                 my $oldbiblio = Koha::Biblios->find( $recordid );
646                 $oldxml = GetXmlBiblio($recordid);
647
648                 # remove item fields so that they don't get
649                 # added again if record is reverted
650                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
651                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
652                 foreach my $item_field ($old_marc->field($item_tag)) {
653                     $old_marc->delete_field($item_field);
654                 }
655                 $oldxml = $old_marc->as_xml($marc_type);
656
657                 my $context = { source => 'batchimport' };
658                 if ($logged_in_patron) {
659                     $context->{categorycode} = $logged_in_patron->categorycode;
660                     $context->{userid} = $logged_in_patron->userid;
661                 }
662
663                 ModBiblio(
664                     $marc_record,
665                     $recordid,
666                     $oldbiblio->frameworkcode,
667                     {
668                         overlay_context   => $context,
669                         skip_record_index => 1
670                     }
671                 );
672                 push @biblio_ids, $recordid;
673                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
674
675                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
676                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
677                     $num_items_added += $bib_items_added;
678                     $num_items_replaced += $bib_items_replaced;
679                     $num_items_errored += $bib_items_errored;
680                 }
681             } else {
682                 $oldxml = GetAuthorityXML($recordid);
683
684                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
685                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
686             }
687             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
688             $sth->execute($oldxml, $rowref->{'import_record_id'});
689             $sth->finish();
690             my $sth2 = $dbh->prepare_cached($query);
691             $sth2->execute($recordid, $rowref->{'import_record_id'});
692             $sth2->finish();
693             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
694             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
695         } elsif ($record_result eq 'ignore') {
696             $recordid = $record_match;
697             $num_ignored++;
698             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
699                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
700                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
701                 $num_items_added += $bib_items_added;
702          $num_items_replaced += $bib_items_replaced;
703                 $num_items_errored += $bib_items_errored;
704                 # still need to record the matched biblionumber so that the
705                 # items can be reverted
706                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
707                 $sth2->execute($recordid, $rowref->{'import_record_id'});
708                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
709             }
710             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
711         }
712     }
713     $sth->finish();
714
715     if ( @biblio_ids ) {
716         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
717         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
718     }
719
720     SetImportBatchStatus($batch_id, 'imported');
721     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
722 }
723
724 =head2 _batchCommitItems
725
726   ($num_items_added, $num_items_errored) = 
727          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
728
729 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
730
731 =cut
732
733 sub _batchCommitItems {
734     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
735
736     my $dbh = C4::Context->dbh;
737
738     my $num_items_added = 0;
739     my $num_items_errored = 0;
740     my $num_items_replaced = 0;
741
742     my $sth = $dbh->prepare( "
743         SELECT import_items_id, import_items.marcxml, encoding
744         FROM import_items
745         JOIN import_records USING (import_record_id)
746         WHERE import_record_id = ?
747         ORDER BY import_items_id
748     " );
749     $sth->bind_param( 1, $import_record_id );
750     $sth->execute();
751
752     while ( my $row = $sth->fetchrow_hashref() ) {
753         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
754
755         # Delete date_due subfield as to not accidentally delete item checkout due dates
756         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
757         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
758
759         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
760
761         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
762         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
763
764         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
765         if ( $action eq "replace" && $duplicate_itemnumber ) {
766             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
767             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
768             $updsth->bind_param( 1, 'imported' );
769             $updsth->bind_param( 2, $item->{itemnumber} );
770             $updsth->bind_param( 3, undef );
771             $updsth->bind_param( 4, $row->{'import_items_id'} );
772             $updsth->execute();
773             $updsth->finish();
774             $num_items_replaced++;
775         } elsif ( $action eq "replace" && $duplicate_barcode ) {
776             my $itemnumber = $duplicate_barcode->itemnumber;
777             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
778             $updsth->bind_param( 1, 'imported' );
779             $updsth->bind_param( 2, $item->{itemnumber} );
780             $updsth->bind_param( 3, undef );
781             $updsth->bind_param( 4, $row->{'import_items_id'} );
782             $updsth->execute();
783             $updsth->finish();
784             $num_items_replaced++;
785         } elsif ($duplicate_barcode) {
786             $updsth->bind_param( 1, 'error' );
787             $updsth->bind_param( 2, undef );
788             $updsth->bind_param( 3, 'duplicate item barcode' );
789             $updsth->bind_param( 4, $row->{'import_items_id'} );
790             $updsth->execute();
791             $num_items_errored++;
792         } else {
793             # Remove the itemnumber if it exists, we want to create a new item
794             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
795             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
796
797             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
798             if( $itemnumber ) {
799                 $updsth->bind_param( 1, 'imported' );
800                 $updsth->bind_param( 2, $itemnumber );
801                 $updsth->bind_param( 3, undef );
802                 $updsth->bind_param( 4, $row->{'import_items_id'} );
803                 $updsth->execute();
804                 $updsth->finish();
805                 $num_items_added++;
806             }
807         }
808     }
809
810     return ( $num_items_added, $num_items_replaced, $num_items_errored );
811 }
812
813 =head2 BatchRevertRecords
814
815   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
816       $num_ignored) = BatchRevertRecords($batch_id);
817
818 =cut
819
820 sub BatchRevertRecords {
821     my $batch_id = shift;
822
823     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
824
825     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
826
827     my $record_type;
828     my $num_deleted = 0;
829     my $num_errors = 0;
830     my $num_reverted = 0;
831     my $num_ignored = 0;
832     my $num_items_deleted = 0;
833     # commit (i.e., save, all records in the batch)
834     SetImportBatchStatus($batch_id, 'reverting');
835     my $overlay_action = GetImportBatchOverlayAction($batch_id);
836     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
837     my $dbh = C4::Context->dbh;
838     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
839                              FROM import_records
840                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
841                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
842                              WHERE import_batch_id = ?");
843     $sth->execute($batch_id);
844     my $marc_type;
845     my $marcflavour = C4::Context->preference('marcflavour');
846     while (my $rowref = $sth->fetchrow_hashref) {
847         $record_type = $rowref->{'record_type'};
848         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
849             $num_ignored++;
850             next;
851         }
852         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
853             $marc_type = 'UNIMARCAUTH';
854         } elsif ($marcflavour eq 'UNIMARC') {
855             $marc_type = 'UNIMARC';
856         } else {
857             $marc_type = 'USMARC';
858         }
859
860         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
861
862         if ($record_result eq 'delete') {
863             my $error = undef;
864             if  ($record_type eq 'biblio') {
865                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
866                 $error = DelBiblio($rowref->{'matched_biblionumber'});
867             } else {
868                 DelAuthority({ authid => $rowref->{'matched_authid'} });
869             }
870             if (defined $error) {
871                 $num_errors++;
872             } else {
873                 $num_deleted++;
874                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
875             }
876         } elsif ($record_result eq 'restore') {
877             $num_reverted++;
878             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
879             if ($record_type eq 'biblio') {
880                 my $biblionumber = $rowref->{'matched_biblionumber'};
881                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
882
883                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
884                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
885
886                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
887                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
888             } else {
889                 my $authid = $rowref->{'matched_authid'};
890                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
891             }
892             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
893         } elsif ($record_result eq 'ignore') {
894             if ($record_type eq 'biblio') {
895                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
896             }
897             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
898         }
899         my $query;
900         if ($record_type eq 'biblio') {
901             # remove matched_biblionumber only if there is no 'imported' item left
902             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
903             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
904         } else {
905             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
906         }
907         my $sth2 = $dbh->prepare_cached($query);
908         $sth2->execute($rowref->{'import_record_id'});
909     }
910
911     $sth->finish();
912     SetImportBatchStatus($batch_id, 'reverted');
913     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
914 }
915
916 =head2 BatchRevertItems
917
918   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
919
920 =cut
921
922 sub BatchRevertItems {
923     my ($import_record_id, $biblionumber) = @_;
924
925     my $dbh = C4::Context->dbh;
926     my $num_items_deleted = 0;
927
928     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
929                                    FROM import_items
930                                    JOIN items USING (itemnumber)
931                                    WHERE import_record_id = ?");
932     $sth->bind_param(1, $import_record_id);
933     $sth->execute();
934     while (my $row = $sth->fetchrow_hashref()) {
935         my $item = Koha::Items->find($row->{itemnumber});
936         if ($item->safe_delete){
937             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
938             $updsth->bind_param(1, 'reverted');
939             $updsth->bind_param(2, $row->{'import_items_id'});
940             $updsth->execute();
941             $updsth->finish();
942             $num_items_deleted++;
943         }
944         else {
945             next;
946         }
947     }
948     $sth->finish();
949     return $num_items_deleted;
950 }
951
952 =head2 CleanBatch
953
954   CleanBatch($batch_id)
955
956 Deletes all staged records from the import batch
957 and sets the status of the batch to 'cleaned'.  Note
958 that deleting a stage record does *not* affect
959 any record that has been committed to the database.
960
961 =cut
962
963 sub CleanBatch {
964     my $batch_id = shift;
965     return unless defined $batch_id;
966
967     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
968     SetImportBatchStatus($batch_id, 'cleaned');
969 }
970
971 =head2 DeleteBatch
972
973   DeleteBatch($batch_id)
974
975 Deletes the record from the database. This can only be done
976 once the batch has been cleaned.
977
978 =cut
979
980 sub DeleteBatch {
981     my $batch_id = shift;
982     return unless defined $batch_id;
983
984     my $dbh = C4::Context->dbh;
985     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
986     $sth->execute( $batch_id );
987 }
988
989 =head2 GetAllImportBatches
990
991   my $results = GetAllImportBatches();
992
993 Returns a references to an array of hash references corresponding
994 to all import_batches rows (of batch_type 'batch'), sorted in 
995 ascending order by import_batch_id.
996
997 =cut
998
999 sub  GetAllImportBatches {
1000     my $dbh = C4::Context->dbh;
1001     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1002                                     WHERE batch_type IN ('batch', 'webservice')
1003                                     ORDER BY import_batch_id ASC");
1004
1005     my $results = [];
1006     $sth->execute();
1007     while (my $row = $sth->fetchrow_hashref) {
1008         push @$results, $row;
1009     }
1010     $sth->finish();
1011     return $results;
1012 }
1013
1014 =head2 GetStagedWebserviceBatches
1015
1016   my $batch_ids = GetStagedWebserviceBatches();
1017
1018 Returns a references to an array of batch id's
1019 of batch_type 'webservice' that are not imported
1020
1021 =cut
1022
1023 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1024 SELECT import_batch_id FROM import_batches
1025 WHERE batch_type = 'webservice'
1026 AND import_status = 'staged'
1027 EOQ
1028 sub  GetStagedWebserviceBatches {
1029     my $dbh = C4::Context->dbh;
1030     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1031 }
1032
1033 =head2 GetImportBatchRangeDesc
1034
1035   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1036
1037 Returns a reference to an array of hash references corresponding to
1038 import_batches rows (sorted in descending order by import_batch_id)
1039 start at the given offset.
1040
1041 =cut
1042
1043 sub GetImportBatchRangeDesc {
1044     my ($offset, $results_per_group) = @_;
1045
1046     my $dbh = C4::Context->dbh;
1047     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1048                                     LEFT JOIN import_batch_profiles p
1049                                     ON b.profile_id = p.id
1050                                     WHERE b.batch_type IN ('batch', 'webservice')
1051                                     ORDER BY b.import_batch_id DESC";
1052     my @params;
1053     if ($results_per_group){
1054         $query .= " LIMIT ?";
1055         push(@params, $results_per_group);
1056     }
1057     if ($offset){
1058         $query .= " OFFSET ?";
1059         push(@params, $offset);
1060     }
1061     my $sth = $dbh->prepare_cached($query);
1062     $sth->execute(@params);
1063     my $results = $sth->fetchall_arrayref({});
1064     $sth->finish();
1065     return $results;
1066 }
1067
1068 =head2 GetItemNumbersFromImportBatch
1069
1070   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1071
1072 =cut
1073
1074 sub GetItemNumbersFromImportBatch {
1075     my ($batch_id) = @_;
1076     my $dbh = C4::Context->dbh;
1077     my $sql = q|
1078 SELECT itemnumber FROM import_items
1079 INNER JOIN items USING (itemnumber)
1080 INNER JOIN import_records USING (import_record_id)
1081 WHERE import_batch_id = ?|;
1082     my  $sth = $dbh->prepare( $sql );
1083     $sth->execute($batch_id);
1084     my @items ;
1085     while ( my ($itm) = $sth->fetchrow_array ) {
1086         push @items, $itm;
1087     }
1088     return @items;
1089 }
1090
1091 =head2 GetNumberOfImportBatches
1092
1093   my $count = GetNumberOfImportBatches();
1094
1095 =cut
1096
1097 sub GetNumberOfNonZ3950ImportBatches {
1098     my $dbh = C4::Context->dbh;
1099     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1100     $sth->execute();
1101     my ($count) = $sth->fetchrow_array();
1102     $sth->finish();
1103     return $count;
1104 }
1105
1106 =head2 GetImportBiblios
1107
1108   my $results = GetImportBiblios($importid);
1109
1110 =cut
1111
1112 sub GetImportBiblios {
1113     my ($import_record_id) = @_;
1114
1115     my $dbh = C4::Context->dbh;
1116     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1117     return $dbh->selectall_arrayref(
1118         $query,
1119         { Slice => {} },
1120         $import_record_id
1121     );
1122
1123 }
1124
1125 =head2 GetImportRecordsRange
1126
1127   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1128
1129 Returns a reference to an array of hash references corresponding to
1130 import_biblios/import_auths/import_records rows for a given batch
1131 starting at the given offset.
1132
1133 =cut
1134
1135 sub GetImportRecordsRange {
1136     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1137
1138     my $dbh = C4::Context->dbh;
1139
1140     my $order_by = $parameters->{order_by} || 'import_record_id';
1141     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1142
1143     my $order_by_direction =
1144       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1145
1146     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1147
1148     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1149                                            record_sequence, status, overlay_status,
1150                                            matched_biblionumber, matched_authid, record_type
1151                                     FROM   import_records
1152                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1153                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1154                                     WHERE  import_batch_id = ?";
1155     my @params;
1156     push(@params, $batch_id);
1157     if ($status) {
1158         $query .= " AND status=?";
1159         push(@params,$status);
1160     }
1161
1162     $query.=" ORDER BY $order_by $order_by_direction";
1163
1164     if($results_per_group){
1165         $query .= " LIMIT ?";
1166         push(@params, $results_per_group);
1167     }
1168     if($offset){
1169         $query .= " OFFSET ?";
1170         push(@params, $offset);
1171     }
1172     my $sth = $dbh->prepare_cached($query);
1173     $sth->execute(@params);
1174     my $results = $sth->fetchall_arrayref({});
1175     $sth->finish();
1176     return $results;
1177
1178 }
1179
1180 =head2 GetBestRecordMatch
1181
1182   my $record_id = GetBestRecordMatch($import_record_id);
1183
1184 =cut
1185
1186 sub GetBestRecordMatch {
1187     my ($import_record_id) = @_;
1188
1189     my $dbh = C4::Context->dbh;
1190     my $sth = $dbh->prepare("SELECT candidate_match_id
1191                              FROM   import_record_matches
1192                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1193                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1194                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1195                              WHERE  import_record_matches.import_record_id = ? AND
1196                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1197                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1198                              AND chosen = 1
1199                              ORDER BY score DESC, candidate_match_id DESC");
1200     $sth->execute($import_record_id);
1201     my ($record_id) = $sth->fetchrow_array();
1202     $sth->finish();
1203     return $record_id;
1204 }
1205
1206 =head2 GetImportBatchStatus
1207
1208   my $status = GetImportBatchStatus($batch_id);
1209
1210 =cut
1211
1212 sub GetImportBatchStatus {
1213     my ($batch_id) = @_;
1214
1215     my $dbh = C4::Context->dbh;
1216     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1217     $sth->execute($batch_id);
1218     my ($status) = $sth->fetchrow_array();
1219     $sth->finish();
1220     return $status;
1221
1222 }
1223
1224 =head2 SetImportBatchStatus
1225
1226   SetImportBatchStatus($batch_id, $new_status);
1227
1228 =cut
1229
1230 sub SetImportBatchStatus {
1231     my ($batch_id, $new_status) = @_;
1232
1233     my $dbh = C4::Context->dbh;
1234     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1235     $sth->execute($new_status, $batch_id);
1236     $sth->finish();
1237
1238 }
1239
1240 =head2 SetMatchedBiblionumber
1241
1242   SetMatchedBiblionumber($import_record_id, $biblionumber);
1243
1244 =cut
1245
1246 sub SetMatchedBiblionumber {
1247     my ($import_record_id, $biblionumber) = @_;
1248
1249     my $dbh = C4::Context->dbh;
1250     $dbh->do(
1251         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1252         undef, $biblionumber, $import_record_id
1253     );
1254 }
1255
1256 =head2 GetImportBatchOverlayAction
1257
1258   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1259
1260 =cut
1261
1262 sub GetImportBatchOverlayAction {
1263     my ($batch_id) = @_;
1264
1265     my $dbh = C4::Context->dbh;
1266     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1267     $sth->execute($batch_id);
1268     my ($overlay_action) = $sth->fetchrow_array();
1269     $sth->finish();
1270     return $overlay_action;
1271
1272 }
1273
1274
1275 =head2 SetImportBatchOverlayAction
1276
1277   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1278
1279 =cut
1280
1281 sub SetImportBatchOverlayAction {
1282     my ($batch_id, $new_overlay_action) = @_;
1283
1284     my $dbh = C4::Context->dbh;
1285     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1286     $sth->execute($new_overlay_action, $batch_id);
1287     $sth->finish();
1288
1289 }
1290
1291 =head2 GetImportBatchNoMatchAction
1292
1293   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1294
1295 =cut
1296
1297 sub GetImportBatchNoMatchAction {
1298     my ($batch_id) = @_;
1299
1300     my $dbh = C4::Context->dbh;
1301     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1302     $sth->execute($batch_id);
1303     my ($nomatch_action) = $sth->fetchrow_array();
1304     $sth->finish();
1305     return $nomatch_action;
1306
1307 }
1308
1309
1310 =head2 SetImportBatchNoMatchAction
1311
1312   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1313
1314 =cut
1315
1316 sub SetImportBatchNoMatchAction {
1317     my ($batch_id, $new_nomatch_action) = @_;
1318
1319     my $dbh = C4::Context->dbh;
1320     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1321     $sth->execute($new_nomatch_action, $batch_id);
1322     $sth->finish();
1323
1324 }
1325
1326 =head2 GetImportBatchItemAction
1327
1328   my $item_action = GetImportBatchItemAction($batch_id);
1329
1330 =cut
1331
1332 sub GetImportBatchItemAction {
1333     my ($batch_id) = @_;
1334
1335     my $dbh = C4::Context->dbh;
1336     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1337     $sth->execute($batch_id);
1338     my ($item_action) = $sth->fetchrow_array();
1339     $sth->finish();
1340     return $item_action;
1341
1342 }
1343
1344
1345 =head2 SetImportBatchItemAction
1346
1347   SetImportBatchItemAction($batch_id, $new_item_action);
1348
1349 =cut
1350
1351 sub SetImportBatchItemAction {
1352     my ($batch_id, $new_item_action) = @_;
1353
1354     my $dbh = C4::Context->dbh;
1355     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1356     $sth->execute($new_item_action, $batch_id);
1357     $sth->finish();
1358
1359 }
1360
1361 =head2 GetImportBatchMatcher
1362
1363   my $matcher_id = GetImportBatchMatcher($batch_id);
1364
1365 =cut
1366
1367 sub GetImportBatchMatcher {
1368     my ($batch_id) = @_;
1369
1370     my $dbh = C4::Context->dbh;
1371     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1372     $sth->execute($batch_id);
1373     my ($matcher_id) = $sth->fetchrow_array();
1374     $sth->finish();
1375     return $matcher_id;
1376
1377 }
1378
1379
1380 =head2 SetImportBatchMatcher
1381
1382   SetImportBatchMatcher($batch_id, $new_matcher_id);
1383
1384 =cut
1385
1386 sub SetImportBatchMatcher {
1387     my ($batch_id, $new_matcher_id) = @_;
1388
1389     my $dbh = C4::Context->dbh;
1390     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1391     $sth->execute($new_matcher_id, $batch_id);
1392     $sth->finish();
1393
1394 }
1395
1396 =head2 GetImportRecordOverlayStatus
1397
1398   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1399
1400 =cut
1401
1402 sub GetImportRecordOverlayStatus {
1403     my ($import_record_id) = @_;
1404
1405     my $dbh = C4::Context->dbh;
1406     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1407     $sth->execute($import_record_id);
1408     my ($overlay_status) = $sth->fetchrow_array();
1409     $sth->finish();
1410     return $overlay_status;
1411
1412 }
1413
1414
1415 =head2 SetImportRecordOverlayStatus
1416
1417   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1418
1419 =cut
1420
1421 sub SetImportRecordOverlayStatus {
1422     my ($import_record_id, $new_overlay_status) = @_;
1423
1424     my $dbh = C4::Context->dbh;
1425     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1426     $sth->execute($new_overlay_status, $import_record_id);
1427     $sth->finish();
1428
1429 }
1430
1431 =head2 GetImportRecordStatus
1432
1433   my $status = GetImportRecordStatus($import_record_id);
1434
1435 =cut
1436
1437 sub GetImportRecordStatus {
1438     my ($import_record_id) = @_;
1439
1440     my $dbh = C4::Context->dbh;
1441     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1442     $sth->execute($import_record_id);
1443     my ($status) = $sth->fetchrow_array();
1444     $sth->finish();
1445     return $status;
1446
1447 }
1448
1449
1450 =head2 SetImportRecordStatus
1451
1452   SetImportRecordStatus($import_record_id, $new_status);
1453
1454 =cut
1455
1456 sub SetImportRecordStatus {
1457     my ($import_record_id, $new_status) = @_;
1458
1459     my $dbh = C4::Context->dbh;
1460     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1461     $sth->execute($new_status, $import_record_id);
1462     $sth->finish();
1463
1464 }
1465
1466 =head2 GetImportRecordMatches
1467
1468   my $results = GetImportRecordMatches($import_record_id, $best_only);
1469
1470 =cut
1471
1472 sub GetImportRecordMatches {
1473     my $import_record_id = shift;
1474     my $best_only = @_ ? shift : 0;
1475
1476     my $dbh = C4::Context->dbh;
1477     # FIXME currently biblio only
1478     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1479                                     candidate_match_id, score, record_type,
1480                                     chosen
1481                                     FROM import_records
1482                                     JOIN import_record_matches USING (import_record_id)
1483                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1484                                     WHERE import_record_id = ?
1485                                     ORDER BY score DESC, biblionumber DESC");
1486     $sth->bind_param(1, $import_record_id);
1487     my $results = [];
1488     $sth->execute();
1489     while (my $row = $sth->fetchrow_hashref) {
1490         if ($row->{'record_type'} eq 'auth') {
1491             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1492         }
1493         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1494         push @$results, $row;
1495         last if $best_only;
1496     }
1497     $sth->finish();
1498
1499     return $results;
1500     
1501 }
1502
1503 =head2 SetImportRecordMatches
1504
1505   SetImportRecordMatches($import_record_id, @matches);
1506
1507 =cut
1508
1509 sub SetImportRecordMatches {
1510     my $import_record_id = shift;
1511     my @matches = @_;
1512
1513     my $dbh = C4::Context->dbh;
1514     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1515     $delsth->execute($import_record_id);
1516     $delsth->finish();
1517
1518     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1519                                     VALUES (?, ?, ?, ?)");
1520     my $chosen = 1; #The first match is defaulted to be chosen
1521     foreach my $match (@matches) {
1522         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1523         $chosen = 0; #After the first we do not default to other matches
1524     }
1525 }
1526
1527 =head2 RecordsFromISO2709File
1528
1529     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1530
1531 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1532
1533 @PARAM1, String, absolute path to the ISO2709 file.
1534 @PARAM2, String, see stage_file.pl
1535 @PARAM3, String, should be utf8
1536
1537 Returns two array refs.
1538
1539 =cut
1540
1541 sub RecordsFromISO2709File {
1542     my ($input_file, $record_type, $encoding) = @_;
1543     my @errors;
1544
1545     my $marc_type = C4::Context->preference('marcflavour');
1546     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1547
1548     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1549     my @marc_records;
1550     $/ = "\035";
1551     while (<$fh>) {
1552         s/^\s+//;
1553         s/\s+$//;
1554         next unless $_; # skip if record has only whitespace, as might occur
1555                         # if file includes newlines between each MARC record
1556         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1557         push @marc_records, $marc_record;
1558         if ($charset_guessed ne $encoding) {
1559             push @errors,
1560                 "Unexpected charset $charset_guessed, expecting $encoding";
1561         }
1562     }
1563     close $fh;
1564     return ( \@errors, \@marc_records );
1565 }
1566
1567 =head2 RecordsFromMARCXMLFile
1568
1569     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1570
1571 Creates MARC::Record-objects out of the given MARCXML-file.
1572
1573 @PARAM1, String, absolute path to the ISO2709 file.
1574 @PARAM2, String, should be utf8
1575
1576 Returns two array refs.
1577
1578 =cut
1579
1580 sub RecordsFromMARCXMLFile {
1581     my ( $filename, $encoding ) = @_;
1582     my $batch = MARC::File::XML->in( $filename );
1583     my ( @marcRecords, @errors, $record );
1584     do {
1585         eval { $record = $batch->next( $encoding ); };
1586         if ($@) {
1587             push @errors, $@;
1588         }
1589         push @marcRecords, $record if $record;
1590     } while( $record );
1591     return (\@errors, \@marcRecords);
1592 }
1593
1594 =head2 RecordsFromMarcPlugin
1595
1596     Converts text of input_file into array of MARC records with to_marc plugin
1597
1598 =cut
1599
1600 sub RecordsFromMarcPlugin {
1601     my ($input_file, $plugin_class, $encoding) = @_;
1602     my ( $text, @return );
1603     return \@return if !$input_file || !$plugin_class;
1604
1605     # Read input file
1606     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1607     $/ = "\035";
1608     while (<$fh>) {
1609         s/^\s+//;
1610         s/\s+$//;
1611         next unless $_;
1612         $text .= $_;
1613     }
1614     close $fh;
1615
1616     # Convert to large MARC blob with plugin
1617     $text = Koha::Plugins::Handler->run({
1618         class  => $plugin_class,
1619         method => 'to_marc',
1620         params => { data => $text },
1621     }) if $text;
1622
1623     # Convert to array of MARC records
1624     if( $text ) {
1625         my $marc_type = C4::Context->preference('marcflavour');
1626         foreach my $blob ( split(/\x1D/, $text) ) {
1627             next if $blob =~ /^\s*$/;
1628             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1629             push @return, $marcrecord;
1630         }
1631     }
1632     return \@return;
1633 }
1634
1635 # internal functions
1636
1637 sub _create_import_record {
1638     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1639
1640     my $dbh = C4::Context->dbh;
1641     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1642                                                          record_type, encoding)
1643                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1644     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1645                   $record_type, $encoding);
1646     my $import_record_id = $dbh->{'mysql_insertid'};
1647     $sth->finish();
1648     return $import_record_id;
1649 }
1650
1651 sub _update_import_record_marc {
1652     my ($import_record_id, $marc_record, $marc_type) = @_;
1653
1654     my $dbh = C4::Context->dbh;
1655     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1656                              WHERE  import_record_id = ?");
1657     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1658     $sth->finish();
1659 }
1660
1661 sub _add_auth_fields {
1662     my ($import_record_id, $marc_record) = @_;
1663
1664     my $controlnumber;
1665     if ($marc_record->field('001')) {
1666         $controlnumber = $marc_record->field('001')->data();
1667     }
1668     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1669     my $dbh = C4::Context->dbh;
1670     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1671     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1672     $sth->finish();
1673 }
1674
1675 sub _add_biblio_fields {
1676     my ($import_record_id, $marc_record) = @_;
1677
1678     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1679     my $dbh = C4::Context->dbh;
1680     # FIXME no controlnumber, originalsource
1681     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1682     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1683     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1684     $sth->finish();
1685                 
1686 }
1687
1688 sub _update_biblio_fields {
1689     my ($import_record_id, $marc_record) = @_;
1690
1691     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1692     my $dbh = C4::Context->dbh;
1693     # FIXME no controlnumber, originalsource
1694     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1695     $isbn =~ s/\(.*$//;
1696     $isbn =~ tr/ -_//;
1697     $isbn = uc $isbn;
1698     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1699                              WHERE  import_record_id = ?");
1700     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1701     $sth->finish();
1702 }
1703
1704 sub _parse_biblio_fields {
1705     my ($marc_record) = @_;
1706
1707     my $dbh = C4::Context->dbh;
1708     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1709     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1710
1711 }
1712
1713 sub _update_batch_record_counts {
1714     my ($batch_id) = @_;
1715
1716     my $dbh = C4::Context->dbh;
1717     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1718                                         num_records = (
1719                                             SELECT COUNT(*)
1720                                             FROM import_records
1721                                             WHERE import_batch_id = import_batches.import_batch_id),
1722                                         num_items = (
1723                                             SELECT COUNT(*)
1724                                             FROM import_records
1725                                             JOIN import_items USING (import_record_id)
1726                                             WHERE import_batch_id = import_batches.import_batch_id
1727                                             AND record_type = 'biblio')
1728                                     WHERE import_batch_id = ?");
1729     $sth->bind_param(1, $batch_id);
1730     $sth->execute();
1731     $sth->finish();
1732 }
1733
1734 sub _get_commit_action {
1735     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1736     
1737     if ($record_type eq 'biblio') {
1738         my ($bib_result, $bib_match, $item_result);
1739
1740         $bib_match = GetBestRecordMatch($import_record_id);
1741         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1742
1743             $bib_result = $overlay_action;
1744
1745             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1746                 $item_result = 'create_new';
1747             } elsif($item_action eq 'replace'){
1748                 $item_result = 'replace';
1749             } else {
1750                 $item_result = 'ignore';
1751             }
1752
1753         } else {
1754             $bib_result = $nomatch_action;
1755             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1756         }
1757         return ($bib_result, $item_result, $bib_match);
1758     } else { # must be auths
1759         my ($auth_result, $auth_match);
1760
1761         $auth_match = GetBestRecordMatch($import_record_id);
1762         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1763             $auth_result = $overlay_action;
1764         } else {
1765             $auth_result = $nomatch_action;
1766         }
1767
1768         return ($auth_result, undef, $auth_match);
1769
1770     }
1771 }
1772
1773 sub _get_revert_action {
1774     my ($overlay_action, $overlay_status, $status) = @_;
1775
1776     my $bib_result;
1777
1778     if ($status eq 'ignored') {
1779         $bib_result = 'ignore';
1780     } else {
1781         if ($overlay_action eq 'create_new') {
1782             $bib_result = 'delete';
1783         } else {
1784             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1785         }
1786     }
1787     return $bib_result;
1788 }
1789
1790 1;
1791 __END__
1792
1793 =head1 AUTHOR
1794
1795 Koha Development Team <http://koha-community.org/>
1796
1797 Galen Charlton <galen.charlton@liblime.com>
1798
1799 =cut