Bug 34893: Unit tests for C4::Auth::checkpw
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       AddImportBatch
50       GetImportBatch
51       AddAuthToBatch
52       AddBiblioToBatch
53       AddItemsToImportBiblio
54       ModAuthorityInBatch
55
56       BatchStageMarcRecords
57       BatchFindDuplicates
58       BatchCommitRecords
59       BatchRevertRecords
60       CleanBatch
61       DeleteBatch
62
63       GetAllImportBatches
64       GetStagedWebserviceBatches
65       GetImportBatchRangeDesc
66       GetNumberOfNonZ3950ImportBatches
67       GetImportBiblios
68       GetImportRecordsRange
69       GetItemNumbersFromImportBatch
70
71       GetImportBatchStatus
72       SetImportBatchStatus
73       GetImportBatchOverlayAction
74       SetImportBatchOverlayAction
75       GetImportBatchNoMatchAction
76       SetImportBatchNoMatchAction
77       GetImportBatchItemAction
78       SetImportBatchItemAction
79       GetImportBatchMatcher
80       SetImportBatchMatcher
81       GetImportRecordOverlayStatus
82       SetImportRecordOverlayStatus
83       GetImportRecordStatus
84       SetImportRecordStatus
85       SetMatchedBiblionumber
86       GetImportRecordMatches
87       SetImportRecordMatches
88
89       RecordsFromMARCXMLFile
90       RecordsFromISO2709File
91       RecordsFromMarcPlugin
92     );
93 }
94
95 =head1 NAME
96
97 C4::ImportBatch - manage batches of imported MARC records
98
99 =head1 SYNOPSIS
100
101 use C4::ImportBatch;
102
103 =head1 FUNCTIONS
104
105 =head2 GetZ3950BatchId
106
107   my $batchid = GetZ3950BatchId($z3950server);
108
109 Retrieves the ID of the import batch for the Z39.50
110 reservoir for the given target.  If necessary,
111 creates the import batch.
112
113 =cut
114
115 sub GetZ3950BatchId {
116     my ($z3950server) = @_;
117
118     my $dbh = C4::Context->dbh;
119     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
120                              WHERE  batch_type = 'z3950'
121                              AND    file_name = ?");
122     $sth->execute($z3950server);
123     my $rowref = $sth->fetchrow_arrayref();
124     $sth->finish();
125     if (defined $rowref) {
126         return $rowref->[0];
127     } else {
128         my $batch_id = AddImportBatch( {
129                 overlay_action => 'create_new',
130                 import_status => 'staged',
131                 batch_type => 'z3950',
132                 file_name => $z3950server,
133             } );
134         return $batch_id;
135     }
136     
137 }
138
139 =head2 GetWebserviceBatchId
140
141   my $batchid = GetWebserviceBatchId();
142
143 Retrieves the ID of the import batch for webservice.
144 If necessary, creates the import batch.
145
146 =cut
147
148 my $WEBSERVICE_BASE_QRY = <<EOQ;
149 SELECT import_batch_id FROM import_batches
150 WHERE  batch_type = 'webservice'
151 AND    import_status = 'staged'
152 EOQ
153 sub GetWebserviceBatchId {
154     my ($params) = @_;
155
156     my $dbh = C4::Context->dbh;
157     my $sql = $WEBSERVICE_BASE_QRY;
158     my @args;
159     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
160         if (my $val = $params->{$field}) {
161             $sql .= " AND $field = ?";
162             push @args, $val;
163         }
164     }
165     my $id = $dbh->selectrow_array($sql, undef, @args);
166     return $id if $id;
167
168     $params->{batch_type} = 'webservice';
169     $params->{import_status} = 'staged';
170     return AddImportBatch($params);
171 }
172
173 =head2 GetImportRecordMarc
174
175   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
176
177 =cut
178
179 sub GetImportRecordMarc {
180     my ($import_record_id) = @_;
181
182     my $dbh = C4::Context->dbh;
183     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
184         SELECT marc, encoding
185         FROM import_records
186         WHERE import_record_id = ?
187     |, undef, $import_record_id );
188
189     return $marc, $encoding;
190 }
191
192 sub EmbedItemsInImportBiblio {
193     my ( $record, $import_record_id ) = @_;
194     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
195     my $dbh = C4::Context->dbh;
196     my $import_items = $dbh->selectall_arrayref(q|
197         SELECT import_items.marcxml
198         FROM import_items
199         WHERE import_record_id = ?
200     |, { Slice => {} }, $import_record_id );
201     my @item_fields;
202     for my $import_item ( @$import_items ) {
203         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
204         push @item_fields, $item_marc->field($itemtag);
205     }
206     $record->append_fields(@item_fields);
207     return $record;
208 }
209
210 =head2 AddImportBatch
211
212   my $batch_id = AddImportBatch($params_hash);
213
214 =cut
215
216 sub AddImportBatch {
217     my ($params) = @_;
218
219     my (@fields, @vals);
220     foreach (qw( matcher_id template_id branchcode
221                  overlay_action nomatch_action item_action
222                  import_status batch_type file_name comments record_type )) {
223         if (exists $params->{$_}) {
224             push @fields, $_;
225             push @vals, $params->{$_};
226         }
227     }
228     my $dbh = C4::Context->dbh;
229     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
230                                   VALUES (".join( ',', map '?', @fields).")",
231              undef,
232              @vals);
233     return $dbh->{'mysql_insertid'};
234 }
235
236 =head2 GetImportBatch 
237
238   my $row = GetImportBatch($batch_id);
239
240 Retrieve a hashref of an import_batches row.
241
242 =cut
243
244 sub GetImportBatch {
245     my ($batch_id) = @_;
246
247     my $dbh = C4::Context->dbh;
248     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
249     $sth->bind_param(1, $batch_id);
250     $sth->execute();
251     my $result = $sth->fetchrow_hashref;
252     $sth->finish();
253     return $result;
254
255 }
256
257 =head2 AddBiblioToBatch 
258
259   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
260                 $marc_record, $encoding, $update_counts);
261
262 =cut
263
264 sub AddBiblioToBatch {
265     my $batch_id = shift;
266     my $record_sequence = shift;
267     my $marc_record = shift;
268     my $encoding = shift;
269     my $update_counts = @_ ? shift : 1;
270
271     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
272     _add_biblio_fields($import_record_id, $marc_record);
273     _update_batch_record_counts($batch_id) if $update_counts;
274     return $import_record_id;
275 }
276
277 =head2 AddAuthToBatch
278
279   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
280                 $marc_record, $encoding, $update_counts, [$marc_type]);
281
282 =cut
283
284 sub AddAuthToBatch {
285     my $batch_id = shift;
286     my $record_sequence = shift;
287     my $marc_record = shift;
288     my $encoding = shift;
289     my $update_counts = @_ ? shift : 1;
290     my $marc_type = shift || C4::Context->preference('marcflavour');
291
292     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
293
294     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
295     _add_auth_fields($import_record_id, $marc_record);
296     _update_batch_record_counts($batch_id) if $update_counts;
297     return $import_record_id;
298 }
299
300 =head2 BatchStageMarcRecords
301
302 ( $batch_id, $num_records, $num_items, @invalid_records ) =
303   BatchStageMarcRecords(
304     $record_type,                $encoding,
305     $marc_records,               $file_name,
306     $marc_modification_template, $comments,
307     $branch_code,                $parse_items,
308     $leave_as_staging,           $progress_interval,
309     $progress_callback
310   );
311
312 =cut
313
314 sub BatchStageMarcRecords {
315     my $record_type = shift;
316     my $encoding = shift;
317     my $marc_records = shift;
318     my $file_name = shift;
319     my $marc_modification_template = shift;
320     my $comments = shift;
321     my $branch_code = shift;
322     my $parse_items = shift;
323     my $leave_as_staging = shift;
324
325     # optional callback to monitor status 
326     # of job
327     my $progress_interval = 0;
328     my $progress_callback = undef;
329     if ($#_ == 1) {
330         $progress_interval = shift;
331         $progress_callback = shift;
332         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
333         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
334     } 
335     
336     my $batch_id = AddImportBatch( {
337             overlay_action => 'create_new',
338             import_status => 'staging',
339             batch_type => 'batch',
340             file_name => $file_name,
341             comments => $comments,
342             record_type => $record_type,
343         } );
344     if ($parse_items) {
345         SetImportBatchItemAction($batch_id, 'always_add');
346     } else {
347         SetImportBatchItemAction($batch_id, 'ignore');
348     }
349
350
351     my $marc_type = C4::Context->preference('marcflavour');
352     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
353     my @invalid_records = ();
354     my $num_valid = 0;
355     my $num_items = 0;
356     # FIXME - for now, we're dealing only with bibs
357     my $rec_num = 0;
358     foreach my $marc_record (@$marc_records) {
359         $rec_num++;
360         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
361             &$progress_callback($rec_num);
362         }
363
364         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
365
366         my $import_record_id;
367         if (scalar($marc_record->fields()) == 0) {
368             push @invalid_records, $marc_record;
369         } else {
370
371             # Normalize the record so it doesn't have separated diacritics
372             SetUTF8Flag($marc_record);
373
374             $num_valid++;
375             if ($record_type eq 'biblio') {
376                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
377                 if ($parse_items) {
378                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
379                     $num_items += scalar(@import_items_ids);
380                 }
381             } elsif ($record_type eq 'auth') {
382                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
383             }
384         }
385     }
386     unless ($leave_as_staging) {
387         SetImportBatchStatus($batch_id, 'staged');
388     }
389     # FIXME branch_code, number of bibs, number of items
390     _update_batch_record_counts($batch_id);
391     return ($batch_id, $num_valid, $num_items, @invalid_records);
392 }
393
394 =head2 AddItemsToImportBiblio
395
396   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
397                 $import_record_id, $marc_record, $update_counts);
398
399 =cut
400
401 sub AddItemsToImportBiblio {
402     my $batch_id = shift;
403     my $import_record_id = shift;
404     my $marc_record = shift;
405     my $update_counts = @_ ? shift : 0;
406
407     my @import_items_ids = ();
408    
409     my $dbh = C4::Context->dbh; 
410     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
411     foreach my $item_field ($marc_record->field($item_tag)) {
412         my $item_marc = MARC::Record->new();
413         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
414         $item_marc->append_fields($item_field);
415         $marc_record->delete_field($item_field);
416         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
417                                         VALUES (?, ?, ?)");
418         $sth->bind_param(1, $import_record_id);
419         $sth->bind_param(2, 'staged');
420         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
421         $sth->execute();
422         push @import_items_ids, $dbh->{'mysql_insertid'};
423         $sth->finish();
424     }
425
426     if ($#import_items_ids > -1) {
427         _update_batch_record_counts($batch_id) if $update_counts;
428     }
429     return @import_items_ids;
430 }
431
432 =head2 BatchFindDuplicates
433
434   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
435              $max_matches, $progress_interval, $progress_callback);
436
437 Goes through the records loaded in the batch and attempts to 
438 find duplicates for each one.  Sets the matching status 
439 of each record to "no_match" or "auto_match" as appropriate.
440
441 The $max_matches parameter is optional; if it is not supplied,
442 it defaults to 10.
443
444 The $progress_interval and $progress_callback parameters are 
445 optional; if both are supplied, the sub referred to by
446 $progress_callback will be invoked every $progress_interval
447 records using the number of records processed as the 
448 singular argument.
449
450 =cut
451
452 sub BatchFindDuplicates {
453     my $batch_id = shift;
454     my $matcher = shift;
455     my $max_matches = @_ ? shift : 10;
456
457     # optional callback to monitor status 
458     # of job
459     my $progress_interval = 0;
460     my $progress_callback = undef;
461     if ($#_ == 1) {
462         $progress_interval = shift;
463         $progress_callback = shift;
464         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
465         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
466     }
467
468     my $dbh = C4::Context->dbh;
469
470     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
471                              FROM import_records
472                              WHERE import_batch_id = ?");
473     $sth->execute($batch_id);
474     my $num_with_matches = 0;
475     my $rec_num = 0;
476     while (my $rowref = $sth->fetchrow_hashref) {
477         $rec_num++;
478         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
479             &$progress_callback($rec_num);
480         }
481         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
482         my @matches = ();
483         if (defined $matcher) {
484             @matches = $matcher->get_matches($marc_record, $max_matches);
485         }
486         if (scalar(@matches) > 0) {
487             $num_with_matches++;
488             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
489             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
490         } else {
491             SetImportRecordMatches($rowref->{'import_record_id'}, ());
492             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
493         }
494     }
495     $sth->finish();
496     return $num_with_matches;
497 }
498
499 =head2 BatchCommitRecords
500
501   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
502         BatchCommitRecords($batch_id, $framework,
503         $progress_interval, $progress_callback);
504
505 =cut
506
507 sub BatchCommitRecords {
508     my $batch_id = shift;
509     my $framework = shift;
510
511     my $schema = Koha::Database->schema;
512
513     # optional callback to monitor status 
514     # of job
515     my $progress_interval = 0;
516     my $progress_callback = undef;
517     if ($#_ == 1) {
518         $progress_interval = shift;
519         $progress_callback = shift;
520         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
521         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
522     }
523
524     my $record_type;
525     my $num_added = 0;
526     my $num_updated = 0;
527     my $num_items_added = 0;
528     my $num_items_replaced = 0;
529     my $num_items_errored = 0;
530     my $num_ignored = 0;
531     # commit (i.e., save, all records in the batch)
532     SetImportBatchStatus($batch_id, 'importing');
533     my $overlay_action = GetImportBatchOverlayAction($batch_id);
534     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
535     my $item_action = GetImportBatchItemAction($batch_id);
536     my $item_tag;
537     my $item_subfield;
538     my $dbh = C4::Context->dbh;
539     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
540                              FROM import_records
541                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
542                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
543                              WHERE import_batch_id = ?");
544     $sth->execute($batch_id);
545     my $marcflavour = C4::Context->preference('marcflavour');
546
547     my $userenv = C4::Context->userenv;
548     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
549
550     my $rec_num = 0;
551
552     $schema->txn_begin; # We commit in a transaction
553     while (my $rowref = $sth->fetchrow_hashref) {
554         $record_type = $rowref->{'record_type'};
555
556         $rec_num++;
557
558         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
559             # report progress and commit
560             $schema->txn_commit;
561             &$progress_callback( $rec_num );
562             $schema->txn_begin;
563         }
564         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
565             $num_ignored++;
566             next;
567         }
568
569         my $marc_type;
570         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
571             $marc_type = 'UNIMARCAUTH';
572         } elsif ($marcflavour eq 'UNIMARC') {
573             $marc_type = 'UNIMARC';
574         } else {
575             $marc_type = 'USMARC';
576         }
577         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
578
579         if ($record_type eq 'biblio') {
580             # remove any item tags - rely on BatchCommitItems
581             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
582             foreach my $item_field ($marc_record->field($item_tag)) {
583                 $marc_record->delete_field($item_field);
584             }
585         }
586
587         my ($record_result, $item_result, $record_match) =
588             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
589                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
590
591         my $recordid;
592         my $query;
593         if ($record_result eq 'create_new') {
594             $num_added++;
595             if ($record_type eq 'biblio') {
596                 my $biblioitemnumber;
597                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
598                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
599                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
600                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
601                     $num_items_added += $bib_items_added;
602                     $num_items_replaced += $bib_items_replaced;
603                     $num_items_errored += $bib_items_errored;
604                 }
605             } else {
606                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
607                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
608             }
609             my $sth = $dbh->prepare_cached($query);
610             $sth->execute($recordid, $rowref->{'import_record_id'});
611             $sth->finish();
612             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
613         } elsif ($record_result eq 'replace') {
614             $num_updated++;
615             $recordid = $record_match;
616             my $oldxml;
617             if ($record_type eq 'biblio') {
618                 my $oldbiblio = Koha::Biblios->find( $recordid );
619                 $oldxml = GetXmlBiblio($recordid);
620
621                 # remove item fields so that they don't get
622                 # added again if record is reverted
623                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
624                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
625                 foreach my $item_field ($old_marc->field($item_tag)) {
626                     $old_marc->delete_field($item_field);
627                 }
628                 $oldxml = $old_marc->as_xml($marc_type);
629
630                 my $context = { source => 'batchimport' };
631                 if ($logged_in_patron) {
632                     $context->{categorycode} = $logged_in_patron->categorycode;
633                     $context->{userid} = $logged_in_patron->userid;
634                 }
635
636                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
637                     overlay_context => $context
638                 });
639                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
640
641                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
642                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
643                     $num_items_added += $bib_items_added;
644                     $num_items_replaced += $bib_items_replaced;
645                     $num_items_errored += $bib_items_errored;
646                 }
647             } else {
648                 $oldxml = GetAuthorityXML($recordid);
649
650                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
651                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
652             }
653             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
654             $sth->execute($oldxml, $rowref->{'import_record_id'});
655             $sth->finish();
656             my $sth2 = $dbh->prepare_cached($query);
657             $sth2->execute($recordid, $rowref->{'import_record_id'});
658             $sth2->finish();
659             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
660             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
661         } elsif ($record_result eq 'ignore') {
662             $recordid = $record_match;
663             $num_ignored++;
664             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
665                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
666                 $num_items_added += $bib_items_added;
667          $num_items_replaced += $bib_items_replaced;
668                 $num_items_errored += $bib_items_errored;
669                 # still need to record the matched biblionumber so that the
670                 # items can be reverted
671                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
672                 $sth2->execute($recordid, $rowref->{'import_record_id'});
673                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
674             }
675             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
676         }
677     }
678     $schema->txn_commit; # Commit final records that may not have hit callback threshold
679     $sth->finish();
680     SetImportBatchStatus($batch_id, 'imported');
681     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
682 }
683
684 =head2 BatchCommitItems
685
686   ($num_items_added, $num_items_errored) = 
687          BatchCommitItems($import_record_id, $biblionumber);
688
689 =cut
690
691 sub BatchCommitItems {
692     my ( $import_record_id, $biblionumber, $action ) = @_;
693
694     my $dbh = C4::Context->dbh;
695
696     my $num_items_added = 0;
697     my $num_items_errored = 0;
698     my $num_items_replaced = 0;
699
700     my $sth = $dbh->prepare( "
701         SELECT import_items_id, import_items.marcxml, encoding
702         FROM import_items
703         JOIN import_records USING (import_record_id)
704         WHERE import_record_id = ?
705         ORDER BY import_items_id
706     " );
707     $sth->bind_param( 1, $import_record_id );
708     $sth->execute();
709
710     while ( my $row = $sth->fetchrow_hashref() ) {
711         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
712
713         # Delete date_due subfield as to not accidentally delete item checkout due dates
714         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
715         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
716
717         my $item = TransformMarcToKoha( $item_marc );
718
719         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
720         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
721
722         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
723         if ( $action eq "replace" && $duplicate_itemnumber ) {
724             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
725             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
726             $updsth->bind_param( 1, 'imported' );
727             $updsth->bind_param( 2, $item->{itemnumber} );
728             $updsth->bind_param( 3, undef );
729             $updsth->bind_param( 4, $row->{'import_items_id'} );
730             $updsth->execute();
731             $updsth->finish();
732             $num_items_replaced++;
733         } elsif ( $action eq "replace" && $duplicate_barcode ) {
734             my $itemnumber = $duplicate_barcode->itemnumber;
735             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
736             $updsth->bind_param( 1, 'imported' );
737             $updsth->bind_param( 2, $item->{itemnumber} );
738             $updsth->bind_param( 3, undef );
739             $updsth->bind_param( 4, $row->{'import_items_id'} );
740             $updsth->execute();
741             $updsth->finish();
742             $num_items_replaced++;
743         } elsif ($duplicate_barcode) {
744             $updsth->bind_param( 1, 'error' );
745             $updsth->bind_param( 2, undef );
746             $updsth->bind_param( 3, 'duplicate item barcode' );
747             $updsth->bind_param( 4, $row->{'import_items_id'} );
748             $updsth->execute();
749             $num_items_errored++;
750         } else {
751             # Remove the itemnumber if it exists, we want to create a new item
752             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
753             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
754
755             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
756             if( $itemnumber ) {
757                 $updsth->bind_param( 1, 'imported' );
758                 $updsth->bind_param( 2, $itemnumber );
759                 $updsth->bind_param( 3, undef );
760                 $updsth->bind_param( 4, $row->{'import_items_id'} );
761                 $updsth->execute();
762                 $updsth->finish();
763                 $num_items_added++;
764             }
765         }
766     }
767
768     return ( $num_items_added, $num_items_replaced, $num_items_errored );
769 }
770
771 =head2 BatchRevertRecords
772
773   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
774       $num_ignored) = BatchRevertRecords($batch_id);
775
776 =cut
777
778 sub BatchRevertRecords {
779     my $batch_id = shift;
780
781     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
782
783     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
784
785     my $record_type;
786     my $num_deleted = 0;
787     my $num_errors = 0;
788     my $num_reverted = 0;
789     my $num_ignored = 0;
790     my $num_items_deleted = 0;
791     # commit (i.e., save, all records in the batch)
792     SetImportBatchStatus($batch_id, 'reverting');
793     my $overlay_action = GetImportBatchOverlayAction($batch_id);
794     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
795     my $dbh = C4::Context->dbh;
796     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
797                              FROM import_records
798                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
799                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
800                              WHERE import_batch_id = ?");
801     $sth->execute($batch_id);
802     my $marc_type;
803     my $marcflavour = C4::Context->preference('marcflavour');
804     while (my $rowref = $sth->fetchrow_hashref) {
805         $record_type = $rowref->{'record_type'};
806         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
807             $num_ignored++;
808             next;
809         }
810         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
811             $marc_type = 'UNIMARCAUTH';
812         } elsif ($marcflavour eq 'UNIMARC') {
813             $marc_type = 'UNIMARC';
814         } else {
815             $marc_type = 'USMARC';
816         }
817
818         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
819
820         if ($record_result eq 'delete') {
821             my $error = undef;
822             if  ($record_type eq 'biblio') {
823                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
824                 $error = DelBiblio($rowref->{'matched_biblionumber'});
825             } else {
826                 DelAuthority({ authid => $rowref->{'matched_authid'} });
827             }
828             if (defined $error) {
829                 $num_errors++;
830             } else {
831                 $num_deleted++;
832                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
833             }
834         } elsif ($record_result eq 'restore') {
835             $num_reverted++;
836             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
837             if ($record_type eq 'biblio') {
838                 my $biblionumber = $rowref->{'matched_biblionumber'};
839                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
840
841                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
842                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
843
844                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
845                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
846             } else {
847                 my $authid = $rowref->{'matched_authid'};
848                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
849             }
850             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
851         } elsif ($record_result eq 'ignore') {
852             if ($record_type eq 'biblio') {
853                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
854             }
855             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
856         }
857         my $query;
858         if ($record_type eq 'biblio') {
859             # remove matched_biblionumber only if there is no 'imported' item left
860             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
861             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
862         } else {
863             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
864         }
865         my $sth2 = $dbh->prepare_cached($query);
866         $sth2->execute($rowref->{'import_record_id'});
867     }
868
869     $sth->finish();
870     SetImportBatchStatus($batch_id, 'reverted');
871     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
872 }
873
874 =head2 BatchRevertItems
875
876   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
877
878 =cut
879
880 sub BatchRevertItems {
881     my ($import_record_id, $biblionumber) = @_;
882
883     my $dbh = C4::Context->dbh;
884     my $num_items_deleted = 0;
885
886     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
887                                    FROM import_items
888                                    JOIN items USING (itemnumber)
889                                    WHERE import_record_id = ?");
890     $sth->bind_param(1, $import_record_id);
891     $sth->execute();
892     while (my $row = $sth->fetchrow_hashref()) {
893         my $item = Koha::Items->find($row->{itemnumber});
894         if ($item->safe_delete){
895             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
896             $updsth->bind_param(1, 'reverted');
897             $updsth->bind_param(2, $row->{'import_items_id'});
898             $updsth->execute();
899             $updsth->finish();
900             $num_items_deleted++;
901         }
902         else {
903             next;
904         }
905     }
906     $sth->finish();
907     return $num_items_deleted;
908 }
909
910 =head2 CleanBatch
911
912   CleanBatch($batch_id)
913
914 Deletes all staged records from the import batch
915 and sets the status of the batch to 'cleaned'.  Note
916 that deleting a stage record does *not* affect
917 any record that has been committed to the database.
918
919 =cut
920
921 sub CleanBatch {
922     my $batch_id = shift;
923     return unless defined $batch_id;
924
925     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
926     SetImportBatchStatus($batch_id, 'cleaned');
927 }
928
929 =head2 DeleteBatch
930
931   DeleteBatch($batch_id)
932
933 Deletes the record from the database. This can only be done
934 once the batch has been cleaned.
935
936 =cut
937
938 sub DeleteBatch {
939     my $batch_id = shift;
940     return unless defined $batch_id;
941
942     my $dbh = C4::Context->dbh;
943     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
944     $sth->execute( $batch_id );
945 }
946
947 =head2 GetAllImportBatches
948
949   my $results = GetAllImportBatches();
950
951 Returns a references to an array of hash references corresponding
952 to all import_batches rows (of batch_type 'batch'), sorted in 
953 ascending order by import_batch_id.
954
955 =cut
956
957 sub  GetAllImportBatches {
958     my $dbh = C4::Context->dbh;
959     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
960                                     WHERE batch_type IN ('batch', 'webservice')
961                                     ORDER BY import_batch_id ASC");
962
963     my $results = [];
964     $sth->execute();
965     while (my $row = $sth->fetchrow_hashref) {
966         push @$results, $row;
967     }
968     $sth->finish();
969     return $results;
970 }
971
972 =head2 GetStagedWebserviceBatches
973
974   my $batch_ids = GetStagedWebserviceBatches();
975
976 Returns a references to an array of batch id's
977 of batch_type 'webservice' that are not imported
978
979 =cut
980
981 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
982 SELECT import_batch_id FROM import_batches
983 WHERE batch_type = 'webservice'
984 AND import_status = 'staged'
985 EOQ
986 sub  GetStagedWebserviceBatches {
987     my $dbh = C4::Context->dbh;
988     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
989 }
990
991 =head2 GetImportBatchRangeDesc
992
993   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
994
995 Returns a reference to an array of hash references corresponding to
996 import_batches rows (sorted in descending order by import_batch_id)
997 start at the given offset.
998
999 =cut
1000
1001 sub GetImportBatchRangeDesc {
1002     my ($offset, $results_per_group) = @_;
1003
1004     my $dbh = C4::Context->dbh;
1005     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1006                                     LEFT JOIN import_batch_profiles p
1007                                     ON b.profile_id = p.id
1008                                     WHERE b.batch_type IN ('batch', 'webservice')
1009                                     ORDER BY b.import_batch_id DESC";
1010     my @params;
1011     if ($results_per_group){
1012         $query .= " LIMIT ?";
1013         push(@params, $results_per_group);
1014     }
1015     if ($offset){
1016         $query .= " OFFSET ?";
1017         push(@params, $offset);
1018     }
1019     my $sth = $dbh->prepare_cached($query);
1020     $sth->execute(@params);
1021     my $results = $sth->fetchall_arrayref({});
1022     $sth->finish();
1023     return $results;
1024 }
1025
1026 =head2 GetItemNumbersFromImportBatch
1027
1028   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1029
1030 =cut
1031
1032 sub GetItemNumbersFromImportBatch {
1033     my ($batch_id) = @_;
1034     my $dbh = C4::Context->dbh;
1035     my $sql = q|
1036 SELECT itemnumber FROM import_items
1037 INNER JOIN items USING (itemnumber)
1038 INNER JOIN import_records USING (import_record_id)
1039 WHERE import_batch_id = ?|;
1040     my  $sth = $dbh->prepare( $sql );
1041     $sth->execute($batch_id);
1042     my @items ;
1043     while ( my ($itm) = $sth->fetchrow_array ) {
1044         push @items, $itm;
1045     }
1046     return @items;
1047 }
1048
1049 =head2 GetNumberOfImportBatches
1050
1051   my $count = GetNumberOfImportBatches();
1052
1053 =cut
1054
1055 sub GetNumberOfNonZ3950ImportBatches {
1056     my $dbh = C4::Context->dbh;
1057     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1058     $sth->execute();
1059     my ($count) = $sth->fetchrow_array();
1060     $sth->finish();
1061     return $count;
1062 }
1063
1064 =head2 GetImportBiblios
1065
1066   my $results = GetImportBiblios($importid);
1067
1068 =cut
1069
1070 sub GetImportBiblios {
1071     my ($import_record_id) = @_;
1072
1073     my $dbh = C4::Context->dbh;
1074     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1075     return $dbh->selectall_arrayref(
1076         $query,
1077         { Slice => {} },
1078         $import_record_id
1079     );
1080
1081 }
1082
1083 =head2 GetImportRecordsRange
1084
1085   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1086
1087 Returns a reference to an array of hash references corresponding to
1088 import_biblios/import_auths/import_records rows for a given batch
1089 starting at the given offset.
1090
1091 =cut
1092
1093 sub GetImportRecordsRange {
1094     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1095
1096     my $dbh = C4::Context->dbh;
1097
1098     my $order_by = $parameters->{order_by} || 'import_record_id';
1099     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1100
1101     my $order_by_direction =
1102       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1103
1104     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1105
1106     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1107                                            record_sequence, status, overlay_status,
1108                                            matched_biblionumber, matched_authid, record_type
1109                                     FROM   import_records
1110                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1111                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1112                                     WHERE  import_batch_id = ?";
1113     my @params;
1114     push(@params, $batch_id);
1115     if ($status) {
1116         $query .= " AND status=?";
1117         push(@params,$status);
1118     }
1119
1120     $query.=" ORDER BY $order_by $order_by_direction";
1121
1122     if($results_per_group){
1123         $query .= " LIMIT ?";
1124         push(@params, $results_per_group);
1125     }
1126     if($offset){
1127         $query .= " OFFSET ?";
1128         push(@params, $offset);
1129     }
1130     my $sth = $dbh->prepare_cached($query);
1131     $sth->execute(@params);
1132     my $results = $sth->fetchall_arrayref({});
1133     $sth->finish();
1134     return $results;
1135
1136 }
1137
1138 =head2 GetBestRecordMatch
1139
1140   my $record_id = GetBestRecordMatch($import_record_id);
1141
1142 =cut
1143
1144 sub GetBestRecordMatch {
1145     my ($import_record_id) = @_;
1146
1147     my $dbh = C4::Context->dbh;
1148     my $sth = $dbh->prepare("SELECT candidate_match_id
1149                              FROM   import_record_matches
1150                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1151                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1152                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1153                              WHERE  import_record_matches.import_record_id = ? AND
1154                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1155                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1156                              AND chosen = 1
1157                              ORDER BY score DESC, candidate_match_id DESC");
1158     $sth->execute($import_record_id);
1159     my ($record_id) = $sth->fetchrow_array();
1160     $sth->finish();
1161     return $record_id;
1162 }
1163
1164 =head2 GetImportBatchStatus
1165
1166   my $status = GetImportBatchStatus($batch_id);
1167
1168 =cut
1169
1170 sub GetImportBatchStatus {
1171     my ($batch_id) = @_;
1172
1173     my $dbh = C4::Context->dbh;
1174     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1175     $sth->execute($batch_id);
1176     my ($status) = $sth->fetchrow_array();
1177     $sth->finish();
1178     return $status;
1179
1180 }
1181
1182 =head2 SetImportBatchStatus
1183
1184   SetImportBatchStatus($batch_id, $new_status);
1185
1186 =cut
1187
1188 sub SetImportBatchStatus {
1189     my ($batch_id, $new_status) = @_;
1190
1191     my $dbh = C4::Context->dbh;
1192     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1193     $sth->execute($new_status, $batch_id);
1194     $sth->finish();
1195
1196 }
1197
1198 =head2 SetMatchedBiblionumber
1199
1200   SetMatchedBiblionumber($import_record_id, $biblionumber);
1201
1202 =cut
1203
1204 sub SetMatchedBiblionumber {
1205     my ($import_record_id, $biblionumber) = @_;
1206
1207     my $dbh = C4::Context->dbh;
1208     $dbh->do(
1209         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1210         undef, $biblionumber, $import_record_id
1211     );
1212 }
1213
1214 =head2 GetImportBatchOverlayAction
1215
1216   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1217
1218 =cut
1219
1220 sub GetImportBatchOverlayAction {
1221     my ($batch_id) = @_;
1222
1223     my $dbh = C4::Context->dbh;
1224     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1225     $sth->execute($batch_id);
1226     my ($overlay_action) = $sth->fetchrow_array();
1227     $sth->finish();
1228     return $overlay_action;
1229
1230 }
1231
1232
1233 =head2 SetImportBatchOverlayAction
1234
1235   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1236
1237 =cut
1238
1239 sub SetImportBatchOverlayAction {
1240     my ($batch_id, $new_overlay_action) = @_;
1241
1242     my $dbh = C4::Context->dbh;
1243     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1244     $sth->execute($new_overlay_action, $batch_id);
1245     $sth->finish();
1246
1247 }
1248
1249 =head2 GetImportBatchNoMatchAction
1250
1251   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1252
1253 =cut
1254
1255 sub GetImportBatchNoMatchAction {
1256     my ($batch_id) = @_;
1257
1258     my $dbh = C4::Context->dbh;
1259     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1260     $sth->execute($batch_id);
1261     my ($nomatch_action) = $sth->fetchrow_array();
1262     $sth->finish();
1263     return $nomatch_action;
1264
1265 }
1266
1267
1268 =head2 SetImportBatchNoMatchAction
1269
1270   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1271
1272 =cut
1273
1274 sub SetImportBatchNoMatchAction {
1275     my ($batch_id, $new_nomatch_action) = @_;
1276
1277     my $dbh = C4::Context->dbh;
1278     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1279     $sth->execute($new_nomatch_action, $batch_id);
1280     $sth->finish();
1281
1282 }
1283
1284 =head2 GetImportBatchItemAction
1285
1286   my $item_action = GetImportBatchItemAction($batch_id);
1287
1288 =cut
1289
1290 sub GetImportBatchItemAction {
1291     my ($batch_id) = @_;
1292
1293     my $dbh = C4::Context->dbh;
1294     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1295     $sth->execute($batch_id);
1296     my ($item_action) = $sth->fetchrow_array();
1297     $sth->finish();
1298     return $item_action;
1299
1300 }
1301
1302
1303 =head2 SetImportBatchItemAction
1304
1305   SetImportBatchItemAction($batch_id, $new_item_action);
1306
1307 =cut
1308
1309 sub SetImportBatchItemAction {
1310     my ($batch_id, $new_item_action) = @_;
1311
1312     my $dbh = C4::Context->dbh;
1313     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1314     $sth->execute($new_item_action, $batch_id);
1315     $sth->finish();
1316
1317 }
1318
1319 =head2 GetImportBatchMatcher
1320
1321   my $matcher_id = GetImportBatchMatcher($batch_id);
1322
1323 =cut
1324
1325 sub GetImportBatchMatcher {
1326     my ($batch_id) = @_;
1327
1328     my $dbh = C4::Context->dbh;
1329     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1330     $sth->execute($batch_id);
1331     my ($matcher_id) = $sth->fetchrow_array();
1332     $sth->finish();
1333     return $matcher_id;
1334
1335 }
1336
1337
1338 =head2 SetImportBatchMatcher
1339
1340   SetImportBatchMatcher($batch_id, $new_matcher_id);
1341
1342 =cut
1343
1344 sub SetImportBatchMatcher {
1345     my ($batch_id, $new_matcher_id) = @_;
1346
1347     my $dbh = C4::Context->dbh;
1348     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1349     $sth->execute($new_matcher_id, $batch_id);
1350     $sth->finish();
1351
1352 }
1353
1354 =head2 GetImportRecordOverlayStatus
1355
1356   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1357
1358 =cut
1359
1360 sub GetImportRecordOverlayStatus {
1361     my ($import_record_id) = @_;
1362
1363     my $dbh = C4::Context->dbh;
1364     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1365     $sth->execute($import_record_id);
1366     my ($overlay_status) = $sth->fetchrow_array();
1367     $sth->finish();
1368     return $overlay_status;
1369
1370 }
1371
1372
1373 =head2 SetImportRecordOverlayStatus
1374
1375   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1376
1377 =cut
1378
1379 sub SetImportRecordOverlayStatus {
1380     my ($import_record_id, $new_overlay_status) = @_;
1381
1382     my $dbh = C4::Context->dbh;
1383     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1384     $sth->execute($new_overlay_status, $import_record_id);
1385     $sth->finish();
1386
1387 }
1388
1389 =head2 GetImportRecordStatus
1390
1391   my $status = GetImportRecordStatus($import_record_id);
1392
1393 =cut
1394
1395 sub GetImportRecordStatus {
1396     my ($import_record_id) = @_;
1397
1398     my $dbh = C4::Context->dbh;
1399     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1400     $sth->execute($import_record_id);
1401     my ($status) = $sth->fetchrow_array();
1402     $sth->finish();
1403     return $status;
1404
1405 }
1406
1407
1408 =head2 SetImportRecordStatus
1409
1410   SetImportRecordStatus($import_record_id, $new_status);
1411
1412 =cut
1413
1414 sub SetImportRecordStatus {
1415     my ($import_record_id, $new_status) = @_;
1416
1417     my $dbh = C4::Context->dbh;
1418     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1419     $sth->execute($new_status, $import_record_id);
1420     $sth->finish();
1421
1422 }
1423
1424 =head2 GetImportRecordMatches
1425
1426   my $results = GetImportRecordMatches($import_record_id, $best_only);
1427
1428 =cut
1429
1430 sub GetImportRecordMatches {
1431     my $import_record_id = shift;
1432     my $best_only = @_ ? shift : 0;
1433
1434     my $dbh = C4::Context->dbh;
1435     # FIXME currently biblio only
1436     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1437                                     candidate_match_id, score, record_type,
1438                                     chosen
1439                                     FROM import_records
1440                                     JOIN import_record_matches USING (import_record_id)
1441                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1442                                     WHERE import_record_id = ?
1443                                     ORDER BY score DESC, biblionumber DESC");
1444     $sth->bind_param(1, $import_record_id);
1445     my $results = [];
1446     $sth->execute();
1447     while (my $row = $sth->fetchrow_hashref) {
1448         if ($row->{'record_type'} eq 'auth') {
1449             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1450         }
1451         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1452         push @$results, $row;
1453         last if $best_only;
1454     }
1455     $sth->finish();
1456
1457     return $results;
1458     
1459 }
1460
1461 =head2 SetImportRecordMatches
1462
1463   SetImportRecordMatches($import_record_id, @matches);
1464
1465 =cut
1466
1467 sub SetImportRecordMatches {
1468     my $import_record_id = shift;
1469     my @matches = @_;
1470
1471     my $dbh = C4::Context->dbh;
1472     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1473     $delsth->execute($import_record_id);
1474     $delsth->finish();
1475
1476     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1477                                     VALUES (?, ?, ?, ?)");
1478     my $chosen = 1; #The first match is defaulted to be chosen
1479     foreach my $match (@matches) {
1480         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1481         $chosen = 0; #After the first we do not default to other matches
1482     }
1483 }
1484
1485 =head2 RecordsFromISO2709File
1486
1487     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1488
1489 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1490
1491 @PARAM1, String, absolute path to the ISO2709 file.
1492 @PARAM2, String, see stage_file.pl
1493 @PARAM3, String, should be utf8
1494
1495 Returns two array refs.
1496
1497 =cut
1498
1499 sub RecordsFromISO2709File {
1500     my ($input_file, $record_type, $encoding) = @_;
1501     my @errors;
1502
1503     my $marc_type = C4::Context->preference('marcflavour');
1504     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1505
1506     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1507     my @marc_records;
1508     $/ = "\035";
1509     while (<$fh>) {
1510         s/^\s+//;
1511         s/\s+$//;
1512         next unless $_; # skip if record has only whitespace, as might occur
1513                         # if file includes newlines between each MARC record
1514         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1515         push @marc_records, $marc_record;
1516         if ($charset_guessed ne $encoding) {
1517             push @errors,
1518                 "Unexpected charset $charset_guessed, expecting $encoding";
1519         }
1520     }
1521     close $fh;
1522     return ( \@errors, \@marc_records );
1523 }
1524
1525 =head2 RecordsFromMARCXMLFile
1526
1527     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1528
1529 Creates MARC::Record-objects out of the given MARCXML-file.
1530
1531 @PARAM1, String, absolute path to the ISO2709 file.
1532 @PARAM2, String, should be utf8
1533
1534 Returns two array refs.
1535
1536 =cut
1537
1538 sub RecordsFromMARCXMLFile {
1539     my ( $filename, $encoding ) = @_;
1540     my $batch = MARC::File::XML->in( $filename );
1541     my ( @marcRecords, @errors, $record );
1542     do {
1543         eval { $record = $batch->next( $encoding ); };
1544         if ($@) {
1545             push @errors, $@;
1546         }
1547         push @marcRecords, $record if $record;
1548     } while( $record );
1549     return (\@errors, \@marcRecords);
1550 }
1551
1552 =head2 RecordsFromMarcPlugin
1553
1554     Converts text of input_file into array of MARC records with to_marc plugin
1555
1556 =cut
1557
1558 sub RecordsFromMarcPlugin {
1559     my ($input_file, $plugin_class, $encoding) = @_;
1560     my ( $text, @return );
1561     return \@return if !$input_file || !$plugin_class;
1562
1563     # Read input file
1564     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1565     $/ = "\035";
1566     while (<$fh>) {
1567         s/^\s+//;
1568         s/\s+$//;
1569         next unless $_;
1570         $text .= $_;
1571     }
1572     close $fh;
1573
1574     # Convert to large MARC blob with plugin
1575     $text = Koha::Plugins::Handler->run({
1576         class  => $plugin_class,
1577         method => 'to_marc',
1578         params => { data => $text },
1579     }) if $text;
1580
1581     # Convert to array of MARC records
1582     if( $text ) {
1583         my $marc_type = C4::Context->preference('marcflavour');
1584         foreach my $blob ( split(/\x1D/, $text) ) {
1585             next if $blob =~ /^\s*$/;
1586             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1587             push @return, $marcrecord;
1588         }
1589     }
1590     return \@return;
1591 }
1592
1593 # internal functions
1594
1595 sub _create_import_record {
1596     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1597
1598     my $dbh = C4::Context->dbh;
1599     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1600                                                          record_type, encoding)
1601                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1602     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1603                   $record_type, $encoding);
1604     my $import_record_id = $dbh->{'mysql_insertid'};
1605     $sth->finish();
1606     return $import_record_id;
1607 }
1608
1609 sub _add_auth_fields {
1610     my ($import_record_id, $marc_record) = @_;
1611
1612     my $controlnumber;
1613     if ($marc_record->field('001')) {
1614         $controlnumber = $marc_record->field('001')->data();
1615     }
1616     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1617     my $dbh = C4::Context->dbh;
1618     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1619     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1620     $sth->finish();
1621 }
1622
1623 sub _add_biblio_fields {
1624     my ($import_record_id, $marc_record) = @_;
1625
1626     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1627     my $dbh = C4::Context->dbh;
1628     # FIXME no controlnumber, originalsource
1629     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1630     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1631     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1632     $sth->finish();
1633                 
1634 }
1635
1636 sub _update_biblio_fields {
1637     my ($import_record_id, $marc_record) = @_;
1638
1639     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1640     my $dbh = C4::Context->dbh;
1641     # FIXME no controlnumber, originalsource
1642     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1643     $isbn =~ s/\(.*$//;
1644     $isbn =~ tr/ -_//;
1645     $isbn = uc $isbn;
1646     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1647                              WHERE  import_record_id = ?");
1648     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1649     $sth->finish();
1650 }
1651
1652 sub _parse_biblio_fields {
1653     my ($marc_record) = @_;
1654
1655     my $dbh = C4::Context->dbh;
1656     my $bibliofields = TransformMarcToKoha($marc_record, '');
1657     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1658
1659 }
1660
1661 sub _update_batch_record_counts {
1662     my ($batch_id) = @_;
1663
1664     my $dbh = C4::Context->dbh;
1665     my ( $num_records ) = $dbh->selectrow_array(q|
1666                                             SELECT COUNT(*)
1667                                             FROM import_records
1668                                             WHERE import_batch_id = ?
1669     |, undef, $batch_id );
1670     my ( $num_items ) = $dbh->selectrow_array(q|
1671                                             SELECT COUNT(*)
1672                                             FROM import_records
1673                                             JOIN import_items USING (import_record_id)
1674                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1675     |, undef, $batch_id );
1676     $dbh->do(
1677         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1678         undef,
1679         $num_records,
1680         $num_items,
1681         $batch_id,
1682     );
1683 }
1684
1685 sub _get_commit_action {
1686     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1687     
1688     if ($record_type eq 'biblio') {
1689         my ($bib_result, $bib_match, $item_result);
1690
1691         $bib_match = GetBestRecordMatch($import_record_id);
1692         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1693
1694             $bib_result = $overlay_action;
1695
1696             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1697                 $item_result = 'create_new';
1698             } elsif($item_action eq 'replace'){
1699                 $item_result = 'replace';
1700             } else {
1701                 $item_result = 'ignore';
1702             }
1703
1704         } else {
1705             $bib_result = $nomatch_action;
1706             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1707         }
1708         return ($bib_result, $item_result, $bib_match);
1709     } else { # must be auths
1710         my ($auth_result, $auth_match);
1711
1712         $auth_match = GetBestRecordMatch($import_record_id);
1713         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1714             $auth_result = $overlay_action;
1715         } else {
1716             $auth_result = $nomatch_action;
1717         }
1718
1719         return ($auth_result, undef, $auth_match);
1720
1721     }
1722 }
1723
1724 sub _get_revert_action {
1725     my ($overlay_action, $overlay_status, $status) = @_;
1726
1727     my $bib_result;
1728
1729     if ($status eq 'ignored') {
1730         $bib_result = 'ignore';
1731     } else {
1732         if ($overlay_action eq 'create_new') {
1733             $bib_result = 'delete';
1734         } else {
1735             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1736         }
1737     }
1738     return $bib_result;
1739 }
1740
1741 1;
1742 __END__
1743
1744 =head1 AUTHOR
1745
1746 Koha Development Team <http://koha-community.org/>
1747
1748 Galen Charlton <galen.charlton@liblime.com>
1749
1750 =cut