Bug 18605: Remove TRUNCATE from C4/HoldsQueue.pm
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Plugins::Handler;
31 use Koha::Logger;
32
33 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
34
35 BEGIN {
36         require Exporter;
37         @ISA    = qw(Exporter);
38         @EXPORT = qw(
39     GetZ3950BatchId
40     GetWebserviceBatchId
41     GetImportRecordMarc
42     GetImportRecordMarcXML
43     AddImportBatch
44     GetImportBatch
45     AddAuthToBatch
46     AddBiblioToBatch
47     AddItemsToImportBiblio
48     ModAuthorityInBatch
49     ModBiblioInBatch
50
51     BatchStageMarcRecords
52     BatchFindDuplicates
53     BatchCommitRecords
54     BatchRevertRecords
55     CleanBatch
56     DeleteBatch
57
58     GetAllImportBatches
59     GetStagedWebserviceBatches
60     GetImportBatchRangeDesc
61     GetNumberOfNonZ3950ImportBatches
62     GetImportBiblios
63     GetImportRecordsRange
64         GetItemNumbersFromImportBatch
65     
66     GetImportBatchStatus
67     SetImportBatchStatus
68     GetImportBatchOverlayAction
69     SetImportBatchOverlayAction
70     GetImportBatchNoMatchAction
71     SetImportBatchNoMatchAction
72     GetImportBatchItemAction
73     SetImportBatchItemAction
74     GetImportBatchMatcher
75     SetImportBatchMatcher
76     GetImportRecordOverlayStatus
77     SetImportRecordOverlayStatus
78     GetImportRecordStatus
79     SetImportRecordStatus
80     GetImportRecordMatches
81     SetImportRecordMatches
82         );
83 }
84
85 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
86
87 =head1 NAME
88
89 C4::ImportBatch - manage batches of imported MARC records
90
91 =head1 SYNOPSIS
92
93 use C4::ImportBatch;
94
95 =head1 FUNCTIONS
96
97 =head2 GetZ3950BatchId
98
99   my $batchid = GetZ3950BatchId($z3950server);
100
101 Retrieves the ID of the import batch for the Z39.50
102 reservoir for the given target.  If necessary,
103 creates the import batch.
104
105 =cut
106
107 sub GetZ3950BatchId {
108     my ($z3950server) = @_;
109
110     my $dbh = C4::Context->dbh;
111     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
112                              WHERE  batch_type = 'z3950'
113                              AND    file_name = ?");
114     $sth->execute($z3950server);
115     my $rowref = $sth->fetchrow_arrayref();
116     $sth->finish();
117     if (defined $rowref) {
118         return $rowref->[0];
119     } else {
120         my $batch_id = AddImportBatch( {
121                 overlay_action => 'create_new',
122                 import_status => 'staged',
123                 batch_type => 'z3950',
124                 file_name => $z3950server,
125             } );
126         return $batch_id;
127     }
128     
129 }
130
131 =head2 GetWebserviceBatchId
132
133   my $batchid = GetWebserviceBatchId();
134
135 Retrieves the ID of the import batch for webservice.
136 If necessary, creates the import batch.
137
138 =cut
139
140 my $WEBSERVICE_BASE_QRY = <<EOQ;
141 SELECT import_batch_id FROM import_batches
142 WHERE  batch_type = 'webservice'
143 AND    import_status = 'staged'
144 EOQ
145 sub GetWebserviceBatchId {
146     my ($params) = @_;
147
148     my $dbh = C4::Context->dbh;
149     my $sql = $WEBSERVICE_BASE_QRY;
150     my @args;
151     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
152         if (my $val = $params->{$field}) {
153             $sql .= " AND $field = ?";
154             push @args, $val;
155         }
156     }
157     my $id = $dbh->selectrow_array($sql, undef, @args);
158     return $id if $id;
159
160     $params->{batch_type} = 'webservice';
161     $params->{import_status} = 'staged';
162     return AddImportBatch($params);
163 }
164
165 =head2 GetImportRecordMarc
166
167   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
168
169 =cut
170
171 sub GetImportRecordMarc {
172     my ($import_record_id) = @_;
173
174     my $dbh = C4::Context->dbh;
175     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
176         SELECT marc, encoding
177         FROM import_records
178         WHERE import_record_id = ?
179     |, undef, $import_record_id );
180
181     return $marc, $encoding;
182 }
183
184 sub GetRecordFromImportBiblio {
185     my ( $import_record_id, $embed_items ) = @_;
186
187     my ($marc) = GetImportRecordMarc($import_record_id);
188     my $record = MARC::Record->new_from_usmarc($marc);
189
190     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
191
192     return $record;
193 }
194
195 sub EmbedItemsInImportBiblio {
196     my ( $record, $import_record_id ) = @_;
197     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
198     my $dbh = C4::Context->dbh;
199     my $import_items = $dbh->selectall_arrayref(q|
200         SELECT import_items.marcxml
201         FROM import_items
202         WHERE import_record_id = ?
203     |, { Slice => {} }, $import_record_id );
204     my @item_fields;
205     for my $import_item ( @$import_items ) {
206         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
207         push @item_fields, $item_marc->field($itemtag);
208     }
209     $record->append_fields(@item_fields);
210     return $record;
211 }
212
213 =head2 GetImportRecordMarcXML
214
215   my $marcxml = GetImportRecordMarcXML($import_record_id);
216
217 =cut
218
219 sub GetImportRecordMarcXML {
220     my ($import_record_id) = @_;
221
222     my $dbh = C4::Context->dbh;
223     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
224     $sth->execute($import_record_id);
225     my ($marcxml) = $sth->fetchrow();
226     $sth->finish();
227     return $marcxml;
228
229 }
230
231 =head2 AddImportBatch
232
233   my $batch_id = AddImportBatch($params_hash);
234
235 =cut
236
237 sub AddImportBatch {
238     my ($params) = @_;
239
240     my (@fields, @vals);
241     foreach (qw( matcher_id template_id branchcode
242                  overlay_action nomatch_action item_action
243                  import_status batch_type file_name comments record_type )) {
244         if (exists $params->{$_}) {
245             push @fields, $_;
246             push @vals, $params->{$_};
247         }
248     }
249     my $dbh = C4::Context->dbh;
250     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
251                                   VALUES (".join( ',', map '?', @fields).")",
252              undef,
253              @vals);
254     return $dbh->{'mysql_insertid'};
255 }
256
257 =head2 GetImportBatch 
258
259   my $row = GetImportBatch($batch_id);
260
261 Retrieve a hashref of an import_batches row.
262
263 =cut
264
265 sub GetImportBatch {
266     my ($batch_id) = @_;
267
268     my $dbh = C4::Context->dbh;
269     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
270     $sth->bind_param(1, $batch_id);
271     $sth->execute();
272     my $result = $sth->fetchrow_hashref;
273     $sth->finish();
274     return $result;
275
276 }
277
278 =head2 AddBiblioToBatch 
279
280   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
281                 $marc_record, $encoding, $z3950random, $update_counts);
282
283 =cut
284
285 sub AddBiblioToBatch {
286     my $batch_id = shift;
287     my $record_sequence = shift;
288     my $marc_record = shift;
289     my $encoding = shift;
290     my $z3950random = shift;
291     my $update_counts = @_ ? shift : 1;
292
293     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
294     _add_biblio_fields($import_record_id, $marc_record);
295     _update_batch_record_counts($batch_id) if $update_counts;
296     return $import_record_id;
297 }
298
299 =head2 ModBiblioInBatch
300
301   ModBiblioInBatch($import_record_id, $marc_record);
302
303 =cut
304
305 sub ModBiblioInBatch {
306     my ($import_record_id, $marc_record) = @_;
307
308     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
309     _update_biblio_fields($import_record_id, $marc_record);
310
311 }
312
313 =head2 AddAuthToBatch
314
315   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
316                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
317
318 =cut
319
320 sub AddAuthToBatch {
321     my $batch_id = shift;
322     my $record_sequence = shift;
323     my $marc_record = shift;
324     my $encoding = shift;
325     my $z3950random = shift;
326     my $update_counts = @_ ? shift : 1;
327     my $marc_type = shift || C4::Context->preference('marcflavour');
328
329     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
330
331     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
332     _add_auth_fields($import_record_id, $marc_record);
333     _update_batch_record_counts($batch_id) if $update_counts;
334     return $import_record_id;
335 }
336
337 =head2 ModAuthInBatch
338
339   ModAuthInBatch($import_record_id, $marc_record);
340
341 =cut
342
343 sub ModAuthInBatch {
344     my ($import_record_id, $marc_record) = @_;
345
346     my $marcflavour = C4::Context->preference('marcflavour');
347     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
348
349 }
350
351 =head2 BatchStageMarcRecords
352
353 ( $batch_id, $num_records, $num_items, @invalid_records ) =
354   BatchStageMarcRecords(
355     $encoding,                   $marc_records,
356     $file_name,                  $to_marc_plugin,
357     $marc_modification_template, $comments,
358     $branch_code,                $parse_items,
359     $leave_as_staging,           $progress_interval,
360     $progress_callback
361   );
362
363 =cut
364
365 sub BatchStageMarcRecords {
366     my $record_type = shift;
367     my $encoding = shift;
368     my $marc_records = shift;
369     my $file_name = shift;
370     my $to_marc_plugin = shift;
371     my $marc_modification_template = shift;
372     my $comments = shift;
373     my $branch_code = shift;
374     my $parse_items = shift;
375     my $leave_as_staging = shift;
376
377     # optional callback to monitor status 
378     # of job
379     my $progress_interval = 0;
380     my $progress_callback = undef;
381     if ($#_ == 1) {
382         $progress_interval = shift;
383         $progress_callback = shift;
384         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
385         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
386     } 
387     
388     my $batch_id = AddImportBatch( {
389             overlay_action => 'create_new',
390             import_status => 'staging',
391             batch_type => 'batch',
392             file_name => $file_name,
393             comments => $comments,
394             record_type => $record_type,
395         } );
396     if ($parse_items) {
397         SetImportBatchItemAction($batch_id, 'always_add');
398     } else {
399         SetImportBatchItemAction($batch_id, 'ignore');
400     }
401
402     $marc_records = Koha::Plugins::Handler->run(
403         {
404             class  => $to_marc_plugin,
405             method => 'to_marc',
406             params => { data => $marc_records }
407         }
408     ) if $to_marc_plugin;
409
410     my $marc_type = C4::Context->preference('marcflavour');
411     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
412     my @invalid_records = ();
413     my $num_valid = 0;
414     my $num_items = 0;
415     # FIXME - for now, we're dealing only with bibs
416     my $rec_num = 0;
417     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
418         $marc_blob =~ s/^\s+//g;
419         $marc_blob =~ s/\s+$//g;
420         next unless $marc_blob;
421         $rec_num++;
422         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
423             &$progress_callback($rec_num);
424         }
425         my ($marc_record, $charset_guessed, $char_errors) =
426             MarcToUTF8Record($marc_blob, $marc_type, $encoding);
427
428         $encoding = $charset_guessed unless $encoding;
429
430         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
431
432         my $import_record_id;
433         if (scalar($marc_record->fields()) == 0) {
434             push @invalid_records, $marc_blob;
435         } else {
436
437             # Normalize the record so it doesn't have separated diacritics
438             SetUTF8Flag($marc_record);
439
440             $num_valid++;
441             if ($record_type eq 'biblio') {
442                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
443                 if ($parse_items) {
444                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
445                     $num_items += scalar(@import_items_ids);
446                 }
447             } elsif ($record_type eq 'auth') {
448                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
449             }
450         }
451     }
452     unless ($leave_as_staging) {
453         SetImportBatchStatus($batch_id, 'staged');
454     }
455     # FIXME branch_code, number of bibs, number of items
456     _update_batch_record_counts($batch_id);
457     return ($batch_id, $num_valid, $num_items, @invalid_records);
458 }
459
460 =head2 AddItemsToImportBiblio
461
462   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
463                 $import_record_id, $marc_record, $update_counts);
464
465 =cut
466
467 sub AddItemsToImportBiblio {
468     my $batch_id = shift;
469     my $import_record_id = shift;
470     my $marc_record = shift;
471     my $update_counts = @_ ? shift : 0;
472
473     my @import_items_ids = ();
474    
475     my $dbh = C4::Context->dbh; 
476     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
477     foreach my $item_field ($marc_record->field($item_tag)) {
478         my $item_marc = MARC::Record->new();
479         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
480         $item_marc->append_fields($item_field);
481         $marc_record->delete_field($item_field);
482         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
483                                         VALUES (?, ?, ?)");
484         $sth->bind_param(1, $import_record_id);
485         $sth->bind_param(2, 'staged');
486         $sth->bind_param(3, $item_marc->as_xml());
487         $sth->execute();
488         push @import_items_ids, $dbh->{'mysql_insertid'};
489         $sth->finish();
490     }
491
492     if ($#import_items_ids > -1) {
493         _update_batch_record_counts($batch_id) if $update_counts;
494         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
495     }
496     return @import_items_ids;
497 }
498
499 =head2 BatchFindDuplicates
500
501   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
502              $max_matches, $progress_interval, $progress_callback);
503
504 Goes through the records loaded in the batch and attempts to 
505 find duplicates for each one.  Sets the matching status 
506 of each record to "no_match" or "auto_match" as appropriate.
507
508 The $max_matches parameter is optional; if it is not supplied,
509 it defaults to 10.
510
511 The $progress_interval and $progress_callback parameters are 
512 optional; if both are supplied, the sub referred to by
513 $progress_callback will be invoked every $progress_interval
514 records using the number of records processed as the 
515 singular argument.
516
517 =cut
518
519 sub BatchFindDuplicates {
520     my $batch_id = shift;
521     my $matcher = shift;
522     my $max_matches = @_ ? shift : 10;
523
524     # optional callback to monitor status 
525     # of job
526     my $progress_interval = 0;
527     my $progress_callback = undef;
528     if ($#_ == 1) {
529         $progress_interval = shift;
530         $progress_callback = shift;
531         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
532         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
533     }
534
535     my $dbh = C4::Context->dbh;
536
537     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
538                              FROM import_records
539                              WHERE import_batch_id = ?");
540     $sth->execute($batch_id);
541     my $num_with_matches = 0;
542     my $rec_num = 0;
543     while (my $rowref = $sth->fetchrow_hashref) {
544         $rec_num++;
545         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
546             &$progress_callback($rec_num);
547         }
548         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
549         my @matches = ();
550         if (defined $matcher) {
551             @matches = $matcher->get_matches($marc_record, $max_matches);
552         }
553         if (scalar(@matches) > 0) {
554             $num_with_matches++;
555             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
556             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
557         } else {
558             SetImportRecordMatches($rowref->{'import_record_id'}, ());
559             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
560         }
561     }
562     $sth->finish();
563     return $num_with_matches;
564 }
565
566 =head2 BatchCommitRecords
567
568   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
569         BatchCommitRecords($batch_id, $framework,
570         $progress_interval, $progress_callback);
571
572 =cut
573
574 sub BatchCommitRecords {
575     my $batch_id = shift;
576     my $framework = shift;
577
578     # optional callback to monitor status 
579     # of job
580     my $progress_interval = 0;
581     my $progress_callback = undef;
582     if ($#_ == 1) {
583         $progress_interval = shift;
584         $progress_callback = shift;
585         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
586         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
587     }
588
589     my $record_type;
590     my $num_added = 0;
591     my $num_updated = 0;
592     my $num_items_added = 0;
593     my $num_items_replaced = 0;
594     my $num_items_errored = 0;
595     my $num_ignored = 0;
596     # commit (i.e., save, all records in the batch)
597     SetImportBatchStatus('importing');
598     my $overlay_action = GetImportBatchOverlayAction($batch_id);
599     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
600     my $item_action = GetImportBatchItemAction($batch_id);
601     my $item_tag;
602     my $item_subfield;
603     my $dbh = C4::Context->dbh;
604     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
605                              FROM import_records
606                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
607                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
608                              WHERE import_batch_id = ?");
609     $sth->execute($batch_id);
610     my $marcflavour = C4::Context->preference('marcflavour');
611     my $rec_num = 0;
612     while (my $rowref = $sth->fetchrow_hashref) {
613         $record_type = $rowref->{'record_type'};
614         $rec_num++;
615         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
616             &$progress_callback($rec_num);
617         }
618         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
619             $num_ignored++;
620             next;
621         }
622
623         my $marc_type;
624         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
625             $marc_type = 'UNIMARCAUTH';
626         } elsif ($marcflavour eq 'UNIMARC') {
627             $marc_type = 'UNIMARC';
628         } else {
629             $marc_type = 'USMARC';
630         }
631         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
632
633         if ($record_type eq 'biblio') {
634             # remove any item tags - rely on BatchCommitItems
635             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
636             foreach my $item_field ($marc_record->field($item_tag)) {
637                 $marc_record->delete_field($item_field);
638             }
639         }
640
641         my ($record_result, $item_result, $record_match) =
642             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
643                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
644
645         my $recordid;
646         my $query;
647         if ($record_result eq 'create_new') {
648             $num_added++;
649             if ($record_type eq 'biblio') {
650                 my $biblioitemnumber;
651                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
652                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
653                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
654                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
655                     $num_items_added += $bib_items_added;
656                     $num_items_replaced += $bib_items_replaced;
657                     $num_items_errored += $bib_items_errored;
658                 }
659             } else {
660                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
661                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
662             }
663             my $sth = $dbh->prepare_cached($query);
664             $sth->execute($recordid, $rowref->{'import_record_id'});
665             $sth->finish();
666             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
667         } elsif ($record_result eq 'replace') {
668             $num_updated++;
669             $recordid = $record_match;
670             my $oldxml;
671             if ($record_type eq 'biblio') {
672                 my $oldbiblio = GetBiblio($recordid);
673                 $oldxml = GetXmlBiblio($recordid);
674
675                 # remove item fields so that they don't get
676                 # added again if record is reverted
677                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
678                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
679                 foreach my $item_field ($old_marc->field($item_tag)) {
680                     $old_marc->delete_field($item_field);
681                 }
682                 $oldxml = $old_marc->as_xml($marc_type);
683
684                 ModBiblio($marc_record, $recordid, $oldbiblio->{'frameworkcode'});
685                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
686
687                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
688                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
689                     $num_items_added += $bib_items_added;
690                     $num_items_replaced += $bib_items_replaced;
691                     $num_items_errored += $bib_items_errored;
692                 }
693             } else {
694                 $oldxml = GetAuthorityXML($recordid);
695
696                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
697                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
698             }
699             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
700             $sth->execute($oldxml, $rowref->{'import_record_id'});
701             $sth->finish();
702             my $sth2 = $dbh->prepare_cached($query);
703             $sth2->execute($recordid, $rowref->{'import_record_id'});
704             $sth2->finish();
705             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
706             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
707         } elsif ($record_result eq 'ignore') {
708             $recordid = $record_match;
709             $num_ignored++;
710             $recordid = $record_match;
711             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
712                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
713                 $num_items_added += $bib_items_added;
714          $num_items_replaced += $bib_items_replaced;
715                 $num_items_errored += $bib_items_errored;
716                 # still need to record the matched biblionumber so that the
717                 # items can be reverted
718                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
719                 $sth2->execute($recordid, $rowref->{'import_record_id'});
720                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
721             }
722             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
723         }
724     }
725     $sth->finish();
726     SetImportBatchStatus($batch_id, 'imported');
727     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
728 }
729
730 =head2 BatchCommitItems
731
732   ($num_items_added, $num_items_errored) = 
733          BatchCommitItems($import_record_id, $biblionumber);
734
735 =cut
736
737 sub BatchCommitItems {
738     my ( $import_record_id, $biblionumber, $action ) = @_;
739
740     my $dbh = C4::Context->dbh;
741
742     my $num_items_added = 0;
743     my $num_items_errored = 0;
744     my $num_items_replaced = 0;
745
746     my $sth = $dbh->prepare( "
747         SELECT import_items_id, import_items.marcxml, encoding
748         FROM import_items
749         JOIN import_records USING (import_record_id)
750         WHERE import_record_id = ?
751         ORDER BY import_items_id
752     " );
753     $sth->bind_param( 1, $import_record_id );
754     $sth->execute();
755
756     while ( my $row = $sth->fetchrow_hashref() ) {
757         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
758
759         # Delete date_due subfield as to not accidentally delete item checkout due dates
760         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
761         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
762
763         my $item = TransformMarcToKoha( $item_marc );
764
765         my $duplicate_barcode = exists( $item->{'barcode'} ) && GetItemnumberFromBarcode( $item->{'barcode'} );
766         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
767
768         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
769         if ( $action eq "replace" && $duplicate_itemnumber ) {
770             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
771             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
772             $updsth->bind_param( 1, 'imported' );
773             $updsth->bind_param( 2, $item->{itemnumber} );
774             $updsth->bind_param( 3, $row->{'import_items_id'} );
775             $updsth->execute();
776             $updsth->finish();
777             $num_items_replaced++;
778         } elsif ( $action eq "replace" && $duplicate_barcode ) {
779             my $itemnumber = GetItemnumberFromBarcode( $item->{'barcode'} );
780             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
781             $updsth->bind_param( 1, 'imported' );
782             $updsth->bind_param( 2, $item->{itemnumber} );
783             $updsth->bind_param( 3, $row->{'import_items_id'} );
784             $updsth->execute();
785             $updsth->finish();
786             $num_items_replaced++;
787         } elsif ($duplicate_barcode) {
788             $updsth->bind_param( 1, 'error' );
789             $updsth->bind_param( 2, 'duplicate item barcode' );
790             $updsth->bind_param( 3, $row->{'import_items_id'} );
791             $updsth->execute();
792             $num_items_errored++;
793         } else {
794             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
795             if( $itemnumber ) {
796                 $updsth->bind_param( 1, 'imported' );
797                 $updsth->bind_param( 2, $itemnumber );
798                 $updsth->bind_param( 3, $row->{'import_items_id'} );
799                 $updsth->execute();
800                 $updsth->finish();
801                 $num_items_added++;
802             }
803         }
804     }
805
806     return ( $num_items_added, $num_items_replaced, $num_items_errored );
807 }
808
809 =head2 BatchRevertRecords
810
811   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
812       $num_ignored) = BatchRevertRecords($batch_id);
813
814 =cut
815
816 sub BatchRevertRecords {
817     my $batch_id = shift;
818
819     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
820
821     my $record_type;
822     my $num_deleted = 0;
823     my $num_errors = 0;
824     my $num_reverted = 0;
825     my $num_ignored = 0;
826     my $num_items_deleted = 0;
827     # commit (i.e., save, all records in the batch)
828     SetImportBatchStatus('reverting');
829     my $overlay_action = GetImportBatchOverlayAction($batch_id);
830     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
831     my $dbh = C4::Context->dbh;
832     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
833                              FROM import_records
834                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
835                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
836                              WHERE import_batch_id = ?");
837     $sth->execute($batch_id);
838     my $marc_type;
839     my $marcflavour = C4::Context->preference('marcflavour');
840     while (my $rowref = $sth->fetchrow_hashref) {
841         $record_type = $rowref->{'record_type'};
842         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
843             $num_ignored++;
844             next;
845         }
846         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
847             $marc_type = 'UNIMARCAUTH';
848         } elsif ($marcflavour eq 'UNIMARC') {
849             $marc_type = 'UNIMARC';
850         } else {
851             $marc_type = 'USMARC';
852         }
853
854         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
855
856         if ($record_result eq 'delete') {
857             my $error = undef;
858             if  ($record_type eq 'biblio') {
859                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
860                 $error = DelBiblio($rowref->{'matched_biblionumber'});
861             } else {
862                 my $deletedauthid = DelAuthority($rowref->{'matched_authid'});
863             }
864             if (defined $error) {
865                 $num_errors++;
866             } else {
867                 $num_deleted++;
868                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
869             }
870         } elsif ($record_result eq 'restore') {
871             $num_reverted++;
872             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
873             if ($record_type eq 'biblio') {
874                 my $biblionumber = $rowref->{'matched_biblionumber'};
875                 my $oldbiblio = GetBiblio($biblionumber);
876
877                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
878                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
879
880                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
881                 ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
882             } else {
883                 my $authid = $rowref->{'matched_authid'};
884                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
885             }
886             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
887         } elsif ($record_result eq 'ignore') {
888             if ($record_type eq 'biblio') {
889                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
890             }
891             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
892         }
893         my $query;
894         if ($record_type eq 'biblio') {
895             # remove matched_biblionumber only if there is no 'imported' item left
896             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
897             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
898         } else {
899             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
900         }
901         my $sth2 = $dbh->prepare_cached($query);
902         $sth2->execute($rowref->{'import_record_id'});
903     }
904
905     $sth->finish();
906     SetImportBatchStatus($batch_id, 'reverted');
907     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
908 }
909
910 =head2 BatchRevertItems
911
912   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
913
914 =cut
915
916 sub BatchRevertItems {
917     my ($import_record_id, $biblionumber) = @_;
918
919     my $dbh = C4::Context->dbh;
920     my $num_items_deleted = 0;
921
922     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
923                                    FROM import_items
924                                    JOIN items USING (itemnumber)
925                                    WHERE import_record_id = ?");
926     $sth->bind_param(1, $import_record_id);
927     $sth->execute();
928     while (my $row = $sth->fetchrow_hashref()) {
929         my $error = DelItemCheck($dbh, $biblionumber, $row->{'itemnumber'});
930         if ($error == 1){
931             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
932             $updsth->bind_param(1, 'reverted');
933             $updsth->bind_param(2, $row->{'import_items_id'});
934             $updsth->execute();
935             $updsth->finish();
936             $num_items_deleted++;
937         }
938         else {
939             next;
940         }
941     }
942     $sth->finish();
943     return $num_items_deleted;
944 }
945
946 =head2 CleanBatch
947
948   CleanBatch($batch_id)
949
950 Deletes all staged records from the import batch
951 and sets the status of the batch to 'cleaned'.  Note
952 that deleting a stage record does *not* affect
953 any record that has been committed to the database.
954
955 =cut
956
957 sub CleanBatch {
958     my $batch_id = shift;
959     return unless defined $batch_id;
960
961     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
962     SetImportBatchStatus($batch_id, 'cleaned');
963 }
964
965 =head2 DeleteBatch
966
967   DeleteBatch($batch_id)
968
969 Deletes the record from the database. This can only be done
970 once the batch has been cleaned.
971
972 =cut
973
974 sub DeleteBatch {
975     my $batch_id = shift;
976     return unless defined $batch_id;
977
978     my $dbh = C4::Context->dbh;
979     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
980     $sth->execute( $batch_id );
981 }
982
983 =head2 GetAllImportBatches
984
985   my $results = GetAllImportBatches();
986
987 Returns a references to an array of hash references corresponding
988 to all import_batches rows (of batch_type 'batch'), sorted in 
989 ascending order by import_batch_id.
990
991 =cut
992
993 sub  GetAllImportBatches {
994     my $dbh = C4::Context->dbh;
995     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
996                                     WHERE batch_type IN ('batch', 'webservice')
997                                     ORDER BY import_batch_id ASC");
998
999     my $results = [];
1000     $sth->execute();
1001     while (my $row = $sth->fetchrow_hashref) {
1002         push @$results, $row;
1003     }
1004     $sth->finish();
1005     return $results;
1006 }
1007
1008 =head2 GetStagedWebserviceBatches
1009
1010   my $batch_ids = GetStagedWebserviceBatches();
1011
1012 Returns a references to an array of batch id's
1013 of batch_type 'webservice' that are not imported
1014
1015 =cut
1016
1017 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1018 SELECT import_batch_id FROM import_batches
1019 WHERE batch_type = 'webservice'
1020 AND import_status = 'staged'
1021 EOQ
1022 sub  GetStagedWebserviceBatches {
1023     my $dbh = C4::Context->dbh;
1024     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1025 }
1026
1027 =head2 GetImportBatchRangeDesc
1028
1029   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1030
1031 Returns a reference to an array of hash references corresponding to
1032 import_batches rows (sorted in descending order by import_batch_id)
1033 start at the given offset.
1034
1035 =cut
1036
1037 sub GetImportBatchRangeDesc {
1038     my ($offset, $results_per_group) = @_;
1039
1040     my $dbh = C4::Context->dbh;
1041     my $query = "SELECT * FROM import_batches
1042                                     WHERE batch_type IN ('batch', 'webservice')
1043                                     ORDER BY import_batch_id DESC";
1044     my @params;
1045     if ($results_per_group){
1046         $query .= " LIMIT ?";
1047         push(@params, $results_per_group);
1048     }
1049     if ($offset){
1050         $query .= " OFFSET ?";
1051         push(@params, $offset);
1052     }
1053     my $sth = $dbh->prepare_cached($query);
1054     $sth->execute(@params);
1055     my $results = $sth->fetchall_arrayref({});
1056     $sth->finish();
1057     return $results;
1058 }
1059
1060 =head2 GetItemNumbersFromImportBatch
1061
1062   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1063
1064 =cut
1065
1066 sub GetItemNumbersFromImportBatch {
1067     my ($batch_id) = @_;
1068     my $dbh = C4::Context->dbh;
1069     my $sql = q|
1070 SELECT itemnumber FROM import_items
1071 INNER JOIN items USING (itemnumber)
1072 INNER JOIN import_records USING (import_record_id)
1073 WHERE import_batch_id = ?|;
1074     my  $sth = $dbh->prepare( $sql );
1075     $sth->execute($batch_id);
1076     my @items ;
1077     while ( my ($itm) = $sth->fetchrow_array ) {
1078         push @items, $itm;
1079     }
1080     return @items;
1081 }
1082
1083 =head2 GetNumberOfImportBatches
1084
1085   my $count = GetNumberOfImportBatches();
1086
1087 =cut
1088
1089 sub GetNumberOfNonZ3950ImportBatches {
1090     my $dbh = C4::Context->dbh;
1091     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1092     $sth->execute();
1093     my ($count) = $sth->fetchrow_array();
1094     $sth->finish();
1095     return $count;
1096 }
1097
1098 =head2 GetImportBiblios
1099
1100   my $results = GetImportBiblios($importid);
1101
1102 =cut
1103
1104 sub GetImportBiblios {
1105     my ($import_record_id) = @_;
1106
1107     my $dbh = C4::Context->dbh;
1108     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1109     return $dbh->selectall_arrayref(
1110         $query,
1111         { Slice => {} },
1112         $import_record_id
1113     );
1114
1115 }
1116
1117 =head2 GetImportRecordsRange
1118
1119   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1120
1121 Returns a reference to an array of hash references corresponding to
1122 import_biblios/import_auths/import_records rows for a given batch
1123 starting at the given offset.
1124
1125 =cut
1126
1127 sub GetImportRecordsRange {
1128     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1129
1130     my $dbh = C4::Context->dbh;
1131
1132     my $order_by = $parameters->{order_by} || 'import_record_id';
1133     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1134
1135     my $order_by_direction =
1136       uc( $parameters->{order_by_direction} ) eq 'DESC' ? 'DESC' : 'ASC';
1137
1138     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1139
1140     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1141                                            record_sequence, status, overlay_status,
1142                                            matched_biblionumber, matched_authid, record_type
1143                                     FROM   import_records
1144                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1145                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1146                                     WHERE  import_batch_id = ?";
1147     my @params;
1148     push(@params, $batch_id);
1149     if ($status) {
1150         $query .= " AND status=?";
1151         push(@params,$status);
1152     }
1153
1154     $query.=" ORDER BY $order_by $order_by_direction";
1155
1156     if($results_per_group){
1157         $query .= " LIMIT ?";
1158         push(@params, $results_per_group);
1159     }
1160     if($offset){
1161         $query .= " OFFSET ?";
1162         push(@params, $offset);
1163     }
1164     my $sth = $dbh->prepare_cached($query);
1165     $sth->execute(@params);
1166     my $results = $sth->fetchall_arrayref({});
1167     $sth->finish();
1168     return $results;
1169
1170 }
1171
1172 =head2 GetBestRecordMatch
1173
1174   my $record_id = GetBestRecordMatch($import_record_id);
1175
1176 =cut
1177
1178 sub GetBestRecordMatch {
1179     my ($import_record_id) = @_;
1180
1181     my $dbh = C4::Context->dbh;
1182     my $sth = $dbh->prepare("SELECT candidate_match_id
1183                              FROM   import_record_matches
1184                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1185                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1186                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1187                              WHERE  import_record_matches.import_record_id = ? AND
1188                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1189                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1190                              ORDER BY score DESC, candidate_match_id DESC");
1191     $sth->execute($import_record_id);
1192     my ($record_id) = $sth->fetchrow_array();
1193     $sth->finish();
1194     return $record_id;
1195 }
1196
1197 =head2 GetImportBatchStatus
1198
1199   my $status = GetImportBatchStatus($batch_id);
1200
1201 =cut
1202
1203 sub GetImportBatchStatus {
1204     my ($batch_id) = @_;
1205
1206     my $dbh = C4::Context->dbh;
1207     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1208     $sth->execute($batch_id);
1209     my ($status) = $sth->fetchrow_array();
1210     $sth->finish();
1211     return $status;
1212
1213 }
1214
1215 =head2 SetImportBatchStatus
1216
1217   SetImportBatchStatus($batch_id, $new_status);
1218
1219 =cut
1220
1221 sub SetImportBatchStatus {
1222     my ($batch_id, $new_status) = @_;
1223
1224     my $dbh = C4::Context->dbh;
1225     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1226     $sth->execute($new_status, $batch_id);
1227     $sth->finish();
1228
1229 }
1230
1231 =head2 GetImportBatchOverlayAction
1232
1233   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1234
1235 =cut
1236
1237 sub GetImportBatchOverlayAction {
1238     my ($batch_id) = @_;
1239
1240     my $dbh = C4::Context->dbh;
1241     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1242     $sth->execute($batch_id);
1243     my ($overlay_action) = $sth->fetchrow_array();
1244     $sth->finish();
1245     return $overlay_action;
1246
1247 }
1248
1249
1250 =head2 SetImportBatchOverlayAction
1251
1252   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1253
1254 =cut
1255
1256 sub SetImportBatchOverlayAction {
1257     my ($batch_id, $new_overlay_action) = @_;
1258
1259     my $dbh = C4::Context->dbh;
1260     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1261     $sth->execute($new_overlay_action, $batch_id);
1262     $sth->finish();
1263
1264 }
1265
1266 =head2 GetImportBatchNoMatchAction
1267
1268   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1269
1270 =cut
1271
1272 sub GetImportBatchNoMatchAction {
1273     my ($batch_id) = @_;
1274
1275     my $dbh = C4::Context->dbh;
1276     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1277     $sth->execute($batch_id);
1278     my ($nomatch_action) = $sth->fetchrow_array();
1279     $sth->finish();
1280     return $nomatch_action;
1281
1282 }
1283
1284
1285 =head2 SetImportBatchNoMatchAction
1286
1287   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1288
1289 =cut
1290
1291 sub SetImportBatchNoMatchAction {
1292     my ($batch_id, $new_nomatch_action) = @_;
1293
1294     my $dbh = C4::Context->dbh;
1295     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1296     $sth->execute($new_nomatch_action, $batch_id);
1297     $sth->finish();
1298
1299 }
1300
1301 =head2 GetImportBatchItemAction
1302
1303   my $item_action = GetImportBatchItemAction($batch_id);
1304
1305 =cut
1306
1307 sub GetImportBatchItemAction {
1308     my ($batch_id) = @_;
1309
1310     my $dbh = C4::Context->dbh;
1311     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1312     $sth->execute($batch_id);
1313     my ($item_action) = $sth->fetchrow_array();
1314     $sth->finish();
1315     return $item_action;
1316
1317 }
1318
1319
1320 =head2 SetImportBatchItemAction
1321
1322   SetImportBatchItemAction($batch_id, $new_item_action);
1323
1324 =cut
1325
1326 sub SetImportBatchItemAction {
1327     my ($batch_id, $new_item_action) = @_;
1328
1329     my $dbh = C4::Context->dbh;
1330     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1331     $sth->execute($new_item_action, $batch_id);
1332     $sth->finish();
1333
1334 }
1335
1336 =head2 GetImportBatchMatcher
1337
1338   my $matcher_id = GetImportBatchMatcher($batch_id);
1339
1340 =cut
1341
1342 sub GetImportBatchMatcher {
1343     my ($batch_id) = @_;
1344
1345     my $dbh = C4::Context->dbh;
1346     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1347     $sth->execute($batch_id);
1348     my ($matcher_id) = $sth->fetchrow_array();
1349     $sth->finish();
1350     return $matcher_id;
1351
1352 }
1353
1354
1355 =head2 SetImportBatchMatcher
1356
1357   SetImportBatchMatcher($batch_id, $new_matcher_id);
1358
1359 =cut
1360
1361 sub SetImportBatchMatcher {
1362     my ($batch_id, $new_matcher_id) = @_;
1363
1364     my $dbh = C4::Context->dbh;
1365     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1366     $sth->execute($new_matcher_id, $batch_id);
1367     $sth->finish();
1368
1369 }
1370
1371 =head2 GetImportRecordOverlayStatus
1372
1373   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1374
1375 =cut
1376
1377 sub GetImportRecordOverlayStatus {
1378     my ($import_record_id) = @_;
1379
1380     my $dbh = C4::Context->dbh;
1381     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1382     $sth->execute($import_record_id);
1383     my ($overlay_status) = $sth->fetchrow_array();
1384     $sth->finish();
1385     return $overlay_status;
1386
1387 }
1388
1389
1390 =head2 SetImportRecordOverlayStatus
1391
1392   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1393
1394 =cut
1395
1396 sub SetImportRecordOverlayStatus {
1397     my ($import_record_id, $new_overlay_status) = @_;
1398
1399     my $dbh = C4::Context->dbh;
1400     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1401     $sth->execute($new_overlay_status, $import_record_id);
1402     $sth->finish();
1403
1404 }
1405
1406 =head2 GetImportRecordStatus
1407
1408   my $status = GetImportRecordStatus($import_record_id);
1409
1410 =cut
1411
1412 sub GetImportRecordStatus {
1413     my ($import_record_id) = @_;
1414
1415     my $dbh = C4::Context->dbh;
1416     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1417     $sth->execute($import_record_id);
1418     my ($status) = $sth->fetchrow_array();
1419     $sth->finish();
1420     return $status;
1421
1422 }
1423
1424
1425 =head2 SetImportRecordStatus
1426
1427   SetImportRecordStatus($import_record_id, $new_status);
1428
1429 =cut
1430
1431 sub SetImportRecordStatus {
1432     my ($import_record_id, $new_status) = @_;
1433
1434     my $dbh = C4::Context->dbh;
1435     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1436     $sth->execute($new_status, $import_record_id);
1437     $sth->finish();
1438
1439 }
1440
1441 =head2 GetImportRecordMatches
1442
1443   my $results = GetImportRecordMatches($import_record_id, $best_only);
1444
1445 =cut
1446
1447 sub GetImportRecordMatches {
1448     my $import_record_id = shift;
1449     my $best_only = @_ ? shift : 0;
1450
1451     my $dbh = C4::Context->dbh;
1452     # FIXME currently biblio only
1453     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1454                                     candidate_match_id, score, record_type
1455                                     FROM import_records
1456                                     JOIN import_record_matches USING (import_record_id)
1457                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1458                                     WHERE import_record_id = ?
1459                                     ORDER BY score DESC, biblionumber DESC");
1460     $sth->bind_param(1, $import_record_id);
1461     my $results = [];
1462     $sth->execute();
1463     while (my $row = $sth->fetchrow_hashref) {
1464         if ($row->{'record_type'} eq 'auth') {
1465             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1466         }
1467         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1468         push @$results, $row;
1469         last if $best_only;
1470     }
1471     $sth->finish();
1472
1473     return $results;
1474     
1475 }
1476
1477
1478 =head2 SetImportRecordMatches
1479
1480   SetImportRecordMatches($import_record_id, @matches);
1481
1482 =cut
1483
1484 sub SetImportRecordMatches {
1485     my $import_record_id = shift;
1486     my @matches = @_;
1487
1488     my $dbh = C4::Context->dbh;
1489     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1490     $delsth->execute($import_record_id);
1491     $delsth->finish();
1492
1493     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1494                                     VALUES (?, ?, ?)");
1495     foreach my $match (@matches) {
1496         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1497     }
1498 }
1499
1500
1501 # internal functions
1502
1503 sub _create_import_record {
1504     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1505
1506     my $dbh = C4::Context->dbh;
1507     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1508                                                          record_type, encoding, z3950random)
1509                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1510     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1511                   $record_type, $encoding, $z3950random);
1512     my $import_record_id = $dbh->{'mysql_insertid'};
1513     $sth->finish();
1514     return $import_record_id;
1515 }
1516
1517 sub _update_import_record_marc {
1518     my ($import_record_id, $marc_record, $marc_type) = @_;
1519
1520     my $dbh = C4::Context->dbh;
1521     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1522                              WHERE  import_record_id = ?");
1523     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1524     $sth->finish();
1525 }
1526
1527 sub _add_auth_fields {
1528     my ($import_record_id, $marc_record) = @_;
1529
1530     my $controlnumber;
1531     if ($marc_record->field('001')) {
1532         $controlnumber = $marc_record->field('001')->data();
1533     }
1534     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1535     my $dbh = C4::Context->dbh;
1536     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1537     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1538     $sth->finish();
1539 }
1540
1541 sub _add_biblio_fields {
1542     my ($import_record_id, $marc_record) = @_;
1543
1544     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1545     my $dbh = C4::Context->dbh;
1546     # FIXME no controlnumber, originalsource
1547     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1548     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1549     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1550     $sth->finish();
1551                 
1552 }
1553
1554 sub _update_biblio_fields {
1555     my ($import_record_id, $marc_record) = @_;
1556
1557     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1558     my $dbh = C4::Context->dbh;
1559     # FIXME no controlnumber, originalsource
1560     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1561     $isbn =~ s/\(.*$//;
1562     $isbn =~ tr/ -_//;
1563     $isbn = uc $isbn;
1564     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1565                              WHERE  import_record_id = ?");
1566     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1567     $sth->finish();
1568 }
1569
1570 sub _parse_biblio_fields {
1571     my ($marc_record) = @_;
1572
1573     my $dbh = C4::Context->dbh;
1574     my $bibliofields = TransformMarcToKoha($marc_record, '');
1575     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1576
1577 }
1578
1579 sub _update_batch_record_counts {
1580     my ($batch_id) = @_;
1581
1582     my $dbh = C4::Context->dbh;
1583     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1584                                         num_records = (
1585                                             SELECT COUNT(*)
1586                                             FROM import_records
1587                                             WHERE import_batch_id = import_batches.import_batch_id),
1588                                         num_items = (
1589                                             SELECT COUNT(*)
1590                                             FROM import_records
1591                                             JOIN import_items USING (import_record_id)
1592                                             WHERE import_batch_id = import_batches.import_batch_id
1593                                             AND record_type = 'biblio')
1594                                     WHERE import_batch_id = ?");
1595     $sth->bind_param(1, $batch_id);
1596     $sth->execute();
1597     $sth->finish();
1598 }
1599
1600 sub _get_commit_action {
1601     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1602     
1603     if ($record_type eq 'biblio') {
1604         my ($bib_result, $bib_match, $item_result);
1605
1606         if ($overlay_status ne 'no_match') {
1607             $bib_match = GetBestRecordMatch($import_record_id);
1608             if ($overlay_action eq 'replace') {
1609                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1610             } elsif ($overlay_action eq 'create_new') {
1611                 $bib_result  = 'create_new';
1612             } elsif ($overlay_action eq 'ignore') {
1613                 $bib_result  = 'ignore';
1614             }
1615          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1616                 $item_result = 'create_new';
1617        }
1618       elsif($item_action eq 'replace'){
1619           $item_result = 'replace';
1620           }
1621       else {
1622              $item_result = 'ignore';
1623            }
1624         } else {
1625             $bib_result = $nomatch_action;
1626             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1627         }
1628         return ($bib_result, $item_result, $bib_match);
1629     } else { # must be auths
1630         my ($auth_result, $auth_match);
1631
1632         if ($overlay_status ne 'no_match') {
1633             $auth_match = GetBestRecordMatch($import_record_id);
1634             if ($overlay_action eq 'replace') {
1635                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1636             } elsif ($overlay_action eq 'create_new') {
1637                 $auth_result  = 'create_new';
1638             } elsif ($overlay_action eq 'ignore') {
1639                 $auth_result  = 'ignore';
1640             }
1641         } else {
1642             $auth_result = $nomatch_action;
1643         }
1644
1645         return ($auth_result, undef, $auth_match);
1646
1647     }
1648 }
1649
1650 sub _get_revert_action {
1651     my ($overlay_action, $overlay_status, $status) = @_;
1652
1653     my $bib_result;
1654
1655     if ($status eq 'ignored') {
1656         $bib_result = 'ignore';
1657     } else {
1658         if ($overlay_action eq 'create_new') {
1659             $bib_result = 'delete';
1660         } else {
1661             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1662         }
1663     }
1664     return $bib_result;
1665 }
1666
1667 1;
1668 __END__
1669
1670 =head1 AUTHOR
1671
1672 Koha Development Team <http://koha-community.org/>
1673
1674 Galen Charlton <galen.charlton@liblime.com>
1675
1676 =cut