Bug 32151: Resolve uninitialized warn in numeric ne in C4::Ris
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512     my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
513         BatchCommitRecords( $batch_id, $framework, $progress_interval, $progress_callback, $params );
514
515     Parameter skip_intermediate_commit does what is says.
516
517 =cut
518
519 sub BatchCommitRecords {
520     my ( $batch_id, $framework, $progress_interval, $progress_callback, $params ) = @_;
521     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
522     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
523     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
524
525     my $schema = Koha::Database->schema;
526     $schema->txn_begin;
527     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
528     # transactions may be committed and started too again. The final commit is close to the end.
529
530     my $record_type;
531     my $num_added = 0;
532     my $num_updated = 0;
533     my $num_items_added = 0;
534     my $num_items_replaced = 0;
535     my $num_items_errored = 0;
536     my $num_ignored = 0;
537     # commit (i.e., save, all records in the batch)
538     SetImportBatchStatus($batch_id, 'importing');
539     my $overlay_action = GetImportBatchOverlayAction($batch_id);
540     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
541     my $item_action = GetImportBatchItemAction($batch_id);
542     my $item_tag;
543     my $item_subfield;
544     my $dbh = C4::Context->dbh;
545     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
546                              FROM import_records
547                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
548                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
549                              WHERE import_batch_id = ?");
550     $sth->execute($batch_id);
551     my $marcflavour = C4::Context->preference('marcflavour');
552
553     my $userenv = C4::Context->userenv;
554     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
555
556     my $rec_num = 0;
557     my @biblio_ids;
558     while (my $rowref = $sth->fetchrow_hashref) {
559         $record_type = $rowref->{'record_type'};
560
561         $rec_num++;
562
563         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
564             # report progress and commit
565             $schema->txn_commit unless $skip_intermediate_commit;
566             &$progress_callback( $rec_num );
567             $schema->txn_begin unless $skip_intermediate_commit;
568         }
569         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
570             $num_ignored++;
571             next;
572         }
573
574         my $marc_type;
575         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
576             $marc_type = 'UNIMARCAUTH';
577         } elsif ($marcflavour eq 'UNIMARC') {
578             $marc_type = 'UNIMARC';
579         } else {
580             $marc_type = 'USMARC';
581         }
582         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
583
584         if ($record_type eq 'biblio') {
585             # remove any item tags - rely on _batchCommitItems
586             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
587             foreach my $item_field ($marc_record->field($item_tag)) {
588                 $marc_record->delete_field($item_field);
589             }
590             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
591                 my @control_num = $marc_record->field('001');
592                 $marc_record->delete_fields(@control_num);
593             }
594         }
595
596         my ($record_result, $item_result, $record_match) =
597             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
598                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
599
600         my $recordid;
601         my $query;
602         if ($record_result eq 'create_new') {
603             $num_added++;
604             if ($record_type eq 'biblio') {
605                 my $biblioitemnumber;
606                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
607                 push @biblio_ids, $recordid;
608                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
609                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
610                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
611                     $num_items_added += $bib_items_added;
612                     $num_items_replaced += $bib_items_replaced;
613                     $num_items_errored += $bib_items_errored;
614                 }
615             } else {
616                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
617                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
618             }
619             my $sth = $dbh->prepare_cached($query);
620             $sth->execute($recordid, $rowref->{'import_record_id'});
621             $sth->finish();
622             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
623         } elsif ($record_result eq 'replace') {
624             $num_updated++;
625             $recordid = $record_match;
626             my $oldxml;
627             if ($record_type eq 'biblio') {
628                 my $oldbiblio = Koha::Biblios->find( $recordid );
629                 $oldxml = GetXmlBiblio($recordid);
630
631                 # remove item fields so that they don't get
632                 # added again if record is reverted
633                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
634                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
635                 foreach my $item_field ($old_marc->field($item_tag)) {
636                     $old_marc->delete_field($item_field);
637                 }
638                 $oldxml = $old_marc->as_xml($marc_type);
639
640                 my $context = { source => 'batchimport' };
641                 if ($logged_in_patron) {
642                     $context->{categorycode} = $logged_in_patron->categorycode;
643                     $context->{userid} = $logged_in_patron->userid;
644                 }
645
646                 ModBiblio(
647                     $marc_record,
648                     $recordid,
649                     $oldbiblio->frameworkcode,
650                     {
651                         overlay_context   => $context,
652                         skip_record_index => 1
653                     }
654                 );
655                 push @biblio_ids, $recordid;
656                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
657
658                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
659                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
660                     $num_items_added += $bib_items_added;
661                     $num_items_replaced += $bib_items_replaced;
662                     $num_items_errored += $bib_items_errored;
663                 }
664             } else {
665                 $oldxml = GetAuthorityXML($recordid);
666
667                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
668                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
669             }
670             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
671             $sth->execute($oldxml, $rowref->{'import_record_id'});
672             $sth->finish();
673             my $sth2 = $dbh->prepare_cached($query);
674             $sth2->execute($recordid, $rowref->{'import_record_id'});
675             $sth2->finish();
676             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
677             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
678         } elsif ($record_result eq 'ignore') {
679             $recordid = $record_match;
680             $num_ignored++;
681             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
682                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
683                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
684                 $num_items_added += $bib_items_added;
685          $num_items_replaced += $bib_items_replaced;
686                 $num_items_errored += $bib_items_errored;
687                 # still need to record the matched biblionumber so that the
688                 # items can be reverted
689                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
690                 $sth2->execute($recordid, $rowref->{'import_record_id'});
691                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
692             }
693             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
694         }
695     }
696
697     if ($progress_interval){
698         &$progress_callback($rec_num);
699     }
700
701     $sth->finish();
702
703     if ( @biblio_ids ) {
704         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
705         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
706     }
707
708     SetImportBatchStatus($batch_id, 'imported');
709
710     # Moved final commit to the end
711     $schema->txn_commit;
712
713     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
714 }
715
716 =head2 _batchCommitItems
717
718   ($num_items_added, $num_items_errored) = 
719          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
720
721 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
722
723 =cut
724
725 sub _batchCommitItems {
726     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
727
728     my $dbh = C4::Context->dbh;
729
730     my $num_items_added = 0;
731     my $num_items_errored = 0;
732     my $num_items_replaced = 0;
733
734     my $sth = $dbh->prepare( "
735         SELECT import_items_id, import_items.marcxml, encoding
736         FROM import_items
737         JOIN import_records USING (import_record_id)
738         WHERE import_record_id = ?
739         ORDER BY import_items_id
740     " );
741     $sth->bind_param( 1, $import_record_id );
742     $sth->execute();
743
744     while ( my $row = $sth->fetchrow_hashref() ) {
745         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
746
747         # Delete date_due subfield as to not accidentally delete item checkout due dates
748         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
749         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
750
751         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
752
753         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
754         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
755
756         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
757         if ( $action eq "replace" && $duplicate_itemnumber ) {
758             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
759             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
760             $updsth->bind_param( 1, 'imported' );
761             $updsth->bind_param( 2, $item->{itemnumber} );
762             $updsth->bind_param( 3, undef );
763             $updsth->bind_param( 4, $row->{'import_items_id'} );
764             $updsth->execute();
765             $updsth->finish();
766             $num_items_replaced++;
767         } elsif ( $action eq "replace" && $duplicate_barcode ) {
768             my $itemnumber = $duplicate_barcode->itemnumber;
769             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
770             $updsth->bind_param( 1, 'imported' );
771             $updsth->bind_param( 2, $item->{itemnumber} );
772             $updsth->bind_param( 3, undef );
773             $updsth->bind_param( 4, $row->{'import_items_id'} );
774             $updsth->execute();
775             $updsth->finish();
776             $num_items_replaced++;
777         } elsif ($duplicate_barcode) {
778             $updsth->bind_param( 1, 'error' );
779             $updsth->bind_param( 2, undef );
780             $updsth->bind_param( 3, 'duplicate item barcode' );
781             $updsth->bind_param( 4, $row->{'import_items_id'} );
782             $updsth->execute();
783             $num_items_errored++;
784         } else {
785             # Remove the itemnumber if it exists, we want to create a new item
786             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
787             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
788
789             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
790             if( $itemnumber ) {
791                 $updsth->bind_param( 1, 'imported' );
792                 $updsth->bind_param( 2, $itemnumber );
793                 $updsth->bind_param( 3, undef );
794                 $updsth->bind_param( 4, $row->{'import_items_id'} );
795                 $updsth->execute();
796                 $updsth->finish();
797                 $num_items_added++;
798             }
799         }
800     }
801
802     return ( $num_items_added, $num_items_replaced, $num_items_errored );
803 }
804
805 =head2 BatchRevertRecords
806
807   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
808       $num_ignored) = BatchRevertRecords($batch_id);
809
810 =cut
811
812 sub BatchRevertRecords {
813     my $batch_id = shift;
814
815     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
816
817     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
818
819     my $record_type;
820     my $num_deleted = 0;
821     my $num_errors = 0;
822     my $num_reverted = 0;
823     my $num_ignored = 0;
824     my $num_items_deleted = 0;
825     # commit (i.e., save, all records in the batch)
826     SetImportBatchStatus($batch_id, 'reverting');
827     my $overlay_action = GetImportBatchOverlayAction($batch_id);
828     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
829     my $dbh = C4::Context->dbh;
830     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
831                              FROM import_records
832                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
833                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
834                              WHERE import_batch_id = ?");
835     $sth->execute($batch_id);
836     my $marc_type;
837     my $marcflavour = C4::Context->preference('marcflavour');
838     while (my $rowref = $sth->fetchrow_hashref) {
839         $record_type = $rowref->{'record_type'};
840         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
841             $num_ignored++;
842             next;
843         }
844         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
845             $marc_type = 'UNIMARCAUTH';
846         } elsif ($marcflavour eq 'UNIMARC') {
847             $marc_type = 'UNIMARC';
848         } else {
849             $marc_type = 'USMARC';
850         }
851
852         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
853
854         if ($record_result eq 'delete') {
855             my $error = undef;
856             if  ($record_type eq 'biblio') {
857                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
858                 $error = DelBiblio($rowref->{'matched_biblionumber'});
859             } else {
860                 DelAuthority({ authid => $rowref->{'matched_authid'} });
861             }
862             if (defined $error) {
863                 $num_errors++;
864             } else {
865                 $num_deleted++;
866                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
867             }
868         } elsif ($record_result eq 'restore') {
869             $num_reverted++;
870             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
871             if ($record_type eq 'biblio') {
872                 my $biblionumber = $rowref->{'matched_biblionumber'};
873                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
874
875                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
876                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
877
878                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
879                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
880             } else {
881                 my $authid = $rowref->{'matched_authid'};
882                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
883             }
884             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
885         } elsif ($record_result eq 'ignore') {
886             if ($record_type eq 'biblio') {
887                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
888             }
889             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
890         }
891         my $query;
892         if ($record_type eq 'biblio') {
893             # remove matched_biblionumber only if there is no 'imported' item left
894             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
895             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
896         } else {
897             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
898         }
899         my $sth2 = $dbh->prepare_cached($query);
900         $sth2->execute($rowref->{'import_record_id'});
901     }
902
903     $sth->finish();
904     SetImportBatchStatus($batch_id, 'reverted');
905     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
906 }
907
908 =head2 BatchRevertItems
909
910   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
911
912 =cut
913
914 sub BatchRevertItems {
915     my ($import_record_id, $biblionumber) = @_;
916
917     my $dbh = C4::Context->dbh;
918     my $num_items_deleted = 0;
919
920     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
921                                    FROM import_items
922                                    JOIN items USING (itemnumber)
923                                    WHERE import_record_id = ?");
924     $sth->bind_param(1, $import_record_id);
925     $sth->execute();
926     while (my $row = $sth->fetchrow_hashref()) {
927         my $item = Koha::Items->find($row->{itemnumber});
928         if ($item->safe_delete){
929             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
930             $updsth->bind_param(1, 'reverted');
931             $updsth->bind_param(2, $row->{'import_items_id'});
932             $updsth->execute();
933             $updsth->finish();
934             $num_items_deleted++;
935         }
936         else {
937             next;
938         }
939     }
940     $sth->finish();
941     return $num_items_deleted;
942 }
943
944 =head2 CleanBatch
945
946   CleanBatch($batch_id)
947
948 Deletes all staged records from the import batch
949 and sets the status of the batch to 'cleaned'.  Note
950 that deleting a stage record does *not* affect
951 any record that has been committed to the database.
952
953 =cut
954
955 sub CleanBatch {
956     my $batch_id = shift;
957     return unless defined $batch_id;
958
959     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
960     SetImportBatchStatus($batch_id, 'cleaned');
961 }
962
963 =head2 DeleteBatch
964
965   DeleteBatch($batch_id)
966
967 Deletes the record from the database. This can only be done
968 once the batch has been cleaned.
969
970 =cut
971
972 sub DeleteBatch {
973     my $batch_id = shift;
974     return unless defined $batch_id;
975
976     my $dbh = C4::Context->dbh;
977     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
978     $sth->execute( $batch_id );
979 }
980
981 =head2 GetAllImportBatches
982
983   my $results = GetAllImportBatches();
984
985 Returns a references to an array of hash references corresponding
986 to all import_batches rows (of batch_type 'batch'), sorted in 
987 ascending order by import_batch_id.
988
989 =cut
990
991 sub  GetAllImportBatches {
992     my $dbh = C4::Context->dbh;
993     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
994                                     WHERE batch_type IN ('batch', 'webservice')
995                                     ORDER BY import_batch_id ASC");
996
997     my $results = [];
998     $sth->execute();
999     while (my $row = $sth->fetchrow_hashref) {
1000         push @$results, $row;
1001     }
1002     $sth->finish();
1003     return $results;
1004 }
1005
1006 =head2 GetStagedWebserviceBatches
1007
1008   my $batch_ids = GetStagedWebserviceBatches();
1009
1010 Returns a references to an array of batch id's
1011 of batch_type 'webservice' that are not imported
1012
1013 =cut
1014
1015 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1016 SELECT import_batch_id FROM import_batches
1017 WHERE batch_type = 'webservice'
1018 AND import_status = 'staged'
1019 EOQ
1020 sub  GetStagedWebserviceBatches {
1021     my $dbh = C4::Context->dbh;
1022     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1023 }
1024
1025 =head2 GetImportBatchRangeDesc
1026
1027   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1028
1029 Returns a reference to an array of hash references corresponding to
1030 import_batches rows (sorted in descending order by import_batch_id)
1031 start at the given offset.
1032
1033 =cut
1034
1035 sub GetImportBatchRangeDesc {
1036     my ($offset, $results_per_group) = @_;
1037
1038     my $dbh = C4::Context->dbh;
1039     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1040                                     LEFT JOIN import_batch_profiles p
1041                                     ON b.profile_id = p.id
1042                                     WHERE b.batch_type IN ('batch', 'webservice')
1043                                     ORDER BY b.import_batch_id DESC";
1044     my @params;
1045     if ($results_per_group){
1046         $query .= " LIMIT ?";
1047         push(@params, $results_per_group);
1048     }
1049     if ($offset){
1050         $query .= " OFFSET ?";
1051         push(@params, $offset);
1052     }
1053     my $sth = $dbh->prepare_cached($query);
1054     $sth->execute(@params);
1055     my $results = $sth->fetchall_arrayref({});
1056     $sth->finish();
1057     return $results;
1058 }
1059
1060 =head2 GetItemNumbersFromImportBatch
1061
1062   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1063
1064 =cut
1065
1066 sub GetItemNumbersFromImportBatch {
1067     my ($batch_id) = @_;
1068     my $dbh = C4::Context->dbh;
1069     my $sql = q|
1070 SELECT itemnumber FROM import_items
1071 INNER JOIN items USING (itemnumber)
1072 INNER JOIN import_records USING (import_record_id)
1073 WHERE import_batch_id = ?|;
1074     my  $sth = $dbh->prepare( $sql );
1075     $sth->execute($batch_id);
1076     my @items ;
1077     while ( my ($itm) = $sth->fetchrow_array ) {
1078         push @items, $itm;
1079     }
1080     return @items;
1081 }
1082
1083 =head2 GetNumberOfImportBatches
1084
1085   my $count = GetNumberOfImportBatches();
1086
1087 =cut
1088
1089 sub GetNumberOfNonZ3950ImportBatches {
1090     my $dbh = C4::Context->dbh;
1091     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1092     $sth->execute();
1093     my ($count) = $sth->fetchrow_array();
1094     $sth->finish();
1095     return $count;
1096 }
1097
1098 =head2 GetImportBiblios
1099
1100   my $results = GetImportBiblios($importid);
1101
1102 =cut
1103
1104 sub GetImportBiblios {
1105     my ($import_record_id) = @_;
1106
1107     my $dbh = C4::Context->dbh;
1108     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1109     return $dbh->selectall_arrayref(
1110         $query,
1111         { Slice => {} },
1112         $import_record_id
1113     );
1114
1115 }
1116
1117 =head2 GetImportRecordsRange
1118
1119   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1120
1121 Returns a reference to an array of hash references corresponding to
1122 import_biblios/import_auths/import_records rows for a given batch
1123 starting at the given offset.
1124
1125 =cut
1126
1127 sub GetImportRecordsRange {
1128     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1129
1130     my $dbh = C4::Context->dbh;
1131
1132     my $order_by = $parameters->{order_by} || 'import_record_id';
1133     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1134
1135     my $order_by_direction =
1136       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1137
1138     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1139
1140     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1141                                            record_sequence, status, overlay_status,
1142                                            matched_biblionumber, matched_authid, record_type
1143                                     FROM   import_records
1144                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1145                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1146                                     WHERE  import_batch_id = ?";
1147     my @params;
1148     push(@params, $batch_id);
1149     if ($status) {
1150         $query .= " AND status=?";
1151         push(@params,$status);
1152     }
1153
1154     $query.=" ORDER BY $order_by $order_by_direction";
1155
1156     if($results_per_group){
1157         $query .= " LIMIT ?";
1158         push(@params, $results_per_group);
1159     }
1160     if($offset){
1161         $query .= " OFFSET ?";
1162         push(@params, $offset);
1163     }
1164     my $sth = $dbh->prepare_cached($query);
1165     $sth->execute(@params);
1166     my $results = $sth->fetchall_arrayref({});
1167     $sth->finish();
1168     return $results;
1169
1170 }
1171
1172 =head2 GetBestRecordMatch
1173
1174   my $record_id = GetBestRecordMatch($import_record_id);
1175
1176 =cut
1177
1178 sub GetBestRecordMatch {
1179     my ($import_record_id) = @_;
1180
1181     my $dbh = C4::Context->dbh;
1182     my $sth = $dbh->prepare("SELECT candidate_match_id
1183                              FROM   import_record_matches
1184                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1185                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1186                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1187                              WHERE  import_record_matches.import_record_id = ? AND
1188                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1189                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1190                              AND chosen = 1
1191                              ORDER BY score DESC, candidate_match_id DESC");
1192     $sth->execute($import_record_id);
1193     my ($record_id) = $sth->fetchrow_array();
1194     $sth->finish();
1195     return $record_id;
1196 }
1197
1198 =head2 GetImportBatchStatus
1199
1200   my $status = GetImportBatchStatus($batch_id);
1201
1202 =cut
1203
1204 sub GetImportBatchStatus {
1205     my ($batch_id) = @_;
1206
1207     my $dbh = C4::Context->dbh;
1208     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1209     $sth->execute($batch_id);
1210     my ($status) = $sth->fetchrow_array();
1211     $sth->finish();
1212     return $status;
1213
1214 }
1215
1216 =head2 SetImportBatchStatus
1217
1218   SetImportBatchStatus($batch_id, $new_status);
1219
1220 =cut
1221
1222 sub SetImportBatchStatus {
1223     my ($batch_id, $new_status) = @_;
1224
1225     my $dbh = C4::Context->dbh;
1226     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1227     $sth->execute($new_status, $batch_id);
1228     $sth->finish();
1229
1230 }
1231
1232 =head2 SetMatchedBiblionumber
1233
1234   SetMatchedBiblionumber($import_record_id, $biblionumber);
1235
1236 =cut
1237
1238 sub SetMatchedBiblionumber {
1239     my ($import_record_id, $biblionumber) = @_;
1240
1241     my $dbh = C4::Context->dbh;
1242     $dbh->do(
1243         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1244         undef, $biblionumber, $import_record_id
1245     );
1246 }
1247
1248 =head2 GetImportBatchOverlayAction
1249
1250   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1251
1252 =cut
1253
1254 sub GetImportBatchOverlayAction {
1255     my ($batch_id) = @_;
1256
1257     my $dbh = C4::Context->dbh;
1258     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1259     $sth->execute($batch_id);
1260     my ($overlay_action) = $sth->fetchrow_array();
1261     $sth->finish();
1262     return $overlay_action;
1263
1264 }
1265
1266
1267 =head2 SetImportBatchOverlayAction
1268
1269   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1270
1271 =cut
1272
1273 sub SetImportBatchOverlayAction {
1274     my ($batch_id, $new_overlay_action) = @_;
1275
1276     my $dbh = C4::Context->dbh;
1277     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1278     $sth->execute($new_overlay_action, $batch_id);
1279     $sth->finish();
1280
1281 }
1282
1283 =head2 GetImportBatchNoMatchAction
1284
1285   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1286
1287 =cut
1288
1289 sub GetImportBatchNoMatchAction {
1290     my ($batch_id) = @_;
1291
1292     my $dbh = C4::Context->dbh;
1293     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1294     $sth->execute($batch_id);
1295     my ($nomatch_action) = $sth->fetchrow_array();
1296     $sth->finish();
1297     return $nomatch_action;
1298
1299 }
1300
1301
1302 =head2 SetImportBatchNoMatchAction
1303
1304   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1305
1306 =cut
1307
1308 sub SetImportBatchNoMatchAction {
1309     my ($batch_id, $new_nomatch_action) = @_;
1310
1311     my $dbh = C4::Context->dbh;
1312     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1313     $sth->execute($new_nomatch_action, $batch_id);
1314     $sth->finish();
1315
1316 }
1317
1318 =head2 GetImportBatchItemAction
1319
1320   my $item_action = GetImportBatchItemAction($batch_id);
1321
1322 =cut
1323
1324 sub GetImportBatchItemAction {
1325     my ($batch_id) = @_;
1326
1327     my $dbh = C4::Context->dbh;
1328     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1329     $sth->execute($batch_id);
1330     my ($item_action) = $sth->fetchrow_array();
1331     $sth->finish();
1332     return $item_action;
1333
1334 }
1335
1336
1337 =head2 SetImportBatchItemAction
1338
1339   SetImportBatchItemAction($batch_id, $new_item_action);
1340
1341 =cut
1342
1343 sub SetImportBatchItemAction {
1344     my ($batch_id, $new_item_action) = @_;
1345
1346     my $dbh = C4::Context->dbh;
1347     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1348     $sth->execute($new_item_action, $batch_id);
1349     $sth->finish();
1350
1351 }
1352
1353 =head2 GetImportBatchMatcher
1354
1355   my $matcher_id = GetImportBatchMatcher($batch_id);
1356
1357 =cut
1358
1359 sub GetImportBatchMatcher {
1360     my ($batch_id) = @_;
1361
1362     my $dbh = C4::Context->dbh;
1363     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1364     $sth->execute($batch_id);
1365     my ($matcher_id) = $sth->fetchrow_array();
1366     $sth->finish();
1367     return $matcher_id;
1368
1369 }
1370
1371
1372 =head2 SetImportBatchMatcher
1373
1374   SetImportBatchMatcher($batch_id, $new_matcher_id);
1375
1376 =cut
1377
1378 sub SetImportBatchMatcher {
1379     my ($batch_id, $new_matcher_id) = @_;
1380
1381     my $dbh = C4::Context->dbh;
1382     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1383     $sth->execute($new_matcher_id, $batch_id);
1384     $sth->finish();
1385
1386 }
1387
1388 =head2 GetImportRecordOverlayStatus
1389
1390   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1391
1392 =cut
1393
1394 sub GetImportRecordOverlayStatus {
1395     my ($import_record_id) = @_;
1396
1397     my $dbh = C4::Context->dbh;
1398     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1399     $sth->execute($import_record_id);
1400     my ($overlay_status) = $sth->fetchrow_array();
1401     $sth->finish();
1402     return $overlay_status;
1403
1404 }
1405
1406
1407 =head2 SetImportRecordOverlayStatus
1408
1409   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1410
1411 =cut
1412
1413 sub SetImportRecordOverlayStatus {
1414     my ($import_record_id, $new_overlay_status) = @_;
1415
1416     my $dbh = C4::Context->dbh;
1417     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1418     $sth->execute($new_overlay_status, $import_record_id);
1419     $sth->finish();
1420
1421 }
1422
1423 =head2 GetImportRecordStatus
1424
1425   my $status = GetImportRecordStatus($import_record_id);
1426
1427 =cut
1428
1429 sub GetImportRecordStatus {
1430     my ($import_record_id) = @_;
1431
1432     my $dbh = C4::Context->dbh;
1433     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1434     $sth->execute($import_record_id);
1435     my ($status) = $sth->fetchrow_array();
1436     $sth->finish();
1437     return $status;
1438
1439 }
1440
1441
1442 =head2 SetImportRecordStatus
1443
1444   SetImportRecordStatus($import_record_id, $new_status);
1445
1446 =cut
1447
1448 sub SetImportRecordStatus {
1449     my ($import_record_id, $new_status) = @_;
1450
1451     my $dbh = C4::Context->dbh;
1452     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1453     $sth->execute($new_status, $import_record_id);
1454     $sth->finish();
1455
1456 }
1457
1458 =head2 GetImportRecordMatches
1459
1460   my $results = GetImportRecordMatches($import_record_id, $best_only);
1461
1462 =cut
1463
1464 sub GetImportRecordMatches {
1465     my $import_record_id = shift;
1466     my $best_only = @_ ? shift : 0;
1467
1468     my $dbh = C4::Context->dbh;
1469     # FIXME currently biblio only
1470     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1471                                     candidate_match_id, score, record_type,
1472                                     chosen
1473                                     FROM import_records
1474                                     JOIN import_record_matches USING (import_record_id)
1475                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1476                                     WHERE import_record_id = ?
1477                                     ORDER BY score DESC, biblionumber DESC");
1478     $sth->bind_param(1, $import_record_id);
1479     my $results = [];
1480     $sth->execute();
1481     while (my $row = $sth->fetchrow_hashref) {
1482         if ($row->{'record_type'} eq 'auth') {
1483             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1484         }
1485         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1486         push @$results, $row;
1487         last if $best_only;
1488     }
1489     $sth->finish();
1490
1491     return $results;
1492     
1493 }
1494
1495 =head2 SetImportRecordMatches
1496
1497   SetImportRecordMatches($import_record_id, @matches);
1498
1499 =cut
1500
1501 sub SetImportRecordMatches {
1502     my $import_record_id = shift;
1503     my @matches = @_;
1504
1505     my $dbh = C4::Context->dbh;
1506     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1507     $delsth->execute($import_record_id);
1508     $delsth->finish();
1509
1510     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1511                                     VALUES (?, ?, ?, ?)");
1512     my $chosen = 1; #The first match is defaulted to be chosen
1513     foreach my $match (@matches) {
1514         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1515         $chosen = 0; #After the first we do not default to other matches
1516     }
1517 }
1518
1519 =head2 RecordsFromISO2709File
1520
1521     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1522
1523 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1524
1525 @PARAM1, String, absolute path to the ISO2709 file.
1526 @PARAM2, String, see stage_file.pl
1527 @PARAM3, String, should be utf8
1528
1529 Returns two array refs.
1530
1531 =cut
1532
1533 sub RecordsFromISO2709File {
1534     my ($input_file, $record_type, $encoding) = @_;
1535     my @errors;
1536
1537     my $marc_type = C4::Context->preference('marcflavour');
1538     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1539
1540     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1541     my @marc_records;
1542     $/ = "\035";
1543     while (<$fh>) {
1544         s/^\s+//;
1545         s/\s+$//;
1546         next unless $_; # skip if record has only whitespace, as might occur
1547                         # if file includes newlines between each MARC record
1548         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1549         push @marc_records, $marc_record;
1550         if ($charset_guessed ne $encoding) {
1551             push @errors,
1552                 "Unexpected charset $charset_guessed, expecting $encoding";
1553         }
1554     }
1555     close $fh;
1556     return ( \@errors, \@marc_records );
1557 }
1558
1559 =head2 RecordsFromMARCXMLFile
1560
1561     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1562
1563 Creates MARC::Record-objects out of the given MARCXML-file.
1564
1565 @PARAM1, String, absolute path to the ISO2709 file.
1566 @PARAM2, String, should be utf8
1567
1568 Returns two array refs.
1569
1570 =cut
1571
1572 sub RecordsFromMARCXMLFile {
1573     my ( $filename, $encoding ) = @_;
1574     my $batch = MARC::File::XML->in( $filename );
1575     my ( @marcRecords, @errors, $record );
1576     do {
1577         eval { $record = $batch->next( $encoding ); };
1578         if ($@) {
1579             push @errors, $@;
1580         }
1581         push @marcRecords, $record if $record;
1582     } while( $record );
1583     return (\@errors, \@marcRecords);
1584 }
1585
1586 =head2 RecordsFromMarcPlugin
1587
1588     Converts text of input_file into array of MARC records with to_marc plugin
1589
1590 =cut
1591
1592 sub RecordsFromMarcPlugin {
1593     my ($input_file, $plugin_class, $encoding) = @_;
1594     my ( $text, @return );
1595     return \@return if !$input_file || !$plugin_class;
1596
1597     # Read input file
1598     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1599     $/ = "\035";
1600     while (<$fh>) {
1601         s/^\s+//;
1602         s/\s+$//;
1603         next unless $_;
1604         $text .= $_;
1605     }
1606     close $fh;
1607
1608     # Convert to large MARC blob with plugin
1609     $text = Koha::Plugins::Handler->run({
1610         class  => $plugin_class,
1611         method => 'to_marc',
1612         params => { data => $text },
1613     }) if $text;
1614
1615     # Convert to array of MARC records
1616     if( $text ) {
1617         my $marc_type = C4::Context->preference('marcflavour');
1618         foreach my $blob ( split(/\x1D/, $text) ) {
1619             next if $blob =~ /^\s*$/;
1620             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1621             push @return, $marcrecord;
1622         }
1623     }
1624     return \@return;
1625 }
1626
1627 # internal functions
1628
1629 sub _create_import_record {
1630     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1631
1632     my $dbh = C4::Context->dbh;
1633     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1634                                                          record_type, encoding)
1635                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1636     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1637                   $record_type, $encoding);
1638     my $import_record_id = $dbh->{'mysql_insertid'};
1639     $sth->finish();
1640     return $import_record_id;
1641 }
1642
1643 sub _add_auth_fields {
1644     my ($import_record_id, $marc_record) = @_;
1645
1646     my $controlnumber;
1647     if ($marc_record->field('001')) {
1648         $controlnumber = $marc_record->field('001')->data();
1649     }
1650     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1651     my $dbh = C4::Context->dbh;
1652     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1653     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1654     $sth->finish();
1655 }
1656
1657 sub _add_biblio_fields {
1658     my ($import_record_id, $marc_record) = @_;
1659
1660     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1661     my $dbh = C4::Context->dbh;
1662     # FIXME no controlnumber, originalsource
1663     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1664     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1665     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1666     $sth->finish();
1667                 
1668 }
1669
1670 sub _update_biblio_fields {
1671     my ($import_record_id, $marc_record) = @_;
1672
1673     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1674     my $dbh = C4::Context->dbh;
1675     # FIXME no controlnumber, originalsource
1676     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1677     $isbn =~ s/\(.*$//;
1678     $isbn =~ tr/ -_//;
1679     $isbn = uc $isbn;
1680     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1681                              WHERE  import_record_id = ?");
1682     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1683     $sth->finish();
1684 }
1685
1686 sub _parse_biblio_fields {
1687     my ($marc_record) = @_;
1688
1689     my $dbh = C4::Context->dbh;
1690     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1691     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1692
1693 }
1694
1695 sub _update_batch_record_counts {
1696     my ($batch_id) = @_;
1697
1698     my $dbh = C4::Context->dbh;
1699     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1700                                         num_records = (
1701                                             SELECT COUNT(*)
1702                                             FROM import_records
1703                                             WHERE import_batch_id = import_batches.import_batch_id),
1704                                         num_items = (
1705                                             SELECT COUNT(*)
1706                                             FROM import_records
1707                                             JOIN import_items USING (import_record_id)
1708                                             WHERE import_batch_id = import_batches.import_batch_id
1709                                             AND record_type = 'biblio')
1710                                     WHERE import_batch_id = ?");
1711     $sth->bind_param(1, $batch_id);
1712     $sth->execute();
1713     $sth->finish();
1714 }
1715
1716 sub _get_commit_action {
1717     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1718     
1719     if ($record_type eq 'biblio') {
1720         my ($bib_result, $bib_match, $item_result);
1721
1722         $bib_match = GetBestRecordMatch($import_record_id);
1723         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1724
1725             $bib_result = $overlay_action;
1726
1727             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1728                 $item_result = 'create_new';
1729             } elsif($item_action eq 'replace'){
1730                 $item_result = 'replace';
1731             } else {
1732                 $item_result = 'ignore';
1733             }
1734
1735         } else {
1736             $bib_result = $nomatch_action;
1737             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1738         }
1739         return ($bib_result, $item_result, $bib_match);
1740     } else { # must be auths
1741         my ($auth_result, $auth_match);
1742
1743         $auth_match = GetBestRecordMatch($import_record_id);
1744         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1745             $auth_result = $overlay_action;
1746         } else {
1747             $auth_result = $nomatch_action;
1748         }
1749
1750         return ($auth_result, undef, $auth_match);
1751
1752     }
1753 }
1754
1755 sub _get_revert_action {
1756     my ($overlay_action, $overlay_status, $status) = @_;
1757
1758     my $bib_result;
1759
1760     if ($status eq 'ignored') {
1761         $bib_result = 'ignore';
1762     } else {
1763         if ($overlay_action eq 'create_new') {
1764             $bib_result = 'delete';
1765         } else {
1766             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1767         }
1768     }
1769     return $bib_result;
1770 }
1771
1772 1;
1773 __END__
1774
1775 =head1 AUTHOR
1776
1777 Koha Development Team <http://koha-community.org/>
1778
1779 Galen Charlton <galen.charlton@liblime.com>
1780
1781 =cut