Update release notes for 3.22.20
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Plugins::Handler;
31 use Koha::Logger;
32
33 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
34
35 BEGIN {
36         # set the version for version checking
37     $VERSION = 3.07.00.049;
38         require Exporter;
39         @ISA    = qw(Exporter);
40         @EXPORT = qw(
41     GetZ3950BatchId
42     GetWebserviceBatchId
43     GetImportRecordMarc
44     GetImportRecordMarcXML
45     AddImportBatch
46     GetImportBatch
47     AddAuthToBatch
48     AddBiblioToBatch
49     AddItemsToImportBiblio
50     ModAuthorityInBatch
51     ModBiblioInBatch
52
53     BatchStageMarcRecords
54     BatchFindDuplicates
55     BatchCommitRecords
56     BatchRevertRecords
57     CleanBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     GetImportRecordMatches
82     SetImportRecordMatches
83         );
84 }
85
86 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
87
88 =head1 NAME
89
90 C4::ImportBatch - manage batches of imported MARC records
91
92 =head1 SYNOPSIS
93
94 use C4::ImportBatch;
95
96 =head1 FUNCTIONS
97
98 =head2 GetZ3950BatchId
99
100   my $batchid = GetZ3950BatchId($z3950server);
101
102 Retrieves the ID of the import batch for the Z39.50
103 reservoir for the given target.  If necessary,
104 creates the import batch.
105
106 =cut
107
108 sub GetZ3950BatchId {
109     my ($z3950server) = @_;
110
111     my $dbh = C4::Context->dbh;
112     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
113                              WHERE  batch_type = 'z3950'
114                              AND    file_name = ?");
115     $sth->execute($z3950server);
116     my $rowref = $sth->fetchrow_arrayref();
117     $sth->finish();
118     if (defined $rowref) {
119         return $rowref->[0];
120     } else {
121         my $batch_id = AddImportBatch( {
122                 overlay_action => 'create_new',
123                 import_status => 'staged',
124                 batch_type => 'z3950',
125                 file_name => $z3950server,
126             } );
127         return $batch_id;
128     }
129     
130 }
131
132 =head2 GetWebserviceBatchId
133
134   my $batchid = GetWebserviceBatchId();
135
136 Retrieves the ID of the import batch for webservice.
137 If necessary, creates the import batch.
138
139 =cut
140
141 my $WEBSERVICE_BASE_QRY = <<EOQ;
142 SELECT import_batch_id FROM import_batches
143 WHERE  batch_type = 'webservice'
144 AND    import_status = 'staged'
145 EOQ
146 sub GetWebserviceBatchId {
147     my ($params) = @_;
148
149     my $dbh = C4::Context->dbh;
150     my $sql = $WEBSERVICE_BASE_QRY;
151     my @args;
152     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
153         if (my $val = $params->{$field}) {
154             $sql .= " AND $field = ?";
155             push @args, $val;
156         }
157     }
158     my $id = $dbh->selectrow_array($sql, undef, @args);
159     return $id if $id;
160
161     $params->{batch_type} = 'webservice';
162     $params->{import_status} = 'staged';
163     return AddImportBatch($params);
164 }
165
166 =head2 GetImportRecordMarc
167
168   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
169
170 =cut
171
172 sub GetImportRecordMarc {
173     my ($import_record_id) = @_;
174
175     my $dbh = C4::Context->dbh;
176     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
177         SELECT marc, encoding
178         FROM import_records
179         WHERE import_record_id = ?
180     |, undef, $import_record_id );
181
182     return $marc, $encoding;
183 }
184
185 sub GetRecordFromImportBiblio {
186     my ( $import_record_id, $embed_items ) = @_;
187
188     my ($marc) = GetImportRecordMarc($import_record_id);
189     my $record = MARC::Record->new_from_usmarc($marc);
190
191     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
192
193     return $record;
194 }
195
196 sub EmbedItemsInImportBiblio {
197     my ( $record, $import_record_id ) = @_;
198     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
199     my $dbh = C4::Context->dbh;
200     my $import_items = $dbh->selectall_arrayref(q|
201         SELECT import_items.marcxml
202         FROM import_items
203         WHERE import_record_id = ?
204     |, { Slice => {} }, $import_record_id );
205     my @item_fields;
206     for my $import_item ( @$import_items ) {
207         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
208         push @item_fields, $item_marc->field($itemtag);
209     }
210     $record->append_fields(@item_fields);
211     return $record;
212 }
213
214 =head2 GetImportRecordMarcXML
215
216   my $marcxml = GetImportRecordMarcXML($import_record_id);
217
218 =cut
219
220 sub GetImportRecordMarcXML {
221     my ($import_record_id) = @_;
222
223     my $dbh = C4::Context->dbh;
224     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
225     $sth->execute($import_record_id);
226     my ($marcxml) = $sth->fetchrow();
227     $sth->finish();
228     return $marcxml;
229
230 }
231
232 =head2 AddImportBatch
233
234   my $batch_id = AddImportBatch($params_hash);
235
236 =cut
237
238 sub AddImportBatch {
239     my ($params) = @_;
240
241     my (@fields, @vals);
242     foreach (qw( matcher_id template_id branchcode
243                  overlay_action nomatch_action item_action
244                  import_status batch_type file_name comments record_type )) {
245         if (exists $params->{$_}) {
246             push @fields, $_;
247             push @vals, $params->{$_};
248         }
249     }
250     my $dbh = C4::Context->dbh;
251     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
252                                   VALUES (".join( ',', map '?', @fields).")",
253              undef,
254              @vals);
255     return $dbh->{'mysql_insertid'};
256 }
257
258 =head2 GetImportBatch 
259
260   my $row = GetImportBatch($batch_id);
261
262 Retrieve a hashref of an import_batches row.
263
264 =cut
265
266 sub GetImportBatch {
267     my ($batch_id) = @_;
268
269     my $dbh = C4::Context->dbh;
270     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
271     $sth->bind_param(1, $batch_id);
272     $sth->execute();
273     my $result = $sth->fetchrow_hashref;
274     $sth->finish();
275     return $result;
276
277 }
278
279 =head2 AddBiblioToBatch 
280
281   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
282                 $marc_record, $encoding, $z3950random, $update_counts);
283
284 =cut
285
286 sub AddBiblioToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $z3950random = shift;
292     my $update_counts = @_ ? shift : 1;
293
294     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
295     _add_biblio_fields($import_record_id, $marc_record);
296     _update_batch_record_counts($batch_id) if $update_counts;
297     return $import_record_id;
298 }
299
300 =head2 ModBiblioInBatch
301
302   ModBiblioInBatch($import_record_id, $marc_record);
303
304 =cut
305
306 sub ModBiblioInBatch {
307     my ($import_record_id, $marc_record) = @_;
308
309     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
310     _update_biblio_fields($import_record_id, $marc_record);
311
312 }
313
314 =head2 AddAuthToBatch
315
316   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
317                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
318
319 =cut
320
321 sub AddAuthToBatch {
322     my $batch_id = shift;
323     my $record_sequence = shift;
324     my $marc_record = shift;
325     my $encoding = shift;
326     my $z3950random = shift;
327     my $update_counts = @_ ? shift : 1;
328     my $marc_type = shift || C4::Context->preference('marcflavour');
329
330     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
331
332     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
333     _add_auth_fields($import_record_id, $marc_record);
334     _update_batch_record_counts($batch_id) if $update_counts;
335     return $import_record_id;
336 }
337
338 =head2 ModAuthInBatch
339
340   ModAuthInBatch($import_record_id, $marc_record);
341
342 =cut
343
344 sub ModAuthInBatch {
345     my ($import_record_id, $marc_record) = @_;
346
347     my $marcflavour = C4::Context->preference('marcflavour');
348     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
349
350 }
351
352 =head2 BatchStageMarcRecords
353
354 ( $batch_id, $num_records, $num_items, @invalid_records ) =
355   BatchStageMarcRecords(
356     $encoding,                   $marc_records,
357     $file_name,                  $to_marc_plugin,
358     $marc_modification_template, $comments,
359     $branch_code,                $parse_items,
360     $leave_as_staging,           $progress_interval,
361     $progress_callback
362   );
363
364 =cut
365
366 sub BatchStageMarcRecords {
367     my $record_type = shift;
368     my $encoding = shift;
369     my $marc_records = shift;
370     my $file_name = shift;
371     my $to_marc_plugin = shift;
372     my $marc_modification_template = shift;
373     my $comments = shift;
374     my $branch_code = shift;
375     my $parse_items = shift;
376     my $leave_as_staging = shift;
377
378     # optional callback to monitor status 
379     # of job
380     my $progress_interval = 0;
381     my $progress_callback = undef;
382     if ($#_ == 1) {
383         $progress_interval = shift;
384         $progress_callback = shift;
385         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
386         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
387     } 
388     
389     my $batch_id = AddImportBatch( {
390             overlay_action => 'create_new',
391             import_status => 'staging',
392             batch_type => 'batch',
393             file_name => $file_name,
394             comments => $comments,
395             record_type => $record_type,
396         } );
397     if ($parse_items) {
398         SetImportBatchItemAction($batch_id, 'always_add');
399     } else {
400         SetImportBatchItemAction($batch_id, 'ignore');
401     }
402
403     $marc_records = Koha::Plugins::Handler->run(
404         {
405             class  => $to_marc_plugin,
406             method => 'to_marc',
407             params => { data => $marc_records }
408         }
409     ) if $to_marc_plugin;
410
411     my $marc_type = C4::Context->preference('marcflavour');
412     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
413     my @invalid_records = ();
414     my $num_valid = 0;
415     my $num_items = 0;
416     # FIXME - for now, we're dealing only with bibs
417     my $rec_num = 0;
418     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
419         $marc_blob =~ s/^\s+//g;
420         $marc_blob =~ s/\s+$//g;
421         next unless $marc_blob;
422         $rec_num++;
423         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
424             &$progress_callback($rec_num);
425         }
426         my ($marc_record, $charset_guessed, $char_errors) =
427             MarcToUTF8Record($marc_blob, $marc_type, $encoding);
428
429         $encoding = $charset_guessed unless $encoding;
430
431         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
432
433         my $import_record_id;
434         if (scalar($marc_record->fields()) == 0) {
435             push @invalid_records, $marc_blob;
436         } else {
437
438             # Normalize the record so it doesn't have separated diacritics
439             SetUTF8Flag($marc_record);
440
441             $num_valid++;
442             if ($record_type eq 'biblio') {
443                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
444                 if ($parse_items) {
445                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
446                     $num_items += scalar(@import_items_ids);
447                 }
448             } elsif ($record_type eq 'auth') {
449                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
450             }
451         }
452     }
453     unless ($leave_as_staging) {
454         SetImportBatchStatus($batch_id, 'staged');
455     }
456     # FIXME branch_code, number of bibs, number of items
457     _update_batch_record_counts($batch_id);
458     return ($batch_id, $num_valid, $num_items, @invalid_records);
459 }
460
461 =head2 AddItemsToImportBiblio
462
463   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
464                 $import_record_id, $marc_record, $update_counts);
465
466 =cut
467
468 sub AddItemsToImportBiblio {
469     my $batch_id = shift;
470     my $import_record_id = shift;
471     my $marc_record = shift;
472     my $update_counts = @_ ? shift : 0;
473
474     my @import_items_ids = ();
475    
476     my $dbh = C4::Context->dbh; 
477     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
478     foreach my $item_field ($marc_record->field($item_tag)) {
479         my $item_marc = MARC::Record->new();
480         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
481         $item_marc->append_fields($item_field);
482         $marc_record->delete_field($item_field);
483         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
484                                         VALUES (?, ?, ?)");
485         $sth->bind_param(1, $import_record_id);
486         $sth->bind_param(2, 'staged');
487         $sth->bind_param(3, $item_marc->as_xml());
488         $sth->execute();
489         push @import_items_ids, $dbh->{'mysql_insertid'};
490         $sth->finish();
491     }
492
493     if ($#import_items_ids > -1) {
494         _update_batch_record_counts($batch_id) if $update_counts;
495         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
496     }
497     return @import_items_ids;
498 }
499
500 =head2 BatchFindDuplicates
501
502   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
503              $max_matches, $progress_interval, $progress_callback);
504
505 Goes through the records loaded in the batch and attempts to 
506 find duplicates for each one.  Sets the matching status 
507 of each record to "no_match" or "auto_match" as appropriate.
508
509 The $max_matches parameter is optional; if it is not supplied,
510 it defaults to 10.
511
512 The $progress_interval and $progress_callback parameters are 
513 optional; if both are supplied, the sub referred to by
514 $progress_callback will be invoked every $progress_interval
515 records using the number of records processed as the 
516 singular argument.
517
518 =cut
519
520 sub BatchFindDuplicates {
521     my $batch_id = shift;
522     my $matcher = shift;
523     my $max_matches = @_ ? shift : 10;
524
525     # optional callback to monitor status 
526     # of job
527     my $progress_interval = 0;
528     my $progress_callback = undef;
529     if ($#_ == 1) {
530         $progress_interval = shift;
531         $progress_callback = shift;
532         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
533         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
534     }
535
536     my $dbh = C4::Context->dbh;
537
538     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
539                              FROM import_records
540                              WHERE import_batch_id = ?");
541     $sth->execute($batch_id);
542     my $num_with_matches = 0;
543     my $rec_num = 0;
544     while (my $rowref = $sth->fetchrow_hashref) {
545         $rec_num++;
546         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
547             &$progress_callback($rec_num);
548         }
549         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
550         my @matches = ();
551         if (defined $matcher) {
552             @matches = $matcher->get_matches($marc_record, $max_matches);
553         }
554         if (scalar(@matches) > 0) {
555             $num_with_matches++;
556             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
557             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
558         } else {
559             SetImportRecordMatches($rowref->{'import_record_id'}, ());
560             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
561         }
562     }
563     $sth->finish();
564     return $num_with_matches;
565 }
566
567 =head2 BatchCommitRecords
568
569   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
570         BatchCommitRecords($batch_id, $framework,
571         $progress_interval, $progress_callback);
572
573 =cut
574
575 sub BatchCommitRecords {
576     my $batch_id = shift;
577     my $framework = shift;
578
579     # optional callback to monitor status 
580     # of job
581     my $progress_interval = 0;
582     my $progress_callback = undef;
583     if ($#_ == 1) {
584         $progress_interval = shift;
585         $progress_callback = shift;
586         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
587         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
588     }
589
590     my $record_type;
591     my $num_added = 0;
592     my $num_updated = 0;
593     my $num_items_added = 0;
594     my $num_items_replaced = 0;
595     my $num_items_errored = 0;
596     my $num_ignored = 0;
597     # commit (i.e., save, all records in the batch)
598     SetImportBatchStatus('importing');
599     my $overlay_action = GetImportBatchOverlayAction($batch_id);
600     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
601     my $item_action = GetImportBatchItemAction($batch_id);
602     my $item_tag;
603     my $item_subfield;
604     my $dbh = C4::Context->dbh;
605     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
606                              FROM import_records
607                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
608                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
609                              WHERE import_batch_id = ?");
610     $sth->execute($batch_id);
611     my $marcflavour = C4::Context->preference('marcflavour');
612     my $rec_num = 0;
613     while (my $rowref = $sth->fetchrow_hashref) {
614         $record_type = $rowref->{'record_type'};
615         $rec_num++;
616         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
617             &$progress_callback($rec_num);
618         }
619         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
620             $num_ignored++;
621             next;
622         }
623
624         my $marc_type;
625         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
626             $marc_type = 'UNIMARCAUTH';
627         } elsif ($marcflavour eq 'UNIMARC') {
628             $marc_type = 'UNIMARC';
629         } else {
630             $marc_type = 'USMARC';
631         }
632         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
633
634         if ($record_type eq 'biblio') {
635             # remove any item tags - rely on BatchCommitItems
636             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
637             foreach my $item_field ($marc_record->field($item_tag)) {
638                 $marc_record->delete_field($item_field);
639             }
640         }
641
642         my ($record_result, $item_result, $record_match) =
643             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
644                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
645
646         my $recordid;
647         my $query;
648         if ($record_result eq 'create_new') {
649             $num_added++;
650             if ($record_type eq 'biblio') {
651                 my $biblioitemnumber;
652                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
653                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
654                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
655                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
656                     $num_items_added += $bib_items_added;
657                     $num_items_replaced += $bib_items_replaced;
658                     $num_items_errored += $bib_items_errored;
659                 }
660             } else {
661                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
662                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
663             }
664             my $sth = $dbh->prepare_cached($query);
665             $sth->execute($recordid, $rowref->{'import_record_id'});
666             $sth->finish();
667             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
668         } elsif ($record_result eq 'replace') {
669             $num_updated++;
670             $recordid = $record_match;
671             my $oldxml;
672             if ($record_type eq 'biblio') {
673                 my $oldbiblio = GetBiblio($recordid);
674                 $oldxml = GetXmlBiblio($recordid);
675
676                 # remove item fields so that they don't get
677                 # added again if record is reverted
678                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
679                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
680                 foreach my $item_field ($old_marc->field($item_tag)) {
681                     $old_marc->delete_field($item_field);
682                 }
683                 $oldxml = $old_marc->as_xml($marc_type);
684
685                 ModBiblio($marc_record, $recordid, $oldbiblio->{'frameworkcode'});
686                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
687
688                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
689                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
690                     $num_items_added += $bib_items_added;
691                     $num_items_replaced += $bib_items_replaced;
692                     $num_items_errored += $bib_items_errored;
693                 }
694             } else {
695                 $oldxml = GetAuthorityXML($recordid);
696
697                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
698                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
699             }
700             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
701             $sth->execute($oldxml, $rowref->{'import_record_id'});
702             $sth->finish();
703             my $sth2 = $dbh->prepare_cached($query);
704             $sth2->execute($recordid, $rowref->{'import_record_id'});
705             $sth2->finish();
706             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
707             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
708         } elsif ($record_result eq 'ignore') {
709             $recordid = $record_match;
710             $num_ignored++;
711             $recordid = $record_match;
712             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
713                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
714                 $num_items_added += $bib_items_added;
715          $num_items_replaced += $bib_items_replaced;
716                 $num_items_errored += $bib_items_errored;
717                 # still need to record the matched biblionumber so that the
718                 # items can be reverted
719                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
720                 $sth2->execute($recordid, $rowref->{'import_record_id'});
721                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
722             }
723             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
724         }
725     }
726     $sth->finish();
727     SetImportBatchStatus($batch_id, 'imported');
728     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
729 }
730
731 =head2 BatchCommitItems
732
733   ($num_items_added, $num_items_errored) = 
734          BatchCommitItems($import_record_id, $biblionumber);
735
736 =cut
737
738 sub BatchCommitItems {
739     my ( $import_record_id, $biblionumber, $action ) = @_;
740
741     my $dbh = C4::Context->dbh;
742
743     my $num_items_added = 0;
744     my $num_items_errored = 0;
745     my $num_items_replaced = 0;
746
747     my $sth = $dbh->prepare( "
748         SELECT import_items_id, import_items.marcxml, encoding
749         FROM import_items
750         JOIN import_records USING (import_record_id)
751         WHERE import_record_id = ?
752         ORDER BY import_items_id
753     " );
754     $sth->bind_param( 1, $import_record_id );
755     $sth->execute();
756
757     while ( my $row = $sth->fetchrow_hashref() ) {
758         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
759
760         # Delete date_due subfield as to not accidentally delete item checkout due dates
761         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
762         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
763
764         my $item = TransformMarcToKoha( $dbh, $item_marc );
765
766         my $duplicate_barcode = exists( $item->{'barcode'} ) && GetItemnumberFromBarcode( $item->{'barcode'} );
767         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
768
769         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
770         if ( $action eq "replace" && $duplicate_itemnumber ) {
771             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
772             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
773             $updsth->bind_param( 1, 'imported' );
774             $updsth->bind_param( 2, $item->{itemnumber} );
775             $updsth->bind_param( 3, $row->{'import_items_id'} );
776             $updsth->execute();
777             $updsth->finish();
778             $num_items_replaced++;
779         } elsif ( $action eq "replace" && $duplicate_barcode ) {
780             my $itemnumber = GetItemnumberFromBarcode( $item->{'barcode'} );
781             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
782             $updsth->bind_param( 1, 'imported' );
783             $updsth->bind_param( 2, $item->{itemnumber} );
784             $updsth->bind_param( 3, $row->{'import_items_id'} );
785             $updsth->execute();
786             $updsth->finish();
787             $num_items_replaced++;
788         } elsif ($duplicate_barcode) {
789             $updsth->bind_param( 1, 'error' );
790             $updsth->bind_param( 2, 'duplicate item barcode' );
791             $updsth->bind_param( 3, $row->{'import_items_id'} );
792             $updsth->execute();
793             $num_items_errored++;
794         } else {
795             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
796             if( $itemnumber ) {
797                 $updsth->bind_param( 1, 'imported' );
798                 $updsth->bind_param( 2, $itemnumber );
799                 $updsth->bind_param( 3, $row->{'import_items_id'} );
800                 $updsth->execute();
801                 $updsth->finish();
802                 $num_items_added++;
803             }
804         }
805     }
806
807     return ( $num_items_added, $num_items_replaced, $num_items_errored );
808 }
809
810 =head2 BatchRevertRecords
811
812   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
813       $num_ignored) = BatchRevertRecords($batch_id);
814
815 =cut
816
817 sub BatchRevertRecords {
818     my $batch_id = shift;
819
820     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
821
822     my $record_type;
823     my $num_deleted = 0;
824     my $num_errors = 0;
825     my $num_reverted = 0;
826     my $num_ignored = 0;
827     my $num_items_deleted = 0;
828     # commit (i.e., save, all records in the batch)
829     SetImportBatchStatus('reverting');
830     my $overlay_action = GetImportBatchOverlayAction($batch_id);
831     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
832     my $dbh = C4::Context->dbh;
833     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
834                              FROM import_records
835                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
836                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
837                              WHERE import_batch_id = ?");
838     $sth->execute($batch_id);
839     my $marc_type;
840     my $marcflavour = C4::Context->preference('marcflavour');
841     while (my $rowref = $sth->fetchrow_hashref) {
842         $record_type = $rowref->{'record_type'};
843         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
844             $num_ignored++;
845             next;
846         }
847         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
848             $marc_type = 'UNIMARCAUTH';
849         } elsif ($marcflavour eq 'UNIMARC') {
850             $marc_type = 'UNIMARC';
851         } else {
852             $marc_type = 'USMARC';
853         }
854
855         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
856
857         if ($record_result eq 'delete') {
858             my $error = undef;
859             if  ($record_type eq 'biblio') {
860                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
861                 $error = DelBiblio($rowref->{'matched_biblionumber'});
862             } else {
863                 my $deletedauthid = DelAuthority($rowref->{'matched_authid'});
864             }
865             if (defined $error) {
866                 $num_errors++;
867             } else {
868                 $num_deleted++;
869                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
870             }
871         } elsif ($record_result eq 'restore') {
872             $num_reverted++;
873             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
874             if ($record_type eq 'biblio') {
875                 my $biblionumber = $rowref->{'matched_biblionumber'};
876                 my $oldbiblio = GetBiblio($biblionumber);
877
878                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
879                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
880
881                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
882                 ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
883             } else {
884                 my $authid = $rowref->{'matched_authid'};
885                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
886             }
887             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
888         } elsif ($record_result eq 'ignore') {
889             if ($record_type eq 'biblio') {
890                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
891             }
892             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
893         }
894         my $query;
895         if ($record_type eq 'biblio') {
896             # remove matched_biblionumber only if there is no 'imported' item left
897             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
898             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
899         } else {
900             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
901         }
902         my $sth2 = $dbh->prepare_cached($query);
903         $sth2->execute($rowref->{'import_record_id'});
904     }
905
906     $sth->finish();
907     SetImportBatchStatus($batch_id, 'reverted');
908     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
909 }
910
911 =head2 BatchRevertItems
912
913   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
914
915 =cut
916
917 sub BatchRevertItems {
918     my ($import_record_id, $biblionumber) = @_;
919
920     my $dbh = C4::Context->dbh;
921     my $num_items_deleted = 0;
922
923     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
924                                    FROM import_items
925                                    JOIN items USING (itemnumber)
926                                    WHERE import_record_id = ?");
927     $sth->bind_param(1, $import_record_id);
928     $sth->execute();
929     while (my $row = $sth->fetchrow_hashref()) {
930         my $error = DelItemCheck($dbh, $biblionumber, $row->{'itemnumber'});
931         if ($error == 1){
932             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
933             $updsth->bind_param(1, 'reverted');
934             $updsth->bind_param(2, $row->{'import_items_id'});
935             $updsth->execute();
936             $updsth->finish();
937             $num_items_deleted++;
938         }
939         else {
940             next;
941         }
942     }
943     $sth->finish();
944     return $num_items_deleted;
945 }
946
947 =head2 CleanBatch
948
949   CleanBatch($batch_id)
950
951 Deletes all staged records from the import batch
952 and sets the status of the batch to 'cleaned'.  Note
953 that deleting a stage record does *not* affect
954 any record that has been committed to the database.
955
956 =cut
957
958 sub CleanBatch {
959     my $batch_id = shift;
960     return unless defined $batch_id;
961
962     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
963     SetImportBatchStatus($batch_id, 'cleaned');
964 }
965
966 =head2 GetAllImportBatches
967
968   my $results = GetAllImportBatches();
969
970 Returns a references to an array of hash references corresponding
971 to all import_batches rows (of batch_type 'batch'), sorted in 
972 ascending order by import_batch_id.
973
974 =cut
975
976 sub  GetAllImportBatches {
977     my $dbh = C4::Context->dbh;
978     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
979                                     WHERE batch_type IN ('batch', 'webservice')
980                                     ORDER BY import_batch_id ASC");
981
982     my $results = [];
983     $sth->execute();
984     while (my $row = $sth->fetchrow_hashref) {
985         push @$results, $row;
986     }
987     $sth->finish();
988     return $results;
989 }
990
991 =head2 GetStagedWebserviceBatches
992
993   my $batch_ids = GetStagedWebserviceBatches();
994
995 Returns a references to an array of batch id's
996 of batch_type 'webservice' that are not imported
997
998 =cut
999
1000 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1001 SELECT import_batch_id FROM import_batches
1002 WHERE batch_type = 'webservice'
1003 AND import_status = 'staged'
1004 EOQ
1005 sub  GetStagedWebserviceBatches {
1006     my $dbh = C4::Context->dbh;
1007     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1008 }
1009
1010 =head2 GetImportBatchRangeDesc
1011
1012   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1013
1014 Returns a reference to an array of hash references corresponding to
1015 import_batches rows (sorted in descending order by import_batch_id)
1016 start at the given offset.
1017
1018 =cut
1019
1020 sub GetImportBatchRangeDesc {
1021     my ($offset, $results_per_group) = @_;
1022
1023     my $dbh = C4::Context->dbh;
1024     my $query = "SELECT * FROM import_batches
1025                                     WHERE batch_type IN ('batch', 'webservice')
1026                                     ORDER BY import_batch_id DESC";
1027     my @params;
1028     if ($results_per_group){
1029         $query .= " LIMIT ?";
1030         push(@params, $results_per_group);
1031     }
1032     if ($offset){
1033         $query .= " OFFSET ?";
1034         push(@params, $offset);
1035     }
1036     my $sth = $dbh->prepare_cached($query);
1037     $sth->execute(@params);
1038     my $results = $sth->fetchall_arrayref({});
1039     $sth->finish();
1040     return $results;
1041 }
1042
1043 =head2 GetItemNumbersFromImportBatch
1044
1045   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1046
1047 =cut
1048
1049 sub GetItemNumbersFromImportBatch {
1050         my ($batch_id) = @_;
1051         my $dbh = C4::Context->dbh;
1052         my $sth = $dbh->prepare("SELECT itemnumber FROM import_batches,import_records,import_items WHERE import_batches.import_batch_id=import_records.import_batch_id AND import_records.import_record_id=import_items.import_record_id AND import_batches.import_batch_id=?");
1053         $sth->execute($batch_id);
1054         my @items ;
1055         while ( my ($itm) = $sth->fetchrow_array ) {
1056                 push @items, $itm;
1057         }
1058         return @items;
1059 }
1060
1061 =head2 GetNumberOfImportBatches 
1062
1063   my $count = GetNumberOfImportBatches();
1064
1065 =cut
1066
1067 sub GetNumberOfNonZ3950ImportBatches {
1068     my $dbh = C4::Context->dbh;
1069     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1070     $sth->execute();
1071     my ($count) = $sth->fetchrow_array();
1072     $sth->finish();
1073     return $count;
1074 }
1075
1076 =head2 GetImportBiblios
1077
1078   my $results = GetImportBiblios($importid);
1079
1080 =cut
1081
1082 sub GetImportBiblios {
1083     my ($import_record_id) = @_;
1084
1085     my $dbh = C4::Context->dbh;
1086     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1087     return $dbh->selectall_arrayref(
1088         $query,
1089         { Slice => {} },
1090         $import_record_id
1091     );
1092
1093 }
1094
1095 =head2 GetImportRecordsRange
1096
1097   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1098
1099 Returns a reference to an array of hash references corresponding to
1100 import_biblios/import_auths/import_records rows for a given batch
1101 starting at the given offset.
1102
1103 =cut
1104
1105 sub GetImportRecordsRange {
1106     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1107
1108     my $dbh = C4::Context->dbh;
1109
1110     my $order_by = $parameters->{order_by} || 'import_record_id';
1111     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1112
1113     my $order_by_direction =
1114       uc( $parameters->{order_by_direction} ) eq 'DESC' ? 'DESC' : 'ASC';
1115
1116     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1117
1118     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1119                                            record_sequence, status, overlay_status,
1120                                            matched_biblionumber, matched_authid, record_type
1121                                     FROM   import_records
1122                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1123                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1124                                     WHERE  import_batch_id = ?";
1125     my @params;
1126     push(@params, $batch_id);
1127     if ($status) {
1128         $query .= " AND status=?";
1129         push(@params,$status);
1130     }
1131
1132     $query.=" ORDER BY $order_by $order_by_direction";
1133
1134     if($results_per_group){
1135         $query .= " LIMIT ?";
1136         push(@params, $results_per_group);
1137     }
1138     if($offset){
1139         $query .= " OFFSET ?";
1140         push(@params, $offset);
1141     }
1142     my $sth = $dbh->prepare_cached($query);
1143     $sth->execute(@params);
1144     my $results = $sth->fetchall_arrayref({});
1145     $sth->finish();
1146     return $results;
1147
1148 }
1149
1150 =head2 GetBestRecordMatch
1151
1152   my $record_id = GetBestRecordMatch($import_record_id);
1153
1154 =cut
1155
1156 sub GetBestRecordMatch {
1157     my ($import_record_id) = @_;
1158
1159     my $dbh = C4::Context->dbh;
1160     my $sth = $dbh->prepare("SELECT candidate_match_id
1161                              FROM   import_record_matches
1162                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1163                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1164                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1165                              WHERE  import_record_matches.import_record_id = ? AND
1166                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1167                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1168                              ORDER BY score DESC, candidate_match_id DESC");
1169     $sth->execute($import_record_id);
1170     my ($record_id) = $sth->fetchrow_array();
1171     $sth->finish();
1172     return $record_id;
1173 }
1174
1175 =head2 GetImportBatchStatus
1176
1177   my $status = GetImportBatchStatus($batch_id);
1178
1179 =cut
1180
1181 sub GetImportBatchStatus {
1182     my ($batch_id) = @_;
1183
1184     my $dbh = C4::Context->dbh;
1185     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1186     $sth->execute($batch_id);
1187     my ($status) = $sth->fetchrow_array();
1188     $sth->finish();
1189     return $status;
1190
1191 }
1192
1193 =head2 SetImportBatchStatus
1194
1195   SetImportBatchStatus($batch_id, $new_status);
1196
1197 =cut
1198
1199 sub SetImportBatchStatus {
1200     my ($batch_id, $new_status) = @_;
1201
1202     my $dbh = C4::Context->dbh;
1203     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1204     $sth->execute($new_status, $batch_id);
1205     $sth->finish();
1206
1207 }
1208
1209 =head2 GetImportBatchOverlayAction
1210
1211   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1212
1213 =cut
1214
1215 sub GetImportBatchOverlayAction {
1216     my ($batch_id) = @_;
1217
1218     my $dbh = C4::Context->dbh;
1219     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1220     $sth->execute($batch_id);
1221     my ($overlay_action) = $sth->fetchrow_array();
1222     $sth->finish();
1223     return $overlay_action;
1224
1225 }
1226
1227
1228 =head2 SetImportBatchOverlayAction
1229
1230   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1231
1232 =cut
1233
1234 sub SetImportBatchOverlayAction {
1235     my ($batch_id, $new_overlay_action) = @_;
1236
1237     my $dbh = C4::Context->dbh;
1238     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1239     $sth->execute($new_overlay_action, $batch_id);
1240     $sth->finish();
1241
1242 }
1243
1244 =head2 GetImportBatchNoMatchAction
1245
1246   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1247
1248 =cut
1249
1250 sub GetImportBatchNoMatchAction {
1251     my ($batch_id) = @_;
1252
1253     my $dbh = C4::Context->dbh;
1254     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1255     $sth->execute($batch_id);
1256     my ($nomatch_action) = $sth->fetchrow_array();
1257     $sth->finish();
1258     return $nomatch_action;
1259
1260 }
1261
1262
1263 =head2 SetImportBatchNoMatchAction
1264
1265   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1266
1267 =cut
1268
1269 sub SetImportBatchNoMatchAction {
1270     my ($batch_id, $new_nomatch_action) = @_;
1271
1272     my $dbh = C4::Context->dbh;
1273     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1274     $sth->execute($new_nomatch_action, $batch_id);
1275     $sth->finish();
1276
1277 }
1278
1279 =head2 GetImportBatchItemAction
1280
1281   my $item_action = GetImportBatchItemAction($batch_id);
1282
1283 =cut
1284
1285 sub GetImportBatchItemAction {
1286     my ($batch_id) = @_;
1287
1288     my $dbh = C4::Context->dbh;
1289     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1290     $sth->execute($batch_id);
1291     my ($item_action) = $sth->fetchrow_array();
1292     $sth->finish();
1293     return $item_action;
1294
1295 }
1296
1297
1298 =head2 SetImportBatchItemAction
1299
1300   SetImportBatchItemAction($batch_id, $new_item_action);
1301
1302 =cut
1303
1304 sub SetImportBatchItemAction {
1305     my ($batch_id, $new_item_action) = @_;
1306
1307     my $dbh = C4::Context->dbh;
1308     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1309     $sth->execute($new_item_action, $batch_id);
1310     $sth->finish();
1311
1312 }
1313
1314 =head2 GetImportBatchMatcher
1315
1316   my $matcher_id = GetImportBatchMatcher($batch_id);
1317
1318 =cut
1319
1320 sub GetImportBatchMatcher {
1321     my ($batch_id) = @_;
1322
1323     my $dbh = C4::Context->dbh;
1324     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1325     $sth->execute($batch_id);
1326     my ($matcher_id) = $sth->fetchrow_array();
1327     $sth->finish();
1328     return $matcher_id;
1329
1330 }
1331
1332
1333 =head2 SetImportBatchMatcher
1334
1335   SetImportBatchMatcher($batch_id, $new_matcher_id);
1336
1337 =cut
1338
1339 sub SetImportBatchMatcher {
1340     my ($batch_id, $new_matcher_id) = @_;
1341
1342     my $dbh = C4::Context->dbh;
1343     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1344     $sth->execute($new_matcher_id, $batch_id);
1345     $sth->finish();
1346
1347 }
1348
1349 =head2 GetImportRecordOverlayStatus
1350
1351   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1352
1353 =cut
1354
1355 sub GetImportRecordOverlayStatus {
1356     my ($import_record_id) = @_;
1357
1358     my $dbh = C4::Context->dbh;
1359     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1360     $sth->execute($import_record_id);
1361     my ($overlay_status) = $sth->fetchrow_array();
1362     $sth->finish();
1363     return $overlay_status;
1364
1365 }
1366
1367
1368 =head2 SetImportRecordOverlayStatus
1369
1370   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1371
1372 =cut
1373
1374 sub SetImportRecordOverlayStatus {
1375     my ($import_record_id, $new_overlay_status) = @_;
1376
1377     my $dbh = C4::Context->dbh;
1378     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1379     $sth->execute($new_overlay_status, $import_record_id);
1380     $sth->finish();
1381
1382 }
1383
1384 =head2 GetImportRecordStatus
1385
1386   my $status = GetImportRecordStatus($import_record_id);
1387
1388 =cut
1389
1390 sub GetImportRecordStatus {
1391     my ($import_record_id) = @_;
1392
1393     my $dbh = C4::Context->dbh;
1394     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1395     $sth->execute($import_record_id);
1396     my ($status) = $sth->fetchrow_array();
1397     $sth->finish();
1398     return $status;
1399
1400 }
1401
1402
1403 =head2 SetImportRecordStatus
1404
1405   SetImportRecordStatus($import_record_id, $new_status);
1406
1407 =cut
1408
1409 sub SetImportRecordStatus {
1410     my ($import_record_id, $new_status) = @_;
1411
1412     my $dbh = C4::Context->dbh;
1413     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1414     $sth->execute($new_status, $import_record_id);
1415     $sth->finish();
1416
1417 }
1418
1419 =head2 GetImportRecordMatches
1420
1421   my $results = GetImportRecordMatches($import_record_id, $best_only);
1422
1423 =cut
1424
1425 sub GetImportRecordMatches {
1426     my $import_record_id = shift;
1427     my $best_only = @_ ? shift : 0;
1428
1429     my $dbh = C4::Context->dbh;
1430     # FIXME currently biblio only
1431     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1432                                     candidate_match_id, score, record_type
1433                                     FROM import_records
1434                                     JOIN import_record_matches USING (import_record_id)
1435                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1436                                     WHERE import_record_id = ?
1437                                     ORDER BY score DESC, biblionumber DESC");
1438     $sth->bind_param(1, $import_record_id);
1439     my $results = [];
1440     $sth->execute();
1441     while (my $row = $sth->fetchrow_hashref) {
1442         if ($row->{'record_type'} eq 'auth') {
1443             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1444         }
1445         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1446         push @$results, $row;
1447         last if $best_only;
1448     }
1449     $sth->finish();
1450
1451     return $results;
1452     
1453 }
1454
1455
1456 =head2 SetImportRecordMatches
1457
1458   SetImportRecordMatches($import_record_id, @matches);
1459
1460 =cut
1461
1462 sub SetImportRecordMatches {
1463     my $import_record_id = shift;
1464     my @matches = @_;
1465
1466     my $dbh = C4::Context->dbh;
1467     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1468     $delsth->execute($import_record_id);
1469     $delsth->finish();
1470
1471     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1472                                     VALUES (?, ?, ?)");
1473     foreach my $match (@matches) {
1474         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1475     }
1476 }
1477
1478
1479 # internal functions
1480
1481 sub _create_import_record {
1482     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1483
1484     my $dbh = C4::Context->dbh;
1485     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1486                                                          record_type, encoding, z3950random)
1487                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1488     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1489                   $record_type, $encoding, $z3950random);
1490     my $import_record_id = $dbh->{'mysql_insertid'};
1491     $sth->finish();
1492     return $import_record_id;
1493 }
1494
1495 sub _update_import_record_marc {
1496     my ($import_record_id, $marc_record, $marc_type) = @_;
1497
1498     my $dbh = C4::Context->dbh;
1499     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1500                              WHERE  import_record_id = ?");
1501     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1502     $sth->finish();
1503 }
1504
1505 sub _add_auth_fields {
1506     my ($import_record_id, $marc_record) = @_;
1507
1508     my $controlnumber;
1509     if ($marc_record->field('001')) {
1510         $controlnumber = $marc_record->field('001')->data();
1511     }
1512     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1513     my $dbh = C4::Context->dbh;
1514     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1515     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1516     $sth->finish();
1517 }
1518
1519 sub _add_biblio_fields {
1520     my ($import_record_id, $marc_record) = @_;
1521
1522     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1523     my $dbh = C4::Context->dbh;
1524     # FIXME no controlnumber, originalsource
1525     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1526     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1527     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1528     $sth->finish();
1529                 
1530 }
1531
1532 sub _update_biblio_fields {
1533     my ($import_record_id, $marc_record) = @_;
1534
1535     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1536     my $dbh = C4::Context->dbh;
1537     # FIXME no controlnumber, originalsource
1538     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1539     $isbn =~ s/\(.*$//;
1540     $isbn =~ tr/ -_//;
1541     $isbn = uc $isbn;
1542     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1543                              WHERE  import_record_id = ?");
1544     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1545     $sth->finish();
1546 }
1547
1548 sub _parse_biblio_fields {
1549     my ($marc_record) = @_;
1550
1551     my $dbh = C4::Context->dbh;
1552     my $bibliofields = TransformMarcToKoha($dbh, $marc_record, '');
1553     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1554
1555 }
1556
1557 sub _update_batch_record_counts {
1558     my ($batch_id) = @_;
1559
1560     my $dbh = C4::Context->dbh;
1561     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1562                                         num_records = (
1563                                             SELECT COUNT(*)
1564                                             FROM import_records
1565                                             WHERE import_batch_id = import_batches.import_batch_id),
1566                                         num_items = (
1567                                             SELECT COUNT(*)
1568                                             FROM import_records
1569                                             JOIN import_items USING (import_record_id)
1570                                             WHERE import_batch_id = import_batches.import_batch_id
1571                                             AND record_type = 'biblio')
1572                                     WHERE import_batch_id = ?");
1573     $sth->bind_param(1, $batch_id);
1574     $sth->execute();
1575     $sth->finish();
1576 }
1577
1578 sub _get_commit_action {
1579     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1580     
1581     if ($record_type eq 'biblio') {
1582         my ($bib_result, $bib_match, $item_result);
1583
1584         if ($overlay_status ne 'no_match') {
1585             $bib_match = GetBestRecordMatch($import_record_id);
1586             if ($overlay_action eq 'replace') {
1587                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1588             } elsif ($overlay_action eq 'create_new') {
1589                 $bib_result  = 'create_new';
1590             } elsif ($overlay_action eq 'ignore') {
1591                 $bib_result  = 'ignore';
1592             }
1593          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1594                 $item_result = 'create_new';
1595        }
1596       elsif($item_action eq 'replace'){
1597           $item_result = 'replace';
1598           }
1599       else {
1600              $item_result = 'ignore';
1601            }
1602         } else {
1603             $bib_result = $nomatch_action;
1604             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1605         }
1606         return ($bib_result, $item_result, $bib_match);
1607     } else { # must be auths
1608         my ($auth_result, $auth_match);
1609
1610         if ($overlay_status ne 'no_match') {
1611             $auth_match = GetBestRecordMatch($import_record_id);
1612             if ($overlay_action eq 'replace') {
1613                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1614             } elsif ($overlay_action eq 'create_new') {
1615                 $auth_result  = 'create_new';
1616             } elsif ($overlay_action eq 'ignore') {
1617                 $auth_result  = 'ignore';
1618             }
1619         } else {
1620             $auth_result = $nomatch_action;
1621         }
1622
1623         return ($auth_result, undef, $auth_match);
1624
1625     }
1626 }
1627
1628 sub _get_revert_action {
1629     my ($overlay_action, $overlay_status, $status) = @_;
1630
1631     my $bib_result;
1632
1633     if ($status eq 'ignored') {
1634         $bib_result = 'ignore';
1635     } else {
1636         if ($overlay_action eq 'create_new') {
1637             $bib_result = 'delete';
1638         } else {
1639             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1640         }
1641     }
1642     return $bib_result;
1643 }
1644
1645 1;
1646 __END__
1647
1648 =head1 AUTHOR
1649
1650 Koha Development Team <http://koha-community.org/>
1651
1652 Galen Charlton <galen.charlton@liblime.com>
1653
1654 =cut