Bug 23019: (follow-up 2) set table name to import_batch_profiles
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Items;
31 use Koha::Plugins::Handler;
32 use Koha::Logger;
33
34 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
35
36 BEGIN {
37         require Exporter;
38         @ISA    = qw(Exporter);
39         @EXPORT = qw(
40     GetZ3950BatchId
41     GetWebserviceBatchId
42     GetImportRecordMarc
43     GetImportRecordMarcXML
44     AddImportBatch
45     GetImportBatch
46     AddAuthToBatch
47     AddBiblioToBatch
48     AddItemsToImportBiblio
49     ModAuthorityInBatch
50     ModBiblioInBatch
51
52     BatchStageMarcRecords
53     BatchFindDuplicates
54     BatchCommitRecords
55     BatchRevertRecords
56     CleanBatch
57     DeleteBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     GetImportRecordMatches
82     SetImportRecordMatches
83         );
84 }
85
86 =head1 NAME
87
88 C4::ImportBatch - manage batches of imported MARC records
89
90 =head1 SYNOPSIS
91
92 use C4::ImportBatch;
93
94 =head1 FUNCTIONS
95
96 =head2 GetZ3950BatchId
97
98   my $batchid = GetZ3950BatchId($z3950server);
99
100 Retrieves the ID of the import batch for the Z39.50
101 reservoir for the given target.  If necessary,
102 creates the import batch.
103
104 =cut
105
106 sub GetZ3950BatchId {
107     my ($z3950server) = @_;
108
109     my $dbh = C4::Context->dbh;
110     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
111                              WHERE  batch_type = 'z3950'
112                              AND    file_name = ?");
113     $sth->execute($z3950server);
114     my $rowref = $sth->fetchrow_arrayref();
115     $sth->finish();
116     if (defined $rowref) {
117         return $rowref->[0];
118     } else {
119         my $batch_id = AddImportBatch( {
120                 overlay_action => 'create_new',
121                 import_status => 'staged',
122                 batch_type => 'z3950',
123                 file_name => $z3950server,
124             } );
125         return $batch_id;
126     }
127     
128 }
129
130 =head2 GetWebserviceBatchId
131
132   my $batchid = GetWebserviceBatchId();
133
134 Retrieves the ID of the import batch for webservice.
135 If necessary, creates the import batch.
136
137 =cut
138
139 my $WEBSERVICE_BASE_QRY = <<EOQ;
140 SELECT import_batch_id FROM import_batches
141 WHERE  batch_type = 'webservice'
142 AND    import_status = 'staged'
143 EOQ
144 sub GetWebserviceBatchId {
145     my ($params) = @_;
146
147     my $dbh = C4::Context->dbh;
148     my $sql = $WEBSERVICE_BASE_QRY;
149     my @args;
150     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
151         if (my $val = $params->{$field}) {
152             $sql .= " AND $field = ?";
153             push @args, $val;
154         }
155     }
156     my $id = $dbh->selectrow_array($sql, undef, @args);
157     return $id if $id;
158
159     $params->{batch_type} = 'webservice';
160     $params->{import_status} = 'staged';
161     return AddImportBatch($params);
162 }
163
164 =head2 GetImportRecordMarc
165
166   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
167
168 =cut
169
170 sub GetImportRecordMarc {
171     my ($import_record_id) = @_;
172
173     my $dbh = C4::Context->dbh;
174     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
175         SELECT marc, encoding
176         FROM import_records
177         WHERE import_record_id = ?
178     |, undef, $import_record_id );
179
180     return $marc, $encoding;
181 }
182
183 sub GetRecordFromImportBiblio {
184     my ( $import_record_id, $embed_items ) = @_;
185
186     my ($marc) = GetImportRecordMarc($import_record_id);
187     my $record = MARC::Record->new_from_usmarc($marc);
188
189     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
190
191     return $record;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 GetImportRecordMarcXML
213
214   my $marcxml = GetImportRecordMarcXML($import_record_id);
215
216 =cut
217
218 sub GetImportRecordMarcXML {
219     my ($import_record_id) = @_;
220
221     my $dbh = C4::Context->dbh;
222     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
223     $sth->execute($import_record_id);
224     my ($marcxml) = $sth->fetchrow();
225     $sth->finish();
226     return $marcxml;
227
228 }
229
230 =head2 AddImportBatch
231
232   my $batch_id = AddImportBatch($params_hash);
233
234 =cut
235
236 sub AddImportBatch {
237     my ($params) = @_;
238
239     my (@fields, @vals);
240     foreach (qw( matcher_id template_id branchcode
241                  overlay_action nomatch_action item_action
242                  import_status batch_type file_name comments record_type )) {
243         if (exists $params->{$_}) {
244             push @fields, $_;
245             push @vals, $params->{$_};
246         }
247     }
248     my $dbh = C4::Context->dbh;
249     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
250                                   VALUES (".join( ',', map '?', @fields).")",
251              undef,
252              @vals);
253     return $dbh->{'mysql_insertid'};
254 }
255
256 =head2 GetImportBatch 
257
258   my $row = GetImportBatch($batch_id);
259
260 Retrieve a hashref of an import_batches row.
261
262 =cut
263
264 sub GetImportBatch {
265     my ($batch_id) = @_;
266
267     my $dbh = C4::Context->dbh;
268     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
269     $sth->bind_param(1, $batch_id);
270     $sth->execute();
271     my $result = $sth->fetchrow_hashref;
272     $sth->finish();
273     return $result;
274
275 }
276
277 =head2 AddBiblioToBatch 
278
279   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
280                 $marc_record, $encoding, $update_counts);
281
282 =cut
283
284 sub AddBiblioToBatch {
285     my $batch_id = shift;
286     my $record_sequence = shift;
287     my $marc_record = shift;
288     my $encoding = shift;
289     my $update_counts = @_ ? shift : 1;
290
291     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
292     _add_biblio_fields($import_record_id, $marc_record);
293     _update_batch_record_counts($batch_id) if $update_counts;
294     return $import_record_id;
295 }
296
297 =head2 ModBiblioInBatch
298
299   ModBiblioInBatch($import_record_id, $marc_record);
300
301 =cut
302
303 sub ModBiblioInBatch {
304     my ($import_record_id, $marc_record) = @_;
305
306     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
307     _update_biblio_fields($import_record_id, $marc_record);
308
309 }
310
311 =head2 AddAuthToBatch
312
313   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
314                 $marc_record, $encoding, $update_counts, [$marc_type]);
315
316 =cut
317
318 sub AddAuthToBatch {
319     my $batch_id = shift;
320     my $record_sequence = shift;
321     my $marc_record = shift;
322     my $encoding = shift;
323     my $update_counts = @_ ? shift : 1;
324     my $marc_type = shift || C4::Context->preference('marcflavour');
325
326     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
327
328     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
329     _add_auth_fields($import_record_id, $marc_record);
330     _update_batch_record_counts($batch_id) if $update_counts;
331     return $import_record_id;
332 }
333
334 =head2 ModAuthInBatch
335
336   ModAuthInBatch($import_record_id, $marc_record);
337
338 =cut
339
340 sub ModAuthInBatch {
341     my ($import_record_id, $marc_record) = @_;
342
343     my $marcflavour = C4::Context->preference('marcflavour');
344     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
345
346 }
347
348 =head2 BatchStageMarcRecords
349
350 ( $batch_id, $num_records, $num_items, @invalid_records ) =
351   BatchStageMarcRecords(
352     $record_type,                $encoding,
353     $marc_records,               $file_name,
354     $marc_modification_template, $comments,
355     $branch_code,                $parse_items,
356     $leave_as_staging,           $progress_interval,
357     $progress_callback
358   );
359
360 =cut
361
362 sub BatchStageMarcRecords {
363     my $record_type = shift;
364     my $encoding = shift;
365     my $marc_records = shift;
366     my $file_name = shift;
367     my $marc_modification_template = shift;
368     my $comments = shift;
369     my $branch_code = shift;
370     my $parse_items = shift;
371     my $leave_as_staging = shift;
372
373     # optional callback to monitor status 
374     # of job
375     my $progress_interval = 0;
376     my $progress_callback = undef;
377     if ($#_ == 1) {
378         $progress_interval = shift;
379         $progress_callback = shift;
380         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
381         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
382     } 
383     
384     my $batch_id = AddImportBatch( {
385             overlay_action => 'create_new',
386             import_status => 'staging',
387             batch_type => 'batch',
388             file_name => $file_name,
389             comments => $comments,
390             record_type => $record_type,
391         } );
392     if ($parse_items) {
393         SetImportBatchItemAction($batch_id, 'always_add');
394     } else {
395         SetImportBatchItemAction($batch_id, 'ignore');
396     }
397
398
399     my $marc_type = C4::Context->preference('marcflavour');
400     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
401     my @invalid_records = ();
402     my $num_valid = 0;
403     my $num_items = 0;
404     # FIXME - for now, we're dealing only with bibs
405     my $rec_num = 0;
406     foreach my $marc_record (@$marc_records) {
407         $rec_num++;
408         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
409             &$progress_callback($rec_num);
410         }
411
412         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
413
414         my $import_record_id;
415         if (scalar($marc_record->fields()) == 0) {
416             push @invalid_records, $marc_record;
417         } else {
418
419             # Normalize the record so it doesn't have separated diacritics
420             SetUTF8Flag($marc_record);
421
422             $num_valid++;
423             if ($record_type eq 'biblio') {
424                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
425                 if ($parse_items) {
426                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
427                     $num_items += scalar(@import_items_ids);
428                 }
429             } elsif ($record_type eq 'auth') {
430                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
431             }
432         }
433     }
434     unless ($leave_as_staging) {
435         SetImportBatchStatus($batch_id, 'staged');
436     }
437     # FIXME branch_code, number of bibs, number of items
438     _update_batch_record_counts($batch_id);
439     return ($batch_id, $num_valid, $num_items, @invalid_records);
440 }
441
442 =head2 AddItemsToImportBiblio
443
444   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
445                 $import_record_id, $marc_record, $update_counts);
446
447 =cut
448
449 sub AddItemsToImportBiblio {
450     my $batch_id = shift;
451     my $import_record_id = shift;
452     my $marc_record = shift;
453     my $update_counts = @_ ? shift : 0;
454
455     my @import_items_ids = ();
456    
457     my $dbh = C4::Context->dbh; 
458     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
459     foreach my $item_field ($marc_record->field($item_tag)) {
460         my $item_marc = MARC::Record->new();
461         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
462         $item_marc->append_fields($item_field);
463         $marc_record->delete_field($item_field);
464         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
465                                         VALUES (?, ?, ?)");
466         $sth->bind_param(1, $import_record_id);
467         $sth->bind_param(2, 'staged');
468         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
469         $sth->execute();
470         push @import_items_ids, $dbh->{'mysql_insertid'};
471         $sth->finish();
472     }
473
474     if ($#import_items_ids > -1) {
475         _update_batch_record_counts($batch_id) if $update_counts;
476         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
477     }
478     return @import_items_ids;
479 }
480
481 =head2 BatchFindDuplicates
482
483   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
484              $max_matches, $progress_interval, $progress_callback);
485
486 Goes through the records loaded in the batch and attempts to 
487 find duplicates for each one.  Sets the matching status 
488 of each record to "no_match" or "auto_match" as appropriate.
489
490 The $max_matches parameter is optional; if it is not supplied,
491 it defaults to 10.
492
493 The $progress_interval and $progress_callback parameters are 
494 optional; if both are supplied, the sub referred to by
495 $progress_callback will be invoked every $progress_interval
496 records using the number of records processed as the 
497 singular argument.
498
499 =cut
500
501 sub BatchFindDuplicates {
502     my $batch_id = shift;
503     my $matcher = shift;
504     my $max_matches = @_ ? shift : 10;
505
506     # optional callback to monitor status 
507     # of job
508     my $progress_interval = 0;
509     my $progress_callback = undef;
510     if ($#_ == 1) {
511         $progress_interval = shift;
512         $progress_callback = shift;
513         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
514         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
515     }
516
517     my $dbh = C4::Context->dbh;
518
519     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
520                              FROM import_records
521                              WHERE import_batch_id = ?");
522     $sth->execute($batch_id);
523     my $num_with_matches = 0;
524     my $rec_num = 0;
525     while (my $rowref = $sth->fetchrow_hashref) {
526         $rec_num++;
527         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
528             &$progress_callback($rec_num);
529         }
530         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
531         my @matches = ();
532         if (defined $matcher) {
533             @matches = $matcher->get_matches($marc_record, $max_matches);
534         }
535         if (scalar(@matches) > 0) {
536             $num_with_matches++;
537             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
538             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
539         } else {
540             SetImportRecordMatches($rowref->{'import_record_id'}, ());
541             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
542         }
543     }
544     $sth->finish();
545     return $num_with_matches;
546 }
547
548 =head2 BatchCommitRecords
549
550   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
551         BatchCommitRecords($batch_id, $framework,
552         $progress_interval, $progress_callback);
553
554 =cut
555
556 sub BatchCommitRecords {
557     my $batch_id = shift;
558     my $framework = shift;
559
560     # optional callback to monitor status 
561     # of job
562     my $progress_interval = 0;
563     my $progress_callback = undef;
564     if ($#_ == 1) {
565         $progress_interval = shift;
566         $progress_callback = shift;
567         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
568         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
569     }
570
571     my $record_type;
572     my $num_added = 0;
573     my $num_updated = 0;
574     my $num_items_added = 0;
575     my $num_items_replaced = 0;
576     my $num_items_errored = 0;
577     my $num_ignored = 0;
578     # commit (i.e., save, all records in the batch)
579     SetImportBatchStatus('importing');
580     my $overlay_action = GetImportBatchOverlayAction($batch_id);
581     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
582     my $item_action = GetImportBatchItemAction($batch_id);
583     my $item_tag;
584     my $item_subfield;
585     my $dbh = C4::Context->dbh;
586     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
587                              FROM import_records
588                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
589                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
590                              WHERE import_batch_id = ?");
591     $sth->execute($batch_id);
592     my $marcflavour = C4::Context->preference('marcflavour');
593     my $rec_num = 0;
594     while (my $rowref = $sth->fetchrow_hashref) {
595         $record_type = $rowref->{'record_type'};
596         $rec_num++;
597         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
598             &$progress_callback($rec_num);
599         }
600         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
601             $num_ignored++;
602             next;
603         }
604
605         my $marc_type;
606         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
607             $marc_type = 'UNIMARCAUTH';
608         } elsif ($marcflavour eq 'UNIMARC') {
609             $marc_type = 'UNIMARC';
610         } else {
611             $marc_type = 'USMARC';
612         }
613         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
614
615         if ($record_type eq 'biblio') {
616             # remove any item tags - rely on BatchCommitItems
617             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
618             foreach my $item_field ($marc_record->field($item_tag)) {
619                 $marc_record->delete_field($item_field);
620             }
621         }
622
623         my ($record_result, $item_result, $record_match) =
624             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
625                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
626
627         my $recordid;
628         my $query;
629         if ($record_result eq 'create_new') {
630             $num_added++;
631             if ($record_type eq 'biblio') {
632                 my $biblioitemnumber;
633                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
634                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
635                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
636                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
637                     $num_items_added += $bib_items_added;
638                     $num_items_replaced += $bib_items_replaced;
639                     $num_items_errored += $bib_items_errored;
640                 }
641             } else {
642                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
643                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
644             }
645             my $sth = $dbh->prepare_cached($query);
646             $sth->execute($recordid, $rowref->{'import_record_id'});
647             $sth->finish();
648             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
649         } elsif ($record_result eq 'replace') {
650             $num_updated++;
651             $recordid = $record_match;
652             my $oldxml;
653             if ($record_type eq 'biblio') {
654                 my $oldbiblio = Koha::Biblios->find( $recordid );
655                 $oldxml = GetXmlBiblio($recordid);
656
657                 # remove item fields so that they don't get
658                 # added again if record is reverted
659                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
660                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
661                 foreach my $item_field ($old_marc->field($item_tag)) {
662                     $old_marc->delete_field($item_field);
663                 }
664                 $oldxml = $old_marc->as_xml($marc_type);
665
666                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
667                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
668
669                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
670                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
671                     $num_items_added += $bib_items_added;
672                     $num_items_replaced += $bib_items_replaced;
673                     $num_items_errored += $bib_items_errored;
674                 }
675             } else {
676                 $oldxml = GetAuthorityXML($recordid);
677
678                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
679                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
680             }
681             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
682             $sth->execute($oldxml, $rowref->{'import_record_id'});
683             $sth->finish();
684             my $sth2 = $dbh->prepare_cached($query);
685             $sth2->execute($recordid, $rowref->{'import_record_id'});
686             $sth2->finish();
687             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
688             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
689         } elsif ($record_result eq 'ignore') {
690             $recordid = $record_match;
691             $num_ignored++;
692             $recordid = $record_match;
693             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
694                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
695                 $num_items_added += $bib_items_added;
696          $num_items_replaced += $bib_items_replaced;
697                 $num_items_errored += $bib_items_errored;
698                 # still need to record the matched biblionumber so that the
699                 # items can be reverted
700                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
701                 $sth2->execute($recordid, $rowref->{'import_record_id'});
702                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
703             }
704             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
705         }
706     }
707     $sth->finish();
708     SetImportBatchStatus($batch_id, 'imported');
709     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
710 }
711
712 =head2 BatchCommitItems
713
714   ($num_items_added, $num_items_errored) = 
715          BatchCommitItems($import_record_id, $biblionumber);
716
717 =cut
718
719 sub BatchCommitItems {
720     my ( $import_record_id, $biblionumber, $action ) = @_;
721
722     my $dbh = C4::Context->dbh;
723
724     my $num_items_added = 0;
725     my $num_items_errored = 0;
726     my $num_items_replaced = 0;
727
728     my $sth = $dbh->prepare( "
729         SELECT import_items_id, import_items.marcxml, encoding
730         FROM import_items
731         JOIN import_records USING (import_record_id)
732         WHERE import_record_id = ?
733         ORDER BY import_items_id
734     " );
735     $sth->bind_param( 1, $import_record_id );
736     $sth->execute();
737
738     while ( my $row = $sth->fetchrow_hashref() ) {
739         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
740
741         # Delete date_due subfield as to not accidentally delete item checkout due dates
742         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
743         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
744
745         my $item = TransformMarcToKoha( $item_marc );
746
747         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
748         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
749
750         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
751         if ( $action eq "replace" && $duplicate_itemnumber ) {
752             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
753             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
754             $updsth->bind_param( 1, 'imported' );
755             $updsth->bind_param( 2, $item->{itemnumber} );
756             $updsth->bind_param( 3, $row->{'import_items_id'} );
757             $updsth->execute();
758             $updsth->finish();
759             $num_items_replaced++;
760         } elsif ( $action eq "replace" && $duplicate_barcode ) {
761             my $itemnumber = $duplicate_barcode->itemnumber;
762             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
763             $updsth->bind_param( 1, 'imported' );
764             $updsth->bind_param( 2, $item->{itemnumber} );
765             $updsth->bind_param( 3, $row->{'import_items_id'} );
766             $updsth->execute();
767             $updsth->finish();
768             $num_items_replaced++;
769         } elsif ($duplicate_barcode) {
770             $updsth->bind_param( 1, 'error' );
771             $updsth->bind_param( 2, 'duplicate item barcode' );
772             $updsth->bind_param( 3, $row->{'import_items_id'} );
773             $updsth->execute();
774             $num_items_errored++;
775         } else {
776             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
777             if( $itemnumber ) {
778                 $updsth->bind_param( 1, 'imported' );
779                 $updsth->bind_param( 2, $itemnumber );
780                 $updsth->bind_param( 3, $row->{'import_items_id'} );
781                 $updsth->execute();
782                 $updsth->finish();
783                 $num_items_added++;
784             }
785         }
786     }
787
788     return ( $num_items_added, $num_items_replaced, $num_items_errored );
789 }
790
791 =head2 BatchRevertRecords
792
793   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
794       $num_ignored) = BatchRevertRecords($batch_id);
795
796 =cut
797
798 sub BatchRevertRecords {
799     my $batch_id = shift;
800
801     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
802
803     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
804
805     my $record_type;
806     my $num_deleted = 0;
807     my $num_errors = 0;
808     my $num_reverted = 0;
809     my $num_ignored = 0;
810     my $num_items_deleted = 0;
811     # commit (i.e., save, all records in the batch)
812     SetImportBatchStatus('reverting');
813     my $overlay_action = GetImportBatchOverlayAction($batch_id);
814     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
815     my $dbh = C4::Context->dbh;
816     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
817                              FROM import_records
818                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
819                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
820                              WHERE import_batch_id = ?");
821     $sth->execute($batch_id);
822     my $marc_type;
823     my $marcflavour = C4::Context->preference('marcflavour');
824     while (my $rowref = $sth->fetchrow_hashref) {
825         $record_type = $rowref->{'record_type'};
826         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
827             $num_ignored++;
828             next;
829         }
830         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
831             $marc_type = 'UNIMARCAUTH';
832         } elsif ($marcflavour eq 'UNIMARC') {
833             $marc_type = 'UNIMARC';
834         } else {
835             $marc_type = 'USMARC';
836         }
837
838         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
839
840         if ($record_result eq 'delete') {
841             my $error = undef;
842             if  ($record_type eq 'biblio') {
843                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
844                 $error = DelBiblio($rowref->{'matched_biblionumber'});
845             } else {
846                 DelAuthority({ authid => $rowref->{'matched_authid'} });
847             }
848             if (defined $error) {
849                 $num_errors++;
850             } else {
851                 $num_deleted++;
852                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
853             }
854         } elsif ($record_result eq 'restore') {
855             $num_reverted++;
856             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
857             if ($record_type eq 'biblio') {
858                 my $biblionumber = $rowref->{'matched_biblionumber'};
859                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
860
861                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
862                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
863
864                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
865                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
866             } else {
867                 my $authid = $rowref->{'matched_authid'};
868                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
869             }
870             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
871         } elsif ($record_result eq 'ignore') {
872             if ($record_type eq 'biblio') {
873                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
874             }
875             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
876         }
877         my $query;
878         if ($record_type eq 'biblio') {
879             # remove matched_biblionumber only if there is no 'imported' item left
880             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
881             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
882         } else {
883             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
884         }
885         my $sth2 = $dbh->prepare_cached($query);
886         $sth2->execute($rowref->{'import_record_id'});
887     }
888
889     $sth->finish();
890     SetImportBatchStatus($batch_id, 'reverted');
891     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
892 }
893
894 =head2 BatchRevertItems
895
896   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
897
898 =cut
899
900 sub BatchRevertItems {
901     my ($import_record_id, $biblionumber) = @_;
902
903     my $dbh = C4::Context->dbh;
904     my $num_items_deleted = 0;
905
906     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
907                                    FROM import_items
908                                    JOIN items USING (itemnumber)
909                                    WHERE import_record_id = ?");
910     $sth->bind_param(1, $import_record_id);
911     $sth->execute();
912     while (my $row = $sth->fetchrow_hashref()) {
913         my $item = Koha::Items->find($row->{itemnumber});
914         my $error = $item->safe_delete;
915         if ($error eq '1'){
916             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
917             $updsth->bind_param(1, 'reverted');
918             $updsth->bind_param(2, $row->{'import_items_id'});
919             $updsth->execute();
920             $updsth->finish();
921             $num_items_deleted++;
922         }
923         else {
924             next;
925         }
926     }
927     $sth->finish();
928     return $num_items_deleted;
929 }
930
931 =head2 CleanBatch
932
933   CleanBatch($batch_id)
934
935 Deletes all staged records from the import batch
936 and sets the status of the batch to 'cleaned'.  Note
937 that deleting a stage record does *not* affect
938 any record that has been committed to the database.
939
940 =cut
941
942 sub CleanBatch {
943     my $batch_id = shift;
944     return unless defined $batch_id;
945
946     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
947     SetImportBatchStatus($batch_id, 'cleaned');
948 }
949
950 =head2 DeleteBatch
951
952   DeleteBatch($batch_id)
953
954 Deletes the record from the database. This can only be done
955 once the batch has been cleaned.
956
957 =cut
958
959 sub DeleteBatch {
960     my $batch_id = shift;
961     return unless defined $batch_id;
962
963     my $dbh = C4::Context->dbh;
964     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
965     $sth->execute( $batch_id );
966 }
967
968 =head2 GetAllImportBatches
969
970   my $results = GetAllImportBatches();
971
972 Returns a references to an array of hash references corresponding
973 to all import_batches rows (of batch_type 'batch'), sorted in 
974 ascending order by import_batch_id.
975
976 =cut
977
978 sub  GetAllImportBatches {
979     my $dbh = C4::Context->dbh;
980     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
981                                     WHERE batch_type IN ('batch', 'webservice')
982                                     ORDER BY import_batch_id ASC");
983
984     my $results = [];
985     $sth->execute();
986     while (my $row = $sth->fetchrow_hashref) {
987         push @$results, $row;
988     }
989     $sth->finish();
990     return $results;
991 }
992
993 =head2 GetStagedWebserviceBatches
994
995   my $batch_ids = GetStagedWebserviceBatches();
996
997 Returns a references to an array of batch id's
998 of batch_type 'webservice' that are not imported
999
1000 =cut
1001
1002 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1003 SELECT import_batch_id FROM import_batches
1004 WHERE batch_type = 'webservice'
1005 AND import_status = 'staged'
1006 EOQ
1007 sub  GetStagedWebserviceBatches {
1008     my $dbh = C4::Context->dbh;
1009     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1010 }
1011
1012 =head2 GetImportBatchRangeDesc
1013
1014   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1015
1016 Returns a reference to an array of hash references corresponding to
1017 import_batches rows (sorted in descending order by import_batch_id)
1018 start at the given offset.
1019
1020 =cut
1021
1022 sub GetImportBatchRangeDesc {
1023     my ($offset, $results_per_group) = @_;
1024
1025     my $dbh = C4::Context->dbh;
1026     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1027                                     LEFT JOIN import_batch_profiles p
1028                                     ON b.profile_id = p.id
1029                                     WHERE b.batch_type IN ('batch', 'webservice')
1030                                     ORDER BY b.import_batch_id DESC";
1031     my @params;
1032     if ($results_per_group){
1033         $query .= " LIMIT ?";
1034         push(@params, $results_per_group);
1035     }
1036     if ($offset){
1037         $query .= " OFFSET ?";
1038         push(@params, $offset);
1039     }
1040     my $sth = $dbh->prepare_cached($query);
1041     $sth->execute(@params);
1042     my $results = $sth->fetchall_arrayref({});
1043     $sth->finish();
1044     return $results;
1045 }
1046
1047 =head2 GetItemNumbersFromImportBatch
1048
1049   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1050
1051 =cut
1052
1053 sub GetItemNumbersFromImportBatch {
1054     my ($batch_id) = @_;
1055     my $dbh = C4::Context->dbh;
1056     my $sql = q|
1057 SELECT itemnumber FROM import_items
1058 INNER JOIN items USING (itemnumber)
1059 INNER JOIN import_records USING (import_record_id)
1060 WHERE import_batch_id = ?|;
1061     my  $sth = $dbh->prepare( $sql );
1062     $sth->execute($batch_id);
1063     my @items ;
1064     while ( my ($itm) = $sth->fetchrow_array ) {
1065         push @items, $itm;
1066     }
1067     return @items;
1068 }
1069
1070 =head2 GetNumberOfImportBatches
1071
1072   my $count = GetNumberOfImportBatches();
1073
1074 =cut
1075
1076 sub GetNumberOfNonZ3950ImportBatches {
1077     my $dbh = C4::Context->dbh;
1078     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1079     $sth->execute();
1080     my ($count) = $sth->fetchrow_array();
1081     $sth->finish();
1082     return $count;
1083 }
1084
1085 =head2 GetImportBiblios
1086
1087   my $results = GetImportBiblios($importid);
1088
1089 =cut
1090
1091 sub GetImportBiblios {
1092     my ($import_record_id) = @_;
1093
1094     my $dbh = C4::Context->dbh;
1095     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1096     return $dbh->selectall_arrayref(
1097         $query,
1098         { Slice => {} },
1099         $import_record_id
1100     );
1101
1102 }
1103
1104 =head2 GetImportRecordsRange
1105
1106   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1107
1108 Returns a reference to an array of hash references corresponding to
1109 import_biblios/import_auths/import_records rows for a given batch
1110 starting at the given offset.
1111
1112 =cut
1113
1114 sub GetImportRecordsRange {
1115     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1116
1117     my $dbh = C4::Context->dbh;
1118
1119     my $order_by = $parameters->{order_by} || 'import_record_id';
1120     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1121
1122     my $order_by_direction =
1123       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1124
1125     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1126
1127     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1128                                            record_sequence, status, overlay_status,
1129                                            matched_biblionumber, matched_authid, record_type
1130                                     FROM   import_records
1131                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1132                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1133                                     WHERE  import_batch_id = ?";
1134     my @params;
1135     push(@params, $batch_id);
1136     if ($status) {
1137         $query .= " AND status=?";
1138         push(@params,$status);
1139     }
1140
1141     $query.=" ORDER BY $order_by $order_by_direction";
1142
1143     if($results_per_group){
1144         $query .= " LIMIT ?";
1145         push(@params, $results_per_group);
1146     }
1147     if($offset){
1148         $query .= " OFFSET ?";
1149         push(@params, $offset);
1150     }
1151     my $sth = $dbh->prepare_cached($query);
1152     $sth->execute(@params);
1153     my $results = $sth->fetchall_arrayref({});
1154     $sth->finish();
1155     return $results;
1156
1157 }
1158
1159 =head2 GetBestRecordMatch
1160
1161   my $record_id = GetBestRecordMatch($import_record_id);
1162
1163 =cut
1164
1165 sub GetBestRecordMatch {
1166     my ($import_record_id) = @_;
1167
1168     my $dbh = C4::Context->dbh;
1169     my $sth = $dbh->prepare("SELECT candidate_match_id
1170                              FROM   import_record_matches
1171                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1172                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1173                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1174                              WHERE  import_record_matches.import_record_id = ? AND
1175                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1176                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1177                              ORDER BY score DESC, candidate_match_id DESC");
1178     $sth->execute($import_record_id);
1179     my ($record_id) = $sth->fetchrow_array();
1180     $sth->finish();
1181     return $record_id;
1182 }
1183
1184 =head2 GetImportBatchStatus
1185
1186   my $status = GetImportBatchStatus($batch_id);
1187
1188 =cut
1189
1190 sub GetImportBatchStatus {
1191     my ($batch_id) = @_;
1192
1193     my $dbh = C4::Context->dbh;
1194     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1195     $sth->execute($batch_id);
1196     my ($status) = $sth->fetchrow_array();
1197     $sth->finish();
1198     return $status;
1199
1200 }
1201
1202 =head2 SetImportBatchStatus
1203
1204   SetImportBatchStatus($batch_id, $new_status);
1205
1206 =cut
1207
1208 sub SetImportBatchStatus {
1209     my ($batch_id, $new_status) = @_;
1210
1211     my $dbh = C4::Context->dbh;
1212     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1213     $sth->execute($new_status, $batch_id);
1214     $sth->finish();
1215
1216 }
1217
1218 =head2 GetImportBatchOverlayAction
1219
1220   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1221
1222 =cut
1223
1224 sub GetImportBatchOverlayAction {
1225     my ($batch_id) = @_;
1226
1227     my $dbh = C4::Context->dbh;
1228     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1229     $sth->execute($batch_id);
1230     my ($overlay_action) = $sth->fetchrow_array();
1231     $sth->finish();
1232     return $overlay_action;
1233
1234 }
1235
1236
1237 =head2 SetImportBatchOverlayAction
1238
1239   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1240
1241 =cut
1242
1243 sub SetImportBatchOverlayAction {
1244     my ($batch_id, $new_overlay_action) = @_;
1245
1246     my $dbh = C4::Context->dbh;
1247     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1248     $sth->execute($new_overlay_action, $batch_id);
1249     $sth->finish();
1250
1251 }
1252
1253 =head2 GetImportBatchNoMatchAction
1254
1255   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1256
1257 =cut
1258
1259 sub GetImportBatchNoMatchAction {
1260     my ($batch_id) = @_;
1261
1262     my $dbh = C4::Context->dbh;
1263     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1264     $sth->execute($batch_id);
1265     my ($nomatch_action) = $sth->fetchrow_array();
1266     $sth->finish();
1267     return $nomatch_action;
1268
1269 }
1270
1271
1272 =head2 SetImportBatchNoMatchAction
1273
1274   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1275
1276 =cut
1277
1278 sub SetImportBatchNoMatchAction {
1279     my ($batch_id, $new_nomatch_action) = @_;
1280
1281     my $dbh = C4::Context->dbh;
1282     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1283     $sth->execute($new_nomatch_action, $batch_id);
1284     $sth->finish();
1285
1286 }
1287
1288 =head2 GetImportBatchItemAction
1289
1290   my $item_action = GetImportBatchItemAction($batch_id);
1291
1292 =cut
1293
1294 sub GetImportBatchItemAction {
1295     my ($batch_id) = @_;
1296
1297     my $dbh = C4::Context->dbh;
1298     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1299     $sth->execute($batch_id);
1300     my ($item_action) = $sth->fetchrow_array();
1301     $sth->finish();
1302     return $item_action;
1303
1304 }
1305
1306
1307 =head2 SetImportBatchItemAction
1308
1309   SetImportBatchItemAction($batch_id, $new_item_action);
1310
1311 =cut
1312
1313 sub SetImportBatchItemAction {
1314     my ($batch_id, $new_item_action) = @_;
1315
1316     my $dbh = C4::Context->dbh;
1317     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1318     $sth->execute($new_item_action, $batch_id);
1319     $sth->finish();
1320
1321 }
1322
1323 =head2 GetImportBatchMatcher
1324
1325   my $matcher_id = GetImportBatchMatcher($batch_id);
1326
1327 =cut
1328
1329 sub GetImportBatchMatcher {
1330     my ($batch_id) = @_;
1331
1332     my $dbh = C4::Context->dbh;
1333     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1334     $sth->execute($batch_id);
1335     my ($matcher_id) = $sth->fetchrow_array();
1336     $sth->finish();
1337     return $matcher_id;
1338
1339 }
1340
1341
1342 =head2 SetImportBatchMatcher
1343
1344   SetImportBatchMatcher($batch_id, $new_matcher_id);
1345
1346 =cut
1347
1348 sub SetImportBatchMatcher {
1349     my ($batch_id, $new_matcher_id) = @_;
1350
1351     my $dbh = C4::Context->dbh;
1352     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1353     $sth->execute($new_matcher_id, $batch_id);
1354     $sth->finish();
1355
1356 }
1357
1358 =head2 GetImportRecordOverlayStatus
1359
1360   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1361
1362 =cut
1363
1364 sub GetImportRecordOverlayStatus {
1365     my ($import_record_id) = @_;
1366
1367     my $dbh = C4::Context->dbh;
1368     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1369     $sth->execute($import_record_id);
1370     my ($overlay_status) = $sth->fetchrow_array();
1371     $sth->finish();
1372     return $overlay_status;
1373
1374 }
1375
1376
1377 =head2 SetImportRecordOverlayStatus
1378
1379   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1380
1381 =cut
1382
1383 sub SetImportRecordOverlayStatus {
1384     my ($import_record_id, $new_overlay_status) = @_;
1385
1386     my $dbh = C4::Context->dbh;
1387     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1388     $sth->execute($new_overlay_status, $import_record_id);
1389     $sth->finish();
1390
1391 }
1392
1393 =head2 GetImportRecordStatus
1394
1395   my $status = GetImportRecordStatus($import_record_id);
1396
1397 =cut
1398
1399 sub GetImportRecordStatus {
1400     my ($import_record_id) = @_;
1401
1402     my $dbh = C4::Context->dbh;
1403     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1404     $sth->execute($import_record_id);
1405     my ($status) = $sth->fetchrow_array();
1406     $sth->finish();
1407     return $status;
1408
1409 }
1410
1411
1412 =head2 SetImportRecordStatus
1413
1414   SetImportRecordStatus($import_record_id, $new_status);
1415
1416 =cut
1417
1418 sub SetImportRecordStatus {
1419     my ($import_record_id, $new_status) = @_;
1420
1421     my $dbh = C4::Context->dbh;
1422     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1423     $sth->execute($new_status, $import_record_id);
1424     $sth->finish();
1425
1426 }
1427
1428 =head2 GetImportRecordMatches
1429
1430   my $results = GetImportRecordMatches($import_record_id, $best_only);
1431
1432 =cut
1433
1434 sub GetImportRecordMatches {
1435     my $import_record_id = shift;
1436     my $best_only = @_ ? shift : 0;
1437
1438     my $dbh = C4::Context->dbh;
1439     # FIXME currently biblio only
1440     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1441                                     candidate_match_id, score, record_type
1442                                     FROM import_records
1443                                     JOIN import_record_matches USING (import_record_id)
1444                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1445                                     WHERE import_record_id = ?
1446                                     ORDER BY score DESC, biblionumber DESC");
1447     $sth->bind_param(1, $import_record_id);
1448     my $results = [];
1449     $sth->execute();
1450     while (my $row = $sth->fetchrow_hashref) {
1451         if ($row->{'record_type'} eq 'auth') {
1452             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1453         }
1454         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1455         push @$results, $row;
1456         last if $best_only;
1457     }
1458     $sth->finish();
1459
1460     return $results;
1461     
1462 }
1463
1464 =head2 SetImportRecordMatches
1465
1466   SetImportRecordMatches($import_record_id, @matches);
1467
1468 =cut
1469
1470 sub SetImportRecordMatches {
1471     my $import_record_id = shift;
1472     my @matches = @_;
1473
1474     my $dbh = C4::Context->dbh;
1475     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1476     $delsth->execute($import_record_id);
1477     $delsth->finish();
1478
1479     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1480                                     VALUES (?, ?, ?)");
1481     foreach my $match (@matches) {
1482         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1483     }
1484 }
1485
1486 =head2 RecordsFromISO2709File
1487
1488     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1489
1490 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1491
1492 @PARAM1, String, absolute path to the ISO2709 file.
1493 @PARAM2, String, see stage_file.pl
1494 @PARAM3, String, should be utf8
1495
1496 Returns two array refs.
1497
1498 =cut
1499
1500 sub RecordsFromISO2709File {
1501     my ($input_file, $record_type, $encoding) = @_;
1502     my @errors;
1503
1504     my $marc_type = C4::Context->preference('marcflavour');
1505     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1506
1507     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1508     my @marc_records;
1509     $/ = "\035";
1510     while (<$fh>) {
1511         s/^\s+//;
1512         s/\s+$//;
1513         next unless $_; # skip if record has only whitespace, as might occur
1514                         # if file includes newlines between each MARC record
1515         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1516         push @marc_records, $marc_record;
1517         if ($charset_guessed ne $encoding) {
1518             push @errors,
1519                 "Unexpected charset $charset_guessed, expecting $encoding";
1520         }
1521     }
1522     close $fh;
1523     return ( \@errors, \@marc_records );
1524 }
1525
1526 =head2 RecordsFromMARCXMLFile
1527
1528     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1529
1530 Creates MARC::Record-objects out of the given MARCXML-file.
1531
1532 @PARAM1, String, absolute path to the ISO2709 file.
1533 @PARAM2, String, should be utf8
1534
1535 Returns two array refs.
1536
1537 =cut
1538
1539 sub RecordsFromMARCXMLFile {
1540     my ( $filename, $encoding ) = @_;
1541     my $batch = MARC::File::XML->in( $filename );
1542     my ( @marcRecords, @errors, $record );
1543     do {
1544         eval { $record = $batch->next( $encoding ); };
1545         if ($@) {
1546             push @errors, $@;
1547         }
1548         push @marcRecords, $record if $record;
1549     } while( $record );
1550     return (\@errors, \@marcRecords);
1551 }
1552
1553 =head2 RecordsFromMarcPlugin
1554
1555     Converts text of input_file into array of MARC records with to_marc plugin
1556
1557 =cut
1558
1559 sub RecordsFromMarcPlugin {
1560     my ($input_file, $plugin_class, $encoding) = @_;
1561     my ( $text, @return );
1562     return \@return if !$input_file || !$plugin_class;
1563
1564     # Read input file
1565     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1566     $/ = "\035";
1567     while (<$fh>) {
1568         s/^\s+//;
1569         s/\s+$//;
1570         next unless $_;
1571         $text .= $_;
1572     }
1573     close $fh;
1574
1575     # Convert to large MARC blob with plugin
1576     $text = Koha::Plugins::Handler->run({
1577         class  => $plugin_class,
1578         method => 'to_marc',
1579         params => { data => $text },
1580     }) if $text;
1581
1582     # Convert to array of MARC records
1583     if( $text ) {
1584         my $marc_type = C4::Context->preference('marcflavour');
1585         foreach my $blob ( split(/\x1D/, $text) ) {
1586             next if $blob =~ /^\s*$/;
1587             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1588             push @return, $marcrecord;
1589         }
1590     }
1591     return \@return;
1592 }
1593
1594 # internal functions
1595
1596 sub _create_import_record {
1597     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1598
1599     my $dbh = C4::Context->dbh;
1600     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1601                                                          record_type, encoding)
1602                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1603     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1604                   $record_type, $encoding);
1605     my $import_record_id = $dbh->{'mysql_insertid'};
1606     $sth->finish();
1607     return $import_record_id;
1608 }
1609
1610 sub _update_import_record_marc {
1611     my ($import_record_id, $marc_record, $marc_type) = @_;
1612
1613     my $dbh = C4::Context->dbh;
1614     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1615                              WHERE  import_record_id = ?");
1616     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1617     $sth->finish();
1618 }
1619
1620 sub _add_auth_fields {
1621     my ($import_record_id, $marc_record) = @_;
1622
1623     my $controlnumber;
1624     if ($marc_record->field('001')) {
1625         $controlnumber = $marc_record->field('001')->data();
1626     }
1627     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1628     my $dbh = C4::Context->dbh;
1629     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1630     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1631     $sth->finish();
1632 }
1633
1634 sub _add_biblio_fields {
1635     my ($import_record_id, $marc_record) = @_;
1636
1637     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1638     my $dbh = C4::Context->dbh;
1639     # FIXME no controlnumber, originalsource
1640     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1641     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1642     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1643     $sth->finish();
1644                 
1645 }
1646
1647 sub _update_biblio_fields {
1648     my ($import_record_id, $marc_record) = @_;
1649
1650     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1651     my $dbh = C4::Context->dbh;
1652     # FIXME no controlnumber, originalsource
1653     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1654     $isbn =~ s/\(.*$//;
1655     $isbn =~ tr/ -_//;
1656     $isbn = uc $isbn;
1657     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1658                              WHERE  import_record_id = ?");
1659     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1660     $sth->finish();
1661 }
1662
1663 sub _parse_biblio_fields {
1664     my ($marc_record) = @_;
1665
1666     my $dbh = C4::Context->dbh;
1667     my $bibliofields = TransformMarcToKoha($marc_record, '');
1668     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1669
1670 }
1671
1672 sub _update_batch_record_counts {
1673     my ($batch_id) = @_;
1674
1675     my $dbh = C4::Context->dbh;
1676     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1677                                         num_records = (
1678                                             SELECT COUNT(*)
1679                                             FROM import_records
1680                                             WHERE import_batch_id = import_batches.import_batch_id),
1681                                         num_items = (
1682                                             SELECT COUNT(*)
1683                                             FROM import_records
1684                                             JOIN import_items USING (import_record_id)
1685                                             WHERE import_batch_id = import_batches.import_batch_id
1686                                             AND record_type = 'biblio')
1687                                     WHERE import_batch_id = ?");
1688     $sth->bind_param(1, $batch_id);
1689     $sth->execute();
1690     $sth->finish();
1691 }
1692
1693 sub _get_commit_action {
1694     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1695     
1696     if ($record_type eq 'biblio') {
1697         my ($bib_result, $bib_match, $item_result);
1698
1699         if ($overlay_status ne 'no_match') {
1700             $bib_match = GetBestRecordMatch($import_record_id);
1701             if ($overlay_action eq 'replace') {
1702                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1703             } elsif ($overlay_action eq 'create_new') {
1704                 $bib_result  = 'create_new';
1705             } elsif ($overlay_action eq 'ignore') {
1706                 $bib_result  = 'ignore';
1707             }
1708          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1709                 $item_result = 'create_new';
1710        }
1711       elsif($item_action eq 'replace'){
1712           $item_result = 'replace';
1713           }
1714       else {
1715              $item_result = 'ignore';
1716            }
1717         } else {
1718             $bib_result = $nomatch_action;
1719             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1720         }
1721         return ($bib_result, $item_result, $bib_match);
1722     } else { # must be auths
1723         my ($auth_result, $auth_match);
1724
1725         if ($overlay_status ne 'no_match') {
1726             $auth_match = GetBestRecordMatch($import_record_id);
1727             if ($overlay_action eq 'replace') {
1728                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1729             } elsif ($overlay_action eq 'create_new') {
1730                 $auth_result  = 'create_new';
1731             } elsif ($overlay_action eq 'ignore') {
1732                 $auth_result  = 'ignore';
1733             }
1734         } else {
1735             $auth_result = $nomatch_action;
1736         }
1737
1738         return ($auth_result, undef, $auth_match);
1739
1740     }
1741 }
1742
1743 sub _get_revert_action {
1744     my ($overlay_action, $overlay_status, $status) = @_;
1745
1746     my $bib_result;
1747
1748     if ($status eq 'ignored') {
1749         $bib_result = 'ignore';
1750     } else {
1751         if ($overlay_action eq 'create_new') {
1752             $bib_result = 'delete';
1753         } else {
1754             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1755         }
1756     }
1757     return $bib_result;
1758 }
1759
1760 1;
1761 __END__
1762
1763 =head1 AUTHOR
1764
1765 Koha Development Team <http://koha-community.org/>
1766
1767 Galen Charlton <galen.charlton@liblime.com>
1768
1769 =cut