Bug 33972: Remove settings of batch status to importing
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
513   'progress_callback' will define code called every X records.
514
515   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
516         BatchCommitRecords({
517             batch_id  => $batch_id,
518             framework => $framework,
519             overlay_framework => $overlay_framework,
520             progress_interval => $progress_interval,
521             progress_callback => $progress_callback,
522             skip_intermediate_commit => $skip_intermediate_commit
523         });
524
525     Parameter skip_intermediate_commit does what is says.
526 =cut
527
528 sub BatchCommitRecords {
529     my $params = shift;
530     my $batch_id          = $params->{batch_id};
531     my $framework         = $params->{framework};
532     my $overlay_framework = $params->{overlay_framework};
533     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
534     my $progress_interval = $params->{progress_interval} // 0;
535     my $progress_callback = $params->{progress_callback};
536     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
537     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
538
539     my $schema = Koha::Database->schema;
540     $schema->txn_begin;
541     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
542     # transactions may be committed and started too again. The final commit is close to the end.
543
544     my $record_type;
545     my $num_added = 0;
546     my $num_updated = 0;
547     my $num_items_added = 0;
548     my $num_items_replaced = 0;
549     my $num_items_errored = 0;
550     my $num_ignored = 0;
551     # commit (i.e., save, all records in the batch)
552     my $overlay_action = GetImportBatchOverlayAction($batch_id);
553     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
554     my $item_action = GetImportBatchItemAction($batch_id);
555     my $item_tag;
556     my $item_subfield;
557     my $dbh = C4::Context->dbh;
558     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
559                              FROM import_records
560                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
561                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
562                              WHERE import_batch_id = ?");
563     $sth->execute($batch_id);
564     my $marcflavour = C4::Context->preference('marcflavour');
565
566     my $userenv = C4::Context->userenv;
567     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
568
569     my $rec_num = 0;
570     my @biblio_ids;
571     while (my $rowref = $sth->fetchrow_hashref) {
572         $record_type = $rowref->{'record_type'};
573
574         $rec_num++;
575
576         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
577             # report progress and commit
578             $schema->txn_commit unless $skip_intermediate_commit;
579             &$progress_callback( $rec_num );
580             $schema->txn_begin unless $skip_intermediate_commit;
581         }
582         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
583             $num_ignored++;
584             next;
585         }
586
587         my $marc_type;
588         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
589             $marc_type = 'UNIMARCAUTH';
590         } elsif ($marcflavour eq 'UNIMARC') {
591             $marc_type = 'UNIMARC';
592         } else {
593             $marc_type = 'USMARC';
594         }
595         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
596
597         if ($record_type eq 'biblio') {
598             # remove any item tags - rely on _batchCommitItems
599             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
600             foreach my $item_field ($marc_record->field($item_tag)) {
601                 $marc_record->delete_field($item_field);
602             }
603             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
604                 my @control_num = $marc_record->field('001');
605                 $marc_record->delete_fields(@control_num);
606             }
607         }
608
609         my ($record_result, $item_result, $record_match) =
610             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
611                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
612
613         my $recordid;
614         my $query;
615         if ($record_result eq 'create_new') {
616             $num_added++;
617             if ($record_type eq 'biblio') {
618                 my $biblioitemnumber;
619                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
620                 push @biblio_ids, $recordid if $recordid;
621                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
622                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
623                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
624                     $num_items_added += $bib_items_added;
625                     $num_items_replaced += $bib_items_replaced;
626                     $num_items_errored += $bib_items_errored;
627                 }
628             } else {
629                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
630                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
631             }
632             my $sth = $dbh->prepare_cached($query);
633             $sth->execute($recordid, $rowref->{'import_record_id'});
634             $sth->finish();
635             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
636         } elsif ($record_result eq 'replace') {
637             $num_updated++;
638             $recordid = $record_match;
639             my $oldxml;
640             if ($record_type eq 'biblio') {
641                 my $oldbiblio = Koha::Biblios->find( $recordid );
642                 $oldxml = GetXmlBiblio($recordid);
643
644                 # remove item fields so that they don't get
645                 # added again if record is reverted
646                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
647                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
648                 foreach my $item_field ($old_marc->field($item_tag)) {
649                     $old_marc->delete_field($item_field);
650                 }
651                 $oldxml = $old_marc->as_xml($marc_type);
652
653                 my $context = { source => 'batchimport' };
654                 if ($logged_in_patron) {
655                     $context->{categorycode} = $logged_in_patron->categorycode;
656                     $context->{userid} = $logged_in_patron->userid;
657                 }
658
659                 ModBiblio(
660                     $marc_record,
661                     $recordid,
662                     $overlay_framework // $oldbiblio->frameworkcode,
663                     {
664                         overlay_context   => $context,
665                         skip_record_index => 1
666                     }
667                 );
668                 push @biblio_ids, $recordid;
669                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
670
671                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
672                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
673                     $num_items_added += $bib_items_added;
674                     $num_items_replaced += $bib_items_replaced;
675                     $num_items_errored += $bib_items_errored;
676                 }
677             } else {
678                 $oldxml = GetAuthorityXML($recordid);
679
680                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
681                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
682             }
683             # Combine xml update, SetImportRecordOverlayStatus, and SetImportRecordStatus updates into a single update for efficiency, especially in a transaction
684             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ?, status = ?, overlay_status = ? WHERE import_record_id = ?");
685             $sth->execute( $oldxml, 'imported', 'match_applied', $rowref->{'import_record_id'} );
686             $sth->finish();
687             my $sth2 = $dbh->prepare_cached($query);
688             $sth2->execute($recordid, $rowref->{'import_record_id'});
689             $sth2->finish();
690         } elsif ($record_result eq 'ignore') {
691             $recordid = $record_match;
692             $num_ignored++;
693             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
694                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
695                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
696                 $num_items_added += $bib_items_added;
697          $num_items_replaced += $bib_items_replaced;
698                 $num_items_errored += $bib_items_errored;
699                 # still need to record the matched biblionumber so that the
700                 # items can be reverted
701                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
702                 $sth2->execute($recordid, $rowref->{'import_record_id'});
703                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
704             }
705             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
706         }
707     }
708
709     if ($progress_interval){
710         &$progress_callback($rec_num);
711     }
712
713     $sth->finish();
714
715     SetImportBatchStatus($batch_id, 'imported');
716
717     # final commit should be before Elastic background indexing in order to find job data
718     $schema->txn_commit;
719
720     if ( @biblio_ids ) {
721         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
722         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
723     }
724
725     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
726 }
727
728 =head2 _batchCommitItems
729
730   ($num_items_added, $num_items_errored) = 
731          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
732
733 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
734
735 =cut
736
737 sub _batchCommitItems {
738     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
739
740     my $dbh = C4::Context->dbh;
741
742     my $num_items_added = 0;
743     my $num_items_errored = 0;
744     my $num_items_replaced = 0;
745
746     my $sth = $dbh->prepare( "
747         SELECT import_items_id, import_items.marcxml, encoding
748         FROM import_items
749         JOIN import_records USING (import_record_id)
750         WHERE import_record_id = ?
751         ORDER BY import_items_id
752     " );
753     $sth->bind_param( 1, $import_record_id );
754     $sth->execute();
755
756     while ( my $row = $sth->fetchrow_hashref() ) {
757         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
758
759         # Delete date_due subfield as to not accidentally delete item checkout due dates
760         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
761         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
762
763         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
764
765         my $item_match;
766         my $duplicate_barcode = exists( $item->{'barcode'} );
767         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
768
769         # We assume that when replacing items we do not want to move them - the onus is on the importer to
770         # ensure the correct items/records are being updated
771         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
772         if (
773             $action eq "replace" &&
774             $duplicate_itemnumber &&
775             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
776         ) {
777             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
778             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
779             $updsth->bind_param( 1, 'imported' );
780             $updsth->bind_param( 2, $item->{itemnumber} );
781             $updsth->bind_param( 3, undef );
782             $updsth->bind_param( 4, $row->{'import_items_id'} );
783             $updsth->execute();
784             $updsth->finish();
785             $num_items_replaced++;
786         } elsif (
787             $action eq "replace" &&
788             $duplicate_barcode &&
789             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
790         ) {
791             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
792             $updsth->bind_param( 1, 'imported' );
793             $updsth->bind_param( 2, $item->{itemnumber} );
794             $updsth->bind_param( 3, undef );
795             $updsth->bind_param( 4, $row->{'import_items_id'} );
796             $updsth->execute();
797             $updsth->finish();
798             $num_items_replaced++;
799         } elsif (
800             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
801             $duplicate_barcode &&
802             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
803         ) {
804             $updsth->bind_param( 1, 'error' );
805             $updsth->bind_param( 2, undef );
806             $updsth->bind_param( 3, 'duplicate item barcode' );
807             $updsth->bind_param( 4, $row->{'import_items_id'} );
808             $updsth->execute();
809             $num_items_errored++;
810         } else {
811             # Remove the itemnumber if it exists, we want to create a new item
812             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
813             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
814
815             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
816             if( $itemnumber ) {
817                 $updsth->bind_param( 1, 'imported' );
818                 $updsth->bind_param( 2, $itemnumber );
819                 $updsth->bind_param( 3, undef );
820                 $updsth->bind_param( 4, $row->{'import_items_id'} );
821                 $updsth->execute();
822                 $updsth->finish();
823                 $num_items_added++;
824             }
825         }
826     }
827
828     return ( $num_items_added, $num_items_replaced, $num_items_errored );
829 }
830
831 =head2 BatchRevertRecords
832
833   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
834       $num_ignored) = BatchRevertRecords($batch_id);
835
836 =cut
837
838 sub BatchRevertRecords {
839     my $batch_id = shift;
840
841     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
842
843     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
844
845     my $record_type;
846     my $num_deleted = 0;
847     my $num_errors = 0;
848     my $num_reverted = 0;
849     my $num_ignored = 0;
850     my $num_items_deleted = 0;
851     # commit (i.e., save, all records in the batch)
852     SetImportBatchStatus($batch_id, 'reverting');
853     my $overlay_action = GetImportBatchOverlayAction($batch_id);
854     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
855     my $dbh = C4::Context->dbh;
856     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
857                              FROM import_records
858                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
859                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
860                              WHERE import_batch_id = ?");
861     $sth->execute($batch_id);
862     my $marc_type;
863     my $marcflavour = C4::Context->preference('marcflavour');
864     while (my $rowref = $sth->fetchrow_hashref) {
865         $record_type = $rowref->{'record_type'};
866         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
867             $num_ignored++;
868             next;
869         }
870         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
871             $marc_type = 'UNIMARCAUTH';
872         } elsif ($marcflavour eq 'UNIMARC') {
873             $marc_type = 'UNIMARC';
874         } else {
875             $marc_type = 'USMARC';
876         }
877
878         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
879
880         if ($record_result eq 'delete') {
881             my $error = undef;
882             if  ($record_type eq 'biblio') {
883                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
884                 $error = DelBiblio($rowref->{'matched_biblionumber'});
885             } else {
886                 DelAuthority({ authid => $rowref->{'matched_authid'} });
887             }
888             if (defined $error) {
889                 $num_errors++;
890             } else {
891                 $num_deleted++;
892                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
893             }
894         } elsif ($record_result eq 'restore') {
895             $num_reverted++;
896             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
897             if ($record_type eq 'biblio') {
898                 my $biblionumber = $rowref->{'matched_biblionumber'};
899                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
900
901                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
902                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
903
904                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
905                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
906             } else {
907                 my $authid = $rowref->{'matched_authid'};
908                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
909             }
910             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
911         } elsif ($record_result eq 'ignore') {
912             if ($record_type eq 'biblio') {
913                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
914             }
915             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
916         }
917         my $query;
918         if ($record_type eq 'biblio') {
919             # remove matched_biblionumber only if there is no 'imported' item left
920             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
921             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
922         } else {
923             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
924         }
925         my $sth2 = $dbh->prepare_cached($query);
926         $sth2->execute($rowref->{'import_record_id'});
927     }
928
929     $sth->finish();
930     SetImportBatchStatus($batch_id, 'reverted');
931     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
932 }
933
934 =head2 BatchRevertItems
935
936   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
937
938 =cut
939
940 sub BatchRevertItems {
941     my ($import_record_id, $biblionumber) = @_;
942
943     my $dbh = C4::Context->dbh;
944     my $num_items_deleted = 0;
945
946     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
947                                    FROM import_items
948                                    JOIN items USING (itemnumber)
949                                    WHERE import_record_id = ?");
950     $sth->bind_param(1, $import_record_id);
951     $sth->execute();
952     while (my $row = $sth->fetchrow_hashref()) {
953         my $item = Koha::Items->find($row->{itemnumber});
954         if ($item->safe_delete){
955             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
956             $updsth->bind_param(1, 'reverted');
957             $updsth->bind_param(2, $row->{'import_items_id'});
958             $updsth->execute();
959             $updsth->finish();
960             $num_items_deleted++;
961         }
962         else {
963             next;
964         }
965     }
966     $sth->finish();
967     return $num_items_deleted;
968 }
969
970 =head2 CleanBatch
971
972   CleanBatch($batch_id)
973
974 Deletes all staged records from the import batch
975 and sets the status of the batch to 'cleaned'.  Note
976 that deleting a stage record does *not* affect
977 any record that has been committed to the database.
978
979 =cut
980
981 sub CleanBatch {
982     my $batch_id = shift;
983     return unless defined $batch_id;
984
985     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
986     SetImportBatchStatus($batch_id, 'cleaned');
987 }
988
989 =head2 DeleteBatch
990
991   DeleteBatch($batch_id)
992
993 Deletes the record from the database. This can only be done
994 once the batch has been cleaned.
995
996 =cut
997
998 sub DeleteBatch {
999     my $batch_id = shift;
1000     return unless defined $batch_id;
1001
1002     my $dbh = C4::Context->dbh;
1003     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1004     $sth->execute( $batch_id );
1005 }
1006
1007 =head2 GetAllImportBatches
1008
1009   my $results = GetAllImportBatches();
1010
1011 Returns a references to an array of hash references corresponding
1012 to all import_batches rows (of batch_type 'batch'), sorted in 
1013 ascending order by import_batch_id.
1014
1015 =cut
1016
1017 sub  GetAllImportBatches {
1018     my $dbh = C4::Context->dbh;
1019     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1020                                     WHERE batch_type IN ('batch', 'webservice')
1021                                     ORDER BY import_batch_id ASC");
1022
1023     my $results = [];
1024     $sth->execute();
1025     while (my $row = $sth->fetchrow_hashref) {
1026         push @$results, $row;
1027     }
1028     $sth->finish();
1029     return $results;
1030 }
1031
1032 =head2 GetStagedWebserviceBatches
1033
1034   my $batch_ids = GetStagedWebserviceBatches();
1035
1036 Returns a references to an array of batch id's
1037 of batch_type 'webservice' that are not imported
1038
1039 =cut
1040
1041 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1042 SELECT import_batch_id FROM import_batches
1043 WHERE batch_type = 'webservice'
1044 AND import_status = 'staged'
1045 EOQ
1046 sub  GetStagedWebserviceBatches {
1047     my $dbh = C4::Context->dbh;
1048     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1049 }
1050
1051 =head2 GetImportBatchRangeDesc
1052
1053   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1054
1055 Returns a reference to an array of hash references corresponding to
1056 import_batches rows (sorted in descending order by import_batch_id)
1057 start at the given offset.
1058
1059 =cut
1060
1061 sub GetImportBatchRangeDesc {
1062     my ($offset, $results_per_group) = @_;
1063
1064     my $dbh = C4::Context->dbh;
1065     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1066                                     LEFT JOIN import_batch_profiles p
1067                                     ON b.profile_id = p.id
1068                                     WHERE b.batch_type IN ('batch', 'webservice')
1069                                     ORDER BY b.import_batch_id DESC";
1070     my @params;
1071     if ($results_per_group){
1072         $query .= " LIMIT ?";
1073         push(@params, $results_per_group);
1074     }
1075     if ($offset){
1076         $query .= " OFFSET ?";
1077         push(@params, $offset);
1078     }
1079     my $sth = $dbh->prepare_cached($query);
1080     $sth->execute(@params);
1081     my $results = $sth->fetchall_arrayref({});
1082     $sth->finish();
1083     return $results;
1084 }
1085
1086 =head2 GetItemNumbersFromImportBatch
1087
1088   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1089
1090 =cut
1091
1092 sub GetItemNumbersFromImportBatch {
1093     my ($batch_id) = @_;
1094     my $dbh = C4::Context->dbh;
1095     my $sql = q|
1096 SELECT itemnumber FROM import_items
1097 INNER JOIN items USING (itemnumber)
1098 INNER JOIN import_records USING (import_record_id)
1099 WHERE import_batch_id = ?|;
1100     my  $sth = $dbh->prepare( $sql );
1101     $sth->execute($batch_id);
1102     my @items ;
1103     while ( my ($itm) = $sth->fetchrow_array ) {
1104         push @items, $itm;
1105     }
1106     return @items;
1107 }
1108
1109 =head2 GetNumberOfImportBatches
1110
1111   my $count = GetNumberOfImportBatches();
1112
1113 =cut
1114
1115 sub GetNumberOfNonZ3950ImportBatches {
1116     my $dbh = C4::Context->dbh;
1117     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1118     $sth->execute();
1119     my ($count) = $sth->fetchrow_array();
1120     $sth->finish();
1121     return $count;
1122 }
1123
1124 =head2 GetImportBiblios
1125
1126   my $results = GetImportBiblios($importid);
1127
1128 =cut
1129
1130 sub GetImportBiblios {
1131     my ($import_record_id) = @_;
1132
1133     my $dbh = C4::Context->dbh;
1134     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1135     return $dbh->selectall_arrayref(
1136         $query,
1137         { Slice => {} },
1138         $import_record_id
1139     );
1140
1141 }
1142
1143 =head2 GetImportRecordsRange
1144
1145   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1146
1147 Returns a reference to an array of hash references corresponding to
1148 import_biblios/import_auths/import_records rows for a given batch
1149 starting at the given offset.
1150
1151 =cut
1152
1153 sub GetImportRecordsRange {
1154     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1155
1156     my $dbh = C4::Context->dbh;
1157
1158     my $order_by = $parameters->{order_by} || 'import_record_id';
1159     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1160
1161     my $order_by_direction =
1162       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1163
1164     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1165
1166     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1167                                            record_sequence, status, overlay_status,
1168                                            matched_biblionumber, matched_authid, record_type
1169                                     FROM   import_records
1170                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1171                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1172                                     WHERE  import_batch_id = ?";
1173     my @params;
1174     push(@params, $batch_id);
1175     if ($status) {
1176         $query .= " AND status=?";
1177         push(@params,$status);
1178     }
1179
1180     $query.=" ORDER BY $order_by $order_by_direction";
1181
1182     if($results_per_group){
1183         $query .= " LIMIT ?";
1184         push(@params, $results_per_group);
1185     }
1186     if($offset){
1187         $query .= " OFFSET ?";
1188         push(@params, $offset);
1189     }
1190     my $sth = $dbh->prepare_cached($query);
1191     $sth->execute(@params);
1192     my $results = $sth->fetchall_arrayref({});
1193     $sth->finish();
1194     return $results;
1195
1196 }
1197
1198 =head2 GetBestRecordMatch
1199
1200   my $record_id = GetBestRecordMatch($import_record_id);
1201
1202 =cut
1203
1204 sub GetBestRecordMatch {
1205     my ($import_record_id) = @_;
1206
1207     my $dbh = C4::Context->dbh;
1208     my $sth = $dbh->prepare("SELECT candidate_match_id
1209                              FROM   import_record_matches
1210                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1211                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1212                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1213                              WHERE  import_record_matches.import_record_id = ? AND
1214                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1215                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1216                              AND chosen = 1
1217                              ORDER BY score DESC, candidate_match_id DESC");
1218     $sth->execute($import_record_id);
1219     my ($record_id) = $sth->fetchrow_array();
1220     $sth->finish();
1221     return $record_id;
1222 }
1223
1224 =head2 GetImportBatchStatus
1225
1226   my $status = GetImportBatchStatus($batch_id);
1227
1228 =cut
1229
1230 sub GetImportBatchStatus {
1231     my ($batch_id) = @_;
1232
1233     my $dbh = C4::Context->dbh;
1234     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1235     $sth->execute($batch_id);
1236     my ($status) = $sth->fetchrow_array();
1237     $sth->finish();
1238     return $status;
1239
1240 }
1241
1242 =head2 SetImportBatchStatus
1243
1244   SetImportBatchStatus($batch_id, $new_status);
1245
1246 =cut
1247
1248 sub SetImportBatchStatus {
1249     my ($batch_id, $new_status) = @_;
1250
1251     my $dbh = C4::Context->dbh;
1252     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1253     $sth->execute($new_status, $batch_id);
1254     $sth->finish();
1255
1256 }
1257
1258 =head2 SetMatchedBiblionumber
1259
1260   SetMatchedBiblionumber($import_record_id, $biblionumber);
1261
1262 =cut
1263
1264 sub SetMatchedBiblionumber {
1265     my ($import_record_id, $biblionumber) = @_;
1266
1267     my $dbh = C4::Context->dbh;
1268     $dbh->do(
1269         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1270         undef, $biblionumber, $import_record_id
1271     );
1272 }
1273
1274 =head2 GetImportBatchOverlayAction
1275
1276   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1277
1278 =cut
1279
1280 sub GetImportBatchOverlayAction {
1281     my ($batch_id) = @_;
1282
1283     my $dbh = C4::Context->dbh;
1284     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1285     $sth->execute($batch_id);
1286     my ($overlay_action) = $sth->fetchrow_array();
1287     $sth->finish();
1288     return $overlay_action;
1289
1290 }
1291
1292
1293 =head2 SetImportBatchOverlayAction
1294
1295   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1296
1297 =cut
1298
1299 sub SetImportBatchOverlayAction {
1300     my ($batch_id, $new_overlay_action) = @_;
1301
1302     my $dbh = C4::Context->dbh;
1303     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1304     $sth->execute($new_overlay_action, $batch_id);
1305     $sth->finish();
1306
1307 }
1308
1309 =head2 GetImportBatchNoMatchAction
1310
1311   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1312
1313 =cut
1314
1315 sub GetImportBatchNoMatchAction {
1316     my ($batch_id) = @_;
1317
1318     my $dbh = C4::Context->dbh;
1319     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1320     $sth->execute($batch_id);
1321     my ($nomatch_action) = $sth->fetchrow_array();
1322     $sth->finish();
1323     return $nomatch_action;
1324
1325 }
1326
1327
1328 =head2 SetImportBatchNoMatchAction
1329
1330   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1331
1332 =cut
1333
1334 sub SetImportBatchNoMatchAction {
1335     my ($batch_id, $new_nomatch_action) = @_;
1336
1337     my $dbh = C4::Context->dbh;
1338     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1339     $sth->execute($new_nomatch_action, $batch_id);
1340     $sth->finish();
1341
1342 }
1343
1344 =head2 GetImportBatchItemAction
1345
1346   my $item_action = GetImportBatchItemAction($batch_id);
1347
1348 =cut
1349
1350 sub GetImportBatchItemAction {
1351     my ($batch_id) = @_;
1352
1353     my $dbh = C4::Context->dbh;
1354     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1355     $sth->execute($batch_id);
1356     my ($item_action) = $sth->fetchrow_array();
1357     $sth->finish();
1358     return $item_action;
1359
1360 }
1361
1362
1363 =head2 SetImportBatchItemAction
1364
1365   SetImportBatchItemAction($batch_id, $new_item_action);
1366
1367 =cut
1368
1369 sub SetImportBatchItemAction {
1370     my ($batch_id, $new_item_action) = @_;
1371
1372     my $dbh = C4::Context->dbh;
1373     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1374     $sth->execute($new_item_action, $batch_id);
1375     $sth->finish();
1376
1377 }
1378
1379 =head2 GetImportBatchMatcher
1380
1381   my $matcher_id = GetImportBatchMatcher($batch_id);
1382
1383 =cut
1384
1385 sub GetImportBatchMatcher {
1386     my ($batch_id) = @_;
1387
1388     my $dbh = C4::Context->dbh;
1389     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1390     $sth->execute($batch_id);
1391     my ($matcher_id) = $sth->fetchrow_array();
1392     $sth->finish();
1393     return $matcher_id;
1394
1395 }
1396
1397
1398 =head2 SetImportBatchMatcher
1399
1400   SetImportBatchMatcher($batch_id, $new_matcher_id);
1401
1402 =cut
1403
1404 sub SetImportBatchMatcher {
1405     my ($batch_id, $new_matcher_id) = @_;
1406
1407     my $dbh = C4::Context->dbh;
1408     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1409     $sth->execute($new_matcher_id, $batch_id);
1410     $sth->finish();
1411
1412 }
1413
1414 =head2 GetImportRecordOverlayStatus
1415
1416   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1417
1418 =cut
1419
1420 sub GetImportRecordOverlayStatus {
1421     my ($import_record_id) = @_;
1422
1423     my $dbh = C4::Context->dbh;
1424     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1425     $sth->execute($import_record_id);
1426     my ($overlay_status) = $sth->fetchrow_array();
1427     $sth->finish();
1428     return $overlay_status;
1429
1430 }
1431
1432
1433 =head2 SetImportRecordOverlayStatus
1434
1435   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1436
1437 =cut
1438
1439 sub SetImportRecordOverlayStatus {
1440     my ($import_record_id, $new_overlay_status) = @_;
1441
1442     my $dbh = C4::Context->dbh;
1443     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1444     $sth->execute($new_overlay_status, $import_record_id);
1445     $sth->finish();
1446
1447 }
1448
1449 =head2 GetImportRecordStatus
1450
1451   my $status = GetImportRecordStatus($import_record_id);
1452
1453 =cut
1454
1455 sub GetImportRecordStatus {
1456     my ($import_record_id) = @_;
1457
1458     my $dbh = C4::Context->dbh;
1459     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1460     $sth->execute($import_record_id);
1461     my ($status) = $sth->fetchrow_array();
1462     $sth->finish();
1463     return $status;
1464
1465 }
1466
1467
1468 =head2 SetImportRecordStatus
1469
1470   SetImportRecordStatus($import_record_id, $new_status);
1471
1472 =cut
1473
1474 sub SetImportRecordStatus {
1475     my ($import_record_id, $new_status) = @_;
1476
1477     my $dbh = C4::Context->dbh;
1478     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1479     $sth->execute($new_status, $import_record_id);
1480     $sth->finish();
1481
1482 }
1483
1484 =head2 GetImportRecordMatches
1485
1486   my $results = GetImportRecordMatches($import_record_id, $best_only);
1487
1488 =cut
1489
1490 sub GetImportRecordMatches {
1491     my $import_record_id = shift;
1492     my $best_only = @_ ? shift : 0;
1493
1494     my $dbh = C4::Context->dbh;
1495     # FIXME currently biblio only
1496     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1497                                     candidate_match_id, score, record_type,
1498                                     chosen
1499                                     FROM import_records
1500                                     JOIN import_record_matches USING (import_record_id)
1501                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1502                                     WHERE import_record_id = ?
1503                                     ORDER BY score DESC, biblionumber DESC");
1504     $sth->bind_param(1, $import_record_id);
1505     my $results = [];
1506     $sth->execute();
1507     while (my $row = $sth->fetchrow_hashref) {
1508         if ($row->{'record_type'} eq 'auth') {
1509             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1510         }
1511         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1512         push @$results, $row;
1513         last if $best_only;
1514     }
1515     $sth->finish();
1516
1517     return $results;
1518     
1519 }
1520
1521 =head2 SetImportRecordMatches
1522
1523   SetImportRecordMatches($import_record_id, @matches);
1524
1525 =cut
1526
1527 sub SetImportRecordMatches {
1528     my $import_record_id = shift;
1529     my @matches = @_;
1530
1531     my $dbh = C4::Context->dbh;
1532     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1533     $delsth->execute($import_record_id);
1534     $delsth->finish();
1535
1536     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1537                                     VALUES (?, ?, ?, ?)");
1538     my $chosen = 1; #The first match is defaulted to be chosen
1539     foreach my $match (@matches) {
1540         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1541         $chosen = 0; #After the first we do not default to other matches
1542     }
1543 }
1544
1545 =head2 RecordsFromISO2709File
1546
1547     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1548
1549 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1550
1551 @PARAM1, String, absolute path to the ISO2709 file.
1552 @PARAM2, String, see stage_file.pl
1553 @PARAM3, String, should be utf8
1554
1555 Returns two array refs.
1556
1557 =cut
1558
1559 sub RecordsFromISO2709File {
1560     my ($input_file, $record_type, $encoding) = @_;
1561     my @errors;
1562
1563     my $marc_type = C4::Context->preference('marcflavour');
1564     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1565
1566     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1567     my @marc_records;
1568     $/ = "\035";
1569     while (<$fh>) {
1570         s/^\s+//;
1571         s/\s+$//;
1572         next unless $_; # skip if record has only whitespace, as might occur
1573                         # if file includes newlines between each MARC record
1574         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1575         push @marc_records, $marc_record;
1576         if ($charset_guessed ne $encoding) {
1577             push @errors,
1578                 "Unexpected charset $charset_guessed, expecting $encoding";
1579         }
1580     }
1581     close $fh;
1582     return ( \@errors, \@marc_records );
1583 }
1584
1585 =head2 RecordsFromMARCXMLFile
1586
1587     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1588
1589 Creates MARC::Record-objects out of the given MARCXML-file.
1590
1591 @PARAM1, String, absolute path to the MARCXML file.
1592 @PARAM2, String, should be utf8
1593
1594 Returns two array refs.
1595
1596 =cut
1597
1598 sub RecordsFromMARCXMLFile {
1599     my ( $filename, $encoding ) = @_;
1600     my $batch = MARC::File::XML->in( $filename );
1601     my ( @marcRecords, @errors, $record );
1602     do {
1603         eval { $record = $batch->next( $encoding ); };
1604         if ($@) {
1605             push @errors, $@;
1606         }
1607         push @marcRecords, $record if $record;
1608     } while( $record );
1609     return (\@errors, \@marcRecords);
1610 }
1611
1612 =head2 RecordsFromMarcPlugin
1613
1614     Converts text of input_file into array of MARC records with to_marc plugin
1615
1616 =cut
1617
1618 sub RecordsFromMarcPlugin {
1619     my ($input_file, $plugin_class, $encoding) = @_;
1620     my ( $text, @return );
1621     return \@return if !$input_file || !$plugin_class;
1622
1623     # Read input file
1624     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1625     $/ = "\035";
1626     while (<$fh>) {
1627         s/^\s+//;
1628         s/\s+$//;
1629         next unless $_;
1630         $text .= $_;
1631     }
1632     close $fh;
1633
1634     # Convert to large MARC blob with plugin
1635     $text = Koha::Plugins::Handler->run({
1636         class  => $plugin_class,
1637         method => 'to_marc',
1638         params => { data => $text },
1639     }) if $text;
1640
1641     # Convert to array of MARC records
1642     if( $text ) {
1643         my $marc_type = C4::Context->preference('marcflavour');
1644         foreach my $blob ( split(/\x1D/, $text) ) {
1645             next if $blob =~ /^\s*$/;
1646             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1647             push @return, $marcrecord;
1648         }
1649     }
1650     return \@return;
1651 }
1652
1653 # internal functions
1654
1655 sub _create_import_record {
1656     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1657
1658     my $dbh = C4::Context->dbh;
1659     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1660                                                          record_type, encoding)
1661                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1662     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1663                   $record_type, $encoding);
1664     my $import_record_id = $dbh->{'mysql_insertid'};
1665     $sth->finish();
1666     return $import_record_id;
1667 }
1668
1669 sub _add_auth_fields {
1670     my ($import_record_id, $marc_record) = @_;
1671
1672     my $controlnumber;
1673     if ($marc_record->field('001')) {
1674         $controlnumber = $marc_record->field('001')->data();
1675     }
1676     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1677     my $dbh = C4::Context->dbh;
1678     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1679     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1680     $sth->finish();
1681 }
1682
1683 sub _add_biblio_fields {
1684     my ($import_record_id, $marc_record) = @_;
1685
1686     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1687     my $dbh = C4::Context->dbh;
1688     # FIXME no controlnumber, originalsource
1689     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1690     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1691     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1692     $sth->finish();
1693                 
1694 }
1695
1696 sub _update_biblio_fields {
1697     my ($import_record_id, $marc_record) = @_;
1698
1699     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1700     my $dbh = C4::Context->dbh;
1701     # FIXME no controlnumber, originalsource
1702     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1703     $isbn =~ s/\(.*$//;
1704     $isbn =~ tr/ -_//;
1705     $isbn = uc $isbn;
1706     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1707                              WHERE  import_record_id = ?");
1708     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1709     $sth->finish();
1710 }
1711
1712 sub _parse_biblio_fields {
1713     my ($marc_record) = @_;
1714
1715     my $dbh = C4::Context->dbh;
1716     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1717     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1718
1719 }
1720
1721 sub _update_batch_record_counts {
1722     my ($batch_id) = @_;
1723
1724     my $dbh = C4::Context->dbh;
1725     my ( $num_records ) = $dbh->selectrow_array(q|
1726                                             SELECT COUNT(*)
1727                                             FROM import_records
1728                                             WHERE import_batch_id = ?
1729     |, undef, $batch_id );
1730     my ( $num_items ) = $dbh->selectrow_array(q|
1731                                             SELECT COUNT(*)
1732                                             FROM import_records
1733                                             JOIN import_items USING (import_record_id)
1734                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1735     |, undef, $batch_id );
1736     $dbh->do(
1737         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1738         undef,
1739         $num_records,
1740         $num_items,
1741         $batch_id,
1742     );
1743 }
1744
1745 sub _get_commit_action {
1746     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1747     
1748     if ($record_type eq 'biblio') {
1749         my ($bib_result, $bib_match, $item_result);
1750
1751         $bib_match = GetBestRecordMatch($import_record_id);
1752         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1753
1754             $bib_result = $overlay_action;
1755
1756             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1757                 $item_result = 'create_new';
1758             } elsif($item_action eq 'replace'){
1759                 $item_result = 'replace';
1760             } else {
1761                 $item_result = 'ignore';
1762             }
1763
1764         } else {
1765             $bib_result = $nomatch_action;
1766             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1767         }
1768         return ($bib_result, $item_result, $bib_match);
1769     } else { # must be auths
1770         my ($auth_result, $auth_match);
1771
1772         $auth_match = GetBestRecordMatch($import_record_id);
1773         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1774             $auth_result = $overlay_action;
1775         } else {
1776             $auth_result = $nomatch_action;
1777         }
1778
1779         return ($auth_result, undef, $auth_match);
1780
1781     }
1782 }
1783
1784 sub _get_revert_action {
1785     my ($overlay_action, $overlay_status, $status) = @_;
1786
1787     my $bib_result;
1788
1789     if ($status eq 'ignored') {
1790         $bib_result = 'ignore';
1791     } else {
1792         if ($overlay_action eq 'create_new') {
1793             $bib_result = 'delete';
1794         } else {
1795             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1796         }
1797     }
1798     return $bib_result;
1799 }
1800
1801 1;
1802 __END__
1803
1804 =head1 AUTHOR
1805
1806 Koha Development Team <http://koha-community.org/>
1807
1808 Galen Charlton <galen.charlton@liblime.com>
1809
1810 =cut