Bug 31618: Fix typo in POD for C4::ImportBatch::RecordsFromMARCXMLFile
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
513   'progress_callback' will define code called every X records.
514
515   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
516         BatchCommitRecords({
517             batch_id  => $batch_id,
518             framework => $framework,
519             overlay_framework => $overlay_framework,
520             progress_interval => $progress_interval,
521             progress_callback => $progress_callback,
522             skip_intermediate_commit => $skip_intermediate_commit
523         });
524
525     Parameter skip_intermediate_commit does what is says.
526 =cut
527
528 sub BatchCommitRecords {
529     my $params = shift;
530     my $batch_id          = $params->{batch_id};
531     my $framework         = $params->{framework};
532     my $overlay_framework = $params->{overlay_framework};
533     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
534     my $progress_interval = $params->{progress_interval} // 0;
535     my $progress_callback = $params->{progress_callback};
536     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
537     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
538
539     my $schema = Koha::Database->schema;
540     $schema->txn_begin;
541     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
542     # transactions may be committed and started too again. The final commit is close to the end.
543
544     my $record_type;
545     my $num_added = 0;
546     my $num_updated = 0;
547     my $num_items_added = 0;
548     my $num_items_replaced = 0;
549     my $num_items_errored = 0;
550     my $num_ignored = 0;
551     # commit (i.e., save, all records in the batch)
552     SetImportBatchStatus($batch_id, 'importing');
553     my $overlay_action = GetImportBatchOverlayAction($batch_id);
554     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
555     my $item_action = GetImportBatchItemAction($batch_id);
556     my $item_tag;
557     my $item_subfield;
558     my $dbh = C4::Context->dbh;
559     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
560                              FROM import_records
561                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
562                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
563                              WHERE import_batch_id = ?");
564     $sth->execute($batch_id);
565     my $marcflavour = C4::Context->preference('marcflavour');
566
567     my $userenv = C4::Context->userenv;
568     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
569
570     my $rec_num = 0;
571     my @biblio_ids;
572     while (my $rowref = $sth->fetchrow_hashref) {
573         $record_type = $rowref->{'record_type'};
574
575         $rec_num++;
576
577         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
578             # report progress and commit
579             $schema->txn_commit unless $skip_intermediate_commit;
580             &$progress_callback( $rec_num );
581             $schema->txn_begin unless $skip_intermediate_commit;
582         }
583         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
584             $num_ignored++;
585             next;
586         }
587
588         my $marc_type;
589         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
590             $marc_type = 'UNIMARCAUTH';
591         } elsif ($marcflavour eq 'UNIMARC') {
592             $marc_type = 'UNIMARC';
593         } else {
594             $marc_type = 'USMARC';
595         }
596         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
597
598         if ($record_type eq 'biblio') {
599             # remove any item tags - rely on _batchCommitItems
600             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
601             foreach my $item_field ($marc_record->field($item_tag)) {
602                 $marc_record->delete_field($item_field);
603             }
604             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
605                 my @control_num = $marc_record->field('001');
606                 $marc_record->delete_fields(@control_num);
607             }
608         }
609
610         my ($record_result, $item_result, $record_match) =
611             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
612                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
613
614         my $recordid;
615         my $query;
616         if ($record_result eq 'create_new') {
617             $num_added++;
618             if ($record_type eq 'biblio') {
619                 my $biblioitemnumber;
620                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
621                 push @biblio_ids, $recordid;
622                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
623                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
624                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
625                     $num_items_added += $bib_items_added;
626                     $num_items_replaced += $bib_items_replaced;
627                     $num_items_errored += $bib_items_errored;
628                 }
629             } else {
630                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
631                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
632             }
633             my $sth = $dbh->prepare_cached($query);
634             $sth->execute($recordid, $rowref->{'import_record_id'});
635             $sth->finish();
636             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
637         } elsif ($record_result eq 'replace') {
638             $num_updated++;
639             $recordid = $record_match;
640             my $oldxml;
641             if ($record_type eq 'biblio') {
642                 my $oldbiblio = Koha::Biblios->find( $recordid );
643                 $oldxml = GetXmlBiblio($recordid);
644
645                 # remove item fields so that they don't get
646                 # added again if record is reverted
647                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
648                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
649                 foreach my $item_field ($old_marc->field($item_tag)) {
650                     $old_marc->delete_field($item_field);
651                 }
652                 $oldxml = $old_marc->as_xml($marc_type);
653
654                 my $context = { source => 'batchimport' };
655                 if ($logged_in_patron) {
656                     $context->{categorycode} = $logged_in_patron->categorycode;
657                     $context->{userid} = $logged_in_patron->userid;
658                 }
659
660                 ModBiblio(
661                     $marc_record,
662                     $recordid,
663                     $overlay_framework // $oldbiblio->frameworkcode,
664                     {
665                         overlay_context   => $context,
666                         skip_record_index => 1
667                     }
668                 );
669                 push @biblio_ids, $recordid;
670                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
671
672                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
673                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
674                     $num_items_added += $bib_items_added;
675                     $num_items_replaced += $bib_items_replaced;
676                     $num_items_errored += $bib_items_errored;
677                 }
678             } else {
679                 $oldxml = GetAuthorityXML($recordid);
680
681                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
682                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
683             }
684             # Combine xml update, SetImportRecordOverlayStatus, and SetImportRecordStatus updates into a single update for efficiency, especially in a transaction
685             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ?, status = ?, overlay_status = ? WHERE import_record_id = ?");
686             $sth->execute( $oldxml, 'imported', 'match_applied', $rowref->{'import_record_id'} );
687             $sth->finish();
688             my $sth2 = $dbh->prepare_cached($query);
689             $sth2->execute($recordid, $rowref->{'import_record_id'});
690             $sth2->finish();
691         } elsif ($record_result eq 'ignore') {
692             $recordid = $record_match;
693             $num_ignored++;
694             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
695                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
696                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
697                 $num_items_added += $bib_items_added;
698          $num_items_replaced += $bib_items_replaced;
699                 $num_items_errored += $bib_items_errored;
700                 # still need to record the matched biblionumber so that the
701                 # items can be reverted
702                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
703                 $sth2->execute($recordid, $rowref->{'import_record_id'});
704                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
705             }
706             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
707         }
708     }
709
710     if ($progress_interval){
711         &$progress_callback($rec_num);
712     }
713
714     $sth->finish();
715
716     SetImportBatchStatus($batch_id, 'imported');
717
718     # final commit should be before Elastic background indexing in order to find job data
719     $schema->txn_commit;
720
721     if ( @biblio_ids ) {
722         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
723         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
724     }
725
726     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
727 }
728
729 =head2 _batchCommitItems
730
731   ($num_items_added, $num_items_errored) = 
732          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
733
734 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
735
736 =cut
737
738 sub _batchCommitItems {
739     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
740
741     my $dbh = C4::Context->dbh;
742
743     my $num_items_added = 0;
744     my $num_items_errored = 0;
745     my $num_items_replaced = 0;
746
747     my $sth = $dbh->prepare( "
748         SELECT import_items_id, import_items.marcxml, encoding
749         FROM import_items
750         JOIN import_records USING (import_record_id)
751         WHERE import_record_id = ?
752         ORDER BY import_items_id
753     " );
754     $sth->bind_param( 1, $import_record_id );
755     $sth->execute();
756
757     while ( my $row = $sth->fetchrow_hashref() ) {
758         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
759
760         # Delete date_due subfield as to not accidentally delete item checkout due dates
761         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
762         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
763
764         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
765
766         my $item_match;
767         my $duplicate_barcode = exists( $item->{'barcode'} );
768         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
769
770         # We assume that when replacing items we do not want to move them - the onus is on the importer to
771         # ensure the correct items/records are being updated
772         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
773         if (
774             $action eq "replace" &&
775             $duplicate_itemnumber &&
776             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
777         ) {
778             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
779             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
780             $updsth->bind_param( 1, 'imported' );
781             $updsth->bind_param( 2, $item->{itemnumber} );
782             $updsth->bind_param( 3, undef );
783             $updsth->bind_param( 4, $row->{'import_items_id'} );
784             $updsth->execute();
785             $updsth->finish();
786             $num_items_replaced++;
787         } elsif (
788             $action eq "replace" &&
789             $duplicate_barcode &&
790             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
791         ) {
792             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
793             $updsth->bind_param( 1, 'imported' );
794             $updsth->bind_param( 2, $item->{itemnumber} );
795             $updsth->bind_param( 3, undef );
796             $updsth->bind_param( 4, $row->{'import_items_id'} );
797             $updsth->execute();
798             $updsth->finish();
799             $num_items_replaced++;
800         } elsif (
801             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
802             $duplicate_barcode &&
803             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
804         ) {
805             $updsth->bind_param( 1, 'error' );
806             $updsth->bind_param( 2, undef );
807             $updsth->bind_param( 3, 'duplicate item barcode' );
808             $updsth->bind_param( 4, $row->{'import_items_id'} );
809             $updsth->execute();
810             $num_items_errored++;
811         } else {
812             # Remove the itemnumber if it exists, we want to create a new item
813             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
814             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
815
816             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
817             if( $itemnumber ) {
818                 $updsth->bind_param( 1, 'imported' );
819                 $updsth->bind_param( 2, $itemnumber );
820                 $updsth->bind_param( 3, undef );
821                 $updsth->bind_param( 4, $row->{'import_items_id'} );
822                 $updsth->execute();
823                 $updsth->finish();
824                 $num_items_added++;
825             }
826         }
827     }
828
829     return ( $num_items_added, $num_items_replaced, $num_items_errored );
830 }
831
832 =head2 BatchRevertRecords
833
834   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
835       $num_ignored) = BatchRevertRecords($batch_id);
836
837 =cut
838
839 sub BatchRevertRecords {
840     my $batch_id = shift;
841
842     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
843
844     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
845
846     my $record_type;
847     my $num_deleted = 0;
848     my $num_errors = 0;
849     my $num_reverted = 0;
850     my $num_ignored = 0;
851     my $num_items_deleted = 0;
852     # commit (i.e., save, all records in the batch)
853     SetImportBatchStatus($batch_id, 'reverting');
854     my $overlay_action = GetImportBatchOverlayAction($batch_id);
855     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
856     my $dbh = C4::Context->dbh;
857     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
858                              FROM import_records
859                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
860                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
861                              WHERE import_batch_id = ?");
862     $sth->execute($batch_id);
863     my $marc_type;
864     my $marcflavour = C4::Context->preference('marcflavour');
865     while (my $rowref = $sth->fetchrow_hashref) {
866         $record_type = $rowref->{'record_type'};
867         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
868             $num_ignored++;
869             next;
870         }
871         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
872             $marc_type = 'UNIMARCAUTH';
873         } elsif ($marcflavour eq 'UNIMARC') {
874             $marc_type = 'UNIMARC';
875         } else {
876             $marc_type = 'USMARC';
877         }
878
879         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
880
881         if ($record_result eq 'delete') {
882             my $error = undef;
883             if  ($record_type eq 'biblio') {
884                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
885                 $error = DelBiblio($rowref->{'matched_biblionumber'});
886             } else {
887                 DelAuthority({ authid => $rowref->{'matched_authid'} });
888             }
889             if (defined $error) {
890                 $num_errors++;
891             } else {
892                 $num_deleted++;
893                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
894             }
895         } elsif ($record_result eq 'restore') {
896             $num_reverted++;
897             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
898             if ($record_type eq 'biblio') {
899                 my $biblionumber = $rowref->{'matched_biblionumber'};
900                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
901
902                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
903                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
904
905                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
906                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
907             } else {
908                 my $authid = $rowref->{'matched_authid'};
909                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
910             }
911             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
912         } elsif ($record_result eq 'ignore') {
913             if ($record_type eq 'biblio') {
914                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
915             }
916             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
917         }
918         my $query;
919         if ($record_type eq 'biblio') {
920             # remove matched_biblionumber only if there is no 'imported' item left
921             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
922             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
923         } else {
924             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
925         }
926         my $sth2 = $dbh->prepare_cached($query);
927         $sth2->execute($rowref->{'import_record_id'});
928     }
929
930     $sth->finish();
931     SetImportBatchStatus($batch_id, 'reverted');
932     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
933 }
934
935 =head2 BatchRevertItems
936
937   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
938
939 =cut
940
941 sub BatchRevertItems {
942     my ($import_record_id, $biblionumber) = @_;
943
944     my $dbh = C4::Context->dbh;
945     my $num_items_deleted = 0;
946
947     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
948                                    FROM import_items
949                                    JOIN items USING (itemnumber)
950                                    WHERE import_record_id = ?");
951     $sth->bind_param(1, $import_record_id);
952     $sth->execute();
953     while (my $row = $sth->fetchrow_hashref()) {
954         my $item = Koha::Items->find($row->{itemnumber});
955         if ($item->safe_delete){
956             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
957             $updsth->bind_param(1, 'reverted');
958             $updsth->bind_param(2, $row->{'import_items_id'});
959             $updsth->execute();
960             $updsth->finish();
961             $num_items_deleted++;
962         }
963         else {
964             next;
965         }
966     }
967     $sth->finish();
968     return $num_items_deleted;
969 }
970
971 =head2 CleanBatch
972
973   CleanBatch($batch_id)
974
975 Deletes all staged records from the import batch
976 and sets the status of the batch to 'cleaned'.  Note
977 that deleting a stage record does *not* affect
978 any record that has been committed to the database.
979
980 =cut
981
982 sub CleanBatch {
983     my $batch_id = shift;
984     return unless defined $batch_id;
985
986     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
987     SetImportBatchStatus($batch_id, 'cleaned');
988 }
989
990 =head2 DeleteBatch
991
992   DeleteBatch($batch_id)
993
994 Deletes the record from the database. This can only be done
995 once the batch has been cleaned.
996
997 =cut
998
999 sub DeleteBatch {
1000     my $batch_id = shift;
1001     return unless defined $batch_id;
1002
1003     my $dbh = C4::Context->dbh;
1004     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1005     $sth->execute( $batch_id );
1006 }
1007
1008 =head2 GetAllImportBatches
1009
1010   my $results = GetAllImportBatches();
1011
1012 Returns a references to an array of hash references corresponding
1013 to all import_batches rows (of batch_type 'batch'), sorted in 
1014 ascending order by import_batch_id.
1015
1016 =cut
1017
1018 sub  GetAllImportBatches {
1019     my $dbh = C4::Context->dbh;
1020     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1021                                     WHERE batch_type IN ('batch', 'webservice')
1022                                     ORDER BY import_batch_id ASC");
1023
1024     my $results = [];
1025     $sth->execute();
1026     while (my $row = $sth->fetchrow_hashref) {
1027         push @$results, $row;
1028     }
1029     $sth->finish();
1030     return $results;
1031 }
1032
1033 =head2 GetStagedWebserviceBatches
1034
1035   my $batch_ids = GetStagedWebserviceBatches();
1036
1037 Returns a references to an array of batch id's
1038 of batch_type 'webservice' that are not imported
1039
1040 =cut
1041
1042 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1043 SELECT import_batch_id FROM import_batches
1044 WHERE batch_type = 'webservice'
1045 AND import_status = 'staged'
1046 EOQ
1047 sub  GetStagedWebserviceBatches {
1048     my $dbh = C4::Context->dbh;
1049     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1050 }
1051
1052 =head2 GetImportBatchRangeDesc
1053
1054   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1055
1056 Returns a reference to an array of hash references corresponding to
1057 import_batches rows (sorted in descending order by import_batch_id)
1058 start at the given offset.
1059
1060 =cut
1061
1062 sub GetImportBatchRangeDesc {
1063     my ($offset, $results_per_group) = @_;
1064
1065     my $dbh = C4::Context->dbh;
1066     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1067                                     LEFT JOIN import_batch_profiles p
1068                                     ON b.profile_id = p.id
1069                                     WHERE b.batch_type IN ('batch', 'webservice')
1070                                     ORDER BY b.import_batch_id DESC";
1071     my @params;
1072     if ($results_per_group){
1073         $query .= " LIMIT ?";
1074         push(@params, $results_per_group);
1075     }
1076     if ($offset){
1077         $query .= " OFFSET ?";
1078         push(@params, $offset);
1079     }
1080     my $sth = $dbh->prepare_cached($query);
1081     $sth->execute(@params);
1082     my $results = $sth->fetchall_arrayref({});
1083     $sth->finish();
1084     return $results;
1085 }
1086
1087 =head2 GetItemNumbersFromImportBatch
1088
1089   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1090
1091 =cut
1092
1093 sub GetItemNumbersFromImportBatch {
1094     my ($batch_id) = @_;
1095     my $dbh = C4::Context->dbh;
1096     my $sql = q|
1097 SELECT itemnumber FROM import_items
1098 INNER JOIN items USING (itemnumber)
1099 INNER JOIN import_records USING (import_record_id)
1100 WHERE import_batch_id = ?|;
1101     my  $sth = $dbh->prepare( $sql );
1102     $sth->execute($batch_id);
1103     my @items ;
1104     while ( my ($itm) = $sth->fetchrow_array ) {
1105         push @items, $itm;
1106     }
1107     return @items;
1108 }
1109
1110 =head2 GetNumberOfImportBatches
1111
1112   my $count = GetNumberOfImportBatches();
1113
1114 =cut
1115
1116 sub GetNumberOfNonZ3950ImportBatches {
1117     my $dbh = C4::Context->dbh;
1118     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1119     $sth->execute();
1120     my ($count) = $sth->fetchrow_array();
1121     $sth->finish();
1122     return $count;
1123 }
1124
1125 =head2 GetImportBiblios
1126
1127   my $results = GetImportBiblios($importid);
1128
1129 =cut
1130
1131 sub GetImportBiblios {
1132     my ($import_record_id) = @_;
1133
1134     my $dbh = C4::Context->dbh;
1135     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1136     return $dbh->selectall_arrayref(
1137         $query,
1138         { Slice => {} },
1139         $import_record_id
1140     );
1141
1142 }
1143
1144 =head2 GetImportRecordsRange
1145
1146   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1147
1148 Returns a reference to an array of hash references corresponding to
1149 import_biblios/import_auths/import_records rows for a given batch
1150 starting at the given offset.
1151
1152 =cut
1153
1154 sub GetImportRecordsRange {
1155     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1156
1157     my $dbh = C4::Context->dbh;
1158
1159     my $order_by = $parameters->{order_by} || 'import_record_id';
1160     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1161
1162     my $order_by_direction =
1163       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1164
1165     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1166
1167     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1168                                            record_sequence, status, overlay_status,
1169                                            matched_biblionumber, matched_authid, record_type
1170                                     FROM   import_records
1171                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1172                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1173                                     WHERE  import_batch_id = ?";
1174     my @params;
1175     push(@params, $batch_id);
1176     if ($status) {
1177         $query .= " AND status=?";
1178         push(@params,$status);
1179     }
1180
1181     $query.=" ORDER BY $order_by $order_by_direction";
1182
1183     if($results_per_group){
1184         $query .= " LIMIT ?";
1185         push(@params, $results_per_group);
1186     }
1187     if($offset){
1188         $query .= " OFFSET ?";
1189         push(@params, $offset);
1190     }
1191     my $sth = $dbh->prepare_cached($query);
1192     $sth->execute(@params);
1193     my $results = $sth->fetchall_arrayref({});
1194     $sth->finish();
1195     return $results;
1196
1197 }
1198
1199 =head2 GetBestRecordMatch
1200
1201   my $record_id = GetBestRecordMatch($import_record_id);
1202
1203 =cut
1204
1205 sub GetBestRecordMatch {
1206     my ($import_record_id) = @_;
1207
1208     my $dbh = C4::Context->dbh;
1209     my $sth = $dbh->prepare("SELECT candidate_match_id
1210                              FROM   import_record_matches
1211                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1212                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1213                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1214                              WHERE  import_record_matches.import_record_id = ? AND
1215                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1216                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1217                              AND chosen = 1
1218                              ORDER BY score DESC, candidate_match_id DESC");
1219     $sth->execute($import_record_id);
1220     my ($record_id) = $sth->fetchrow_array();
1221     $sth->finish();
1222     return $record_id;
1223 }
1224
1225 =head2 GetImportBatchStatus
1226
1227   my $status = GetImportBatchStatus($batch_id);
1228
1229 =cut
1230
1231 sub GetImportBatchStatus {
1232     my ($batch_id) = @_;
1233
1234     my $dbh = C4::Context->dbh;
1235     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1236     $sth->execute($batch_id);
1237     my ($status) = $sth->fetchrow_array();
1238     $sth->finish();
1239     return $status;
1240
1241 }
1242
1243 =head2 SetImportBatchStatus
1244
1245   SetImportBatchStatus($batch_id, $new_status);
1246
1247 =cut
1248
1249 sub SetImportBatchStatus {
1250     my ($batch_id, $new_status) = @_;
1251
1252     my $dbh = C4::Context->dbh;
1253     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1254     $sth->execute($new_status, $batch_id);
1255     $sth->finish();
1256
1257 }
1258
1259 =head2 SetMatchedBiblionumber
1260
1261   SetMatchedBiblionumber($import_record_id, $biblionumber);
1262
1263 =cut
1264
1265 sub SetMatchedBiblionumber {
1266     my ($import_record_id, $biblionumber) = @_;
1267
1268     my $dbh = C4::Context->dbh;
1269     $dbh->do(
1270         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1271         undef, $biblionumber, $import_record_id
1272     );
1273 }
1274
1275 =head2 GetImportBatchOverlayAction
1276
1277   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1278
1279 =cut
1280
1281 sub GetImportBatchOverlayAction {
1282     my ($batch_id) = @_;
1283
1284     my $dbh = C4::Context->dbh;
1285     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1286     $sth->execute($batch_id);
1287     my ($overlay_action) = $sth->fetchrow_array();
1288     $sth->finish();
1289     return $overlay_action;
1290
1291 }
1292
1293
1294 =head2 SetImportBatchOverlayAction
1295
1296   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1297
1298 =cut
1299
1300 sub SetImportBatchOverlayAction {
1301     my ($batch_id, $new_overlay_action) = @_;
1302
1303     my $dbh = C4::Context->dbh;
1304     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1305     $sth->execute($new_overlay_action, $batch_id);
1306     $sth->finish();
1307
1308 }
1309
1310 =head2 GetImportBatchNoMatchAction
1311
1312   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1313
1314 =cut
1315
1316 sub GetImportBatchNoMatchAction {
1317     my ($batch_id) = @_;
1318
1319     my $dbh = C4::Context->dbh;
1320     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1321     $sth->execute($batch_id);
1322     my ($nomatch_action) = $sth->fetchrow_array();
1323     $sth->finish();
1324     return $nomatch_action;
1325
1326 }
1327
1328
1329 =head2 SetImportBatchNoMatchAction
1330
1331   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1332
1333 =cut
1334
1335 sub SetImportBatchNoMatchAction {
1336     my ($batch_id, $new_nomatch_action) = @_;
1337
1338     my $dbh = C4::Context->dbh;
1339     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1340     $sth->execute($new_nomatch_action, $batch_id);
1341     $sth->finish();
1342
1343 }
1344
1345 =head2 GetImportBatchItemAction
1346
1347   my $item_action = GetImportBatchItemAction($batch_id);
1348
1349 =cut
1350
1351 sub GetImportBatchItemAction {
1352     my ($batch_id) = @_;
1353
1354     my $dbh = C4::Context->dbh;
1355     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1356     $sth->execute($batch_id);
1357     my ($item_action) = $sth->fetchrow_array();
1358     $sth->finish();
1359     return $item_action;
1360
1361 }
1362
1363
1364 =head2 SetImportBatchItemAction
1365
1366   SetImportBatchItemAction($batch_id, $new_item_action);
1367
1368 =cut
1369
1370 sub SetImportBatchItemAction {
1371     my ($batch_id, $new_item_action) = @_;
1372
1373     my $dbh = C4::Context->dbh;
1374     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1375     $sth->execute($new_item_action, $batch_id);
1376     $sth->finish();
1377
1378 }
1379
1380 =head2 GetImportBatchMatcher
1381
1382   my $matcher_id = GetImportBatchMatcher($batch_id);
1383
1384 =cut
1385
1386 sub GetImportBatchMatcher {
1387     my ($batch_id) = @_;
1388
1389     my $dbh = C4::Context->dbh;
1390     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1391     $sth->execute($batch_id);
1392     my ($matcher_id) = $sth->fetchrow_array();
1393     $sth->finish();
1394     return $matcher_id;
1395
1396 }
1397
1398
1399 =head2 SetImportBatchMatcher
1400
1401   SetImportBatchMatcher($batch_id, $new_matcher_id);
1402
1403 =cut
1404
1405 sub SetImportBatchMatcher {
1406     my ($batch_id, $new_matcher_id) = @_;
1407
1408     my $dbh = C4::Context->dbh;
1409     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1410     $sth->execute($new_matcher_id, $batch_id);
1411     $sth->finish();
1412
1413 }
1414
1415 =head2 GetImportRecordOverlayStatus
1416
1417   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1418
1419 =cut
1420
1421 sub GetImportRecordOverlayStatus {
1422     my ($import_record_id) = @_;
1423
1424     my $dbh = C4::Context->dbh;
1425     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1426     $sth->execute($import_record_id);
1427     my ($overlay_status) = $sth->fetchrow_array();
1428     $sth->finish();
1429     return $overlay_status;
1430
1431 }
1432
1433
1434 =head2 SetImportRecordOverlayStatus
1435
1436   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1437
1438 =cut
1439
1440 sub SetImportRecordOverlayStatus {
1441     my ($import_record_id, $new_overlay_status) = @_;
1442
1443     my $dbh = C4::Context->dbh;
1444     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1445     $sth->execute($new_overlay_status, $import_record_id);
1446     $sth->finish();
1447
1448 }
1449
1450 =head2 GetImportRecordStatus
1451
1452   my $status = GetImportRecordStatus($import_record_id);
1453
1454 =cut
1455
1456 sub GetImportRecordStatus {
1457     my ($import_record_id) = @_;
1458
1459     my $dbh = C4::Context->dbh;
1460     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1461     $sth->execute($import_record_id);
1462     my ($status) = $sth->fetchrow_array();
1463     $sth->finish();
1464     return $status;
1465
1466 }
1467
1468
1469 =head2 SetImportRecordStatus
1470
1471   SetImportRecordStatus($import_record_id, $new_status);
1472
1473 =cut
1474
1475 sub SetImportRecordStatus {
1476     my ($import_record_id, $new_status) = @_;
1477
1478     my $dbh = C4::Context->dbh;
1479     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1480     $sth->execute($new_status, $import_record_id);
1481     $sth->finish();
1482
1483 }
1484
1485 =head2 GetImportRecordMatches
1486
1487   my $results = GetImportRecordMatches($import_record_id, $best_only);
1488
1489 =cut
1490
1491 sub GetImportRecordMatches {
1492     my $import_record_id = shift;
1493     my $best_only = @_ ? shift : 0;
1494
1495     my $dbh = C4::Context->dbh;
1496     # FIXME currently biblio only
1497     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1498                                     candidate_match_id, score, record_type,
1499                                     chosen
1500                                     FROM import_records
1501                                     JOIN import_record_matches USING (import_record_id)
1502                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1503                                     WHERE import_record_id = ?
1504                                     ORDER BY score DESC, biblionumber DESC");
1505     $sth->bind_param(1, $import_record_id);
1506     my $results = [];
1507     $sth->execute();
1508     while (my $row = $sth->fetchrow_hashref) {
1509         if ($row->{'record_type'} eq 'auth') {
1510             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1511         }
1512         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1513         push @$results, $row;
1514         last if $best_only;
1515     }
1516     $sth->finish();
1517
1518     return $results;
1519     
1520 }
1521
1522 =head2 SetImportRecordMatches
1523
1524   SetImportRecordMatches($import_record_id, @matches);
1525
1526 =cut
1527
1528 sub SetImportRecordMatches {
1529     my $import_record_id = shift;
1530     my @matches = @_;
1531
1532     my $dbh = C4::Context->dbh;
1533     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1534     $delsth->execute($import_record_id);
1535     $delsth->finish();
1536
1537     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1538                                     VALUES (?, ?, ?, ?)");
1539     my $chosen = 1; #The first match is defaulted to be chosen
1540     foreach my $match (@matches) {
1541         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1542         $chosen = 0; #After the first we do not default to other matches
1543     }
1544 }
1545
1546 =head2 RecordsFromISO2709File
1547
1548     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1549
1550 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1551
1552 @PARAM1, String, absolute path to the ISO2709 file.
1553 @PARAM2, String, see stage_file.pl
1554 @PARAM3, String, should be utf8
1555
1556 Returns two array refs.
1557
1558 =cut
1559
1560 sub RecordsFromISO2709File {
1561     my ($input_file, $record_type, $encoding) = @_;
1562     my @errors;
1563
1564     my $marc_type = C4::Context->preference('marcflavour');
1565     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1566
1567     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1568     my @marc_records;
1569     $/ = "\035";
1570     while (<$fh>) {
1571         s/^\s+//;
1572         s/\s+$//;
1573         next unless $_; # skip if record has only whitespace, as might occur
1574                         # if file includes newlines between each MARC record
1575         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1576         push @marc_records, $marc_record;
1577         if ($charset_guessed ne $encoding) {
1578             push @errors,
1579                 "Unexpected charset $charset_guessed, expecting $encoding";
1580         }
1581     }
1582     close $fh;
1583     return ( \@errors, \@marc_records );
1584 }
1585
1586 =head2 RecordsFromMARCXMLFile
1587
1588     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1589
1590 Creates MARC::Record-objects out of the given MARCXML-file.
1591
1592 @PARAM1, String, absolute path to the MARCXML file.
1593 @PARAM2, String, should be utf8
1594
1595 Returns two array refs.
1596
1597 =cut
1598
1599 sub RecordsFromMARCXMLFile {
1600     my ( $filename, $encoding ) = @_;
1601     my $batch = MARC::File::XML->in( $filename );
1602     my ( @marcRecords, @errors, $record );
1603     do {
1604         eval { $record = $batch->next( $encoding ); };
1605         if ($@) {
1606             push @errors, $@;
1607         }
1608         push @marcRecords, $record if $record;
1609     } while( $record );
1610     return (\@errors, \@marcRecords);
1611 }
1612
1613 =head2 RecordsFromMarcPlugin
1614
1615     Converts text of input_file into array of MARC records with to_marc plugin
1616
1617 =cut
1618
1619 sub RecordsFromMarcPlugin {
1620     my ($input_file, $plugin_class, $encoding) = @_;
1621     my ( $text, @return );
1622     return \@return if !$input_file || !$plugin_class;
1623
1624     # Read input file
1625     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1626     $/ = "\035";
1627     while (<$fh>) {
1628         s/^\s+//;
1629         s/\s+$//;
1630         next unless $_;
1631         $text .= $_;
1632     }
1633     close $fh;
1634
1635     # Convert to large MARC blob with plugin
1636     $text = Koha::Plugins::Handler->run({
1637         class  => $plugin_class,
1638         method => 'to_marc',
1639         params => { data => $text },
1640     }) if $text;
1641
1642     # Convert to array of MARC records
1643     if( $text ) {
1644         my $marc_type = C4::Context->preference('marcflavour');
1645         foreach my $blob ( split(/\x1D/, $text) ) {
1646             next if $blob =~ /^\s*$/;
1647             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1648             push @return, $marcrecord;
1649         }
1650     }
1651     return \@return;
1652 }
1653
1654 # internal functions
1655
1656 sub _create_import_record {
1657     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1658
1659     my $dbh = C4::Context->dbh;
1660     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1661                                                          record_type, encoding)
1662                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1663     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1664                   $record_type, $encoding);
1665     my $import_record_id = $dbh->{'mysql_insertid'};
1666     $sth->finish();
1667     return $import_record_id;
1668 }
1669
1670 sub _add_auth_fields {
1671     my ($import_record_id, $marc_record) = @_;
1672
1673     my $controlnumber;
1674     if ($marc_record->field('001')) {
1675         $controlnumber = $marc_record->field('001')->data();
1676     }
1677     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1678     my $dbh = C4::Context->dbh;
1679     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1680     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1681     $sth->finish();
1682 }
1683
1684 sub _add_biblio_fields {
1685     my ($import_record_id, $marc_record) = @_;
1686
1687     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1688     my $dbh = C4::Context->dbh;
1689     # FIXME no controlnumber, originalsource
1690     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1691     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1692     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1693     $sth->finish();
1694                 
1695 }
1696
1697 sub _update_biblio_fields {
1698     my ($import_record_id, $marc_record) = @_;
1699
1700     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1701     my $dbh = C4::Context->dbh;
1702     # FIXME no controlnumber, originalsource
1703     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1704     $isbn =~ s/\(.*$//;
1705     $isbn =~ tr/ -_//;
1706     $isbn = uc $isbn;
1707     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1708                              WHERE  import_record_id = ?");
1709     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1710     $sth->finish();
1711 }
1712
1713 sub _parse_biblio_fields {
1714     my ($marc_record) = @_;
1715
1716     my $dbh = C4::Context->dbh;
1717     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1718     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1719
1720 }
1721
1722 sub _update_batch_record_counts {
1723     my ($batch_id) = @_;
1724
1725     my $dbh = C4::Context->dbh;
1726     my ( $num_records ) = $dbh->selectrow_array(q|
1727                                             SELECT COUNT(*)
1728                                             FROM import_records
1729                                             WHERE import_batch_id = ?
1730     |, undef, $batch_id );
1731     my ( $num_items ) = $dbh->selectrow_array(q|
1732                                             SELECT COUNT(*)
1733                                             FROM import_records
1734                                             JOIN import_items USING (import_record_id)
1735                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1736     |, undef, $batch_id );
1737     $dbh->do(
1738         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1739         undef,
1740         $num_records,
1741         $num_items,
1742         $batch_id,
1743     );
1744 }
1745
1746 sub _get_commit_action {
1747     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1748     
1749     if ($record_type eq 'biblio') {
1750         my ($bib_result, $bib_match, $item_result);
1751
1752         $bib_match = GetBestRecordMatch($import_record_id);
1753         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1754
1755             $bib_result = $overlay_action;
1756
1757             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1758                 $item_result = 'create_new';
1759             } elsif($item_action eq 'replace'){
1760                 $item_result = 'replace';
1761             } else {
1762                 $item_result = 'ignore';
1763             }
1764
1765         } else {
1766             $bib_result = $nomatch_action;
1767             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1768         }
1769         return ($bib_result, $item_result, $bib_match);
1770     } else { # must be auths
1771         my ($auth_result, $auth_match);
1772
1773         $auth_match = GetBestRecordMatch($import_record_id);
1774         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1775             $auth_result = $overlay_action;
1776         } else {
1777             $auth_result = $nomatch_action;
1778         }
1779
1780         return ($auth_result, undef, $auth_match);
1781
1782     }
1783 }
1784
1785 sub _get_revert_action {
1786     my ($overlay_action, $overlay_status, $status) = @_;
1787
1788     my $bib_result;
1789
1790     if ($status eq 'ignored') {
1791         $bib_result = 'ignore';
1792     } else {
1793         if ($overlay_action eq 'create_new') {
1794             $bib_result = 'delete';
1795         } else {
1796             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1797         }
1798     }
1799     return $bib_result;
1800 }
1801
1802 1;
1803 __END__
1804
1805 =head1 AUTHOR
1806
1807 Koha Development Team <http://koha-community.org/>
1808
1809 Galen Charlton <galen.charlton@liblime.com>
1810
1811 =cut