Bug 12478 - add some base objects that the ES code will depend on
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Plugins::Handler;
31 use Koha::Logger;
32
33 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
34
35 BEGIN {
36         require Exporter;
37         @ISA    = qw(Exporter);
38         @EXPORT = qw(
39     GetZ3950BatchId
40     GetWebserviceBatchId
41     GetImportRecordMarc
42     GetImportRecordMarcXML
43     AddImportBatch
44     GetImportBatch
45     AddAuthToBatch
46     AddBiblioToBatch
47     AddItemsToImportBiblio
48     ModAuthorityInBatch
49     ModBiblioInBatch
50
51     BatchStageMarcRecords
52     BatchFindDuplicates
53     BatchCommitRecords
54     BatchRevertRecords
55     CleanBatch
56
57     GetAllImportBatches
58     GetStagedWebserviceBatches
59     GetImportBatchRangeDesc
60     GetNumberOfNonZ3950ImportBatches
61     GetImportBiblios
62     GetImportRecordsRange
63         GetItemNumbersFromImportBatch
64     
65     GetImportBatchStatus
66     SetImportBatchStatus
67     GetImportBatchOverlayAction
68     SetImportBatchOverlayAction
69     GetImportBatchNoMatchAction
70     SetImportBatchNoMatchAction
71     GetImportBatchItemAction
72     SetImportBatchItemAction
73     GetImportBatchMatcher
74     SetImportBatchMatcher
75     GetImportRecordOverlayStatus
76     SetImportRecordOverlayStatus
77     GetImportRecordStatus
78     SetImportRecordStatus
79     GetImportRecordMatches
80     SetImportRecordMatches
81         );
82 }
83
84 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
85
86 =head1 NAME
87
88 C4::ImportBatch - manage batches of imported MARC records
89
90 =head1 SYNOPSIS
91
92 use C4::ImportBatch;
93
94 =head1 FUNCTIONS
95
96 =head2 GetZ3950BatchId
97
98   my $batchid = GetZ3950BatchId($z3950server);
99
100 Retrieves the ID of the import batch for the Z39.50
101 reservoir for the given target.  If necessary,
102 creates the import batch.
103
104 =cut
105
106 sub GetZ3950BatchId {
107     my ($z3950server) = @_;
108
109     my $dbh = C4::Context->dbh;
110     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
111                              WHERE  batch_type = 'z3950'
112                              AND    file_name = ?");
113     $sth->execute($z3950server);
114     my $rowref = $sth->fetchrow_arrayref();
115     $sth->finish();
116     if (defined $rowref) {
117         return $rowref->[0];
118     } else {
119         my $batch_id = AddImportBatch( {
120                 overlay_action => 'create_new',
121                 import_status => 'staged',
122                 batch_type => 'z3950',
123                 file_name => $z3950server,
124             } );
125         return $batch_id;
126     }
127     
128 }
129
130 =head2 GetWebserviceBatchId
131
132   my $batchid = GetWebserviceBatchId();
133
134 Retrieves the ID of the import batch for webservice.
135 If necessary, creates the import batch.
136
137 =cut
138
139 my $WEBSERVICE_BASE_QRY = <<EOQ;
140 SELECT import_batch_id FROM import_batches
141 WHERE  batch_type = 'webservice'
142 AND    import_status = 'staged'
143 EOQ
144 sub GetWebserviceBatchId {
145     my ($params) = @_;
146
147     my $dbh = C4::Context->dbh;
148     my $sql = $WEBSERVICE_BASE_QRY;
149     my @args;
150     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
151         if (my $val = $params->{$field}) {
152             $sql .= " AND $field = ?";
153             push @args, $val;
154         }
155     }
156     my $id = $dbh->selectrow_array($sql, undef, @args);
157     return $id if $id;
158
159     $params->{batch_type} = 'webservice';
160     $params->{import_status} = 'staged';
161     return AddImportBatch($params);
162 }
163
164 =head2 GetImportRecordMarc
165
166   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
167
168 =cut
169
170 sub GetImportRecordMarc {
171     my ($import_record_id) = @_;
172
173     my $dbh = C4::Context->dbh;
174     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
175         SELECT marc, encoding
176         FROM import_records
177         WHERE import_record_id = ?
178     |, undef, $import_record_id );
179
180     return $marc, $encoding;
181 }
182
183 sub GetRecordFromImportBiblio {
184     my ( $import_record_id, $embed_items ) = @_;
185
186     my ($marc) = GetImportRecordMarc($import_record_id);
187     my $record = MARC::Record->new_from_usmarc($marc);
188
189     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
190
191     return $record;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 GetImportRecordMarcXML
213
214   my $marcxml = GetImportRecordMarcXML($import_record_id);
215
216 =cut
217
218 sub GetImportRecordMarcXML {
219     my ($import_record_id) = @_;
220
221     my $dbh = C4::Context->dbh;
222     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
223     $sth->execute($import_record_id);
224     my ($marcxml) = $sth->fetchrow();
225     $sth->finish();
226     return $marcxml;
227
228 }
229
230 =head2 AddImportBatch
231
232   my $batch_id = AddImportBatch($params_hash);
233
234 =cut
235
236 sub AddImportBatch {
237     my ($params) = @_;
238
239     my (@fields, @vals);
240     foreach (qw( matcher_id template_id branchcode
241                  overlay_action nomatch_action item_action
242                  import_status batch_type file_name comments record_type )) {
243         if (exists $params->{$_}) {
244             push @fields, $_;
245             push @vals, $params->{$_};
246         }
247     }
248     my $dbh = C4::Context->dbh;
249     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
250                                   VALUES (".join( ',', map '?', @fields).")",
251              undef,
252              @vals);
253     return $dbh->{'mysql_insertid'};
254 }
255
256 =head2 GetImportBatch 
257
258   my $row = GetImportBatch($batch_id);
259
260 Retrieve a hashref of an import_batches row.
261
262 =cut
263
264 sub GetImportBatch {
265     my ($batch_id) = @_;
266
267     my $dbh = C4::Context->dbh;
268     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
269     $sth->bind_param(1, $batch_id);
270     $sth->execute();
271     my $result = $sth->fetchrow_hashref;
272     $sth->finish();
273     return $result;
274
275 }
276
277 =head2 AddBiblioToBatch 
278
279   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
280                 $marc_record, $encoding, $z3950random, $update_counts);
281
282 =cut
283
284 sub AddBiblioToBatch {
285     my $batch_id = shift;
286     my $record_sequence = shift;
287     my $marc_record = shift;
288     my $encoding = shift;
289     my $z3950random = shift;
290     my $update_counts = @_ ? shift : 1;
291
292     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
293     _add_biblio_fields($import_record_id, $marc_record);
294     _update_batch_record_counts($batch_id) if $update_counts;
295     return $import_record_id;
296 }
297
298 =head2 ModBiblioInBatch
299
300   ModBiblioInBatch($import_record_id, $marc_record);
301
302 =cut
303
304 sub ModBiblioInBatch {
305     my ($import_record_id, $marc_record) = @_;
306
307     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
308     _update_biblio_fields($import_record_id, $marc_record);
309
310 }
311
312 =head2 AddAuthToBatch
313
314   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
315                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
316
317 =cut
318
319 sub AddAuthToBatch {
320     my $batch_id = shift;
321     my $record_sequence = shift;
322     my $marc_record = shift;
323     my $encoding = shift;
324     my $z3950random = shift;
325     my $update_counts = @_ ? shift : 1;
326     my $marc_type = shift || C4::Context->preference('marcflavour');
327
328     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
329
330     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
331     _add_auth_fields($import_record_id, $marc_record);
332     _update_batch_record_counts($batch_id) if $update_counts;
333     return $import_record_id;
334 }
335
336 =head2 ModAuthInBatch
337
338   ModAuthInBatch($import_record_id, $marc_record);
339
340 =cut
341
342 sub ModAuthInBatch {
343     my ($import_record_id, $marc_record) = @_;
344
345     my $marcflavour = C4::Context->preference('marcflavour');
346     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
347
348 }
349
350 =head2 BatchStageMarcRecords
351
352 ( $batch_id, $num_records, $num_items, @invalid_records ) =
353   BatchStageMarcRecords(
354     $encoding,                   $marc_records,
355     $file_name,                  $to_marc_plugin,
356     $marc_modification_template, $comments,
357     $branch_code,                $parse_items,
358     $leave_as_staging,           $progress_interval,
359     $progress_callback
360   );
361
362 =cut
363
364 sub BatchStageMarcRecords {
365     my $record_type = shift;
366     my $encoding = shift;
367     my $marc_records = shift;
368     my $file_name = shift;
369     my $to_marc_plugin = shift;
370     my $marc_modification_template = shift;
371     my $comments = shift;
372     my $branch_code = shift;
373     my $parse_items = shift;
374     my $leave_as_staging = shift;
375
376     # optional callback to monitor status 
377     # of job
378     my $progress_interval = 0;
379     my $progress_callback = undef;
380     if ($#_ == 1) {
381         $progress_interval = shift;
382         $progress_callback = shift;
383         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
384         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
385     } 
386     
387     my $batch_id = AddImportBatch( {
388             overlay_action => 'create_new',
389             import_status => 'staging',
390             batch_type => 'batch',
391             file_name => $file_name,
392             comments => $comments,
393             record_type => $record_type,
394         } );
395     if ($parse_items) {
396         SetImportBatchItemAction($batch_id, 'always_add');
397     } else {
398         SetImportBatchItemAction($batch_id, 'ignore');
399     }
400
401     $marc_records = Koha::Plugins::Handler->run(
402         {
403             class  => $to_marc_plugin,
404             method => 'to_marc',
405             params => { data => $marc_records }
406         }
407     ) if $to_marc_plugin;
408
409     my $marc_type = C4::Context->preference('marcflavour');
410     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
411     my @invalid_records = ();
412     my $num_valid = 0;
413     my $num_items = 0;
414     # FIXME - for now, we're dealing only with bibs
415     my $rec_num = 0;
416     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
417         $marc_blob =~ s/^\s+//g;
418         $marc_blob =~ s/\s+$//g;
419         next unless $marc_blob;
420         $rec_num++;
421         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
422             &$progress_callback($rec_num);
423         }
424         my ($marc_record, $charset_guessed, $char_errors) =
425             MarcToUTF8Record($marc_blob, $marc_type, $encoding);
426
427         $encoding = $charset_guessed unless $encoding;
428
429         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
430
431         my $import_record_id;
432         if (scalar($marc_record->fields()) == 0) {
433             push @invalid_records, $marc_blob;
434         } else {
435
436             # Normalize the record so it doesn't have separated diacritics
437             SetUTF8Flag($marc_record);
438
439             $num_valid++;
440             if ($record_type eq 'biblio') {
441                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
442                 if ($parse_items) {
443                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
444                     $num_items += scalar(@import_items_ids);
445                 }
446             } elsif ($record_type eq 'auth') {
447                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
448             }
449         }
450     }
451     unless ($leave_as_staging) {
452         SetImportBatchStatus($batch_id, 'staged');
453     }
454     # FIXME branch_code, number of bibs, number of items
455     _update_batch_record_counts($batch_id);
456     return ($batch_id, $num_valid, $num_items, @invalid_records);
457 }
458
459 =head2 AddItemsToImportBiblio
460
461   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
462                 $import_record_id, $marc_record, $update_counts);
463
464 =cut
465
466 sub AddItemsToImportBiblio {
467     my $batch_id = shift;
468     my $import_record_id = shift;
469     my $marc_record = shift;
470     my $update_counts = @_ ? shift : 0;
471
472     my @import_items_ids = ();
473    
474     my $dbh = C4::Context->dbh; 
475     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
476     foreach my $item_field ($marc_record->field($item_tag)) {
477         my $item_marc = MARC::Record->new();
478         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
479         $item_marc->append_fields($item_field);
480         $marc_record->delete_field($item_field);
481         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
482                                         VALUES (?, ?, ?)");
483         $sth->bind_param(1, $import_record_id);
484         $sth->bind_param(2, 'staged');
485         $sth->bind_param(3, $item_marc->as_xml());
486         $sth->execute();
487         push @import_items_ids, $dbh->{'mysql_insertid'};
488         $sth->finish();
489     }
490
491     if ($#import_items_ids > -1) {
492         _update_batch_record_counts($batch_id) if $update_counts;
493         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
494     }
495     return @import_items_ids;
496 }
497
498 =head2 BatchFindDuplicates
499
500   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
501              $max_matches, $progress_interval, $progress_callback);
502
503 Goes through the records loaded in the batch and attempts to 
504 find duplicates for each one.  Sets the matching status 
505 of each record to "no_match" or "auto_match" as appropriate.
506
507 The $max_matches parameter is optional; if it is not supplied,
508 it defaults to 10.
509
510 The $progress_interval and $progress_callback parameters are 
511 optional; if both are supplied, the sub referred to by
512 $progress_callback will be invoked every $progress_interval
513 records using the number of records processed as the 
514 singular argument.
515
516 =cut
517
518 sub BatchFindDuplicates {
519     my $batch_id = shift;
520     my $matcher = shift;
521     my $max_matches = @_ ? shift : 10;
522
523     # optional callback to monitor status 
524     # of job
525     my $progress_interval = 0;
526     my $progress_callback = undef;
527     if ($#_ == 1) {
528         $progress_interval = shift;
529         $progress_callback = shift;
530         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
531         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
532     }
533
534     my $dbh = C4::Context->dbh;
535
536     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
537                              FROM import_records
538                              WHERE import_batch_id = ?");
539     $sth->execute($batch_id);
540     my $num_with_matches = 0;
541     my $rec_num = 0;
542     while (my $rowref = $sth->fetchrow_hashref) {
543         $rec_num++;
544         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
545             &$progress_callback($rec_num);
546         }
547         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
548         my @matches = ();
549         if (defined $matcher) {
550             @matches = $matcher->get_matches($marc_record, $max_matches);
551         }
552         if (scalar(@matches) > 0) {
553             $num_with_matches++;
554             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
555             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
556         } else {
557             SetImportRecordMatches($rowref->{'import_record_id'}, ());
558             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
559         }
560     }
561     $sth->finish();
562     return $num_with_matches;
563 }
564
565 =head2 BatchCommitRecords
566
567   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
568         BatchCommitRecords($batch_id, $framework,
569         $progress_interval, $progress_callback);
570
571 =cut
572
573 sub BatchCommitRecords {
574     my $batch_id = shift;
575     my $framework = shift;
576
577     # optional callback to monitor status 
578     # of job
579     my $progress_interval = 0;
580     my $progress_callback = undef;
581     if ($#_ == 1) {
582         $progress_interval = shift;
583         $progress_callback = shift;
584         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
585         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
586     }
587
588     my $record_type;
589     my $num_added = 0;
590     my $num_updated = 0;
591     my $num_items_added = 0;
592     my $num_items_replaced = 0;
593     my $num_items_errored = 0;
594     my $num_ignored = 0;
595     # commit (i.e., save, all records in the batch)
596     SetImportBatchStatus('importing');
597     my $overlay_action = GetImportBatchOverlayAction($batch_id);
598     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
599     my $item_action = GetImportBatchItemAction($batch_id);
600     my $item_tag;
601     my $item_subfield;
602     my $dbh = C4::Context->dbh;
603     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
604                              FROM import_records
605                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
606                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
607                              WHERE import_batch_id = ?");
608     $sth->execute($batch_id);
609     my $marcflavour = C4::Context->preference('marcflavour');
610     my $rec_num = 0;
611     while (my $rowref = $sth->fetchrow_hashref) {
612         $record_type = $rowref->{'record_type'};
613         $rec_num++;
614         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
615             &$progress_callback($rec_num);
616         }
617         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
618             $num_ignored++;
619             next;
620         }
621
622         my $marc_type;
623         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
624             $marc_type = 'UNIMARCAUTH';
625         } elsif ($marcflavour eq 'UNIMARC') {
626             $marc_type = 'UNIMARC';
627         } else {
628             $marc_type = 'USMARC';
629         }
630         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
631
632         if ($record_type eq 'biblio') {
633             # remove any item tags - rely on BatchCommitItems
634             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
635             foreach my $item_field ($marc_record->field($item_tag)) {
636                 $marc_record->delete_field($item_field);
637             }
638         }
639
640         my ($record_result, $item_result, $record_match) =
641             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
642                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
643
644         my $recordid;
645         my $query;
646         if ($record_result eq 'create_new') {
647             $num_added++;
648             if ($record_type eq 'biblio') {
649                 my $biblioitemnumber;
650                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
651                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
652                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
653                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
654                     $num_items_added += $bib_items_added;
655                     $num_items_replaced += $bib_items_replaced;
656                     $num_items_errored += $bib_items_errored;
657                 }
658             } else {
659                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
660                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
661             }
662             my $sth = $dbh->prepare_cached($query);
663             $sth->execute($recordid, $rowref->{'import_record_id'});
664             $sth->finish();
665             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
666         } elsif ($record_result eq 'replace') {
667             $num_updated++;
668             $recordid = $record_match;
669             my $oldxml;
670             if ($record_type eq 'biblio') {
671                 my $oldbiblio = GetBiblio($recordid);
672                 $oldxml = GetXmlBiblio($recordid);
673
674                 # remove item fields so that they don't get
675                 # added again if record is reverted
676                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
677                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
678                 foreach my $item_field ($old_marc->field($item_tag)) {
679                     $old_marc->delete_field($item_field);
680                 }
681                 $oldxml = $old_marc->as_xml($marc_type);
682
683                 ModBiblio($marc_record, $recordid, $oldbiblio->{'frameworkcode'});
684                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
685
686                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
687                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
688                     $num_items_added += $bib_items_added;
689                     $num_items_replaced += $bib_items_replaced;
690                     $num_items_errored += $bib_items_errored;
691                 }
692             } else {
693                 $oldxml = GetAuthorityXML($recordid);
694
695                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
696                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
697             }
698             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
699             $sth->execute($oldxml, $rowref->{'import_record_id'});
700             $sth->finish();
701             my $sth2 = $dbh->prepare_cached($query);
702             $sth2->execute($recordid, $rowref->{'import_record_id'});
703             $sth2->finish();
704             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
705             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
706         } elsif ($record_result eq 'ignore') {
707             $recordid = $record_match;
708             $num_ignored++;
709             $recordid = $record_match;
710             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
711                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
712                 $num_items_added += $bib_items_added;
713          $num_items_replaced += $bib_items_replaced;
714                 $num_items_errored += $bib_items_errored;
715                 # still need to record the matched biblionumber so that the
716                 # items can be reverted
717                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
718                 $sth2->execute($recordid, $rowref->{'import_record_id'});
719                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
720             }
721             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
722         }
723     }
724     $sth->finish();
725     SetImportBatchStatus($batch_id, 'imported');
726     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
727 }
728
729 =head2 BatchCommitItems
730
731   ($num_items_added, $num_items_errored) = 
732          BatchCommitItems($import_record_id, $biblionumber);
733
734 =cut
735
736 sub BatchCommitItems {
737     my ( $import_record_id, $biblionumber, $action ) = @_;
738
739     my $dbh = C4::Context->dbh;
740
741     my $num_items_added = 0;
742     my $num_items_errored = 0;
743     my $num_items_replaced = 0;
744
745     my $sth = $dbh->prepare( "
746         SELECT import_items_id, import_items.marcxml, encoding
747         FROM import_items
748         JOIN import_records USING (import_record_id)
749         WHERE import_record_id = ?
750         ORDER BY import_items_id
751     " );
752     $sth->bind_param( 1, $import_record_id );
753     $sth->execute();
754
755     while ( my $row = $sth->fetchrow_hashref() ) {
756         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
757
758         # Delete date_due subfield as to not accidentally delete item checkout due dates
759         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
760         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
761
762         my $item = TransformMarcToKoha( $item_marc );
763
764         my $duplicate_barcode = exists( $item->{'barcode'} ) && GetItemnumberFromBarcode( $item->{'barcode'} );
765         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
766
767         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
768         if ( $action eq "replace" && $duplicate_itemnumber ) {
769             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
770             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
771             $updsth->bind_param( 1, 'imported' );
772             $updsth->bind_param( 2, $item->{itemnumber} );
773             $updsth->bind_param( 3, $row->{'import_items_id'} );
774             $updsth->execute();
775             $updsth->finish();
776             $num_items_replaced++;
777         } elsif ( $action eq "replace" && $duplicate_barcode ) {
778             my $itemnumber = GetItemnumberFromBarcode( $item->{'barcode'} );
779             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
780             $updsth->bind_param( 1, 'imported' );
781             $updsth->bind_param( 2, $item->{itemnumber} );
782             $updsth->bind_param( 3, $row->{'import_items_id'} );
783             $updsth->execute();
784             $updsth->finish();
785             $num_items_replaced++;
786         } elsif ($duplicate_barcode) {
787             $updsth->bind_param( 1, 'error' );
788             $updsth->bind_param( 2, 'duplicate item barcode' );
789             $updsth->bind_param( 3, $row->{'import_items_id'} );
790             $updsth->execute();
791             $num_items_errored++;
792         } else {
793             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
794             $updsth->bind_param( 1, 'imported' );
795             $updsth->bind_param( 2, $itemnumber );
796             $updsth->bind_param( 3, $row->{'import_items_id'} );
797             $updsth->execute();
798             $updsth->finish();
799             $num_items_added++;
800         }
801     }
802
803     return ( $num_items_added, $num_items_replaced, $num_items_errored );
804 }
805
806 =head2 BatchRevertRecords
807
808   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
809       $num_ignored) = BatchRevertRecords($batch_id);
810
811 =cut
812
813 sub BatchRevertRecords {
814     my $batch_id = shift;
815
816     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
817
818     my $record_type;
819     my $num_deleted = 0;
820     my $num_errors = 0;
821     my $num_reverted = 0;
822     my $num_ignored = 0;
823     my $num_items_deleted = 0;
824     # commit (i.e., save, all records in the batch)
825     SetImportBatchStatus('reverting');
826     my $overlay_action = GetImportBatchOverlayAction($batch_id);
827     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
828     my $dbh = C4::Context->dbh;
829     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
830                              FROM import_records
831                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
832                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
833                              WHERE import_batch_id = ?");
834     $sth->execute($batch_id);
835     my $marc_type;
836     my $marcflavour = C4::Context->preference('marcflavour');
837     while (my $rowref = $sth->fetchrow_hashref) {
838         $record_type = $rowref->{'record_type'};
839         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
840             $num_ignored++;
841             next;
842         }
843         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
844             $marc_type = 'UNIMARCAUTH';
845         } elsif ($marcflavour eq 'UNIMARC') {
846             $marc_type = 'UNIMARC';
847         } else {
848             $marc_type = 'USMARC';
849         }
850
851         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
852
853         if ($record_result eq 'delete') {
854             my $error = undef;
855             if  ($record_type eq 'biblio') {
856                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
857                 $error = DelBiblio($rowref->{'matched_biblionumber'});
858             } else {
859                 my $deletedauthid = DelAuthority($rowref->{'matched_authid'});
860             }
861             if (defined $error) {
862                 $num_errors++;
863             } else {
864                 $num_deleted++;
865                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
866             }
867         } elsif ($record_result eq 'restore') {
868             $num_reverted++;
869             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
870             if ($record_type eq 'biblio') {
871                 my $biblionumber = $rowref->{'matched_biblionumber'};
872                 my $oldbiblio = GetBiblio($biblionumber);
873
874                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
875                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
876
877                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
878                 ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
879             } else {
880                 my $authid = $rowref->{'matched_authid'};
881                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
882             }
883             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
884         } elsif ($record_result eq 'ignore') {
885             if ($record_type eq 'biblio') {
886                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
887             }
888             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
889         }
890         my $query;
891         if ($record_type eq 'biblio') {
892             # remove matched_biblionumber only if there is no 'imported' item left
893             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
894             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
895         } else {
896             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
897         }
898         my $sth2 = $dbh->prepare_cached($query);
899         $sth2->execute($rowref->{'import_record_id'});
900     }
901
902     $sth->finish();
903     SetImportBatchStatus($batch_id, 'reverted');
904     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
905 }
906
907 =head2 BatchRevertItems
908
909   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
910
911 =cut
912
913 sub BatchRevertItems {
914     my ($import_record_id, $biblionumber) = @_;
915
916     my $dbh = C4::Context->dbh;
917     my $num_items_deleted = 0;
918
919     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
920                                    FROM import_items
921                                    JOIN items USING (itemnumber)
922                                    WHERE import_record_id = ?");
923     $sth->bind_param(1, $import_record_id);
924     $sth->execute();
925     while (my $row = $sth->fetchrow_hashref()) {
926         my $error = DelItemCheck($dbh, $biblionumber, $row->{'itemnumber'});
927         if ($error == 1){
928             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
929             $updsth->bind_param(1, 'reverted');
930             $updsth->bind_param(2, $row->{'import_items_id'});
931             $updsth->execute();
932             $updsth->finish();
933             $num_items_deleted++;
934         }
935         else {
936             next;
937         }
938     }
939     $sth->finish();
940     return $num_items_deleted;
941 }
942
943 =head2 CleanBatch
944
945   CleanBatch($batch_id)
946
947 Deletes all staged records from the import batch
948 and sets the status of the batch to 'cleaned'.  Note
949 that deleting a stage record does *not* affect
950 any record that has been committed to the database.
951
952 =cut
953
954 sub CleanBatch {
955     my $batch_id = shift;
956     return unless defined $batch_id;
957
958     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
959     SetImportBatchStatus($batch_id, 'cleaned');
960 }
961
962 =head2 GetAllImportBatches
963
964   my $results = GetAllImportBatches();
965
966 Returns a references to an array of hash references corresponding
967 to all import_batches rows (of batch_type 'batch'), sorted in 
968 ascending order by import_batch_id.
969
970 =cut
971
972 sub  GetAllImportBatches {
973     my $dbh = C4::Context->dbh;
974     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
975                                     WHERE batch_type IN ('batch', 'webservice')
976                                     ORDER BY import_batch_id ASC");
977
978     my $results = [];
979     $sth->execute();
980     while (my $row = $sth->fetchrow_hashref) {
981         push @$results, $row;
982     }
983     $sth->finish();
984     return $results;
985 }
986
987 =head2 GetStagedWebserviceBatches
988
989   my $batch_ids = GetStagedWebserviceBatches();
990
991 Returns a references to an array of batch id's
992 of batch_type 'webservice' that are not imported
993
994 =cut
995
996 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
997 SELECT import_batch_id FROM import_batches
998 WHERE batch_type = 'webservice'
999 AND import_status = 'staged'
1000 EOQ
1001 sub  GetStagedWebserviceBatches {
1002     my $dbh = C4::Context->dbh;
1003     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1004 }
1005
1006 =head2 GetImportBatchRangeDesc
1007
1008   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1009
1010 Returns a reference to an array of hash references corresponding to
1011 import_batches rows (sorted in descending order by import_batch_id)
1012 start at the given offset.
1013
1014 =cut
1015
1016 sub GetImportBatchRangeDesc {
1017     my ($offset, $results_per_group) = @_;
1018
1019     my $dbh = C4::Context->dbh;
1020     my $query = "SELECT * FROM import_batches
1021                                     WHERE batch_type IN ('batch', 'webservice')
1022                                     ORDER BY import_batch_id DESC";
1023     my @params;
1024     if ($results_per_group){
1025         $query .= " LIMIT ?";
1026         push(@params, $results_per_group);
1027     }
1028     if ($offset){
1029         $query .= " OFFSET ?";
1030         push(@params, $offset);
1031     }
1032     my $sth = $dbh->prepare_cached($query);
1033     $sth->execute(@params);
1034     my $results = $sth->fetchall_arrayref({});
1035     $sth->finish();
1036     return $results;
1037 }
1038
1039 =head2 GetItemNumbersFromImportBatch
1040
1041   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1042
1043 =cut
1044
1045 sub GetItemNumbersFromImportBatch {
1046         my ($batch_id) = @_;
1047         my $dbh = C4::Context->dbh;
1048         my $sth = $dbh->prepare("SELECT itemnumber FROM import_batches,import_records,import_items WHERE import_batches.import_batch_id=import_records.import_batch_id AND import_records.import_record_id=import_items.import_record_id AND import_batches.import_batch_id=?");
1049         $sth->execute($batch_id);
1050         my @items ;
1051         while ( my ($itm) = $sth->fetchrow_array ) {
1052                 push @items, $itm;
1053         }
1054         return @items;
1055 }
1056
1057 =head2 GetNumberOfImportBatches 
1058
1059   my $count = GetNumberOfImportBatches();
1060
1061 =cut
1062
1063 sub GetNumberOfNonZ3950ImportBatches {
1064     my $dbh = C4::Context->dbh;
1065     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1066     $sth->execute();
1067     my ($count) = $sth->fetchrow_array();
1068     $sth->finish();
1069     return $count;
1070 }
1071
1072 =head2 GetImportBiblios
1073
1074   my $results = GetImportBiblios($importid);
1075
1076 =cut
1077
1078 sub GetImportBiblios {
1079     my ($import_record_id) = @_;
1080
1081     my $dbh = C4::Context->dbh;
1082     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1083     return $dbh->selectall_arrayref(
1084         $query,
1085         { Slice => {} },
1086         $import_record_id
1087     );
1088
1089 }
1090
1091 =head2 GetImportRecordsRange
1092
1093   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1094
1095 Returns a reference to an array of hash references corresponding to
1096 import_biblios/import_auths/import_records rows for a given batch
1097 starting at the given offset.
1098
1099 =cut
1100
1101 sub GetImportRecordsRange {
1102     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1103
1104     my $dbh = C4::Context->dbh;
1105
1106     my $order_by = $parameters->{order_by} || 'import_record_id';
1107     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1108
1109     my $order_by_direction =
1110       uc( $parameters->{order_by_direction} ) eq 'DESC' ? 'DESC' : 'ASC';
1111
1112     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1113
1114     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1115                                            record_sequence, status, overlay_status,
1116                                            matched_biblionumber, matched_authid, record_type
1117                                     FROM   import_records
1118                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1119                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1120                                     WHERE  import_batch_id = ?";
1121     my @params;
1122     push(@params, $batch_id);
1123     if ($status) {
1124         $query .= " AND status=?";
1125         push(@params,$status);
1126     }
1127
1128     $query.=" ORDER BY $order_by $order_by_direction";
1129
1130     if($results_per_group){
1131         $query .= " LIMIT ?";
1132         push(@params, $results_per_group);
1133     }
1134     if($offset){
1135         $query .= " OFFSET ?";
1136         push(@params, $offset);
1137     }
1138     my $sth = $dbh->prepare_cached($query);
1139     $sth->execute(@params);
1140     my $results = $sth->fetchall_arrayref({});
1141     $sth->finish();
1142     return $results;
1143
1144 }
1145
1146 =head2 GetBestRecordMatch
1147
1148   my $record_id = GetBestRecordMatch($import_record_id);
1149
1150 =cut
1151
1152 sub GetBestRecordMatch {
1153     my ($import_record_id) = @_;
1154
1155     my $dbh = C4::Context->dbh;
1156     my $sth = $dbh->prepare("SELECT candidate_match_id
1157                              FROM   import_record_matches
1158                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1159                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1160                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1161                              WHERE  import_record_matches.import_record_id = ? AND
1162                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1163                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1164                              ORDER BY score DESC, candidate_match_id DESC");
1165     $sth->execute($import_record_id);
1166     my ($record_id) = $sth->fetchrow_array();
1167     $sth->finish();
1168     return $record_id;
1169 }
1170
1171 =head2 GetImportBatchStatus
1172
1173   my $status = GetImportBatchStatus($batch_id);
1174
1175 =cut
1176
1177 sub GetImportBatchStatus {
1178     my ($batch_id) = @_;
1179
1180     my $dbh = C4::Context->dbh;
1181     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1182     $sth->execute($batch_id);
1183     my ($status) = $sth->fetchrow_array();
1184     $sth->finish();
1185     return $status;
1186
1187 }
1188
1189 =head2 SetImportBatchStatus
1190
1191   SetImportBatchStatus($batch_id, $new_status);
1192
1193 =cut
1194
1195 sub SetImportBatchStatus {
1196     my ($batch_id, $new_status) = @_;
1197
1198     my $dbh = C4::Context->dbh;
1199     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1200     $sth->execute($new_status, $batch_id);
1201     $sth->finish();
1202
1203 }
1204
1205 =head2 GetImportBatchOverlayAction
1206
1207   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1208
1209 =cut
1210
1211 sub GetImportBatchOverlayAction {
1212     my ($batch_id) = @_;
1213
1214     my $dbh = C4::Context->dbh;
1215     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1216     $sth->execute($batch_id);
1217     my ($overlay_action) = $sth->fetchrow_array();
1218     $sth->finish();
1219     return $overlay_action;
1220
1221 }
1222
1223
1224 =head2 SetImportBatchOverlayAction
1225
1226   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1227
1228 =cut
1229
1230 sub SetImportBatchOverlayAction {
1231     my ($batch_id, $new_overlay_action) = @_;
1232
1233     my $dbh = C4::Context->dbh;
1234     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1235     $sth->execute($new_overlay_action, $batch_id);
1236     $sth->finish();
1237
1238 }
1239
1240 =head2 GetImportBatchNoMatchAction
1241
1242   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1243
1244 =cut
1245
1246 sub GetImportBatchNoMatchAction {
1247     my ($batch_id) = @_;
1248
1249     my $dbh = C4::Context->dbh;
1250     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1251     $sth->execute($batch_id);
1252     my ($nomatch_action) = $sth->fetchrow_array();
1253     $sth->finish();
1254     return $nomatch_action;
1255
1256 }
1257
1258
1259 =head2 SetImportBatchNoMatchAction
1260
1261   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1262
1263 =cut
1264
1265 sub SetImportBatchNoMatchAction {
1266     my ($batch_id, $new_nomatch_action) = @_;
1267
1268     my $dbh = C4::Context->dbh;
1269     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1270     $sth->execute($new_nomatch_action, $batch_id);
1271     $sth->finish();
1272
1273 }
1274
1275 =head2 GetImportBatchItemAction
1276
1277   my $item_action = GetImportBatchItemAction($batch_id);
1278
1279 =cut
1280
1281 sub GetImportBatchItemAction {
1282     my ($batch_id) = @_;
1283
1284     my $dbh = C4::Context->dbh;
1285     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1286     $sth->execute($batch_id);
1287     my ($item_action) = $sth->fetchrow_array();
1288     $sth->finish();
1289     return $item_action;
1290
1291 }
1292
1293
1294 =head2 SetImportBatchItemAction
1295
1296   SetImportBatchItemAction($batch_id, $new_item_action);
1297
1298 =cut
1299
1300 sub SetImportBatchItemAction {
1301     my ($batch_id, $new_item_action) = @_;
1302
1303     my $dbh = C4::Context->dbh;
1304     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1305     $sth->execute($new_item_action, $batch_id);
1306     $sth->finish();
1307
1308 }
1309
1310 =head2 GetImportBatchMatcher
1311
1312   my $matcher_id = GetImportBatchMatcher($batch_id);
1313
1314 =cut
1315
1316 sub GetImportBatchMatcher {
1317     my ($batch_id) = @_;
1318
1319     my $dbh = C4::Context->dbh;
1320     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1321     $sth->execute($batch_id);
1322     my ($matcher_id) = $sth->fetchrow_array();
1323     $sth->finish();
1324     return $matcher_id;
1325
1326 }
1327
1328
1329 =head2 SetImportBatchMatcher
1330
1331   SetImportBatchMatcher($batch_id, $new_matcher_id);
1332
1333 =cut
1334
1335 sub SetImportBatchMatcher {
1336     my ($batch_id, $new_matcher_id) = @_;
1337
1338     my $dbh = C4::Context->dbh;
1339     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1340     $sth->execute($new_matcher_id, $batch_id);
1341     $sth->finish();
1342
1343 }
1344
1345 =head2 GetImportRecordOverlayStatus
1346
1347   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1348
1349 =cut
1350
1351 sub GetImportRecordOverlayStatus {
1352     my ($import_record_id) = @_;
1353
1354     my $dbh = C4::Context->dbh;
1355     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1356     $sth->execute($import_record_id);
1357     my ($overlay_status) = $sth->fetchrow_array();
1358     $sth->finish();
1359     return $overlay_status;
1360
1361 }
1362
1363
1364 =head2 SetImportRecordOverlayStatus
1365
1366   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1367
1368 =cut
1369
1370 sub SetImportRecordOverlayStatus {
1371     my ($import_record_id, $new_overlay_status) = @_;
1372
1373     my $dbh = C4::Context->dbh;
1374     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1375     $sth->execute($new_overlay_status, $import_record_id);
1376     $sth->finish();
1377
1378 }
1379
1380 =head2 GetImportRecordStatus
1381
1382   my $status = GetImportRecordStatus($import_record_id);
1383
1384 =cut
1385
1386 sub GetImportRecordStatus {
1387     my ($import_record_id) = @_;
1388
1389     my $dbh = C4::Context->dbh;
1390     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1391     $sth->execute($import_record_id);
1392     my ($status) = $sth->fetchrow_array();
1393     $sth->finish();
1394     return $status;
1395
1396 }
1397
1398
1399 =head2 SetImportRecordStatus
1400
1401   SetImportRecordStatus($import_record_id, $new_status);
1402
1403 =cut
1404
1405 sub SetImportRecordStatus {
1406     my ($import_record_id, $new_status) = @_;
1407
1408     my $dbh = C4::Context->dbh;
1409     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1410     $sth->execute($new_status, $import_record_id);
1411     $sth->finish();
1412
1413 }
1414
1415 =head2 GetImportRecordMatches
1416
1417   my $results = GetImportRecordMatches($import_record_id, $best_only);
1418
1419 =cut
1420
1421 sub GetImportRecordMatches {
1422     my $import_record_id = shift;
1423     my $best_only = @_ ? shift : 0;
1424
1425     my $dbh = C4::Context->dbh;
1426     # FIXME currently biblio only
1427     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1428                                     candidate_match_id, score, record_type
1429                                     FROM import_records
1430                                     JOIN import_record_matches USING (import_record_id)
1431                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1432                                     WHERE import_record_id = ?
1433                                     ORDER BY score DESC, biblionumber DESC");
1434     $sth->bind_param(1, $import_record_id);
1435     my $results = [];
1436     $sth->execute();
1437     while (my $row = $sth->fetchrow_hashref) {
1438         if ($row->{'record_type'} eq 'auth') {
1439             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1440         }
1441         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1442         push @$results, $row;
1443         last if $best_only;
1444     }
1445     $sth->finish();
1446
1447     return $results;
1448     
1449 }
1450
1451
1452 =head2 SetImportRecordMatches
1453
1454   SetImportRecordMatches($import_record_id, @matches);
1455
1456 =cut
1457
1458 sub SetImportRecordMatches {
1459     my $import_record_id = shift;
1460     my @matches = @_;
1461
1462     my $dbh = C4::Context->dbh;
1463     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1464     $delsth->execute($import_record_id);
1465     $delsth->finish();
1466
1467     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1468                                     VALUES (?, ?, ?)");
1469     foreach my $match (@matches) {
1470         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1471     }
1472 }
1473
1474
1475 # internal functions
1476
1477 sub _create_import_record {
1478     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1479
1480     my $dbh = C4::Context->dbh;
1481     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1482                                                          record_type, encoding, z3950random)
1483                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1484     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1485                   $record_type, $encoding, $z3950random);
1486     my $import_record_id = $dbh->{'mysql_insertid'};
1487     $sth->finish();
1488     return $import_record_id;
1489 }
1490
1491 sub _update_import_record_marc {
1492     my ($import_record_id, $marc_record, $marc_type) = @_;
1493
1494     my $dbh = C4::Context->dbh;
1495     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1496                              WHERE  import_record_id = ?");
1497     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1498     $sth->finish();
1499 }
1500
1501 sub _add_auth_fields {
1502     my ($import_record_id, $marc_record) = @_;
1503
1504     my $controlnumber;
1505     if ($marc_record->field('001')) {
1506         $controlnumber = $marc_record->field('001')->data();
1507     }
1508     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1509     my $dbh = C4::Context->dbh;
1510     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1511     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1512     $sth->finish();
1513 }
1514
1515 sub _add_biblio_fields {
1516     my ($import_record_id, $marc_record) = @_;
1517
1518     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1519     my $dbh = C4::Context->dbh;
1520     # FIXME no controlnumber, originalsource
1521     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1522     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1523     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1524     $sth->finish();
1525                 
1526 }
1527
1528 sub _update_biblio_fields {
1529     my ($import_record_id, $marc_record) = @_;
1530
1531     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1532     my $dbh = C4::Context->dbh;
1533     # FIXME no controlnumber, originalsource
1534     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1535     $isbn =~ s/\(.*$//;
1536     $isbn =~ tr/ -_//;
1537     $isbn = uc $isbn;
1538     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1539                              WHERE  import_record_id = ?");
1540     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1541     $sth->finish();
1542 }
1543
1544 sub _parse_biblio_fields {
1545     my ($marc_record) = @_;
1546
1547     my $dbh = C4::Context->dbh;
1548     my $bibliofields = TransformMarcToKoha($marc_record, '');
1549     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1550
1551 }
1552
1553 sub _update_batch_record_counts {
1554     my ($batch_id) = @_;
1555
1556     my $dbh = C4::Context->dbh;
1557     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1558                                         num_records = (
1559                                             SELECT COUNT(*)
1560                                             FROM import_records
1561                                             WHERE import_batch_id = import_batches.import_batch_id),
1562                                         num_items = (
1563                                             SELECT COUNT(*)
1564                                             FROM import_records
1565                                             JOIN import_items USING (import_record_id)
1566                                             WHERE import_batch_id = import_batches.import_batch_id
1567                                             AND record_type = 'biblio')
1568                                     WHERE import_batch_id = ?");
1569     $sth->bind_param(1, $batch_id);
1570     $sth->execute();
1571     $sth->finish();
1572 }
1573
1574 sub _get_commit_action {
1575     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1576     
1577     if ($record_type eq 'biblio') {
1578         my ($bib_result, $bib_match, $item_result);
1579
1580         if ($overlay_status ne 'no_match') {
1581             $bib_match = GetBestRecordMatch($import_record_id);
1582             if ($overlay_action eq 'replace') {
1583                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1584             } elsif ($overlay_action eq 'create_new') {
1585                 $bib_result  = 'create_new';
1586             } elsif ($overlay_action eq 'ignore') {
1587                 $bib_result  = 'ignore';
1588             }
1589          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1590                 $item_result = 'create_new';
1591        }
1592       elsif($item_action eq 'replace'){
1593           $item_result = 'replace';
1594           }
1595       else {
1596              $item_result = 'ignore';
1597            }
1598         } else {
1599             $bib_result = $nomatch_action;
1600             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1601         }
1602         return ($bib_result, $item_result, $bib_match);
1603     } else { # must be auths
1604         my ($auth_result, $auth_match);
1605
1606         if ($overlay_status ne 'no_match') {
1607             $auth_match = GetBestRecordMatch($import_record_id);
1608             if ($overlay_action eq 'replace') {
1609                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1610             } elsif ($overlay_action eq 'create_new') {
1611                 $auth_result  = 'create_new';
1612             } elsif ($overlay_action eq 'ignore') {
1613                 $auth_result  = 'ignore';
1614             }
1615         } else {
1616             $auth_result = $nomatch_action;
1617         }
1618
1619         return ($auth_result, undef, $auth_match);
1620
1621     }
1622 }
1623
1624 sub _get_revert_action {
1625     my ($overlay_action, $overlay_status, $status) = @_;
1626
1627     my $bib_result;
1628
1629     if ($status eq 'ignored') {
1630         $bib_result = 'ignore';
1631     } else {
1632         if ($overlay_action eq 'create_new') {
1633             $bib_result = 'delete';
1634         } else {
1635             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1636         }
1637     }
1638     return $bib_result;
1639 }
1640
1641 1;
1642 __END__
1643
1644 =head1 AUTHOR
1645
1646 Koha Development Team <http://koha-community.org/>
1647
1648 Galen Charlton <galen.charlton@liblime.com>
1649
1650 =cut