Update release notes for 22.05.05 release
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       AddImportBatch
50       GetImportBatch
51       AddAuthToBatch
52       AddBiblioToBatch
53       AddItemsToImportBiblio
54       ModAuthorityInBatch
55
56       BatchStageMarcRecords
57       BatchFindDuplicates
58       BatchCommitRecords
59       BatchRevertRecords
60       CleanBatch
61       DeleteBatch
62
63       GetAllImportBatches
64       GetStagedWebserviceBatches
65       GetImportBatchRangeDesc
66       GetNumberOfNonZ3950ImportBatches
67       GetImportBiblios
68       GetImportRecordsRange
69       GetItemNumbersFromImportBatch
70
71       GetImportBatchStatus
72       SetImportBatchStatus
73       GetImportBatchOverlayAction
74       SetImportBatchOverlayAction
75       GetImportBatchNoMatchAction
76       SetImportBatchNoMatchAction
77       GetImportBatchItemAction
78       SetImportBatchItemAction
79       GetImportBatchMatcher
80       SetImportBatchMatcher
81       GetImportRecordOverlayStatus
82       SetImportRecordOverlayStatus
83       GetImportRecordStatus
84       SetImportRecordStatus
85       SetMatchedBiblionumber
86       GetImportRecordMatches
87       SetImportRecordMatches
88
89       RecordsFromMARCXMLFile
90       RecordsFromISO2709File
91       RecordsFromMarcPlugin
92     );
93 }
94
95 =head1 NAME
96
97 C4::ImportBatch - manage batches of imported MARC records
98
99 =head1 SYNOPSIS
100
101 use C4::ImportBatch;
102
103 =head1 FUNCTIONS
104
105 =head2 GetZ3950BatchId
106
107   my $batchid = GetZ3950BatchId($z3950server);
108
109 Retrieves the ID of the import batch for the Z39.50
110 reservoir for the given target.  If necessary,
111 creates the import batch.
112
113 =cut
114
115 sub GetZ3950BatchId {
116     my ($z3950server) = @_;
117
118     my $dbh = C4::Context->dbh;
119     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
120                              WHERE  batch_type = 'z3950'
121                              AND    file_name = ?");
122     $sth->execute($z3950server);
123     my $rowref = $sth->fetchrow_arrayref();
124     $sth->finish();
125     if (defined $rowref) {
126         return $rowref->[0];
127     } else {
128         my $batch_id = AddImportBatch( {
129                 overlay_action => 'create_new',
130                 import_status => 'staged',
131                 batch_type => 'z3950',
132                 file_name => $z3950server,
133             } );
134         return $batch_id;
135     }
136     
137 }
138
139 =head2 GetWebserviceBatchId
140
141   my $batchid = GetWebserviceBatchId();
142
143 Retrieves the ID of the import batch for webservice.
144 If necessary, creates the import batch.
145
146 =cut
147
148 my $WEBSERVICE_BASE_QRY = <<EOQ;
149 SELECT import_batch_id FROM import_batches
150 WHERE  batch_type = 'webservice'
151 AND    import_status = 'staged'
152 EOQ
153 sub GetWebserviceBatchId {
154     my ($params) = @_;
155
156     my $dbh = C4::Context->dbh;
157     my $sql = $WEBSERVICE_BASE_QRY;
158     my @args;
159     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
160         if (my $val = $params->{$field}) {
161             $sql .= " AND $field = ?";
162             push @args, $val;
163         }
164     }
165     my $id = $dbh->selectrow_array($sql, undef, @args);
166     return $id if $id;
167
168     $params->{batch_type} = 'webservice';
169     $params->{import_status} = 'staged';
170     return AddImportBatch($params);
171 }
172
173 =head2 GetImportRecordMarc
174
175   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
176
177 =cut
178
179 sub GetImportRecordMarc {
180     my ($import_record_id) = @_;
181
182     my $dbh = C4::Context->dbh;
183     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
184         SELECT marc, encoding
185         FROM import_records
186         WHERE import_record_id = ?
187     |, undef, $import_record_id );
188
189     return $marc, $encoding;
190 }
191
192 sub EmbedItemsInImportBiblio {
193     my ( $record, $import_record_id ) = @_;
194     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
195     my $dbh = C4::Context->dbh;
196     my $import_items = $dbh->selectall_arrayref(q|
197         SELECT import_items.marcxml
198         FROM import_items
199         WHERE import_record_id = ?
200     |, { Slice => {} }, $import_record_id );
201     my @item_fields;
202     for my $import_item ( @$import_items ) {
203         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
204         push @item_fields, $item_marc->field($itemtag);
205     }
206     $record->append_fields(@item_fields);
207     return $record;
208 }
209
210 =head2 AddImportBatch
211
212   my $batch_id = AddImportBatch($params_hash);
213
214 =cut
215
216 sub AddImportBatch {
217     my ($params) = @_;
218
219     my (@fields, @vals);
220     foreach (qw( matcher_id template_id branchcode
221                  overlay_action nomatch_action item_action
222                  import_status batch_type file_name comments record_type )) {
223         if (exists $params->{$_}) {
224             push @fields, $_;
225             push @vals, $params->{$_};
226         }
227     }
228     my $dbh = C4::Context->dbh;
229     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
230                                   VALUES (".join( ',', map '?', @fields).")",
231              undef,
232              @vals);
233     return $dbh->{'mysql_insertid'};
234 }
235
236 =head2 GetImportBatch 
237
238   my $row = GetImportBatch($batch_id);
239
240 Retrieve a hashref of an import_batches row.
241
242 =cut
243
244 sub GetImportBatch {
245     my ($batch_id) = @_;
246
247     my $dbh = C4::Context->dbh;
248     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
249     $sth->bind_param(1, $batch_id);
250     $sth->execute();
251     my $result = $sth->fetchrow_hashref;
252     $sth->finish();
253     return $result;
254
255 }
256
257 =head2 AddBiblioToBatch 
258
259   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
260                 $marc_record, $encoding, $update_counts);
261
262 =cut
263
264 sub AddBiblioToBatch {
265     my $batch_id = shift;
266     my $record_sequence = shift;
267     my $marc_record = shift;
268     my $encoding = shift;
269     my $update_counts = @_ ? shift : 1;
270
271     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
272     _add_biblio_fields($import_record_id, $marc_record);
273     _update_batch_record_counts($batch_id) if $update_counts;
274     return $import_record_id;
275 }
276
277 =head2 AddAuthToBatch
278
279   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
280                 $marc_record, $encoding, $update_counts, [$marc_type]);
281
282 =cut
283
284 sub AddAuthToBatch {
285     my $batch_id = shift;
286     my $record_sequence = shift;
287     my $marc_record = shift;
288     my $encoding = shift;
289     my $update_counts = @_ ? shift : 1;
290     my $marc_type = shift || C4::Context->preference('marcflavour');
291
292     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
293
294     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
295     _add_auth_fields($import_record_id, $marc_record);
296     _update_batch_record_counts($batch_id) if $update_counts;
297     return $import_record_id;
298 }
299
300 =head2 BatchStageMarcRecords
301
302 ( $batch_id, $num_records, $num_items, @invalid_records ) =
303   BatchStageMarcRecords(
304     $record_type,                $encoding,
305     $marc_records,               $file_name,
306     $marc_modification_template, $comments,
307     $branch_code,                $parse_items,
308     $leave_as_staging,           $progress_interval,
309     $progress_callback
310   );
311
312 =cut
313
314 sub BatchStageMarcRecords {
315     my $record_type = shift;
316     my $encoding = shift;
317     my $marc_records = shift;
318     my $file_name = shift;
319     my $marc_modification_template = shift;
320     my $comments = shift;
321     my $branch_code = shift;
322     my $parse_items = shift;
323     my $leave_as_staging = shift;
324
325     # optional callback to monitor status 
326     # of job
327     my $progress_interval = 0;
328     my $progress_callback = undef;
329     if ($#_ == 1) {
330         $progress_interval = shift;
331         $progress_callback = shift;
332         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
333         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
334     } 
335     
336     my $batch_id = AddImportBatch( {
337             overlay_action => 'create_new',
338             import_status => 'staging',
339             batch_type => 'batch',
340             file_name => $file_name,
341             comments => $comments,
342             record_type => $record_type,
343         } );
344     if ($parse_items) {
345         SetImportBatchItemAction($batch_id, 'always_add');
346     } else {
347         SetImportBatchItemAction($batch_id, 'ignore');
348     }
349
350
351     my $marc_type = C4::Context->preference('marcflavour');
352     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
353     my @invalid_records = ();
354     my $num_valid = 0;
355     my $num_items = 0;
356     # FIXME - for now, we're dealing only with bibs
357     my $rec_num = 0;
358     foreach my $marc_record (@$marc_records) {
359         $rec_num++;
360         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
361             &$progress_callback($rec_num);
362         }
363
364         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
365
366         my $import_record_id;
367         if (scalar($marc_record->fields()) == 0) {
368             push @invalid_records, $marc_record;
369         } else {
370
371             # Normalize the record so it doesn't have separated diacritics
372             SetUTF8Flag($marc_record);
373
374             $num_valid++;
375             if ($record_type eq 'biblio') {
376                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
377                 if ($parse_items) {
378                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
379                     $num_items += scalar(@import_items_ids);
380                 }
381             } elsif ($record_type eq 'auth') {
382                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
383             }
384         }
385     }
386     unless ($leave_as_staging) {
387         SetImportBatchStatus($batch_id, 'staged');
388     }
389     # FIXME branch_code, number of bibs, number of items
390     _update_batch_record_counts($batch_id);
391     return ($batch_id, $num_valid, $num_items, @invalid_records);
392 }
393
394 =head2 AddItemsToImportBiblio
395
396   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
397                 $import_record_id, $marc_record, $update_counts);
398
399 =cut
400
401 sub AddItemsToImportBiblio {
402     my $batch_id = shift;
403     my $import_record_id = shift;
404     my $marc_record = shift;
405     my $update_counts = @_ ? shift : 0;
406
407     my @import_items_ids = ();
408    
409     my $dbh = C4::Context->dbh; 
410     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
411     foreach my $item_field ($marc_record->field($item_tag)) {
412         my $item_marc = MARC::Record->new();
413         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
414         $item_marc->append_fields($item_field);
415         $marc_record->delete_field($item_field);
416         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
417                                         VALUES (?, ?, ?)");
418         $sth->bind_param(1, $import_record_id);
419         $sth->bind_param(2, 'staged');
420         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
421         $sth->execute();
422         push @import_items_ids, $dbh->{'mysql_insertid'};
423         $sth->finish();
424     }
425
426     if ($#import_items_ids > -1) {
427         _update_batch_record_counts($batch_id) if $update_counts;
428         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
429     }
430     return @import_items_ids;
431 }
432
433 =head2 BatchFindDuplicates
434
435   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
436              $max_matches, $progress_interval, $progress_callback);
437
438 Goes through the records loaded in the batch and attempts to 
439 find duplicates for each one.  Sets the matching status 
440 of each record to "no_match" or "auto_match" as appropriate.
441
442 The $max_matches parameter is optional; if it is not supplied,
443 it defaults to 10.
444
445 The $progress_interval and $progress_callback parameters are 
446 optional; if both are supplied, the sub referred to by
447 $progress_callback will be invoked every $progress_interval
448 records using the number of records processed as the 
449 singular argument.
450
451 =cut
452
453 sub BatchFindDuplicates {
454     my $batch_id = shift;
455     my $matcher = shift;
456     my $max_matches = @_ ? shift : 10;
457
458     # optional callback to monitor status 
459     # of job
460     my $progress_interval = 0;
461     my $progress_callback = undef;
462     if ($#_ == 1) {
463         $progress_interval = shift;
464         $progress_callback = shift;
465         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
466         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
467     }
468
469     my $dbh = C4::Context->dbh;
470
471     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
472                              FROM import_records
473                              WHERE import_batch_id = ?");
474     $sth->execute($batch_id);
475     my $num_with_matches = 0;
476     my $rec_num = 0;
477     while (my $rowref = $sth->fetchrow_hashref) {
478         $rec_num++;
479         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
480             &$progress_callback($rec_num);
481         }
482         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
483         my @matches = ();
484         if (defined $matcher) {
485             @matches = $matcher->get_matches($marc_record, $max_matches);
486         }
487         if (scalar(@matches) > 0) {
488             $num_with_matches++;
489             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
490             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
491         } else {
492             SetImportRecordMatches($rowref->{'import_record_id'}, ());
493             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
494         }
495     }
496     $sth->finish();
497     return $num_with_matches;
498 }
499
500 =head2 BatchCommitRecords
501
502   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
503         BatchCommitRecords($batch_id, $framework,
504         $progress_interval, $progress_callback);
505
506 =cut
507
508 sub BatchCommitRecords {
509     my $batch_id = shift;
510     my $framework = shift;
511
512     my $schema = Koha::Database->schema;
513
514     # optional callback to monitor status 
515     # of job
516     my $progress_interval = 0;
517     my $progress_callback = undef;
518     if ($#_ == 1) {
519         $progress_interval = shift;
520         $progress_callback = shift;
521         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
522         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
523     }
524
525     my $record_type;
526     my $num_added = 0;
527     my $num_updated = 0;
528     my $num_items_added = 0;
529     my $num_items_replaced = 0;
530     my $num_items_errored = 0;
531     my $num_ignored = 0;
532     # commit (i.e., save, all records in the batch)
533     SetImportBatchStatus($batch_id, 'importing');
534     my $overlay_action = GetImportBatchOverlayAction($batch_id);
535     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
536     my $item_action = GetImportBatchItemAction($batch_id);
537     my $item_tag;
538     my $item_subfield;
539     my $dbh = C4::Context->dbh;
540     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
541                              FROM import_records
542                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
543                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
544                              WHERE import_batch_id = ?");
545     $sth->execute($batch_id);
546     my $marcflavour = C4::Context->preference('marcflavour');
547
548     my $userenv = C4::Context->userenv;
549     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
550
551     my $rec_num = 0;
552
553     $schema->txn_begin; # We commit in a transaction
554     while (my $rowref = $sth->fetchrow_hashref) {
555         $record_type = $rowref->{'record_type'};
556
557         $rec_num++;
558
559         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
560             # report progress and commit
561             $schema->txn_commit;
562             &$progress_callback( $rec_num );
563             $schema->txn_begin;
564         }
565         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
566             $num_ignored++;
567             next;
568         }
569
570         my $marc_type;
571         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
572             $marc_type = 'UNIMARCAUTH';
573         } elsif ($marcflavour eq 'UNIMARC') {
574             $marc_type = 'UNIMARC';
575         } else {
576             $marc_type = 'USMARC';
577         }
578         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
579
580         if ($record_type eq 'biblio') {
581             # remove any item tags - rely on BatchCommitItems
582             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
583             foreach my $item_field ($marc_record->field($item_tag)) {
584                 $marc_record->delete_field($item_field);
585             }
586         }
587
588         my ($record_result, $item_result, $record_match) =
589             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
590                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
591
592         my $recordid;
593         my $query;
594         if ($record_result eq 'create_new') {
595             $num_added++;
596             if ($record_type eq 'biblio') {
597                 my $biblioitemnumber;
598                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
599                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
600                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
601                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
602                     $num_items_added += $bib_items_added;
603                     $num_items_replaced += $bib_items_replaced;
604                     $num_items_errored += $bib_items_errored;
605                 }
606             } else {
607                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
608                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
609             }
610             my $sth = $dbh->prepare_cached($query);
611             $sth->execute($recordid, $rowref->{'import_record_id'});
612             $sth->finish();
613             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
614         } elsif ($record_result eq 'replace') {
615             $num_updated++;
616             $recordid = $record_match;
617             my $oldxml;
618             if ($record_type eq 'biblio') {
619                 my $oldbiblio = Koha::Biblios->find( $recordid );
620                 $oldxml = GetXmlBiblio($recordid);
621
622                 # remove item fields so that they don't get
623                 # added again if record is reverted
624                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
625                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
626                 foreach my $item_field ($old_marc->field($item_tag)) {
627                     $old_marc->delete_field($item_field);
628                 }
629                 $oldxml = $old_marc->as_xml($marc_type);
630
631                 my $context = { source => 'batchimport' };
632                 if ($logged_in_patron) {
633                     $context->{categorycode} = $logged_in_patron->categorycode;
634                     $context->{userid} = $logged_in_patron->userid;
635                 }
636
637                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
638                     overlay_context => $context
639                 });
640                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
641
642                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
643                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
644                     $num_items_added += $bib_items_added;
645                     $num_items_replaced += $bib_items_replaced;
646                     $num_items_errored += $bib_items_errored;
647                 }
648             } else {
649                 $oldxml = GetAuthorityXML($recordid);
650
651                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
652                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
653             }
654             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
655             $sth->execute($oldxml, $rowref->{'import_record_id'});
656             $sth->finish();
657             my $sth2 = $dbh->prepare_cached($query);
658             $sth2->execute($recordid, $rowref->{'import_record_id'});
659             $sth2->finish();
660             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
661             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
662         } elsif ($record_result eq 'ignore') {
663             $recordid = $record_match;
664             $num_ignored++;
665             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
666                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
667                 $num_items_added += $bib_items_added;
668          $num_items_replaced += $bib_items_replaced;
669                 $num_items_errored += $bib_items_errored;
670                 # still need to record the matched biblionumber so that the
671                 # items can be reverted
672                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
673                 $sth2->execute($recordid, $rowref->{'import_record_id'});
674                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
675             }
676             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
677         }
678     }
679     $schema->txn_commit; # Commit final records that may not have hit callback threshold
680     $sth->finish();
681     SetImportBatchStatus($batch_id, 'imported');
682     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
683 }
684
685 =head2 BatchCommitItems
686
687   ($num_items_added, $num_items_errored) = 
688          BatchCommitItems($import_record_id, $biblionumber);
689
690 =cut
691
692 sub BatchCommitItems {
693     my ( $import_record_id, $biblionumber, $action ) = @_;
694
695     my $dbh = C4::Context->dbh;
696
697     my $num_items_added = 0;
698     my $num_items_errored = 0;
699     my $num_items_replaced = 0;
700
701     my $sth = $dbh->prepare( "
702         SELECT import_items_id, import_items.marcxml, encoding
703         FROM import_items
704         JOIN import_records USING (import_record_id)
705         WHERE import_record_id = ?
706         ORDER BY import_items_id
707     " );
708     $sth->bind_param( 1, $import_record_id );
709     $sth->execute();
710
711     while ( my $row = $sth->fetchrow_hashref() ) {
712         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
713
714         # Delete date_due subfield as to not accidentally delete item checkout due dates
715         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
716         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
717
718         my $item = TransformMarcToKoha( $item_marc );
719
720         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
721         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
722
723         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
724         if ( $action eq "replace" && $duplicate_itemnumber ) {
725             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
726             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
727             $updsth->bind_param( 1, 'imported' );
728             $updsth->bind_param( 2, $item->{itemnumber} );
729             $updsth->bind_param( 3, undef );
730             $updsth->bind_param( 4, $row->{'import_items_id'} );
731             $updsth->execute();
732             $updsth->finish();
733             $num_items_replaced++;
734         } elsif ( $action eq "replace" && $duplicate_barcode ) {
735             my $itemnumber = $duplicate_barcode->itemnumber;
736             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
737             $updsth->bind_param( 1, 'imported' );
738             $updsth->bind_param( 2, $item->{itemnumber} );
739             $updsth->bind_param( 3, undef );
740             $updsth->bind_param( 4, $row->{'import_items_id'} );
741             $updsth->execute();
742             $updsth->finish();
743             $num_items_replaced++;
744         } elsif ($duplicate_barcode) {
745             $updsth->bind_param( 1, 'error' );
746             $updsth->bind_param( 2, undef );
747             $updsth->bind_param( 3, 'duplicate item barcode' );
748             $updsth->bind_param( 4, $row->{'import_items_id'} );
749             $updsth->execute();
750             $num_items_errored++;
751         } else {
752             # Remove the itemnumber if it exists, we want to create a new item
753             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
754             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
755
756             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
757             if( $itemnumber ) {
758                 $updsth->bind_param( 1, 'imported' );
759                 $updsth->bind_param( 2, $itemnumber );
760                 $updsth->bind_param( 3, undef );
761                 $updsth->bind_param( 4, $row->{'import_items_id'} );
762                 $updsth->execute();
763                 $updsth->finish();
764                 $num_items_added++;
765             }
766         }
767     }
768
769     return ( $num_items_added, $num_items_replaced, $num_items_errored );
770 }
771
772 =head2 BatchRevertRecords
773
774   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
775       $num_ignored) = BatchRevertRecords($batch_id);
776
777 =cut
778
779 sub BatchRevertRecords {
780     my $batch_id = shift;
781
782     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
783
784     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
785
786     my $record_type;
787     my $num_deleted = 0;
788     my $num_errors = 0;
789     my $num_reverted = 0;
790     my $num_ignored = 0;
791     my $num_items_deleted = 0;
792     # commit (i.e., save, all records in the batch)
793     SetImportBatchStatus($batch_id, 'reverting');
794     my $overlay_action = GetImportBatchOverlayAction($batch_id);
795     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
796     my $dbh = C4::Context->dbh;
797     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
798                              FROM import_records
799                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
800                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
801                              WHERE import_batch_id = ?");
802     $sth->execute($batch_id);
803     my $marc_type;
804     my $marcflavour = C4::Context->preference('marcflavour');
805     while (my $rowref = $sth->fetchrow_hashref) {
806         $record_type = $rowref->{'record_type'};
807         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
808             $num_ignored++;
809             next;
810         }
811         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
812             $marc_type = 'UNIMARCAUTH';
813         } elsif ($marcflavour eq 'UNIMARC') {
814             $marc_type = 'UNIMARC';
815         } else {
816             $marc_type = 'USMARC';
817         }
818
819         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
820
821         if ($record_result eq 'delete') {
822             my $error = undef;
823             if  ($record_type eq 'biblio') {
824                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
825                 $error = DelBiblio($rowref->{'matched_biblionumber'});
826             } else {
827                 DelAuthority({ authid => $rowref->{'matched_authid'} });
828             }
829             if (defined $error) {
830                 $num_errors++;
831             } else {
832                 $num_deleted++;
833                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
834             }
835         } elsif ($record_result eq 'restore') {
836             $num_reverted++;
837             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
838             if ($record_type eq 'biblio') {
839                 my $biblionumber = $rowref->{'matched_biblionumber'};
840                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
841
842                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
843                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
844
845                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
846                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
847             } else {
848                 my $authid = $rowref->{'matched_authid'};
849                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
850             }
851             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
852         } elsif ($record_result eq 'ignore') {
853             if ($record_type eq 'biblio') {
854                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
855             }
856             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
857         }
858         my $query;
859         if ($record_type eq 'biblio') {
860             # remove matched_biblionumber only if there is no 'imported' item left
861             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
862             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
863         } else {
864             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
865         }
866         my $sth2 = $dbh->prepare_cached($query);
867         $sth2->execute($rowref->{'import_record_id'});
868     }
869
870     $sth->finish();
871     SetImportBatchStatus($batch_id, 'reverted');
872     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
873 }
874
875 =head2 BatchRevertItems
876
877   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
878
879 =cut
880
881 sub BatchRevertItems {
882     my ($import_record_id, $biblionumber) = @_;
883
884     my $dbh = C4::Context->dbh;
885     my $num_items_deleted = 0;
886
887     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
888                                    FROM import_items
889                                    JOIN items USING (itemnumber)
890                                    WHERE import_record_id = ?");
891     $sth->bind_param(1, $import_record_id);
892     $sth->execute();
893     while (my $row = $sth->fetchrow_hashref()) {
894         my $item = Koha::Items->find($row->{itemnumber});
895         if ($item->safe_delete){
896             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
897             $updsth->bind_param(1, 'reverted');
898             $updsth->bind_param(2, $row->{'import_items_id'});
899             $updsth->execute();
900             $updsth->finish();
901             $num_items_deleted++;
902         }
903         else {
904             next;
905         }
906     }
907     $sth->finish();
908     return $num_items_deleted;
909 }
910
911 =head2 CleanBatch
912
913   CleanBatch($batch_id)
914
915 Deletes all staged records from the import batch
916 and sets the status of the batch to 'cleaned'.  Note
917 that deleting a stage record does *not* affect
918 any record that has been committed to the database.
919
920 =cut
921
922 sub CleanBatch {
923     my $batch_id = shift;
924     return unless defined $batch_id;
925
926     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
927     SetImportBatchStatus($batch_id, 'cleaned');
928 }
929
930 =head2 DeleteBatch
931
932   DeleteBatch($batch_id)
933
934 Deletes the record from the database. This can only be done
935 once the batch has been cleaned.
936
937 =cut
938
939 sub DeleteBatch {
940     my $batch_id = shift;
941     return unless defined $batch_id;
942
943     my $dbh = C4::Context->dbh;
944     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
945     $sth->execute( $batch_id );
946 }
947
948 =head2 GetAllImportBatches
949
950   my $results = GetAllImportBatches();
951
952 Returns a references to an array of hash references corresponding
953 to all import_batches rows (of batch_type 'batch'), sorted in 
954 ascending order by import_batch_id.
955
956 =cut
957
958 sub  GetAllImportBatches {
959     my $dbh = C4::Context->dbh;
960     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
961                                     WHERE batch_type IN ('batch', 'webservice')
962                                     ORDER BY import_batch_id ASC");
963
964     my $results = [];
965     $sth->execute();
966     while (my $row = $sth->fetchrow_hashref) {
967         push @$results, $row;
968     }
969     $sth->finish();
970     return $results;
971 }
972
973 =head2 GetStagedWebserviceBatches
974
975   my $batch_ids = GetStagedWebserviceBatches();
976
977 Returns a references to an array of batch id's
978 of batch_type 'webservice' that are not imported
979
980 =cut
981
982 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
983 SELECT import_batch_id FROM import_batches
984 WHERE batch_type = 'webservice'
985 AND import_status = 'staged'
986 EOQ
987 sub  GetStagedWebserviceBatches {
988     my $dbh = C4::Context->dbh;
989     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
990 }
991
992 =head2 GetImportBatchRangeDesc
993
994   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
995
996 Returns a reference to an array of hash references corresponding to
997 import_batches rows (sorted in descending order by import_batch_id)
998 start at the given offset.
999
1000 =cut
1001
1002 sub GetImportBatchRangeDesc {
1003     my ($offset, $results_per_group) = @_;
1004
1005     my $dbh = C4::Context->dbh;
1006     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1007                                     LEFT JOIN import_batch_profiles p
1008                                     ON b.profile_id = p.id
1009                                     WHERE b.batch_type IN ('batch', 'webservice')
1010                                     ORDER BY b.import_batch_id DESC";
1011     my @params;
1012     if ($results_per_group){
1013         $query .= " LIMIT ?";
1014         push(@params, $results_per_group);
1015     }
1016     if ($offset){
1017         $query .= " OFFSET ?";
1018         push(@params, $offset);
1019     }
1020     my $sth = $dbh->prepare_cached($query);
1021     $sth->execute(@params);
1022     my $results = $sth->fetchall_arrayref({});
1023     $sth->finish();
1024     return $results;
1025 }
1026
1027 =head2 GetItemNumbersFromImportBatch
1028
1029   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1030
1031 =cut
1032
1033 sub GetItemNumbersFromImportBatch {
1034     my ($batch_id) = @_;
1035     my $dbh = C4::Context->dbh;
1036     my $sql = q|
1037 SELECT itemnumber FROM import_items
1038 INNER JOIN items USING (itemnumber)
1039 INNER JOIN import_records USING (import_record_id)
1040 WHERE import_batch_id = ?|;
1041     my  $sth = $dbh->prepare( $sql );
1042     $sth->execute($batch_id);
1043     my @items ;
1044     while ( my ($itm) = $sth->fetchrow_array ) {
1045         push @items, $itm;
1046     }
1047     return @items;
1048 }
1049
1050 =head2 GetNumberOfImportBatches
1051
1052   my $count = GetNumberOfImportBatches();
1053
1054 =cut
1055
1056 sub GetNumberOfNonZ3950ImportBatches {
1057     my $dbh = C4::Context->dbh;
1058     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1059     $sth->execute();
1060     my ($count) = $sth->fetchrow_array();
1061     $sth->finish();
1062     return $count;
1063 }
1064
1065 =head2 GetImportBiblios
1066
1067   my $results = GetImportBiblios($importid);
1068
1069 =cut
1070
1071 sub GetImportBiblios {
1072     my ($import_record_id) = @_;
1073
1074     my $dbh = C4::Context->dbh;
1075     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1076     return $dbh->selectall_arrayref(
1077         $query,
1078         { Slice => {} },
1079         $import_record_id
1080     );
1081
1082 }
1083
1084 =head2 GetImportRecordsRange
1085
1086   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1087
1088 Returns a reference to an array of hash references corresponding to
1089 import_biblios/import_auths/import_records rows for a given batch
1090 starting at the given offset.
1091
1092 =cut
1093
1094 sub GetImportRecordsRange {
1095     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1096
1097     my $dbh = C4::Context->dbh;
1098
1099     my $order_by = $parameters->{order_by} || 'import_record_id';
1100     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1101
1102     my $order_by_direction =
1103       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1104
1105     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1106
1107     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1108                                            record_sequence, status, overlay_status,
1109                                            matched_biblionumber, matched_authid, record_type
1110                                     FROM   import_records
1111                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1112                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1113                                     WHERE  import_batch_id = ?";
1114     my @params;
1115     push(@params, $batch_id);
1116     if ($status) {
1117         $query .= " AND status=?";
1118         push(@params,$status);
1119     }
1120
1121     $query.=" ORDER BY $order_by $order_by_direction";
1122
1123     if($results_per_group){
1124         $query .= " LIMIT ?";
1125         push(@params, $results_per_group);
1126     }
1127     if($offset){
1128         $query .= " OFFSET ?";
1129         push(@params, $offset);
1130     }
1131     my $sth = $dbh->prepare_cached($query);
1132     $sth->execute(@params);
1133     my $results = $sth->fetchall_arrayref({});
1134     $sth->finish();
1135     return $results;
1136
1137 }
1138
1139 =head2 GetBestRecordMatch
1140
1141   my $record_id = GetBestRecordMatch($import_record_id);
1142
1143 =cut
1144
1145 sub GetBestRecordMatch {
1146     my ($import_record_id) = @_;
1147
1148     my $dbh = C4::Context->dbh;
1149     my $sth = $dbh->prepare("SELECT candidate_match_id
1150                              FROM   import_record_matches
1151                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1152                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1153                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1154                              WHERE  import_record_matches.import_record_id = ? AND
1155                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1156                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1157                              AND chosen = 1
1158                              ORDER BY score DESC, candidate_match_id DESC");
1159     $sth->execute($import_record_id);
1160     my ($record_id) = $sth->fetchrow_array();
1161     $sth->finish();
1162     return $record_id;
1163 }
1164
1165 =head2 GetImportBatchStatus
1166
1167   my $status = GetImportBatchStatus($batch_id);
1168
1169 =cut
1170
1171 sub GetImportBatchStatus {
1172     my ($batch_id) = @_;
1173
1174     my $dbh = C4::Context->dbh;
1175     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1176     $sth->execute($batch_id);
1177     my ($status) = $sth->fetchrow_array();
1178     $sth->finish();
1179     return $status;
1180
1181 }
1182
1183 =head2 SetImportBatchStatus
1184
1185   SetImportBatchStatus($batch_id, $new_status);
1186
1187 =cut
1188
1189 sub SetImportBatchStatus {
1190     my ($batch_id, $new_status) = @_;
1191
1192     my $dbh = C4::Context->dbh;
1193     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1194     $sth->execute($new_status, $batch_id);
1195     $sth->finish();
1196
1197 }
1198
1199 =head2 SetMatchedBiblionumber
1200
1201   SetMatchedBiblionumber($import_record_id, $biblionumber);
1202
1203 =cut
1204
1205 sub SetMatchedBiblionumber {
1206     my ($import_record_id, $biblionumber) = @_;
1207
1208     my $dbh = C4::Context->dbh;
1209     $dbh->do(
1210         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1211         undef, $biblionumber, $import_record_id
1212     );
1213 }
1214
1215 =head2 GetImportBatchOverlayAction
1216
1217   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1218
1219 =cut
1220
1221 sub GetImportBatchOverlayAction {
1222     my ($batch_id) = @_;
1223
1224     my $dbh = C4::Context->dbh;
1225     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1226     $sth->execute($batch_id);
1227     my ($overlay_action) = $sth->fetchrow_array();
1228     $sth->finish();
1229     return $overlay_action;
1230
1231 }
1232
1233
1234 =head2 SetImportBatchOverlayAction
1235
1236   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1237
1238 =cut
1239
1240 sub SetImportBatchOverlayAction {
1241     my ($batch_id, $new_overlay_action) = @_;
1242
1243     my $dbh = C4::Context->dbh;
1244     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1245     $sth->execute($new_overlay_action, $batch_id);
1246     $sth->finish();
1247
1248 }
1249
1250 =head2 GetImportBatchNoMatchAction
1251
1252   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1253
1254 =cut
1255
1256 sub GetImportBatchNoMatchAction {
1257     my ($batch_id) = @_;
1258
1259     my $dbh = C4::Context->dbh;
1260     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1261     $sth->execute($batch_id);
1262     my ($nomatch_action) = $sth->fetchrow_array();
1263     $sth->finish();
1264     return $nomatch_action;
1265
1266 }
1267
1268
1269 =head2 SetImportBatchNoMatchAction
1270
1271   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1272
1273 =cut
1274
1275 sub SetImportBatchNoMatchAction {
1276     my ($batch_id, $new_nomatch_action) = @_;
1277
1278     my $dbh = C4::Context->dbh;
1279     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1280     $sth->execute($new_nomatch_action, $batch_id);
1281     $sth->finish();
1282
1283 }
1284
1285 =head2 GetImportBatchItemAction
1286
1287   my $item_action = GetImportBatchItemAction($batch_id);
1288
1289 =cut
1290
1291 sub GetImportBatchItemAction {
1292     my ($batch_id) = @_;
1293
1294     my $dbh = C4::Context->dbh;
1295     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1296     $sth->execute($batch_id);
1297     my ($item_action) = $sth->fetchrow_array();
1298     $sth->finish();
1299     return $item_action;
1300
1301 }
1302
1303
1304 =head2 SetImportBatchItemAction
1305
1306   SetImportBatchItemAction($batch_id, $new_item_action);
1307
1308 =cut
1309
1310 sub SetImportBatchItemAction {
1311     my ($batch_id, $new_item_action) = @_;
1312
1313     my $dbh = C4::Context->dbh;
1314     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1315     $sth->execute($new_item_action, $batch_id);
1316     $sth->finish();
1317
1318 }
1319
1320 =head2 GetImportBatchMatcher
1321
1322   my $matcher_id = GetImportBatchMatcher($batch_id);
1323
1324 =cut
1325
1326 sub GetImportBatchMatcher {
1327     my ($batch_id) = @_;
1328
1329     my $dbh = C4::Context->dbh;
1330     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1331     $sth->execute($batch_id);
1332     my ($matcher_id) = $sth->fetchrow_array();
1333     $sth->finish();
1334     return $matcher_id;
1335
1336 }
1337
1338
1339 =head2 SetImportBatchMatcher
1340
1341   SetImportBatchMatcher($batch_id, $new_matcher_id);
1342
1343 =cut
1344
1345 sub SetImportBatchMatcher {
1346     my ($batch_id, $new_matcher_id) = @_;
1347
1348     my $dbh = C4::Context->dbh;
1349     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1350     $sth->execute($new_matcher_id, $batch_id);
1351     $sth->finish();
1352
1353 }
1354
1355 =head2 GetImportRecordOverlayStatus
1356
1357   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1358
1359 =cut
1360
1361 sub GetImportRecordOverlayStatus {
1362     my ($import_record_id) = @_;
1363
1364     my $dbh = C4::Context->dbh;
1365     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1366     $sth->execute($import_record_id);
1367     my ($overlay_status) = $sth->fetchrow_array();
1368     $sth->finish();
1369     return $overlay_status;
1370
1371 }
1372
1373
1374 =head2 SetImportRecordOverlayStatus
1375
1376   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1377
1378 =cut
1379
1380 sub SetImportRecordOverlayStatus {
1381     my ($import_record_id, $new_overlay_status) = @_;
1382
1383     my $dbh = C4::Context->dbh;
1384     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1385     $sth->execute($new_overlay_status, $import_record_id);
1386     $sth->finish();
1387
1388 }
1389
1390 =head2 GetImportRecordStatus
1391
1392   my $status = GetImportRecordStatus($import_record_id);
1393
1394 =cut
1395
1396 sub GetImportRecordStatus {
1397     my ($import_record_id) = @_;
1398
1399     my $dbh = C4::Context->dbh;
1400     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1401     $sth->execute($import_record_id);
1402     my ($status) = $sth->fetchrow_array();
1403     $sth->finish();
1404     return $status;
1405
1406 }
1407
1408
1409 =head2 SetImportRecordStatus
1410
1411   SetImportRecordStatus($import_record_id, $new_status);
1412
1413 =cut
1414
1415 sub SetImportRecordStatus {
1416     my ($import_record_id, $new_status) = @_;
1417
1418     my $dbh = C4::Context->dbh;
1419     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1420     $sth->execute($new_status, $import_record_id);
1421     $sth->finish();
1422
1423 }
1424
1425 =head2 GetImportRecordMatches
1426
1427   my $results = GetImportRecordMatches($import_record_id, $best_only);
1428
1429 =cut
1430
1431 sub GetImportRecordMatches {
1432     my $import_record_id = shift;
1433     my $best_only = @_ ? shift : 0;
1434
1435     my $dbh = C4::Context->dbh;
1436     # FIXME currently biblio only
1437     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1438                                     candidate_match_id, score, record_type,
1439                                     chosen
1440                                     FROM import_records
1441                                     JOIN import_record_matches USING (import_record_id)
1442                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1443                                     WHERE import_record_id = ?
1444                                     ORDER BY score DESC, biblionumber DESC");
1445     $sth->bind_param(1, $import_record_id);
1446     my $results = [];
1447     $sth->execute();
1448     while (my $row = $sth->fetchrow_hashref) {
1449         if ($row->{'record_type'} eq 'auth') {
1450             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1451         }
1452         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1453         push @$results, $row;
1454         last if $best_only;
1455     }
1456     $sth->finish();
1457
1458     return $results;
1459     
1460 }
1461
1462 =head2 SetImportRecordMatches
1463
1464   SetImportRecordMatches($import_record_id, @matches);
1465
1466 =cut
1467
1468 sub SetImportRecordMatches {
1469     my $import_record_id = shift;
1470     my @matches = @_;
1471
1472     my $dbh = C4::Context->dbh;
1473     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1474     $delsth->execute($import_record_id);
1475     $delsth->finish();
1476
1477     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1478                                     VALUES (?, ?, ?, ?)");
1479     my $chosen = 1; #The first match is defaulted to be chosen
1480     foreach my $match (@matches) {
1481         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1482         $chosen = 0; #After the first we do not default to other matches
1483     }
1484 }
1485
1486 =head2 RecordsFromISO2709File
1487
1488     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1489
1490 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1491
1492 @PARAM1, String, absolute path to the ISO2709 file.
1493 @PARAM2, String, see stage_file.pl
1494 @PARAM3, String, should be utf8
1495
1496 Returns two array refs.
1497
1498 =cut
1499
1500 sub RecordsFromISO2709File {
1501     my ($input_file, $record_type, $encoding) = @_;
1502     my @errors;
1503
1504     my $marc_type = C4::Context->preference('marcflavour');
1505     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1506
1507     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1508     my @marc_records;
1509     $/ = "\035";
1510     while (<$fh>) {
1511         s/^\s+//;
1512         s/\s+$//;
1513         next unless $_; # skip if record has only whitespace, as might occur
1514                         # if file includes newlines between each MARC record
1515         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1516         push @marc_records, $marc_record;
1517         if ($charset_guessed ne $encoding) {
1518             push @errors,
1519                 "Unexpected charset $charset_guessed, expecting $encoding";
1520         }
1521     }
1522     close $fh;
1523     return ( \@errors, \@marc_records );
1524 }
1525
1526 =head2 RecordsFromMARCXMLFile
1527
1528     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1529
1530 Creates MARC::Record-objects out of the given MARCXML-file.
1531
1532 @PARAM1, String, absolute path to the ISO2709 file.
1533 @PARAM2, String, should be utf8
1534
1535 Returns two array refs.
1536
1537 =cut
1538
1539 sub RecordsFromMARCXMLFile {
1540     my ( $filename, $encoding ) = @_;
1541     my $batch = MARC::File::XML->in( $filename );
1542     my ( @marcRecords, @errors, $record );
1543     do {
1544         eval { $record = $batch->next( $encoding ); };
1545         if ($@) {
1546             push @errors, $@;
1547         }
1548         push @marcRecords, $record if $record;
1549     } while( $record );
1550     return (\@errors, \@marcRecords);
1551 }
1552
1553 =head2 RecordsFromMarcPlugin
1554
1555     Converts text of input_file into array of MARC records with to_marc plugin
1556
1557 =cut
1558
1559 sub RecordsFromMarcPlugin {
1560     my ($input_file, $plugin_class, $encoding) = @_;
1561     my ( $text, @return );
1562     return \@return if !$input_file || !$plugin_class;
1563
1564     # Read input file
1565     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1566     $/ = "\035";
1567     while (<$fh>) {
1568         s/^\s+//;
1569         s/\s+$//;
1570         next unless $_;
1571         $text .= $_;
1572     }
1573     close $fh;
1574
1575     # Convert to large MARC blob with plugin
1576     $text = Koha::Plugins::Handler->run({
1577         class  => $plugin_class,
1578         method => 'to_marc',
1579         params => { data => $text },
1580     }) if $text;
1581
1582     # Convert to array of MARC records
1583     if( $text ) {
1584         my $marc_type = C4::Context->preference('marcflavour');
1585         foreach my $blob ( split(/\x1D/, $text) ) {
1586             next if $blob =~ /^\s*$/;
1587             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1588             push @return, $marcrecord;
1589         }
1590     }
1591     return \@return;
1592 }
1593
1594 # internal functions
1595
1596 sub _create_import_record {
1597     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1598
1599     my $dbh = C4::Context->dbh;
1600     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1601                                                          record_type, encoding)
1602                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1603     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1604                   $record_type, $encoding);
1605     my $import_record_id = $dbh->{'mysql_insertid'};
1606     $sth->finish();
1607     return $import_record_id;
1608 }
1609
1610 sub _update_import_record_marc {
1611     my ($import_record_id, $marc_record, $marc_type) = @_;
1612
1613     my $dbh = C4::Context->dbh;
1614     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1615                              WHERE  import_record_id = ?");
1616     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1617     $sth->finish();
1618 }
1619
1620 sub _add_auth_fields {
1621     my ($import_record_id, $marc_record) = @_;
1622
1623     my $controlnumber;
1624     if ($marc_record->field('001')) {
1625         $controlnumber = $marc_record->field('001')->data();
1626     }
1627     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1628     my $dbh = C4::Context->dbh;
1629     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1630     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1631     $sth->finish();
1632 }
1633
1634 sub _add_biblio_fields {
1635     my ($import_record_id, $marc_record) = @_;
1636
1637     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1638     my $dbh = C4::Context->dbh;
1639     # FIXME no controlnumber, originalsource
1640     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1641     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1642     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1643     $sth->finish();
1644                 
1645 }
1646
1647 sub _update_biblio_fields {
1648     my ($import_record_id, $marc_record) = @_;
1649
1650     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1651     my $dbh = C4::Context->dbh;
1652     # FIXME no controlnumber, originalsource
1653     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1654     $isbn =~ s/\(.*$//;
1655     $isbn =~ tr/ -_//;
1656     $isbn = uc $isbn;
1657     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1658                              WHERE  import_record_id = ?");
1659     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1660     $sth->finish();
1661 }
1662
1663 sub _parse_biblio_fields {
1664     my ($marc_record) = @_;
1665
1666     my $dbh = C4::Context->dbh;
1667     my $bibliofields = TransformMarcToKoha($marc_record, '');
1668     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1669
1670 }
1671
1672 sub _update_batch_record_counts {
1673     my ($batch_id) = @_;
1674
1675     my $dbh = C4::Context->dbh;
1676     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1677                                         num_records = (
1678                                             SELECT COUNT(*)
1679                                             FROM import_records
1680                                             WHERE import_batch_id = import_batches.import_batch_id),
1681                                         num_items = (
1682                                             SELECT COUNT(*)
1683                                             FROM import_records
1684                                             JOIN import_items USING (import_record_id)
1685                                             WHERE import_batch_id = import_batches.import_batch_id
1686                                             AND record_type = 'biblio')
1687                                     WHERE import_batch_id = ?");
1688     $sth->bind_param(1, $batch_id);
1689     $sth->execute();
1690     $sth->finish();
1691 }
1692
1693 sub _get_commit_action {
1694     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1695     
1696     if ($record_type eq 'biblio') {
1697         my ($bib_result, $bib_match, $item_result);
1698
1699         $bib_match = GetBestRecordMatch($import_record_id);
1700         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1701
1702             $bib_result = $overlay_action;
1703
1704             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1705                 $item_result = 'create_new';
1706             } elsif($item_action eq 'replace'){
1707                 $item_result = 'replace';
1708             } else {
1709                 $item_result = 'ignore';
1710             }
1711
1712         } else {
1713             $bib_result = $nomatch_action;
1714             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1715         }
1716         return ($bib_result, $item_result, $bib_match);
1717     } else { # must be auths
1718         my ($auth_result, $auth_match);
1719
1720         $auth_match = GetBestRecordMatch($import_record_id);
1721         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1722             $auth_result = $overlay_action;
1723         } else {
1724             $auth_result = $nomatch_action;
1725         }
1726
1727         return ($auth_result, undef, $auth_match);
1728
1729     }
1730 }
1731
1732 sub _get_revert_action {
1733     my ($overlay_action, $overlay_status, $status) = @_;
1734
1735     my $bib_result;
1736
1737     if ($status eq 'ignored') {
1738         $bib_result = 'ignore';
1739     } else {
1740         if ($overlay_action eq 'create_new') {
1741             $bib_result = 'delete';
1742         } else {
1743             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1744         }
1745     }
1746     return $bib_result;
1747 }
1748
1749 1;
1750 __END__
1751
1752 =head1 AUTHOR
1753
1754 Koha Development Team <http://koha-community.org/>
1755
1756 Galen Charlton <galen.charlton@liblime.com>
1757
1758 =cut