Bug 7272 setting NULL to debarred field, to avoid having 0000-00-00
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28
29 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
30
31 BEGIN {
32         # set the version for version checking
33         $VERSION = 3.01;
34         require Exporter;
35         @ISA    = qw(Exporter);
36         @EXPORT = qw(
37     GetZ3950BatchId
38     GetImportRecordMarc
39     AddImportBatch
40     GetImportBatch
41     AddBiblioToBatch
42     ModBiblioInBatch
43
44     BatchStageMarcRecords
45     BatchFindBibDuplicates
46     BatchCommitBibRecords
47     BatchRevertBibRecords
48     CleanBatch
49
50     GetAllImportBatches
51     GetImportBatchRangeDesc
52     GetNumberOfNonZ3950ImportBatches
53     GetImportBibliosRange
54         GetItemNumbersFromImportBatch
55     
56     GetImportBatchStatus
57     SetImportBatchStatus
58     GetImportBatchOverlayAction
59     SetImportBatchOverlayAction
60     GetImportBatchNoMatchAction
61     SetImportBatchNoMatchAction
62     GetImportBatchItemAction
63     SetImportBatchItemAction
64     GetImportBatchMatcher
65     SetImportBatchMatcher
66     GetImportRecordOverlayStatus
67     SetImportRecordOverlayStatus
68     GetImportRecordStatus
69     SetImportRecordStatus
70     GetImportRecordMatches
71     SetImportRecordMatches
72         );
73 }
74
75 =head1 NAME
76
77 C4::ImportBatch - manage batches of imported MARC records
78
79 =head1 SYNOPSIS
80
81 use C4::ImportBatch;
82
83 =head1 FUNCTIONS
84
85 =head2 GetZ3950BatchId
86
87   my $batchid = GetZ3950BatchId($z3950server);
88
89 Retrieves the ID of the import batch for the Z39.50
90 reservoir for the given target.  If necessary,
91 creates the import batch.
92
93 =cut
94
95 sub GetZ3950BatchId {
96     my ($z3950server) = @_;
97
98     my $dbh = C4::Context->dbh;
99     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
100                              WHERE  batch_type = 'z3950'
101                              AND    file_name = ?");
102     $sth->execute($z3950server);
103     my $rowref = $sth->fetchrow_arrayref();
104     $sth->finish();
105     if (defined $rowref) {
106         return $rowref->[0];
107     } else {
108         my $batch_id = AddImportBatch('create_new', 'staged', 'z3950', $z3950server, '');
109         return $batch_id;
110     }
111     
112 }
113
114 =head2 GetImportRecordMarc
115
116   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
117
118 =cut
119
120 sub GetImportRecordMarc {
121     my ($import_record_id) = @_;
122
123     my $dbh = C4::Context->dbh;
124     my $sth = $dbh->prepare("SELECT marc, encoding FROM import_records WHERE import_record_id = ?");
125     $sth->execute($import_record_id);
126     my ($marc, $encoding) = $sth->fetchrow();
127     $sth->finish();
128     return $marc, $encoding;
129
130 }
131
132 =head2 AddImportBatch
133
134   my $batch_id = AddImportBatch($overlay_action, $import_status, $type, 
135                                 $file_name, $comments);
136
137 =cut
138
139 sub AddImportBatch {
140     my ($overlay_action, $import_status, $type, $file_name, $comments) = @_;
141
142     my $dbh = C4::Context->dbh;
143     my $sth = $dbh->prepare("INSERT INTO import_batches (overlay_action, import_status, batch_type,
144                                                          file_name, comments)
145                                     VALUES (?, ?, ?, ?, ?)");
146     $sth->execute($overlay_action, $import_status, $type, $file_name, $comments);
147     my $batch_id = $dbh->{'mysql_insertid'};
148     $sth->finish();
149
150     return $batch_id;
151
152 }
153
154 =head2 GetImportBatch 
155
156   my $row = GetImportBatch($batch_id);
157
158 Retrieve a hashref of an import_batches row.
159
160 =cut
161
162 sub GetImportBatch {
163     my ($batch_id) = @_;
164
165     my $dbh = C4::Context->dbh;
166     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
167     $sth->bind_param(1, $batch_id);
168     $sth->execute();
169     my $result = $sth->fetchrow_hashref;
170     $sth->finish();
171     return $result;
172
173 }
174
175 =head2 AddBiblioToBatch 
176
177   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
178                 $marc_record, $encoding, $z3950random, $update_counts);
179
180 =cut
181
182 sub AddBiblioToBatch {
183     my $batch_id = shift;
184     my $record_sequence = shift;
185     my $marc_record = shift;
186     my $encoding = shift;
187     my $z3950random = shift;
188     my $update_counts = @_ ? shift : 1;
189
190     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random);
191     _add_biblio_fields($import_record_id, $marc_record);
192     _update_batch_record_counts($batch_id) if $update_counts;
193     return $import_record_id;
194 }
195
196 =head2 ModBiblioInBatch
197
198   ModBiblioInBatch($import_record_id, $marc_record);
199
200 =cut
201
202 sub ModBiblioInBatch {
203     my ($import_record_id, $marc_record) = @_;
204
205     _update_import_record_marc($import_record_id, $marc_record);
206     _update_biblio_fields($import_record_id, $marc_record);
207
208 }
209
210 =head2 BatchStageMarcRecords
211
212   ($batch_id, $num_records, $num_items, @invalid_records) = 
213     BatchStageMarcRecords($encoding, $marc_records, $file_name, 
214                           $comments, $branch_code, $parse_items,
215                           $leave_as_staging, 
216                           $progress_interval, $progress_callback);
217
218 =cut
219
220 sub  BatchStageMarcRecords {
221     my $encoding = shift;
222     my $marc_records = shift;
223     my $file_name = shift;
224     my $comments = shift;
225     my $branch_code = shift;
226     my $parse_items = shift;
227     my $leave_as_staging = shift;
228    
229     # optional callback to monitor status 
230     # of job
231     my $progress_interval = 0;
232     my $progress_callback = undef;
233     if ($#_ == 1) {
234         $progress_interval = shift;
235         $progress_callback = shift;
236         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
237         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
238     } 
239     
240     my $batch_id = AddImportBatch('create_new', 'staging', 'batch', $file_name, $comments);
241     if ($parse_items) {
242         SetImportBatchItemAction($batch_id, 'always_add');
243     } else {
244         SetImportBatchItemAction($batch_id, 'ignore');
245     }
246
247     my @invalid_records = ();
248     my $num_valid = 0;
249     my $num_items = 0;
250     # FIXME - for now, we're dealing only with bibs
251     my $rec_num = 0;
252     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
253         $marc_blob =~ s/^\s+//g;
254         $marc_blob =~ s/\s+$//g;
255         next unless $marc_blob;
256         $rec_num++;
257         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
258             &$progress_callback($rec_num);
259         }
260         my ($marc_record, $charset_guessed, $char_errors) =
261             MarcToUTF8Record($marc_blob, C4::Context->preference("marcflavour"), $encoding);
262
263         $encoding = $charset_guessed unless $encoding;
264
265         my $import_record_id;
266         if (scalar($marc_record->fields()) == 0) {
267             push @invalid_records, $marc_blob;
268         } else {
269             $num_valid++;
270             $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
271             if ($parse_items) {
272                 my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
273                 $num_items += scalar(@import_items_ids);
274             }
275         }
276     }
277     unless ($leave_as_staging) {
278         SetImportBatchStatus($batch_id, 'staged');
279     }
280     # FIXME branch_code, number of bibs, number of items
281     _update_batch_record_counts($batch_id);
282     return ($batch_id, $num_valid, $num_items, @invalid_records);
283 }
284
285 =head2 AddItemsToImportBiblio
286
287   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
288                 $import_record_id, $marc_record, $update_counts);
289
290 =cut
291
292 sub AddItemsToImportBiblio {
293     my $batch_id = shift;
294     my $import_record_id = shift;
295     my $marc_record = shift;
296     my $update_counts = @_ ? shift : 0;
297
298     my @import_items_ids = ();
299    
300     my $dbh = C4::Context->dbh; 
301     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
302     foreach my $item_field ($marc_record->field($item_tag)) {
303         my $item_marc = MARC::Record->new();
304         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
305         $item_marc->append_fields($item_field);
306         $marc_record->delete_field($item_field);
307         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
308                                         VALUES (?, ?, ?)");
309         $sth->bind_param(1, $import_record_id);
310         $sth->bind_param(2, 'staged');
311         $sth->bind_param(3, $item_marc->as_xml());
312         $sth->execute();
313         push @import_items_ids, $dbh->{'mysql_insertid'};
314         $sth->finish();
315     }
316
317     if ($#import_items_ids > -1) {
318         _update_batch_record_counts($batch_id) if $update_counts;
319         _update_import_record_marc($import_record_id, $marc_record);
320     }
321     return @import_items_ids;
322 }
323
324 =head2 BatchFindBibDuplicates
325
326   my $num_with_matches = BatchFindBibDuplicates($batch_id, $matcher, 
327              $max_matches, $progress_interval, $progress_callback);
328
329 Goes through the records loaded in the batch and attempts to 
330 find duplicates for each one.  Sets the matching status 
331 of each record to "no_match" or "auto_match" as appropriate.
332
333 The $max_matches parameter is optional; if it is not supplied,
334 it defaults to 10.
335
336 The $progress_interval and $progress_callback parameters are 
337 optional; if both are supplied, the sub referred to by
338 $progress_callback will be invoked every $progress_interval
339 records using the number of records processed as the 
340 singular argument.
341
342 =cut
343
344 sub BatchFindBibDuplicates {
345     my $batch_id = shift;
346     my $matcher = shift;
347     my $max_matches = @_ ? shift : 10;
348
349     # optional callback to monitor status 
350     # of job
351     my $progress_interval = 0;
352     my $progress_callback = undef;
353     if ($#_ == 1) {
354         $progress_interval = shift;
355         $progress_callback = shift;
356         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
357         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
358     }
359
360     my $dbh = C4::Context->dbh;
361
362     my $sth = $dbh->prepare("SELECT import_record_id, marc
363                              FROM import_records
364                              JOIN import_biblios USING (import_record_id)
365                              WHERE import_batch_id = ?");
366     $sth->execute($batch_id);
367     my $num_with_matches = 0;
368     my $rec_num = 0;
369     while (my $rowref = $sth->fetchrow_hashref) {
370         $rec_num++;
371         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
372             &$progress_callback($rec_num);
373         }
374         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
375         my @matches = ();
376         if (defined $matcher) {
377             @matches = $matcher->get_matches($marc_record, $max_matches);
378         }
379         if (scalar(@matches) > 0) {
380             $num_with_matches++;
381             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
382             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
383         } else {
384             SetImportRecordMatches($rowref->{'import_record_id'}, ());
385             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
386         }
387     }
388     $sth->finish();
389     return $num_with_matches;
390 }
391
392 =head2 BatchCommitBibRecords
393
394   my ($num_added, $num_updated, $num_items_added, $num_items_errored, 
395       $num_ignored) = BatchCommitBibRecords($batch_id, $framework,
396                       $progress_interval, $progress_callback);
397
398 =cut
399
400 sub BatchCommitBibRecords {
401     my $batch_id = shift;
402     my $framework = shift;
403
404     # optional callback to monitor status 
405     # of job
406     my $progress_interval = 0;
407     my $progress_callback = undef;
408     if ($#_ == 1) {
409         $progress_interval = shift;
410         $progress_callback = shift;
411         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
412         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
413     }
414
415     my $num_added = 0;
416     my $num_updated = 0;
417     my $num_items_added = 0;
418     my $num_items_errored = 0;
419     my $num_ignored = 0;
420     # commit (i.e., save, all records in the batch)
421     # FIXME biblio only at the moment
422     SetImportBatchStatus('importing');
423     my $overlay_action = GetImportBatchOverlayAction($batch_id);
424     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
425     my $item_action = GetImportBatchItemAction($batch_id);
426     my $dbh = C4::Context->dbh;
427     my $sth = $dbh->prepare("SELECT import_record_id, status, overlay_status, marc, encoding
428                              FROM import_records
429                              JOIN import_biblios USING (import_record_id)
430                              WHERE import_batch_id = ?");
431     $sth->execute($batch_id);
432     my $rec_num = 0;
433     while (my $rowref = $sth->fetchrow_hashref) {
434         $rec_num++;
435         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
436             &$progress_callback($rec_num);
437         }
438         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
439             $num_ignored++;
440             next;
441         }
442
443         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
444
445         # remove any item tags - rely on BatchCommitItems
446         my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
447         foreach my $item_field ($marc_record->field($item_tag)) {
448             $marc_record->delete_field($item_field);
449         }
450
451         # decide what what to do with the bib and item records
452         my ($bib_result, $item_result, $bib_match) = 
453             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
454                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'});
455
456         if ($bib_result eq 'create_new') {
457             $num_added++;
458             my ($biblionumber, $biblioitemnumber) = AddBiblio($marc_record, $framework);
459             my $sth = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
460             $sth->execute($biblionumber, $rowref->{'import_record_id'});
461             $sth->finish();
462             if ($item_result eq 'create_new') {
463                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
464                 $num_items_added += $bib_items_added;
465                 $num_items_errored += $bib_items_errored;
466             }
467             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
468         } elsif ($bib_result eq 'replace') {
469             $num_updated++;
470             my $biblionumber = $bib_match;
471             my ($count, $oldbiblio) = GetBiblio($biblionumber);
472             my $oldxml = GetXmlBiblio($biblionumber);
473
474             # remove item fields so that they don't get
475             # added again if record is reverted
476             my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'});
477             foreach my $item_field ($old_marc->field($item_tag)) {
478                 $old_marc->delete_field($item_field);
479             }
480
481             ModBiblio($marc_record, $biblionumber, $oldbiblio->{'frameworkcode'});
482             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
483             $sth->execute($old_marc->as_xml(), $rowref->{'import_record_id'});
484             $sth->finish();
485             my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
486             $sth2->execute($biblionumber, $rowref->{'import_record_id'});
487             $sth2->finish();
488             if ($item_result eq 'create_new') {
489                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
490                 $num_items_added += $bib_items_added;
491                 $num_items_errored += $bib_items_errored;
492             }
493             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
494             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
495         } elsif ($bib_result eq 'ignore') {
496             $num_ignored++;
497             my $biblionumber = $bib_match;
498             if (defined $biblionumber and $item_result eq 'create_new') {
499                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
500                 $num_items_added += $bib_items_added;
501                 $num_items_errored += $bib_items_errored;
502                 # still need to record the matched biblionumber so that the
503                 # items can be reverted
504                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
505                 $sth2->execute($biblionumber, $rowref->{'import_record_id'});
506                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
507             }
508             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
509         }
510     }
511     $sth->finish();
512     SetImportBatchStatus($batch_id, 'imported');
513     return ($num_added, $num_updated, $num_items_added, $num_items_errored, $num_ignored);
514 }
515
516 =head2 BatchCommitItems
517
518   ($num_items_added, $num_items_errored) = 
519          BatchCommitItems($import_record_id, $biblionumber);
520
521 =cut
522
523 sub BatchCommitItems {
524     my ($import_record_id, $biblionumber) = @_;
525
526     my $dbh = C4::Context->dbh;
527
528     my $num_items_added = 0;
529     my $num_items_errored = 0;
530     my $sth = $dbh->prepare("SELECT import_items_id, import_items.marcxml, encoding
531                              FROM import_items
532                              JOIN import_records USING (import_record_id)
533                              WHERE import_record_id = ?
534                              ORDER BY import_items_id");
535     $sth->bind_param(1, $import_record_id);
536     $sth->execute();
537     while (my $row = $sth->fetchrow_hashref()) {
538         my $item_marc = MARC::Record->new_from_xml(StripNonXmlChars($row->{'marcxml'}), 'UTF-8', $row->{'encoding'});
539         # FIXME - duplicate barcode check needs to become part of AddItemFromMarc()
540         my $item = TransformMarcToKoha($dbh, $item_marc);
541         my $duplicate_barcode = exists($item->{'barcode'}) && GetItemnumberFromBarcode($item->{'barcode'});
542         if ($duplicate_barcode) {
543             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, import_error = ? WHERE import_items_id = ?");
544             $updsth->bind_param(1, 'error');
545             $updsth->bind_param(2, 'duplicate item barcode');
546             $updsth->bind_param(3, $row->{'import_items_id'});
547             $updsth->execute();
548             $num_items_errored++;
549         } else {
550             my ($item_biblionumber, $biblioitemnumber, $itemnumber) = AddItemFromMarc($item_marc, $biblionumber);
551             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
552             $updsth->bind_param(1, 'imported');
553             $updsth->bind_param(2, $itemnumber);
554             $updsth->bind_param(3, $row->{'import_items_id'});
555             $updsth->execute();
556             $updsth->finish();
557             $num_items_added++;
558         }
559     }
560     $sth->finish();
561     return ($num_items_added, $num_items_errored);
562 }
563
564 =head2 BatchRevertBibRecords
565
566   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
567       $num_ignored) = BatchRevertBibRecords($batch_id);
568
569 =cut
570
571 sub BatchRevertBibRecords {
572     my $batch_id = shift;
573
574     my $num_deleted = 0;
575     my $num_errors = 0;
576     my $num_reverted = 0;
577     my $num_items_deleted = 0;
578     my $num_ignored = 0;
579     # commit (i.e., save, all records in the batch)
580     # FIXME biblio only at the moment
581     SetImportBatchStatus('reverting');
582     my $overlay_action = GetImportBatchOverlayAction($batch_id);
583     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
584     my $dbh = C4::Context->dbh;
585     my $sth = $dbh->prepare("SELECT import_record_id, status, overlay_status, marcxml_old, encoding, matched_biblionumber
586                              FROM import_records
587                              JOIN import_biblios USING (import_record_id)
588                              WHERE import_batch_id = ?");
589     $sth->execute($batch_id);
590     while (my $rowref = $sth->fetchrow_hashref) {
591         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
592             $num_ignored++;
593             next;
594         }
595
596         my $bib_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
597
598         if ($bib_result eq 'delete') {
599             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
600             my $error = DelBiblio($rowref->{'matched_biblionumber'});
601             if (defined $error) {
602                 $num_errors++;
603             } else {
604                 $num_deleted++;
605                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
606             }
607         } elsif ($bib_result eq 'restore') {
608             $num_reverted++;
609             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'});
610             my $biblionumber = $rowref->{'matched_biblionumber'};
611             my ($count, $oldbiblio) = GetBiblio($biblionumber);
612             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
613             ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
614             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
615         } elsif ($bib_result eq 'ignore') {
616             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
617             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
618         }
619         my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?");
620         $sth2->execute($rowref->{'import_record_id'});
621     }
622
623     $sth->finish();
624     SetImportBatchStatus($batch_id, 'reverted');
625     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
626 }
627
628 =head2 BatchRevertItems
629
630   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
631
632 =cut
633
634 sub BatchRevertItems {
635     my ($import_record_id, $biblionumber) = @_;
636
637     my $dbh = C4::Context->dbh;
638     my $num_items_deleted = 0;
639
640     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
641                                    FROM import_items
642                                    JOIN items USING (itemnumber)
643                                    WHERE import_record_id = ?");
644     $sth->bind_param(1, $import_record_id);
645     $sth->execute();
646     while (my $row = $sth->fetchrow_hashref()) {
647         DelItem($dbh, $biblionumber, $row->{'itemnumber'});
648         my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
649         $updsth->bind_param(1, 'reverted');
650         $updsth->bind_param(2, $row->{'import_items_id'});
651         $updsth->execute();
652         $updsth->finish();
653         $num_items_deleted++;
654     }
655     $sth->finish();
656     return $num_items_deleted;
657 }
658
659 =head2 CleanBatch
660
661   CleanBatch($batch_id)
662
663 Deletes all staged records from the import batch
664 and sets the status of the batch to 'cleaned'.  Note
665 that deleting a stage record does *not* affect
666 any record that has been committed to the database.
667
668 =cut
669
670 sub CleanBatch {
671     my $batch_id = shift;
672     return unless defined $batch_id;
673
674     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
675     SetImportBatchStatus($batch_id, 'cleaned');
676 }
677
678 =head2 GetAllImportBatches
679
680   my $results = GetAllImportBatches();
681
682 Returns a references to an array of hash references corresponding
683 to all import_batches rows (of batch_type 'batch'), sorted in 
684 ascending order by import_batch_id.
685
686 =cut
687
688 sub  GetAllImportBatches {
689     my $dbh = C4::Context->dbh;
690     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
691                                     WHERE batch_type = 'batch'
692                                     ORDER BY import_batch_id ASC");
693
694     my $results = [];
695     $sth->execute();
696     while (my $row = $sth->fetchrow_hashref) {
697         push @$results, $row;
698     }
699     $sth->finish();
700     return $results;
701 }
702
703 =head2 GetImportBatchRangeDesc
704
705   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
706
707 Returns a reference to an array of hash references corresponding to
708 import_batches rows (sorted in descending order by import_batch_id)
709 start at the given offset.
710
711 =cut
712
713 sub GetImportBatchRangeDesc {
714     my ($offset, $results_per_group) = @_;
715
716     my $dbh = C4::Context->dbh;
717     my $query = "SELECT * FROM import_batches
718                                     WHERE batch_type = 'batch'
719                                     ORDER BY import_batch_id DESC";
720     my @params;
721     if ($results_per_group){
722         $query .= " LIMIT ?";
723         push(@params, $results_per_group);
724     }
725     if ($offset){
726         $query .= " OFFSET ?";
727         push(@params, $offset);
728     }
729     my $sth = $dbh->prepare_cached($query);
730     $sth->execute(@params);
731     my $results = $sth->fetchall_arrayref({});
732     $sth->finish();
733     return $results;
734 }
735
736 =head2 GetItemNumbersFromImportBatch
737
738   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
739
740 =cut
741
742 sub GetItemNumbersFromImportBatch {
743         my ($batch_id) = @_;
744         my $dbh = C4::Context->dbh;
745         my $sth = $dbh->prepare("SELECT itemnumber FROM import_batches,import_records,import_items WHERE import_batches.import_batch_id=import_records.import_batch_id AND import_records.import_record_id=import_items.import_record_id AND import_batches.import_batch_id=?");
746         $sth->execute($batch_id);
747         my @items ;
748         while ( my ($itm) = $sth->fetchrow_array ) {
749                 push @items, $itm;
750         }
751         return @items;
752 }
753
754 =head2 GetNumberOfImportBatches 
755
756   my $count = GetNumberOfImportBatches();
757
758 =cut
759
760 sub GetNumberOfNonZ3950ImportBatches {
761     my $dbh = C4::Context->dbh;
762     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type='batch'");
763     $sth->execute();
764     my ($count) = $sth->fetchrow_array();
765     $sth->finish();
766     return $count;
767 }
768
769 =head2 GetImportBibliosRange
770
771   my $results = GetImportBibliosRange($batch_id, $offset, $results_per_group);
772
773 Returns a reference to an array of hash references corresponding to
774 import_biblios/import_records rows for a given batch
775 starting at the given offset.
776
777 =cut
778
779 sub GetImportBibliosRange {
780     my ($batch_id, $offset, $results_per_group, $status) = @_;
781
782     my $dbh = C4::Context->dbh;
783     my $query = "SELECT title, author, isbn, issn, import_record_id, record_sequence,
784                                            status, overlay_status, matched_biblionumber
785                                     FROM   import_records
786                                     JOIN   import_biblios USING (import_record_id)
787                                     WHERE  import_batch_id = ?";
788     my @params;
789     push(@params, $batch_id);
790     if ($status) {
791         $query .= " AND status=?";
792         push(@params,$status);
793     }
794     $query.=" ORDER BY import_record_id";
795
796     if($results_per_group){
797         $query .= " LIMIT ?";
798         push(@params, $results_per_group);
799     }
800     if($offset){
801         $query .= " OFFSET ?";
802         push(@params, $offset);
803     }
804     my $sth = $dbh->prepare_cached($query);
805     $sth->execute(@params);
806     my $results = $sth->fetchall_arrayref({});
807     $sth->finish();
808     return $results;
809
810 }
811
812 =head2 GetBestRecordMatch
813
814   my $record_id = GetBestRecordMatch($import_record_id);
815
816 =cut
817
818 sub GetBestRecordMatch {
819     my ($import_record_id) = @_;
820
821     my $dbh = C4::Context->dbh;
822     my $sth = $dbh->prepare("SELECT candidate_match_id
823                              FROM   import_record_matches
824                              WHERE  import_record_id = ?
825                              ORDER BY score DESC, candidate_match_id DESC");
826     $sth->execute($import_record_id);
827     my ($record_id) = $sth->fetchrow_array();
828     $sth->finish();
829     return $record_id;
830 }
831
832 =head2 GetImportBatchStatus
833
834   my $status = GetImportBatchStatus($batch_id);
835
836 =cut
837
838 sub GetImportBatchStatus {
839     my ($batch_id) = @_;
840
841     my $dbh = C4::Context->dbh;
842     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
843     $sth->execute($batch_id);
844     my ($status) = $sth->fetchrow_array();
845     $sth->finish();
846     return $status;
847
848 }
849
850 =head2 SetImportBatchStatus
851
852   SetImportBatchStatus($batch_id, $new_status);
853
854 =cut
855
856 sub SetImportBatchStatus {
857     my ($batch_id, $new_status) = @_;
858
859     my $dbh = C4::Context->dbh;
860     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
861     $sth->execute($new_status, $batch_id);
862     $sth->finish();
863
864 }
865
866 =head2 GetImportBatchOverlayAction
867
868   my $overlay_action = GetImportBatchOverlayAction($batch_id);
869
870 =cut
871
872 sub GetImportBatchOverlayAction {
873     my ($batch_id) = @_;
874
875     my $dbh = C4::Context->dbh;
876     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
877     $sth->execute($batch_id);
878     my ($overlay_action) = $sth->fetchrow_array();
879     $sth->finish();
880     return $overlay_action;
881
882 }
883
884
885 =head2 SetImportBatchOverlayAction
886
887   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
888
889 =cut
890
891 sub SetImportBatchOverlayAction {
892     my ($batch_id, $new_overlay_action) = @_;
893
894     my $dbh = C4::Context->dbh;
895     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
896     $sth->execute($new_overlay_action, $batch_id);
897     $sth->finish();
898
899 }
900
901 =head2 GetImportBatchNoMatchAction
902
903   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
904
905 =cut
906
907 sub GetImportBatchNoMatchAction {
908     my ($batch_id) = @_;
909
910     my $dbh = C4::Context->dbh;
911     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
912     $sth->execute($batch_id);
913     my ($nomatch_action) = $sth->fetchrow_array();
914     $sth->finish();
915     return $nomatch_action;
916
917 }
918
919
920 =head2 SetImportBatchNoMatchAction
921
922   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
923
924 =cut
925
926 sub SetImportBatchNoMatchAction {
927     my ($batch_id, $new_nomatch_action) = @_;
928
929     my $dbh = C4::Context->dbh;
930     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
931     $sth->execute($new_nomatch_action, $batch_id);
932     $sth->finish();
933
934 }
935
936 =head2 GetImportBatchItemAction
937
938   my $item_action = GetImportBatchItemAction($batch_id);
939
940 =cut
941
942 sub GetImportBatchItemAction {
943     my ($batch_id) = @_;
944
945     my $dbh = C4::Context->dbh;
946     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
947     $sth->execute($batch_id);
948     my ($item_action) = $sth->fetchrow_array();
949     $sth->finish();
950     return $item_action;
951
952 }
953
954
955 =head2 SetImportBatchItemAction
956
957   SetImportBatchItemAction($batch_id, $new_item_action);
958
959 =cut
960
961 sub SetImportBatchItemAction {
962     my ($batch_id, $new_item_action) = @_;
963
964     my $dbh = C4::Context->dbh;
965     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
966     $sth->execute($new_item_action, $batch_id);
967     $sth->finish();
968
969 }
970
971 =head2 GetImportBatchMatcher
972
973   my $matcher_id = GetImportBatchMatcher($batch_id);
974
975 =cut
976
977 sub GetImportBatchMatcher {
978     my ($batch_id) = @_;
979
980     my $dbh = C4::Context->dbh;
981     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
982     $sth->execute($batch_id);
983     my ($matcher_id) = $sth->fetchrow_array();
984     $sth->finish();
985     return $matcher_id;
986
987 }
988
989
990 =head2 SetImportBatchMatcher
991
992   SetImportBatchMatcher($batch_id, $new_matcher_id);
993
994 =cut
995
996 sub SetImportBatchMatcher {
997     my ($batch_id, $new_matcher_id) = @_;
998
999     my $dbh = C4::Context->dbh;
1000     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1001     $sth->execute($new_matcher_id, $batch_id);
1002     $sth->finish();
1003
1004 }
1005
1006 =head2 GetImportRecordOverlayStatus
1007
1008   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1009
1010 =cut
1011
1012 sub GetImportRecordOverlayStatus {
1013     my ($import_record_id) = @_;
1014
1015     my $dbh = C4::Context->dbh;
1016     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1017     $sth->execute($import_record_id);
1018     my ($overlay_status) = $sth->fetchrow_array();
1019     $sth->finish();
1020     return $overlay_status;
1021
1022 }
1023
1024
1025 =head2 SetImportRecordOverlayStatus
1026
1027   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1028
1029 =cut
1030
1031 sub SetImportRecordOverlayStatus {
1032     my ($import_record_id, $new_overlay_status) = @_;
1033
1034     my $dbh = C4::Context->dbh;
1035     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1036     $sth->execute($new_overlay_status, $import_record_id);
1037     $sth->finish();
1038
1039 }
1040
1041 =head2 GetImportRecordStatus
1042
1043   my $overlay_status = GetImportRecordStatus($import_record_id);
1044
1045 =cut
1046
1047 sub GetImportRecordStatus {
1048     my ($import_record_id) = @_;
1049
1050     my $dbh = C4::Context->dbh;
1051     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1052     $sth->execute($import_record_id);
1053     my ($overlay_status) = $sth->fetchrow_array();
1054     $sth->finish();
1055     return $overlay_status;
1056
1057 }
1058
1059
1060 =head2 SetImportRecordStatus
1061
1062   SetImportRecordStatus($import_record_id, $new_overlay_status);
1063
1064 =cut
1065
1066 sub SetImportRecordStatus {
1067     my ($import_record_id, $new_overlay_status) = @_;
1068
1069     my $dbh = C4::Context->dbh;
1070     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1071     $sth->execute($new_overlay_status, $import_record_id);
1072     $sth->finish();
1073
1074 }
1075
1076 =head2 GetImportRecordMatches
1077
1078   my $results = GetImportRecordMatches($import_record_id, $best_only);
1079
1080 =cut
1081
1082 sub GetImportRecordMatches {
1083     my $import_record_id = shift;
1084     my $best_only = @_ ? shift : 0;
1085
1086     my $dbh = C4::Context->dbh;
1087     # FIXME currently biblio only
1088     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber, score
1089                                     FROM import_records
1090                                     JOIN import_record_matches USING (import_record_id)
1091                                     JOIN biblio ON (biblionumber = candidate_match_id)
1092                                     WHERE import_record_id = ?
1093                                     ORDER BY score DESC, biblionumber DESC");
1094     $sth->bind_param(1, $import_record_id);
1095     my $results = [];
1096     $sth->execute();
1097     while (my $row = $sth->fetchrow_hashref) {
1098         push @$results, $row;
1099         last if $best_only;
1100     }
1101     $sth->finish();
1102
1103     return $results;
1104     
1105 }
1106
1107
1108 =head2 SetImportRecordMatches
1109
1110   SetImportRecordMatches($import_record_id, @matches);
1111
1112 =cut
1113
1114 sub SetImportRecordMatches {
1115     my $import_record_id = shift;
1116     my @matches = @_;
1117
1118     my $dbh = C4::Context->dbh;
1119     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1120     $delsth->execute($import_record_id);
1121     $delsth->finish();
1122
1123     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1124                                     VALUES (?, ?, ?)");
1125     foreach my $match (@matches) {
1126         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1127     }
1128 }
1129
1130
1131 # internal functions
1132
1133 sub _create_import_record {
1134     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random) = @_;
1135
1136     my $dbh = C4::Context->dbh;
1137     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1138                                                          record_type, encoding, z3950random)
1139                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1140     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml(),
1141                   $record_type, $encoding, $z3950random);
1142     my $import_record_id = $dbh->{'mysql_insertid'};
1143     $sth->finish();
1144     return $import_record_id;
1145 }
1146
1147 sub _update_import_record_marc {
1148     my ($import_record_id, $marc_record) = @_;
1149
1150     my $dbh = C4::Context->dbh;
1151     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1152                              WHERE  import_record_id = ?");
1153     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml(), $import_record_id);
1154     $sth->finish();
1155 }
1156
1157 sub _add_biblio_fields {
1158     my ($import_record_id, $marc_record) = @_;
1159
1160     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1161     my $dbh = C4::Context->dbh;
1162     # FIXME no controlnumber, originalsource
1163     $isbn = C4::Koha::_isbn_cleanup($isbn); # FIXME C4::Koha::_isbn_cleanup should be made public
1164     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1165     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1166     $sth->finish();
1167                 
1168 }
1169
1170 sub _update_biblio_fields {
1171     my ($import_record_id, $marc_record) = @_;
1172
1173     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1174     my $dbh = C4::Context->dbh;
1175     # FIXME no controlnumber, originalsource
1176     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1177     $isbn =~ s/\(.*$//;
1178     $isbn =~ tr/ -_//;
1179     $isbn = uc $isbn;
1180     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1181                              WHERE  import_record_id = ?");
1182     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1183     $sth->finish();
1184 }
1185
1186 sub _parse_biblio_fields {
1187     my ($marc_record) = @_;
1188
1189     my $dbh = C4::Context->dbh;
1190     my $bibliofields = TransformMarcToKoha($dbh, $marc_record, '');
1191     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1192
1193 }
1194
1195 sub _update_batch_record_counts {
1196     my ($batch_id) = @_;
1197
1198     my $dbh = C4::Context->dbh;
1199     my $sth = $dbh->prepare_cached("UPDATE import_batches SET num_biblios = (
1200                                     SELECT COUNT(*)
1201                                     FROM import_records
1202                                     WHERE import_batch_id = import_batches.import_batch_id
1203                                     AND record_type = 'biblio')
1204                                     WHERE import_batch_id = ?");
1205     $sth->bind_param(1, $batch_id);
1206     $sth->execute();
1207     $sth->finish();
1208     $sth = $dbh->prepare_cached("UPDATE import_batches SET num_items = (
1209                                     SELECT COUNT(*)
1210                                     FROM import_records
1211                                     JOIN import_items USING (import_record_id)
1212                                     WHERE import_batch_id = import_batches.import_batch_id
1213                                     AND record_type = 'biblio')
1214                                     WHERE import_batch_id = ?");
1215     $sth->bind_param(1, $batch_id);
1216     $sth->execute();
1217     $sth->finish();
1218
1219 }
1220
1221 sub _get_commit_action {
1222     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id) = @_;
1223     
1224     my ($bib_result, $bib_match, $item_result);
1225
1226     if ($overlay_status ne 'no_match') {
1227         $bib_match = GetBestRecordMatch($import_record_id);
1228         if ($overlay_action eq 'replace') {
1229             $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1230         } elsif ($overlay_action eq 'create_new') {
1231             $bib_result  = 'create_new';
1232         } elsif ($overlay_action eq 'ignore') {
1233             $bib_result  = 'ignore';
1234         } 
1235         $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_matches') ? 'create_new' : 'ignore';
1236     } else {
1237         $bib_result = $nomatch_action;
1238         $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1239     }
1240
1241     return ($bib_result, $item_result, $bib_match);
1242 }
1243
1244 sub _get_revert_action {
1245     my ($overlay_action, $overlay_status, $status) = @_;
1246
1247     my $bib_result;
1248
1249     if ($status eq 'ignored') {
1250         $bib_result = 'ignore';
1251     } else {
1252         if ($overlay_action eq 'create_new') {
1253             $bib_result = 'delete';
1254         } else {
1255             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1256         }
1257     }
1258     return $bib_result;
1259 }
1260
1261 1;
1262 __END__
1263
1264 =head1 AUTHOR
1265
1266 Koha Development Team <http://koha-community.org/>
1267
1268 Galen Charlton <galen.charlton@liblime.com>
1269
1270 =cut