Bug 34328: (follow-up) Add description to en_GB
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue;
38 use Koha::Items;
39 use Koha::SearchEngine;
40 use Koha::SearchEngine::Indexer;
41 use Koha::Plugins::Handler;
42 use Koha::Logger;
43
44 our (@ISA, @EXPORT_OK);
45 BEGIN {
46     require Exporter;
47     @ISA       = qw(Exporter);
48     @EXPORT_OK = qw(
49       GetZ3950BatchId
50       GetWebserviceBatchId
51       GetImportRecordMarc
52       AddImportBatch
53       GetImportBatch
54       AddAuthToBatch
55       AddBiblioToBatch
56       AddItemsToImportBiblio
57       ModAuthorityInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub EmbedItemsInImportBiblio {
196     my ( $record, $import_record_id ) = @_;
197     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
198     my $dbh = C4::Context->dbh;
199     my $import_items = $dbh->selectall_arrayref(q|
200         SELECT import_items.marcxml
201         FROM import_items
202         WHERE import_record_id = ?
203     |, { Slice => {} }, $import_record_id );
204     my @item_fields;
205     for my $import_item ( @$import_items ) {
206         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
207         push @item_fields, $item_marc->field($itemtag);
208     }
209     $record->append_fields(@item_fields);
210     return $record;
211 }
212
213 =head2 AddImportBatch
214
215   my $batch_id = AddImportBatch($params_hash);
216
217 =cut
218
219 sub AddImportBatch {
220     my ($params) = @_;
221
222     my (@fields, @vals);
223     foreach (qw( matcher_id template_id branchcode
224                  overlay_action nomatch_action item_action
225                  import_status batch_type file_name comments record_type )) {
226         if (exists $params->{$_}) {
227             push @fields, $_;
228             push @vals, $params->{$_};
229         }
230     }
231     my $dbh = C4::Context->dbh;
232     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
233                                   VALUES (".join( ',', map '?', @fields).")",
234              undef,
235              @vals);
236     return $dbh->{'mysql_insertid'};
237 }
238
239 =head2 GetImportBatch 
240
241   my $row = GetImportBatch($batch_id);
242
243 Retrieve a hashref of an import_batches row.
244
245 =cut
246
247 sub GetImportBatch {
248     my ($batch_id) = @_;
249
250     my $dbh = C4::Context->dbh;
251     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
252     $sth->bind_param(1, $batch_id);
253     $sth->execute();
254     my $result = $sth->fetchrow_hashref;
255     $sth->finish();
256     return $result;
257
258 }
259
260 =head2 AddBiblioToBatch 
261
262   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
263                 $marc_record, $encoding, $update_counts);
264
265 =cut
266
267 sub AddBiblioToBatch {
268     my $batch_id = shift;
269     my $record_sequence = shift;
270     my $marc_record = shift;
271     my $encoding = shift;
272     my $update_counts = @_ ? shift : 1;
273
274     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
275     _add_biblio_fields($import_record_id, $marc_record);
276     _update_batch_record_counts($batch_id) if $update_counts;
277     return $import_record_id;
278 }
279
280 =head2 AddAuthToBatch
281
282   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
283                 $marc_record, $encoding, $update_counts, [$marc_type]);
284
285 =cut
286
287 sub AddAuthToBatch {
288     my $batch_id = shift;
289     my $record_sequence = shift;
290     my $marc_record = shift;
291     my $encoding = shift;
292     my $update_counts = @_ ? shift : 1;
293     my $marc_type = shift || C4::Context->preference('marcflavour');
294
295     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
296
297     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
298     _add_auth_fields($import_record_id, $marc_record);
299     _update_batch_record_counts($batch_id) if $update_counts;
300     return $import_record_id;
301 }
302
303 =head2 BatchStageMarcRecords
304
305 ( $batch_id, $num_records, $num_items, @invalid_records ) =
306   BatchStageMarcRecords(
307     $record_type,                $encoding,
308     $marc_records,               $file_name,
309     $marc_modification_template, $comments,
310     $branch_code,                $parse_items,
311     $leave_as_staging,           $progress_interval,
312     $progress_callback
313   );
314
315 =cut
316
317 sub BatchStageMarcRecords {
318     my $record_type = shift;
319     my $encoding = shift;
320     my $marc_records = shift;
321     my $file_name = shift;
322     my $marc_modification_template = shift;
323     my $comments = shift;
324     my $branch_code = shift;
325     my $parse_items = shift;
326     my $leave_as_staging = shift;
327
328     # optional callback to monitor status 
329     # of job
330     my $progress_interval = 0;
331     my $progress_callback = undef;
332     if ($#_ == 1) {
333         $progress_interval = shift;
334         $progress_callback = shift;
335         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
336         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
337     } 
338     
339     my $batch_id = AddImportBatch( {
340             overlay_action => 'create_new',
341             import_status => 'staging',
342             batch_type => 'batch',
343             file_name => $file_name,
344             comments => $comments,
345             record_type => $record_type,
346         } );
347     if ($parse_items) {
348         SetImportBatchItemAction($batch_id, 'always_add');
349     } else {
350         SetImportBatchItemAction($batch_id, 'ignore');
351     }
352
353
354     my $marc_type = C4::Context->preference('marcflavour');
355     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
356     my @invalid_records = ();
357     my $num_valid = 0;
358     my $num_items = 0;
359     # FIXME - for now, we're dealing only with bibs
360     my $rec_num = 0;
361     foreach my $marc_record (@$marc_records) {
362         $rec_num++;
363         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
364             &$progress_callback($rec_num);
365         }
366
367         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
368
369         my $import_record_id;
370         if (scalar($marc_record->fields()) == 0) {
371             push @invalid_records, $marc_record;
372         } else {
373
374             # Normalize the record so it doesn't have separated diacritics
375             SetUTF8Flag($marc_record);
376
377             $num_valid++;
378             if ($record_type eq 'biblio') {
379                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
380                 if ($parse_items) {
381                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
382                     $num_items += scalar(@import_items_ids);
383                 }
384             } elsif ($record_type eq 'auth') {
385                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
386             }
387         }
388     }
389     unless ($leave_as_staging) {
390         SetImportBatchStatus($batch_id, 'staged');
391     }
392     # FIXME branch_code, number of bibs, number of items
393     _update_batch_record_counts($batch_id);
394     if ($progress_interval){
395         &$progress_callback($rec_num);
396     }
397
398     return ($batch_id, $num_valid, $num_items, @invalid_records);
399 }
400
401 =head2 AddItemsToImportBiblio
402
403   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
404                 $import_record_id, $marc_record, $update_counts);
405
406 =cut
407
408 sub AddItemsToImportBiblio {
409     my $batch_id = shift;
410     my $import_record_id = shift;
411     my $marc_record = shift;
412     my $update_counts = @_ ? shift : 0;
413
414     my @import_items_ids = ();
415    
416     my $dbh = C4::Context->dbh; 
417     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
418     foreach my $item_field ($marc_record->field($item_tag)) {
419         my $item_marc = MARC::Record->new();
420         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
421         $item_marc->append_fields($item_field);
422         $marc_record->delete_field($item_field);
423         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
424                                         VALUES (?, ?, ?)");
425         $sth->bind_param(1, $import_record_id);
426         $sth->bind_param(2, 'staged');
427         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
428         $sth->execute();
429         push @import_items_ids, $dbh->{'mysql_insertid'};
430         $sth->finish();
431     }
432
433     if ($#import_items_ids > -1) {
434         _update_batch_record_counts($batch_id) if $update_counts;
435     }
436     return @import_items_ids;
437 }
438
439 =head2 BatchFindDuplicates
440
441   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
442              $max_matches, $progress_interval, $progress_callback);
443
444 Goes through the records loaded in the batch and attempts to 
445 find duplicates for each one.  Sets the matching status 
446 of each record to "no_match" or "auto_match" as appropriate.
447
448 The $max_matches parameter is optional; if it is not supplied,
449 it defaults to 10.
450
451 The $progress_interval and $progress_callback parameters are 
452 optional; if both are supplied, the sub referred to by
453 $progress_callback will be invoked every $progress_interval
454 records using the number of records processed as the 
455 singular argument.
456
457 =cut
458
459 sub BatchFindDuplicates {
460     my $batch_id = shift;
461     my $matcher = shift;
462     my $max_matches = @_ ? shift : 10;
463
464     # optional callback to monitor status 
465     # of job
466     my $progress_interval = 0;
467     my $progress_callback = undef;
468     if ($#_ == 1) {
469         $progress_interval = shift;
470         $progress_callback = shift;
471         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
472         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
473     }
474
475     my $dbh = C4::Context->dbh;
476
477     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
478                              FROM import_records
479                              WHERE import_batch_id = ?");
480     $sth->execute($batch_id);
481     my $num_with_matches = 0;
482     my $rec_num = 0;
483     while (my $rowref = $sth->fetchrow_hashref) {
484         $rec_num++;
485         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
486             &$progress_callback($rec_num);
487         }
488         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
489         my @matches = ();
490         if (defined $matcher) {
491             @matches = $matcher->get_matches($marc_record, $max_matches);
492         }
493         if (scalar(@matches) > 0) {
494             $num_with_matches++;
495             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
496             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
497         } else {
498             SetImportRecordMatches($rowref->{'import_record_id'}, ());
499             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
500         }
501     }
502
503     if ($progress_interval){
504         &$progress_callback($rec_num);
505     }
506
507     $sth->finish();
508     return $num_with_matches;
509 }
510
511 =head2 BatchCommitRecords
512
513   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
514   'progress_callback' will define code called every X records.
515
516   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
517         BatchCommitRecords({
518             batch_id  => $batch_id,
519             framework => $framework,
520             overlay_framework => $overlay_framework,
521             progress_interval => $progress_interval,
522             progress_callback => $progress_callback,
523             skip_intermediate_commit => $skip_intermediate_commit
524         });
525
526     Parameter skip_intermediate_commit does what is says.
527 =cut
528
529 sub BatchCommitRecords {
530     my $params = shift;
531     my $batch_id          = $params->{batch_id};
532     my $framework         = $params->{framework};
533     my $overlay_framework = $params->{overlay_framework};
534     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
535     my $progress_interval = $params->{progress_interval} // 0;
536     my $progress_callback = $params->{progress_callback};
537     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
538     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
539
540     my $schema = Koha::Database->schema;
541     $schema->txn_begin;
542     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
543     # transactions may be committed and started too again. The final commit is close to the end.
544
545     my $record_type;
546     my $num_added = 0;
547     my $num_updated = 0;
548     my $num_items_added = 0;
549     my $num_items_replaced = 0;
550     my $num_items_errored = 0;
551     my $num_ignored = 0;
552     # commit (i.e., save, all records in the batch)
553     my $overlay_action = GetImportBatchOverlayAction($batch_id);
554     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
555     my $item_action = GetImportBatchItemAction($batch_id);
556     my $item_tag;
557     my $item_subfield;
558     my $dbh = C4::Context->dbh;
559     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
560                              FROM import_records
561                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
562                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
563                              WHERE import_batch_id = ?");
564     $sth->execute($batch_id);
565     my $marcflavour = C4::Context->preference('marcflavour');
566
567     my $userenv = C4::Context->userenv;
568     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
569
570     my $rec_num = 0;
571     my @biblio_ids;
572     my @updated_ids;
573     while (my $rowref = $sth->fetchrow_hashref) {
574         $record_type = $rowref->{'record_type'};
575
576         $rec_num++;
577
578         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
579             # report progress and commit
580             $schema->txn_commit unless $skip_intermediate_commit;
581             &$progress_callback( $rec_num );
582             $schema->txn_begin unless $skip_intermediate_commit;
583         }
584         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
585             $num_ignored++;
586             next;
587         }
588
589         my $marc_type;
590         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
591             $marc_type = 'UNIMARCAUTH';
592         } elsif ($marcflavour eq 'UNIMARC') {
593             $marc_type = 'UNIMARC';
594         } else {
595             $marc_type = 'USMARC';
596         }
597         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
598
599         if ($record_type eq 'biblio') {
600             # remove any item tags - rely on _batchCommitItems
601             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
602             foreach my $item_field ($marc_record->field($item_tag)) {
603                 $marc_record->delete_field($item_field);
604             }
605             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
606                 my @control_num = $marc_record->field('001');
607                 $marc_record->delete_fields(@control_num);
608             }
609         }
610
611         my ($record_result, $item_result, $record_match) =
612             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
613                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
614
615         my $recordid;
616         my $query;
617         if ($record_result eq 'create_new') {
618             $num_added++;
619             if ($record_type eq 'biblio') {
620                 my $biblioitemnumber;
621                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
622                 push @biblio_ids, $recordid if $recordid;
623                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
624                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
625                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
626                     $num_items_added += $bib_items_added;
627                     $num_items_replaced += $bib_items_replaced;
628                     $num_items_errored += $bib_items_errored;
629                 }
630             } else {
631                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
632                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
633             }
634             my $sth = $dbh->prepare_cached($query);
635             $sth->execute($recordid, $rowref->{'import_record_id'});
636             $sth->finish();
637             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
638         } elsif ($record_result eq 'replace') {
639             $num_updated++;
640             $recordid = $record_match;
641             my $oldxml;
642             if ($record_type eq 'biblio') {
643                 my $oldbiblio = Koha::Biblios->find( $recordid );
644                 $oldxml = GetXmlBiblio($recordid);
645
646                 # remove item fields so that they don't get
647                 # added again if record is reverted
648                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
649                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
650                 foreach my $item_field ($old_marc->field($item_tag)) {
651                     $old_marc->delete_field($item_field);
652                 }
653                 $oldxml = $old_marc->as_xml($marc_type);
654
655                 my $context = { source => 'batchimport' };
656                 if ($logged_in_patron) {
657                     $context->{categorycode} = $logged_in_patron->categorycode;
658                     $context->{userid} = $logged_in_patron->userid;
659                 }
660
661                 ModBiblio(
662                     $marc_record,
663                     $recordid,
664                     $overlay_framework // $oldbiblio->frameworkcode,
665                     {
666                         overlay_context   => $context,
667                         skip_record_index => 1,
668                         skip_holds_queue  => 1,
669                     }
670                 );
671                 push @biblio_ids, $recordid;
672                 push @updated_ids, $recordid;
673                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
674
675                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
676                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
677                     $num_items_added += $bib_items_added;
678                     $num_items_replaced += $bib_items_replaced;
679                     $num_items_errored += $bib_items_errored;
680                 }
681             } else {
682                 $oldxml = GetAuthorityXML($recordid);
683
684                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
685                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
686             }
687             # Combine xml update, SetImportRecordOverlayStatus, and SetImportRecordStatus updates into a single update for efficiency, especially in a transaction
688             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ?, status = ?, overlay_status = ? WHERE import_record_id = ?");
689             $sth->execute( $oldxml, 'imported', 'match_applied', $rowref->{'import_record_id'} );
690             $sth->finish();
691             my $sth2 = $dbh->prepare_cached($query);
692             $sth2->execute($recordid, $rowref->{'import_record_id'});
693             $sth2->finish();
694         } elsif ($record_result eq 'ignore') {
695             $recordid = $record_match;
696             $num_ignored++;
697             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
698                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
699                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
700                 $num_items_added += $bib_items_added;
701          $num_items_replaced += $bib_items_replaced;
702                 $num_items_errored += $bib_items_errored;
703                 # still need to record the matched biblionumber so that the
704                 # items can be reverted
705                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
706                 $sth2->execute($recordid, $rowref->{'import_record_id'});
707                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
708             }
709             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
710         }
711     }
712
713     if ($progress_interval){
714         &$progress_callback($rec_num);
715     }
716
717     $sth->finish();
718
719     SetImportBatchStatus($batch_id, 'imported');
720
721     # final commit should be before Elastic background indexing in order to find job data
722     $schema->txn_commit;
723
724     if (@biblio_ids) {
725         my $indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } );
726         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
727     }
728     Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue->new->enqueue( { biblio_ids => \@updated_ids } )
729         if ( @updated_ids && C4::Context->preference('RealTimeHoldsQueue') );
730
731     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
732 }
733
734 =head2 _batchCommitItems
735
736   ($num_items_added, $num_items_errored) = 
737          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
738
739 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
740
741 =cut
742
743 sub _batchCommitItems {
744     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
745
746     my $dbh = C4::Context->dbh;
747
748     my $num_items_added = 0;
749     my $num_items_errored = 0;
750     my $num_items_replaced = 0;
751
752     my $sth = $dbh->prepare( "
753         SELECT import_items_id, import_items.marcxml, encoding
754         FROM import_items
755         JOIN import_records USING (import_record_id)
756         WHERE import_record_id = ?
757         ORDER BY import_items_id
758     " );
759     $sth->bind_param( 1, $import_record_id );
760     $sth->execute();
761
762     while ( my $row = $sth->fetchrow_hashref() ) {
763         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
764
765         # Delete date_due subfield as to not accidentally delete item checkout due dates
766         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
767         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
768
769         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
770
771         my $item_match;
772         my $duplicate_barcode = exists( $item->{'barcode'} );
773         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
774
775         # We assume that when replacing items we do not want to move them - the onus is on the importer to
776         # ensure the correct items/records are being updated
777         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
778         if (
779             $action eq "replace" &&
780             $duplicate_itemnumber &&
781             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
782         ) {
783             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
784             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
785             $updsth->bind_param( 1, 'imported' );
786             $updsth->bind_param( 2, $item->{itemnumber} );
787             $updsth->bind_param( 3, undef );
788             $updsth->bind_param( 4, $row->{'import_items_id'} );
789             $updsth->execute();
790             $updsth->finish();
791             $num_items_replaced++;
792         } elsif (
793             $action eq "replace" &&
794             $duplicate_barcode &&
795             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
796         ) {
797             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
798             $updsth->bind_param( 1, 'imported' );
799             $updsth->bind_param( 2, $item->{itemnumber} );
800             $updsth->bind_param( 3, undef );
801             $updsth->bind_param( 4, $row->{'import_items_id'} );
802             $updsth->execute();
803             $updsth->finish();
804             $num_items_replaced++;
805         } elsif (
806             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
807             $duplicate_barcode &&
808             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
809         ) {
810             $updsth->bind_param( 1, 'error' );
811             $updsth->bind_param( 2, undef );
812             $updsth->bind_param( 3, 'duplicate item barcode' );
813             $updsth->bind_param( 4, $row->{'import_items_id'} );
814             $updsth->execute();
815             $num_items_errored++;
816         } else {
817             # Remove the itemnumber if it exists, we want to create a new item
818             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
819             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
820
821             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
822             if( $itemnumber ) {
823                 $updsth->bind_param( 1, 'imported' );
824                 $updsth->bind_param( 2, $itemnumber );
825                 $updsth->bind_param( 3, undef );
826                 $updsth->bind_param( 4, $row->{'import_items_id'} );
827                 $updsth->execute();
828                 $updsth->finish();
829                 $num_items_added++;
830             }
831         }
832     }
833
834     return ( $num_items_added, $num_items_replaced, $num_items_errored );
835 }
836
837 =head2 BatchRevertRecords
838
839   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
840       $num_ignored) = BatchRevertRecords($batch_id);
841
842 =cut
843
844 sub BatchRevertRecords {
845     my $batch_id = shift;
846
847     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
848
849     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
850
851     my $record_type;
852     my $num_deleted = 0;
853     my $num_errors = 0;
854     my $num_reverted = 0;
855     my $num_ignored = 0;
856     my $num_items_deleted = 0;
857     # commit (i.e., save, all records in the batch)
858     SetImportBatchStatus($batch_id, 'reverting');
859     my $overlay_action = GetImportBatchOverlayAction($batch_id);
860     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
861     my $dbh = C4::Context->dbh;
862     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
863                              FROM import_records
864                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
865                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
866                              WHERE import_batch_id = ?");
867     $sth->execute($batch_id);
868     my $marc_type;
869     my $marcflavour = C4::Context->preference('marcflavour');
870     while (my $rowref = $sth->fetchrow_hashref) {
871         $record_type = $rowref->{'record_type'};
872         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
873             $num_ignored++;
874             next;
875         }
876         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
877             $marc_type = 'UNIMARCAUTH';
878         } elsif ($marcflavour eq 'UNIMARC') {
879             $marc_type = 'UNIMARC';
880         } else {
881             $marc_type = 'USMARC';
882         }
883
884         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
885
886         if ($record_result eq 'delete') {
887             my $error = undef;
888             if  ($record_type eq 'biblio') {
889                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
890                 $error = DelBiblio($rowref->{'matched_biblionumber'});
891             } else {
892                 DelAuthority({ authid => $rowref->{'matched_authid'} });
893             }
894             if (defined $error) {
895                 $num_errors++;
896             } else {
897                 $num_deleted++;
898                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
899             }
900         } elsif ($record_result eq 'restore') {
901             $num_reverted++;
902             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
903             if ($record_type eq 'biblio') {
904                 my $biblionumber = $rowref->{'matched_biblionumber'};
905                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
906
907                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
908                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
909
910                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
911                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
912             } else {
913                 my $authid = $rowref->{'matched_authid'};
914                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
915             }
916             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
917         } elsif ($record_result eq 'ignore') {
918             if ($record_type eq 'biblio') {
919                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
920             }
921             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
922         }
923         my $query;
924         if ($record_type eq 'biblio') {
925             # remove matched_biblionumber only if there is no 'imported' item left
926             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
927             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
928         } else {
929             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
930         }
931         my $sth2 = $dbh->prepare_cached($query);
932         $sth2->execute($rowref->{'import_record_id'});
933     }
934
935     $sth->finish();
936     SetImportBatchStatus($batch_id, 'reverted');
937     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
938 }
939
940 =head2 BatchRevertItems
941
942   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
943
944 =cut
945
946 sub BatchRevertItems {
947     my ($import_record_id, $biblionumber) = @_;
948
949     my $dbh = C4::Context->dbh;
950     my $num_items_deleted = 0;
951
952     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
953                                    FROM import_items
954                                    JOIN items USING (itemnumber)
955                                    WHERE import_record_id = ?");
956     $sth->bind_param(1, $import_record_id);
957     $sth->execute();
958     while (my $row = $sth->fetchrow_hashref()) {
959         my $item = Koha::Items->find($row->{itemnumber});
960         if ($item->safe_delete){
961             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
962             $updsth->bind_param(1, 'reverted');
963             $updsth->bind_param(2, $row->{'import_items_id'});
964             $updsth->execute();
965             $updsth->finish();
966             $num_items_deleted++;
967         }
968         else {
969             next;
970         }
971     }
972     $sth->finish();
973     return $num_items_deleted;
974 }
975
976 =head2 CleanBatch
977
978   CleanBatch($batch_id)
979
980 Deletes all staged records from the import batch
981 and sets the status of the batch to 'cleaned'.  Note
982 that deleting a stage record does *not* affect
983 any record that has been committed to the database.
984
985 =cut
986
987 sub CleanBatch {
988     my $batch_id = shift;
989     return unless defined $batch_id;
990
991     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
992     SetImportBatchStatus($batch_id, 'cleaned');
993 }
994
995 =head2 DeleteBatch
996
997   DeleteBatch($batch_id)
998
999 Deletes the record from the database. This can only be done
1000 once the batch has been cleaned.
1001
1002 =cut
1003
1004 sub DeleteBatch {
1005     my $batch_id = shift;
1006     return unless defined $batch_id;
1007
1008     my $dbh = C4::Context->dbh;
1009     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1010     $sth->execute( $batch_id );
1011 }
1012
1013 =head2 GetAllImportBatches
1014
1015   my $results = GetAllImportBatches();
1016
1017 Returns a references to an array of hash references corresponding
1018 to all import_batches rows (of batch_type 'batch'), sorted in 
1019 ascending order by import_batch_id.
1020
1021 =cut
1022
1023 sub  GetAllImportBatches {
1024     my $dbh = C4::Context->dbh;
1025     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1026                                     WHERE batch_type IN ('batch', 'webservice')
1027                                     ORDER BY import_batch_id ASC");
1028
1029     my $results = [];
1030     $sth->execute();
1031     while (my $row = $sth->fetchrow_hashref) {
1032         push @$results, $row;
1033     }
1034     $sth->finish();
1035     return $results;
1036 }
1037
1038 =head2 GetStagedWebserviceBatches
1039
1040   my $batch_ids = GetStagedWebserviceBatches();
1041
1042 Returns a references to an array of batch id's
1043 of batch_type 'webservice' that are not imported
1044
1045 =cut
1046
1047 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1048 SELECT import_batch_id FROM import_batches
1049 WHERE batch_type = 'webservice'
1050 AND import_status = 'staged'
1051 EOQ
1052 sub  GetStagedWebserviceBatches {
1053     my $dbh = C4::Context->dbh;
1054     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1055 }
1056
1057 =head2 GetImportBatchRangeDesc
1058
1059   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1060
1061 Returns a reference to an array of hash references corresponding to
1062 import_batches rows (sorted in descending order by import_batch_id)
1063 start at the given offset.
1064
1065 =cut
1066
1067 sub GetImportBatchRangeDesc {
1068     my ($offset, $results_per_group) = @_;
1069
1070     my $dbh = C4::Context->dbh;
1071     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1072                                     LEFT JOIN import_batch_profiles p
1073                                     ON b.profile_id = p.id
1074                                     WHERE b.batch_type IN ('batch', 'webservice')
1075                                     ORDER BY b.import_batch_id DESC";
1076     my @params;
1077     if ($results_per_group){
1078         $query .= " LIMIT ?";
1079         push(@params, $results_per_group);
1080     }
1081     if ($offset){
1082         $query .= " OFFSET ?";
1083         push(@params, $offset);
1084     }
1085     my $sth = $dbh->prepare_cached($query);
1086     $sth->execute(@params);
1087     my $results = $sth->fetchall_arrayref({});
1088     $sth->finish();
1089     return $results;
1090 }
1091
1092 =head2 GetItemNumbersFromImportBatch
1093
1094   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1095
1096 =cut
1097
1098 sub GetItemNumbersFromImportBatch {
1099     my ($batch_id) = @_;
1100     my $dbh = C4::Context->dbh;
1101     my $sql = q|
1102 SELECT itemnumber FROM import_items
1103 INNER JOIN items USING (itemnumber)
1104 INNER JOIN import_records USING (import_record_id)
1105 WHERE import_batch_id = ?|;
1106     my  $sth = $dbh->prepare( $sql );
1107     $sth->execute($batch_id);
1108     my @items ;
1109     while ( my ($itm) = $sth->fetchrow_array ) {
1110         push @items, $itm;
1111     }
1112     return @items;
1113 }
1114
1115 =head2 GetNumberOfImportBatches
1116
1117   my $count = GetNumberOfImportBatches();
1118
1119 =cut
1120
1121 sub GetNumberOfNonZ3950ImportBatches {
1122     my $dbh = C4::Context->dbh;
1123     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1124     $sth->execute();
1125     my ($count) = $sth->fetchrow_array();
1126     $sth->finish();
1127     return $count;
1128 }
1129
1130 =head2 GetImportBiblios
1131
1132   my $results = GetImportBiblios($importid);
1133
1134 =cut
1135
1136 sub GetImportBiblios {
1137     my ($import_record_id) = @_;
1138
1139     my $dbh = C4::Context->dbh;
1140     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1141     return $dbh->selectall_arrayref(
1142         $query,
1143         { Slice => {} },
1144         $import_record_id
1145     );
1146
1147 }
1148
1149 =head2 GetImportRecordsRange
1150
1151   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1152
1153 Returns a reference to an array of hash references corresponding to
1154 import_biblios/import_auths/import_records rows for a given batch
1155 starting at the given offset.
1156
1157 =cut
1158
1159 sub GetImportRecordsRange {
1160     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1161
1162     my $dbh = C4::Context->dbh;
1163
1164     my $order_by = $parameters->{order_by} || 'import_record_id';
1165     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1166
1167     my $order_by_direction =
1168       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1169
1170     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1171
1172     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1173                                            record_sequence, status, overlay_status,
1174                                            matched_biblionumber, matched_authid, record_type
1175                                     FROM   import_records
1176                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1177                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1178                                     WHERE  import_batch_id = ?";
1179     my @params;
1180     push(@params, $batch_id);
1181     if ($status) {
1182         $query .= " AND status=?";
1183         push(@params,$status);
1184     }
1185
1186     $query.=" ORDER BY $order_by $order_by_direction";
1187
1188     if($results_per_group){
1189         $query .= " LIMIT ?";
1190         push(@params, $results_per_group);
1191     }
1192     if($offset){
1193         $query .= " OFFSET ?";
1194         push(@params, $offset);
1195     }
1196     my $sth = $dbh->prepare_cached($query);
1197     $sth->execute(@params);
1198     my $results = $sth->fetchall_arrayref({});
1199     $sth->finish();
1200     return $results;
1201
1202 }
1203
1204 =head2 GetBestRecordMatch
1205
1206   my $record_id = GetBestRecordMatch($import_record_id);
1207
1208 =cut
1209
1210 sub GetBestRecordMatch {
1211     my ($import_record_id) = @_;
1212
1213     my $dbh = C4::Context->dbh;
1214     my $sth = $dbh->prepare("SELECT candidate_match_id
1215                              FROM   import_record_matches
1216                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1217                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1218                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1219                              WHERE  import_record_matches.import_record_id = ? AND
1220                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1221                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1222                              AND chosen = 1
1223                              ORDER BY score DESC, candidate_match_id DESC");
1224     $sth->execute($import_record_id);
1225     my ($record_id) = $sth->fetchrow_array();
1226     $sth->finish();
1227     return $record_id;
1228 }
1229
1230 =head2 GetImportBatchStatus
1231
1232   my $status = GetImportBatchStatus($batch_id);
1233
1234 =cut
1235
1236 sub GetImportBatchStatus {
1237     my ($batch_id) = @_;
1238
1239     my $dbh = C4::Context->dbh;
1240     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1241     $sth->execute($batch_id);
1242     my ($status) = $sth->fetchrow_array();
1243     $sth->finish();
1244     return $status;
1245
1246 }
1247
1248 =head2 SetImportBatchStatus
1249
1250   SetImportBatchStatus($batch_id, $new_status);
1251
1252 =cut
1253
1254 sub SetImportBatchStatus {
1255     my ($batch_id, $new_status) = @_;
1256
1257     my $dbh = C4::Context->dbh;
1258     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1259     $sth->execute($new_status, $batch_id);
1260     $sth->finish();
1261
1262 }
1263
1264 =head2 SetMatchedBiblionumber
1265
1266   SetMatchedBiblionumber($import_record_id, $biblionumber);
1267
1268 =cut
1269
1270 sub SetMatchedBiblionumber {
1271     my ($import_record_id, $biblionumber) = @_;
1272
1273     my $dbh = C4::Context->dbh;
1274     $dbh->do(
1275         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1276         undef, $biblionumber, $import_record_id
1277     );
1278 }
1279
1280 =head2 GetImportBatchOverlayAction
1281
1282   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1283
1284 =cut
1285
1286 sub GetImportBatchOverlayAction {
1287     my ($batch_id) = @_;
1288
1289     my $dbh = C4::Context->dbh;
1290     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1291     $sth->execute($batch_id);
1292     my ($overlay_action) = $sth->fetchrow_array();
1293     $sth->finish();
1294     return $overlay_action;
1295
1296 }
1297
1298
1299 =head2 SetImportBatchOverlayAction
1300
1301   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1302
1303 =cut
1304
1305 sub SetImportBatchOverlayAction {
1306     my ($batch_id, $new_overlay_action) = @_;
1307
1308     my $dbh = C4::Context->dbh;
1309     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1310     $sth->execute($new_overlay_action, $batch_id);
1311     $sth->finish();
1312
1313 }
1314
1315 =head2 GetImportBatchNoMatchAction
1316
1317   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1318
1319 =cut
1320
1321 sub GetImportBatchNoMatchAction {
1322     my ($batch_id) = @_;
1323
1324     my $dbh = C4::Context->dbh;
1325     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1326     $sth->execute($batch_id);
1327     my ($nomatch_action) = $sth->fetchrow_array();
1328     $sth->finish();
1329     return $nomatch_action;
1330
1331 }
1332
1333
1334 =head2 SetImportBatchNoMatchAction
1335
1336   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1337
1338 =cut
1339
1340 sub SetImportBatchNoMatchAction {
1341     my ($batch_id, $new_nomatch_action) = @_;
1342
1343     my $dbh = C4::Context->dbh;
1344     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1345     $sth->execute($new_nomatch_action, $batch_id);
1346     $sth->finish();
1347
1348 }
1349
1350 =head2 GetImportBatchItemAction
1351
1352   my $item_action = GetImportBatchItemAction($batch_id);
1353
1354 =cut
1355
1356 sub GetImportBatchItemAction {
1357     my ($batch_id) = @_;
1358
1359     my $dbh = C4::Context->dbh;
1360     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1361     $sth->execute($batch_id);
1362     my ($item_action) = $sth->fetchrow_array();
1363     $sth->finish();
1364     return $item_action;
1365
1366 }
1367
1368
1369 =head2 SetImportBatchItemAction
1370
1371   SetImportBatchItemAction($batch_id, $new_item_action);
1372
1373 =cut
1374
1375 sub SetImportBatchItemAction {
1376     my ($batch_id, $new_item_action) = @_;
1377
1378     my $dbh = C4::Context->dbh;
1379     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1380     $sth->execute($new_item_action, $batch_id);
1381     $sth->finish();
1382
1383 }
1384
1385 =head2 GetImportBatchMatcher
1386
1387   my $matcher_id = GetImportBatchMatcher($batch_id);
1388
1389 =cut
1390
1391 sub GetImportBatchMatcher {
1392     my ($batch_id) = @_;
1393
1394     my $dbh = C4::Context->dbh;
1395     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1396     $sth->execute($batch_id);
1397     my ($matcher_id) = $sth->fetchrow_array();
1398     $sth->finish();
1399     return $matcher_id;
1400
1401 }
1402
1403
1404 =head2 SetImportBatchMatcher
1405
1406   SetImportBatchMatcher($batch_id, $new_matcher_id);
1407
1408 =cut
1409
1410 sub SetImportBatchMatcher {
1411     my ($batch_id, $new_matcher_id) = @_;
1412
1413     my $dbh = C4::Context->dbh;
1414     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1415     $sth->execute($new_matcher_id, $batch_id);
1416     $sth->finish();
1417
1418 }
1419
1420 =head2 GetImportRecordOverlayStatus
1421
1422   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1423
1424 =cut
1425
1426 sub GetImportRecordOverlayStatus {
1427     my ($import_record_id) = @_;
1428
1429     my $dbh = C4::Context->dbh;
1430     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1431     $sth->execute($import_record_id);
1432     my ($overlay_status) = $sth->fetchrow_array();
1433     $sth->finish();
1434     return $overlay_status;
1435
1436 }
1437
1438
1439 =head2 SetImportRecordOverlayStatus
1440
1441   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1442
1443 =cut
1444
1445 sub SetImportRecordOverlayStatus {
1446     my ($import_record_id, $new_overlay_status) = @_;
1447
1448     my $dbh = C4::Context->dbh;
1449     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1450     $sth->execute($new_overlay_status, $import_record_id);
1451     $sth->finish();
1452
1453 }
1454
1455 =head2 GetImportRecordStatus
1456
1457   my $status = GetImportRecordStatus($import_record_id);
1458
1459 =cut
1460
1461 sub GetImportRecordStatus {
1462     my ($import_record_id) = @_;
1463
1464     my $dbh = C4::Context->dbh;
1465     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1466     $sth->execute($import_record_id);
1467     my ($status) = $sth->fetchrow_array();
1468     $sth->finish();
1469     return $status;
1470
1471 }
1472
1473
1474 =head2 SetImportRecordStatus
1475
1476   SetImportRecordStatus($import_record_id, $new_status);
1477
1478 =cut
1479
1480 sub SetImportRecordStatus {
1481     my ($import_record_id, $new_status) = @_;
1482
1483     my $dbh = C4::Context->dbh;
1484     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1485     $sth->execute($new_status, $import_record_id);
1486     $sth->finish();
1487
1488 }
1489
1490 =head2 GetImportRecordMatches
1491
1492   my $results = GetImportRecordMatches($import_record_id, $best_only);
1493
1494 =cut
1495
1496 sub GetImportRecordMatches {
1497     my $import_record_id = shift;
1498     my $best_only = @_ ? shift : 0;
1499
1500     my $dbh = C4::Context->dbh;
1501     # FIXME currently biblio only
1502     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1503                                     candidate_match_id, score, record_type,
1504                                     chosen
1505                                     FROM import_records
1506                                     JOIN import_record_matches USING (import_record_id)
1507                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1508                                     WHERE import_record_id = ?
1509                                     ORDER BY score DESC, biblionumber DESC");
1510     $sth->bind_param(1, $import_record_id);
1511     my $results = [];
1512     $sth->execute();
1513     while (my $row = $sth->fetchrow_hashref) {
1514         if ($row->{'record_type'} eq 'auth') {
1515             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1516         }
1517         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1518         push @$results, $row;
1519         last if $best_only;
1520     }
1521     $sth->finish();
1522
1523     return $results;
1524     
1525 }
1526
1527 =head2 SetImportRecordMatches
1528
1529   SetImportRecordMatches($import_record_id, @matches);
1530
1531 =cut
1532
1533 sub SetImportRecordMatches {
1534     my $import_record_id = shift;
1535     my @matches = @_;
1536
1537     my $dbh = C4::Context->dbh;
1538     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1539     $delsth->execute($import_record_id);
1540     $delsth->finish();
1541
1542     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1543                                     VALUES (?, ?, ?, ?)");
1544     my $chosen = 1; #The first match is defaulted to be chosen
1545     foreach my $match (@matches) {
1546         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1547         $chosen = 0; #After the first we do not default to other matches
1548     }
1549 }
1550
1551 =head2 RecordsFromISO2709File
1552
1553     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1554
1555 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1556
1557 @PARAM1, String, absolute path to the ISO2709 file.
1558 @PARAM2, String, see stage_file.pl
1559 @PARAM3, String, should be utf8
1560
1561 Returns two array refs.
1562
1563 =cut
1564
1565 sub RecordsFromISO2709File {
1566     my ($input_file, $record_type, $encoding) = @_;
1567     my @errors;
1568
1569     my $marc_type = C4::Context->preference('marcflavour');
1570     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1571
1572     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1573     my @marc_records;
1574     $/ = "\035";
1575     while (<$fh>) {
1576         s/^\s+//;
1577         s/\s+$//;
1578         next unless $_; # skip if record has only whitespace, as might occur
1579                         # if file includes newlines between each MARC record
1580         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1581         push @marc_records, $marc_record;
1582         if ($charset_guessed ne $encoding) {
1583             push @errors,
1584                 "Unexpected charset $charset_guessed, expecting $encoding";
1585         }
1586     }
1587     close $fh;
1588     return ( \@errors, \@marc_records );
1589 }
1590
1591 =head2 RecordsFromMARCXMLFile
1592
1593     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1594
1595 Creates MARC::Record-objects out of the given MARCXML-file.
1596
1597 @PARAM1, String, absolute path to the MARCXML file.
1598 @PARAM2, String, should be utf8
1599
1600 Returns two array refs.
1601
1602 =cut
1603
1604 sub RecordsFromMARCXMLFile {
1605     my ( $filename, $encoding ) = @_;
1606     my $batch = MARC::File::XML->in( $filename );
1607     my ( @marcRecords, @errors, $record );
1608     do {
1609         eval { $record = $batch->next( $encoding ); };
1610         if ($@) {
1611             push @errors, $@;
1612         }
1613         push @marcRecords, $record if $record;
1614     } while( $record );
1615     return (\@errors, \@marcRecords);
1616 }
1617
1618 =head2 RecordsFromMarcPlugin
1619
1620     Converts text of input_file into array of MARC records with to_marc plugin
1621
1622 =cut
1623
1624 sub RecordsFromMarcPlugin {
1625     my ($input_file, $plugin_class, $encoding) = @_;
1626     my ( $text, @return );
1627     return \@return if !$input_file || !$plugin_class;
1628
1629     # Read input file
1630     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1631     $/ = "\035";
1632     while (<$fh>) {
1633         s/^\s+//;
1634         s/\s+$//;
1635         next unless $_;
1636         $text .= $_;
1637     }
1638     close $fh;
1639
1640     # Convert to large MARC blob with plugin
1641     $text = Koha::Plugins::Handler->run({
1642         class  => $plugin_class,
1643         method => 'to_marc',
1644         params => { data => $text },
1645     }) if $text;
1646
1647     # Convert to array of MARC records
1648     if( $text ) {
1649         my $marc_type = C4::Context->preference('marcflavour');
1650         foreach my $blob ( split(/\x1D/, $text) ) {
1651             next if $blob =~ /^\s*$/;
1652             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1653             push @return, $marcrecord;
1654         }
1655     }
1656     return \@return;
1657 }
1658
1659 # internal functions
1660
1661 sub _create_import_record {
1662     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1663
1664     my $dbh = C4::Context->dbh;
1665     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1666                                                          record_type, encoding)
1667                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1668     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1669                   $record_type, $encoding);
1670     my $import_record_id = $dbh->{'mysql_insertid'};
1671     $sth->finish();
1672     return $import_record_id;
1673 }
1674
1675 sub _add_auth_fields {
1676     my ($import_record_id, $marc_record) = @_;
1677
1678     my $controlnumber;
1679     if ($marc_record->field('001')) {
1680         $controlnumber = $marc_record->field('001')->data();
1681     }
1682     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1683     my $dbh = C4::Context->dbh;
1684     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1685     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1686     $sth->finish();
1687 }
1688
1689 sub _add_biblio_fields {
1690     my ($import_record_id, $marc_record) = @_;
1691
1692     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1693     my $dbh = C4::Context->dbh;
1694     # FIXME no controlnumber, originalsource
1695     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1696     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1697     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1698     $sth->finish();
1699                 
1700 }
1701
1702 sub _update_biblio_fields {
1703     my ($import_record_id, $marc_record) = @_;
1704
1705     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1706     my $dbh = C4::Context->dbh;
1707     # FIXME no controlnumber, originalsource
1708     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1709     $isbn =~ s/\(.*$//;
1710     $isbn =~ tr/ -_//;
1711     $isbn = uc $isbn;
1712     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1713                              WHERE  import_record_id = ?");
1714     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1715     $sth->finish();
1716 }
1717
1718 sub _parse_biblio_fields {
1719     my ($marc_record) = @_;
1720
1721     my $dbh = C4::Context->dbh;
1722     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1723     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1724
1725 }
1726
1727 sub _update_batch_record_counts {
1728     my ($batch_id) = @_;
1729
1730     my $dbh = C4::Context->dbh;
1731     my ( $num_records ) = $dbh->selectrow_array(q|
1732                                             SELECT COUNT(*)
1733                                             FROM import_records
1734                                             WHERE import_batch_id = ?
1735     |, undef, $batch_id );
1736     my ( $num_items ) = $dbh->selectrow_array(q|
1737                                             SELECT COUNT(*)
1738                                             FROM import_records
1739                                             JOIN import_items USING (import_record_id)
1740                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1741     |, undef, $batch_id );
1742     $dbh->do(
1743         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1744         undef,
1745         $num_records,
1746         $num_items,
1747         $batch_id,
1748     );
1749 }
1750
1751 sub _get_commit_action {
1752     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1753     
1754     if ($record_type eq 'biblio') {
1755         my ($bib_result, $bib_match, $item_result);
1756
1757         $bib_match = GetBestRecordMatch($import_record_id);
1758         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1759
1760             $bib_result = $overlay_action;
1761
1762             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1763                 $item_result = 'create_new';
1764             } elsif($item_action eq 'replace'){
1765                 $item_result = 'replace';
1766             } else {
1767                 $item_result = 'ignore';
1768             }
1769
1770         } else {
1771             $bib_result = $nomatch_action;
1772             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1773         }
1774         return ($bib_result, $item_result, $bib_match);
1775     } else { # must be auths
1776         my ($auth_result, $auth_match);
1777
1778         $auth_match = GetBestRecordMatch($import_record_id);
1779         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1780             $auth_result = $overlay_action;
1781         } else {
1782             $auth_result = $nomatch_action;
1783         }
1784
1785         return ($auth_result, undef, $auth_match);
1786
1787     }
1788 }
1789
1790 sub _get_revert_action {
1791     my ($overlay_action, $overlay_status, $status) = @_;
1792
1793     my $bib_result;
1794
1795     if ($status eq 'ignored') {
1796         $bib_result = 'ignore';
1797     } else {
1798         if ($overlay_action eq 'create_new') {
1799             $bib_result = 'delete';
1800         } else {
1801             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1802         }
1803     }
1804     return $bib_result;
1805 }
1806
1807 1;
1808 __END__
1809
1810 =head1 AUTHOR
1811
1812 Koha Development Team <http://koha-community.org/>
1813
1814 Galen Charlton <galen.charlton@liblime.com>
1815
1816 =cut