bug 2157: add ability to 'clean' staged record batches
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # Koha; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
18 # Suite 330, Boston, MA  02111-1307 USA
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28
29 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
30
31 BEGIN {
32         # set the version for version checking
33         $VERSION = 3.01;
34         require Exporter;
35         @ISA    = qw(Exporter);
36         @EXPORT = qw(
37     GetZ3950BatchId
38     GetImportRecordMarc
39     AddImportBatch
40     GetImportBatch
41     AddBiblioToBatch
42     ModBiblioInBatch
43
44     BatchStageMarcRecords
45     BatchFindBibDuplicates
46     BatchCommitBibRecords
47     BatchRevertBibRecords
48     CleanBatch
49
50     GetAllImportBatches
51     GetImportBatchRangeDesc
52     GetNumberOfNonZ3950ImportBatches
53     GetImportBibliosRange
54         GetItemNumbersFromImportBatch
55     
56     GetImportBatchStatus
57     SetImportBatchStatus
58     GetImportBatchOverlayAction
59     SetImportBatchOverlayAction
60     GetImportBatchNoMatchAction
61     SetImportBatchNoMatchAction
62     GetImportBatchItemAction
63     SetImportBatchItemAction
64     GetImportBatchMatcher
65     SetImportBatchMatcher
66     GetImportRecordOverlayStatus
67     SetImportRecordOverlayStatus
68     GetImportRecordStatus
69     SetImportRecordStatus
70     GetImportRecordMatches
71     SetImportRecordMatches
72         );
73 }
74
75 =head1 NAME
76
77 C4::ImportBatch - manage batches of imported MARC records
78
79 =head1 SYNOPSIS
80
81 =over 4
82
83 use C4::ImportBatch;
84
85 =back
86
87 =head1 FUNCTIONS
88
89 =head2 GetZ3950BatchId
90
91 =over 4
92
93 my $batchid = GetZ3950BatchId($z3950server);
94
95 =back
96
97 Retrieves the ID of the import batch for the Z39.50
98 reservoir for the given target.  If necessary,
99 creates the import batch.
100
101 =cut
102
103 sub GetZ3950BatchId {
104     my ($z3950server) = @_;
105
106     my $dbh = C4::Context->dbh;
107     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
108                              WHERE  batch_type = 'z3950'
109                              AND    file_name = ?");
110     $sth->execute($z3950server);
111     my $rowref = $sth->fetchrow_arrayref();
112     $sth->finish();
113     if (defined $rowref) {
114         return $rowref->[0];
115     } else {
116         my $batch_id = AddImportBatch('create_new', 'staged', 'z3950', $z3950server, '');
117         return $batch_id;
118     }
119     
120 }
121
122 =head2 GetImportRecordMarc
123
124 =over 4
125
126 my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
127
128 =back
129
130 =cut
131
132 sub GetImportRecordMarc {
133     my ($import_record_id) = @_;
134
135     my $dbh = C4::Context->dbh;
136     my $sth = $dbh->prepare("SELECT marc, encoding FROM import_records WHERE import_record_id = ?");
137     $sth->execute($import_record_id);
138     my ($marc, $encoding) = $sth->fetchrow();
139     $sth->finish();
140     return $marc;
141
142 }
143
144 =head2 AddImportBatch
145
146 =over 4
147
148 my $batch_id = AddImportBatch($overlay_action, $import_status, $type, $file_name, $comments);
149
150 =back
151
152 =cut
153
154 sub AddImportBatch {
155     my ($overlay_action, $import_status, $type, $file_name, $comments) = @_;
156
157     my $dbh = C4::Context->dbh;
158     my $sth = $dbh->prepare("INSERT INTO import_batches (overlay_action, import_status, batch_type,
159                                                          file_name, comments)
160                                     VALUES (?, ?, ?, ?, ?)");
161     $sth->execute($overlay_action, $import_status, $type, $file_name, $comments);
162     my $batch_id = $dbh->{'mysql_insertid'};
163     $sth->finish();
164
165     return $batch_id;
166
167 }
168
169 =head2 GetImportBatch 
170
171 =over 4
172
173 my $row = GetImportBatch($batch_id);
174
175 =back
176
177 Retrieve a hashref of an import_batches row.
178
179 =cut
180
181 sub GetImportBatch {
182     my ($batch_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
186     $sth->bind_param(1, $batch_id);
187     $sth->execute();
188     my $result = $sth->fetchrow_hashref;
189     $sth->finish();
190     return $result;
191
192 }
193
194 =head2 AddBiblioToBatch 
195
196 =over 4
197
198 my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, $marc_record, $encoding, $z3950random, $update_counts);
199
200 =back
201
202 =cut
203
204 sub AddBiblioToBatch {
205     my $batch_id = shift;
206     my $record_sequence = shift;
207     my $marc_record = shift;
208     my $encoding = shift;
209     my $z3950random = shift;
210     my $update_counts = @_ ? shift : 1;
211
212     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random);
213     _add_biblio_fields($import_record_id, $marc_record);
214     _update_batch_record_counts($batch_id) if $update_counts;
215     return $import_record_id;
216 }
217
218 =head2 ModBiblioInBatch
219
220 =over 4
221
222 ModBiblioInBatch($import_record_id, $marc_record);
223
224 =back
225
226 =cut
227
228 sub ModBiblioInBatch {
229     my ($import_record_id, $marc_record) = @_;
230
231     _update_import_record_marc($import_record_id, $marc_record);
232     _update_biblio_fields($import_record_id, $marc_record);
233
234 }
235
236 =head2 BatchStageMarcRecords
237
238 =over 4
239
240 ($batch_id, $num_records, $num_items, @invalid_records) = 
241     BatchStageMarcRecords($marc_flavor, $marc_records, $file_name, 
242                           $comments, $branch_code, $parse_items,
243                           $leave_as_staging, 
244                           $progress_interval, $progress_callback);
245
246 =back
247
248 =cut
249
250 sub  BatchStageMarcRecords {
251     my $marc_flavor = shift;
252     my $marc_records = shift;
253     my $file_name = shift;
254     my $comments = shift;
255     my $branch_code = shift;
256     my $parse_items = shift;
257     my $leave_as_staging = shift;
258    
259     # optional callback to monitor status 
260     # of job
261     my $progress_interval = 0;
262     my $progress_callback = undef;
263     if ($#_ == 1) {
264         $progress_interval = shift;
265         $progress_callback = shift;
266         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
267         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
268     } 
269     
270     my $batch_id = AddImportBatch('create_new', 'staging', 'batch', $file_name, $comments);
271     if ($parse_items) {
272         SetImportBatchItemAction($batch_id, 'always_add');
273     } else {
274         SetImportBatchItemAction($batch_id, 'ignore');
275     }
276
277     my @invalid_records = ();
278     my $num_valid = 0;
279     my $num_items = 0;
280     # FIXME - for now, we're dealing only with bibs
281     my $rec_num = 0;
282     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
283         $marc_blob =~ s/^\s+//g;
284         $marc_blob =~ s/\s+$//g;
285         next unless $marc_blob;
286         $rec_num++;
287         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
288             &$progress_callback($rec_num);
289         }
290         my ($marc_record, $charset_guessed, $char_errors) =
291             MarcToUTF8Record($marc_blob, C4::Context->preference("marcflavour"));
292         my $import_record_id;
293         if (scalar($marc_record->fields()) == 0) {
294             push @invalid_records, $marc_blob;
295         } else {
296             $num_valid++;
297             $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $marc_flavor, int(rand(99999)), 0);
298             if ($parse_items) {
299                 my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
300                 $num_items += scalar(@import_items_ids);
301             }
302         }
303     }
304     unless ($leave_as_staging) {
305         SetImportBatchStatus($batch_id, 'staged');
306     }
307     # FIXME branch_code, number of bibs, number of items
308     _update_batch_record_counts($batch_id);
309     return ($batch_id, $num_valid, $num_items, @invalid_records);
310 }
311
312 =head2 AddItemsToImportBiblio
313
314 =over 4
315
316 my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, $update_counts);
317
318 =back
319
320 =cut
321
322 sub AddItemsToImportBiblio {
323     my $batch_id = shift;
324     my $import_record_id = shift;
325     my $marc_record = shift;
326     my $update_counts = @_ ? shift : 0;
327
328     my @import_items_ids = ();
329    
330     my $dbh = C4::Context->dbh; 
331     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
332     foreach my $item_field ($marc_record->field($item_tag)) {
333         my $item_marc = MARC::Record->new();
334         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
335         $item_marc->append_fields($item_field);
336         $marc_record->delete_field($item_field);
337         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
338                                         VALUES (?, ?, ?)");
339         $sth->bind_param(1, $import_record_id);
340         $sth->bind_param(2, 'staged');
341         $sth->bind_param(3, $item_marc->as_xml());
342         $sth->execute();
343         push @import_items_ids, $dbh->{'mysql_insertid'};
344         $sth->finish();
345     }
346
347     if ($#import_items_ids > -1) {
348         _update_batch_record_counts($batch_id) if $update_counts;
349         _update_import_record_marc($import_record_id, $marc_record);
350     }
351     return @import_items_ids;
352 }
353
354 =head2 BatchFindBibDuplicates
355
356 =over 4
357
358 my $num_with_matches = BatchFindBibDuplicates($batch_id, $matcher, $max_matches, $progress_interval, $progress_callback);
359
360 =back
361
362 Goes through the records loaded in the batch and attempts to 
363 find duplicates for each one.  Sets the matching status 
364 of each record to "no_match" or "auto_match" as appropriate.
365
366 The $max_matches parameter is optional; if it is not supplied,
367 it defaults to 10.
368
369 The $progress_interval and $progress_callback parameters are 
370 optional; if both are supplied, the sub referred to by
371 $progress_callback will be invoked every $progress_interval
372 records using the number of records processed as the 
373 singular argument.
374
375 =cut
376
377 sub BatchFindBibDuplicates {
378     my $batch_id = shift;
379     my $matcher = shift;
380     my $max_matches = @_ ? shift : 10;
381
382     # optional callback to monitor status 
383     # of job
384     my $progress_interval = 0;
385     my $progress_callback = undef;
386     if ($#_ == 1) {
387         $progress_interval = shift;
388         $progress_callback = shift;
389         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
390         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
391     }
392
393     my $dbh = C4::Context->dbh;
394
395     my $sth = $dbh->prepare("SELECT import_record_id, marc
396                              FROM import_records
397                              JOIN import_biblios USING (import_record_id)
398                              WHERE import_batch_id = ?");
399     $sth->execute($batch_id);
400     my $num_with_matches = 0;
401     my $rec_num = 0;
402     while (my $rowref = $sth->fetchrow_hashref) {
403         $rec_num++;
404         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
405             &$progress_callback($rec_num);
406         }
407         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
408         my @matches = ();
409         if (defined $matcher) {
410             @matches = $matcher->get_matches($marc_record, $max_matches);
411         }
412         if (scalar(@matches) > 0) {
413             $num_with_matches++;
414             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
415             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
416         } else {
417             SetImportRecordMatches($rowref->{'import_record_id'}, ());
418             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
419         }
420     }
421     $sth->finish();
422     return $num_with_matches;
423 }
424
425 =head2 BatchCommitBibRecords
426
427 =over 4
428
429 my ($num_added, $num_updated, $num_items_added, $num_items_errored, $num_ignored) = 
430     BatchCommitBibRecords($batch_id, $progress_interval, $progress_callback);
431
432 =back
433
434 =cut
435
436 sub BatchCommitBibRecords {
437     my $batch_id = shift;
438
439     # optional callback to monitor status 
440     # of job
441     my $progress_interval = 0;
442     my $progress_callback = undef;
443     if ($#_ == 1) {
444         $progress_interval = shift;
445         $progress_callback = shift;
446         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
447         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
448     }
449
450     my $num_added = 0;
451     my $num_updated = 0;
452     my $num_items_added = 0;
453     my $num_items_errored = 0;
454     my $num_ignored = 0;
455     # commit (i.e., save, all records in the batch)
456     # FIXME biblio only at the moment
457     SetImportBatchStatus('importing');
458     my $overlay_action = GetImportBatchOverlayAction($batch_id);
459     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
460     my $item_action = GetImportBatchItemAction($batch_id);
461     my $dbh = C4::Context->dbh;
462     my $sth = $dbh->prepare("SELECT import_record_id, status, overlay_status, marc, encoding
463                              FROM import_records
464                              JOIN import_biblios USING (import_record_id)
465                              WHERE import_batch_id = ?");
466     $sth->execute($batch_id);
467     my $rec_num = 0;
468     while (my $rowref = $sth->fetchrow_hashref) {
469         $rec_num++;
470         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
471             &$progress_callback($rec_num);
472         }
473         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
474             $num_ignored++;
475             next;
476         }
477
478         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
479
480         # remove any item tags - rely on BatchCommitItems
481         my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
482         foreach my $item_field ($marc_record->field($item_tag)) {
483             $marc_record->delete_field($item_field);
484         }
485
486         # decide what what to do with the bib and item records
487         my ($bib_result, $item_result, $bib_match) = 
488             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
489                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'});
490
491         if ($bib_result eq 'create_new') {
492             $num_added++;
493             my ($biblionumber, $biblioitemnumber) = AddBiblio($marc_record, '');
494             my $sth = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
495             $sth->execute($biblionumber, $rowref->{'import_record_id'});
496             $sth->finish();
497             if ($item_result eq 'create_new') {
498                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
499                 $num_items_added += $bib_items_added;
500                 $num_items_errored += $bib_items_errored;
501             }
502             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
503         } elsif ($bib_result eq 'replace') {
504             $num_updated++;
505             my $biblionumber = $bib_match;
506             my ($count, $oldbiblio) = GetBiblio($biblionumber);
507             my $oldxml = GetXmlBiblio($biblionumber);
508
509             # remove item fields so that they don't get
510             # added again if record is reverted
511             my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'});
512             foreach my $item_field ($old_marc->field($item_tag)) {
513                 $old_marc->delete_field($item_field);
514             }
515
516             ModBiblio($marc_record, $biblionumber, $oldbiblio->{'frameworkcode'});
517             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
518             $sth->execute($old_marc->as_xml(), $rowref->{'import_record_id'});
519             $sth->finish();
520             my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
521             $sth2->execute($biblionumber, $rowref->{'import_record_id'});
522             $sth2->finish();
523             if ($item_result eq 'create_new') {
524                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
525                 $num_items_added += $bib_items_added;
526                 $num_items_errored += $bib_items_errored;
527             }
528             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
529             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
530         } elsif ($bib_result eq 'ignore') {
531             $num_ignored++;
532             my $biblionumber = $bib_match;
533             if (defined $biblionumber and $item_result eq 'create_new') {
534                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $biblionumber);
535                 $num_items_added += $bib_items_added;
536                 $num_items_errored += $bib_items_errored;
537                 # still need to record the matched biblionumber so that the
538                 # items can be reverted
539                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
540                 $sth2->execute($biblionumber, $rowref->{'import_record_id'});
541                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
542             }
543             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
544         }
545     }
546     $sth->finish();
547     SetImportBatchStatus($batch_id, 'imported');
548     return ($num_added, $num_updated, $num_items_added, $num_items_errored, $num_ignored);
549 }
550
551 =head2 BatchCommitItems
552
553 =over 4
554
555 ($num_items_added, $num_items_errored) = BatchCommitItems($import_record_id, $biblionumber);
556
557 =back
558
559 =cut
560
561 sub BatchCommitItems {
562     my ($import_record_id, $biblionumber) = @_;
563
564     my $dbh = C4::Context->dbh;
565
566     my $num_items_added = 0;
567     my $num_items_errored = 0;
568     my $sth = $dbh->prepare("SELECT import_items_id, import_items.marcxml, encoding
569                              FROM import_items
570                              JOIN import_records USING (import_record_id)
571                              WHERE import_record_id = ?
572                              ORDER BY import_items_id");
573     $sth->bind_param(1, $import_record_id);
574     $sth->execute();
575     while (my $row = $sth->fetchrow_hashref()) {
576         my $item_marc = MARC::Record->new_from_xml(StripNonXmlChars($row->{'marcxml'}), 'UTF-8', $row->{'encoding'});
577         # FIXME - duplicate barcode check needs to become part of AddItemFromMarc()
578         my $item = TransformMarcToKoha($dbh, $item_marc);
579         my $duplicate_barcode = exists($item->{'barcode'}) && GetItemnumberFromBarcode($item->{'barcode'});
580         if ($duplicate_barcode) {
581             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, import_error = ? WHERE import_items_id = ?");
582             $updsth->bind_param(1, 'error');
583             $updsth->bind_param(2, 'duplicate item barcode');
584             $updsth->bind_param(3, $row->{'import_items_id'});
585             $updsth->execute();
586             $num_items_errored++;
587         } else {
588             my ($item_biblionumber, $biblioitemnumber, $itemnumber) = AddItemFromMarc($item_marc, $biblionumber);
589             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
590             $updsth->bind_param(1, 'imported');
591             $updsth->bind_param(2, $itemnumber);
592             $updsth->bind_param(3, $row->{'import_items_id'});
593             $updsth->execute();
594             $updsth->finish();
595             $num_items_added++;
596         }
597     }
598     $sth->finish();
599     return ($num_items_added, $num_items_errored);
600 }
601
602 =head2 BatchRevertBibRecords
603
604 =over 4
605
606 my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored) = BatchRevertBibRecords($batch_id);
607
608 =back
609
610 =cut
611
612 sub BatchRevertBibRecords {
613     my $batch_id = shift;
614
615     my $num_deleted = 0;
616     my $num_errors = 0;
617     my $num_reverted = 0;
618     my $num_items_deleted = 0;
619     my $num_ignored = 0;
620     # commit (i.e., save, all records in the batch)
621     # FIXME biblio only at the moment
622     SetImportBatchStatus('reverting');
623     my $overlay_action = GetImportBatchOverlayAction($batch_id);
624     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
625     my $dbh = C4::Context->dbh;
626     my $sth = $dbh->prepare("SELECT import_record_id, status, overlay_status, marcxml_old, encoding, matched_biblionumber
627                              FROM import_records
628                              JOIN import_biblios USING (import_record_id)
629                              WHERE import_batch_id = ?");
630     $sth->execute($batch_id);
631     while (my $rowref = $sth->fetchrow_hashref) {
632         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
633             $num_ignored++;
634             next;
635         }
636
637         my $bib_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
638
639         if ($bib_result eq 'delete') {
640             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
641             my $error = DelBiblio($rowref->{'matched_biblionumber'});
642             if (defined $error) {
643                 $num_errors++;
644             } else {
645                 $num_deleted++;
646                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
647             }
648         } elsif ($bib_result eq 'restore') {
649             $num_reverted++;
650             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'});
651             my $biblionumber = $rowref->{'matched_biblionumber'};
652             my ($count, $oldbiblio) = GetBiblio($biblionumber);
653             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
654             ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
655             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
656         } elsif ($bib_result eq 'ignore') {
657             $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
658             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
659         }
660         my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?");
661         $sth2->execute($rowref->{'import_record_id'});
662     }
663
664     $sth->finish();
665     SetImportBatchStatus($batch_id, 'reverted');
666     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
667 }
668
669 =head2 BatchRevertItems
670
671 =over 4
672
673 my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
674
675 =back
676
677 =cut
678
679 sub BatchRevertItems {
680     my ($import_record_id, $biblionumber) = @_;
681
682     my $dbh = C4::Context->dbh;
683     my $num_items_deleted = 0;
684
685     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
686                                    FROM import_items
687                                    JOIN items USING (itemnumber)
688                                    WHERE import_record_id = ?");
689     $sth->bind_param(1, $import_record_id);
690     $sth->execute();
691     while (my $row = $sth->fetchrow_hashref()) {
692         DelItem($dbh, $biblionumber, $row->{'itemnumber'});
693         my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
694         $updsth->bind_param(1, 'reverted');
695         $updsth->bind_param(2, $row->{'import_items_id'});
696         $updsth->execute();
697         $updsth->finish();
698         $num_items_deleted++;
699     }
700     $sth->finish();
701     return $num_items_deleted;
702 }
703
704 =head2 CleanBatch
705
706 =over 4
707
708 CleanBatch($batch_id)
709
710 =back
711
712 Deletes all staged records from the import batch
713 and sets the status of the batch to 'cleaned'.  Note
714 that deleting a stage record does *not* affect
715 any record that has been committed to the database.
716
717 =cut
718
719 sub CleanBatch {
720     my $batch_id = shift;
721     return unless defined $batch_id;
722
723     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
724     SetImportBatchStatus($batch_id, 'cleaned');
725 }
726
727 =head2 GetAllImportBatches
728
729 =over 4
730
731 my $results = GetAllImportBatches();
732
733 =back
734
735 Returns a references to an array of hash references corresponding
736 to all import_batches rows (of batch_type 'batch'), sorted in 
737 ascending order by import_batch_id.
738
739 =cut
740
741 sub  GetAllImportBatches {
742     my $dbh = C4::Context->dbh;
743     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
744                                     WHERE batch_type = 'batch'
745                                     ORDER BY import_batch_id ASC");
746
747     my $results = [];
748     $sth->execute();
749     while (my $row = $sth->fetchrow_hashref) {
750         push @$results, $row;
751     }
752     $sth->finish();
753     return $results;
754 }
755
756 =head2 GetImportBatchRangeDesc
757
758 =over 4
759
760 my $results = GetImportBatchRangeDesc($offset, $results_per_group);
761
762 =back
763
764 Returns a reference to an array of hash references corresponding to
765 import_batches rows (sorted in descending order by import_batch_id)
766 start at the given offset.
767
768 =cut
769
770 sub GetImportBatchRangeDesc {
771     my ($offset, $results_per_group) = @_;
772
773     my $dbh = C4::Context->dbh;
774     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
775                                     WHERE batch_type = 'batch'
776                                     ORDER BY import_batch_id DESC
777                                     LIMIT ? OFFSET ?");
778     $sth->bind_param(1, $results_per_group);
779     $sth->bind_param(2, $offset);
780
781     my $results = [];
782     $sth->execute();
783     while (my $row = $sth->fetchrow_hashref) {
784         push @$results, $row;
785     }
786     $sth->finish();
787     return $results;
788 }
789
790 =head2 GetItemNumbersFromImportBatch
791
792 =cut
793
794 sub GetItemNumbersFromImportBatch {
795         my ($batch_id) = @_;
796         my $dbh = C4::Context->dbh;
797         my $sth = $dbh->prepare("select itemnumber from import_batches,import_records,import_items where import_batches.import_batch_id=import_records.import_batch_id and import_records.import_record_id=import_items.import_record_id and import_batches.import_batch_id=?");
798         $sth->execute($batch_id);
799         my @items ;
800         while ( my ($itm) = $sth->fetchrow_array ) {
801                 push @items, $itm;
802         }
803         return @items;
804 }
805
806 =head2 GetNumberOfImportBatches 
807
808 =over 4
809
810 my $count = GetNumberOfImportBatches();
811
812 =back
813
814 =cut
815
816 sub GetNumberOfNonZ3950ImportBatches {
817     my $dbh = C4::Context->dbh;
818     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type='batch'");
819     $sth->execute();
820     my ($count) = $sth->fetchrow_array();
821     $sth->finish();
822     return $count;
823 }
824
825 =head2 GetImportBibliosRange
826
827 =over 4
828
829 my $results = GetImportBibliosRange($batch_id, $offset, $results_per_group);
830
831 =back
832
833 Returns a reference to an array of hash references corresponding to
834 import_biblios/import_records rows for a given batch
835 starting at the given offset.
836
837 =cut
838
839 sub GetImportBibliosRange {
840     my ($batch_id, $offset, $results_per_group) = @_;
841
842     my $dbh = C4::Context->dbh;
843     my $sth = $dbh->prepare_cached("SELECT title, author, isbn, issn, import_record_id, record_sequence,
844                                            matched_biblionumber, status, overlay_status
845                                     FROM   import_records
846                                     JOIN   import_biblios USING (import_record_id)
847                                     WHERE  import_batch_id = ?
848                                     ORDER BY import_record_id LIMIT ? OFFSET ?");
849     $sth->bind_param(1, $batch_id);
850     $sth->bind_param(2, $results_per_group);
851     $sth->bind_param(3, $offset);
852     my $results = [];
853     $sth->execute();
854     while (my $row = $sth->fetchrow_hashref) {
855         push @$results, $row;
856     }
857     $sth->finish();
858     return $results;
859
860 }
861
862 =head2 GetBestRecordMatch
863
864 =over 4
865
866 my $record_id = GetBestRecordMatch($import_record_id);
867
868 =back
869
870 =cut
871
872 sub GetBestRecordMatch {
873     my ($import_record_id) = @_;
874
875     my $dbh = C4::Context->dbh;
876     my $sth = $dbh->prepare("SELECT candidate_match_id
877                              FROM   import_record_matches
878                              WHERE  import_record_id = ?
879                              ORDER BY score DESC, candidate_match_id DESC");
880     $sth->execute($import_record_id);
881     my ($record_id) = $sth->fetchrow_array();
882     $sth->finish();
883     return $record_id;
884 }
885
886 =head2 GetImportBatchStatus
887
888 =over 4
889
890 my $status = GetImportBatchStatus($batch_id);
891
892 =back
893
894 =cut
895
896 sub GetImportBatchStatus {
897     my ($batch_id) = @_;
898
899     my $dbh = C4::Context->dbh;
900     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
901     $sth->execute($batch_id);
902     my ($status) = $sth->fetchrow_array();
903     $sth->finish();
904     return $status;
905
906 }
907
908 =head2 SetImportBatchStatus
909
910 =over 4
911
912 SetImportBatchStatus($batch_id, $new_status);
913
914 =back
915
916 =cut
917
918 sub SetImportBatchStatus {
919     my ($batch_id, $new_status) = @_;
920
921     my $dbh = C4::Context->dbh;
922     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
923     $sth->execute($new_status, $batch_id);
924     $sth->finish();
925
926 }
927
928 =head2 GetImportBatchOverlayAction
929
930 =over 4
931
932 my $overlay_action = GetImportBatchOverlayAction($batch_id);
933
934 =back
935
936 =cut
937
938 sub GetImportBatchOverlayAction {
939     my ($batch_id) = @_;
940
941     my $dbh = C4::Context->dbh;
942     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
943     $sth->execute($batch_id);
944     my ($overlay_action) = $sth->fetchrow_array();
945     $sth->finish();
946     return $overlay_action;
947
948 }
949
950
951 =head2 SetImportBatchOverlayAction
952
953 =over 4
954
955 SetImportBatchOverlayAction($batch_id, $new_overlay_action);
956
957 =back
958
959 =cut
960
961 sub SetImportBatchOverlayAction {
962     my ($batch_id, $new_overlay_action) = @_;
963
964     my $dbh = C4::Context->dbh;
965     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
966     $sth->execute($new_overlay_action, $batch_id);
967     $sth->finish();
968
969 }
970
971 =head2 GetImportBatchNoMatchAction
972
973 =over 4
974
975 my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
976
977 =back
978
979 =cut
980
981 sub GetImportBatchNoMatchAction {
982     my ($batch_id) = @_;
983
984     my $dbh = C4::Context->dbh;
985     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
986     $sth->execute($batch_id);
987     my ($nomatch_action) = $sth->fetchrow_array();
988     $sth->finish();
989     return $nomatch_action;
990
991 }
992
993
994 =head2 SetImportBatchNoMatchAction
995
996 =over 4
997
998 SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
999
1000 =back
1001
1002 =cut
1003
1004 sub SetImportBatchNoMatchAction {
1005     my ($batch_id, $new_nomatch_action) = @_;
1006
1007     my $dbh = C4::Context->dbh;
1008     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1009     $sth->execute($new_nomatch_action, $batch_id);
1010     $sth->finish();
1011
1012 }
1013
1014 =head2 GetImportBatchItemAction
1015
1016 =over 4
1017
1018 my $item_action = GetImportBatchItemAction($batch_id);
1019
1020 =back
1021
1022 =cut
1023
1024 sub GetImportBatchItemAction {
1025     my ($batch_id) = @_;
1026
1027     my $dbh = C4::Context->dbh;
1028     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1029     $sth->execute($batch_id);
1030     my ($item_action) = $sth->fetchrow_array();
1031     $sth->finish();
1032     return $item_action;
1033
1034 }
1035
1036
1037 =head2 SetImportBatchItemAction
1038
1039 =over 4
1040
1041 SetImportBatchItemAction($batch_id, $new_item_action);
1042
1043 =back
1044
1045 =cut
1046
1047 sub SetImportBatchItemAction {
1048     my ($batch_id, $new_item_action) = @_;
1049
1050     my $dbh = C4::Context->dbh;
1051     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1052     $sth->execute($new_item_action, $batch_id);
1053     $sth->finish();
1054
1055 }
1056
1057 =head2 GetImportBatchMatcher
1058
1059 =over 4
1060
1061 my $matcher_id = GetImportBatchMatcher($batch_id);
1062
1063 =back
1064
1065 =cut
1066
1067 sub GetImportBatchMatcher {
1068     my ($batch_id) = @_;
1069
1070     my $dbh = C4::Context->dbh;
1071     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1072     $sth->execute($batch_id);
1073     my ($matcher_id) = $sth->fetchrow_array();
1074     $sth->finish();
1075     return $matcher_id;
1076
1077 }
1078
1079
1080 =head2 SetImportBatchMatcher
1081
1082 =over 4
1083
1084 SetImportBatchMatcher($batch_id, $new_matcher_id);
1085
1086 =back
1087
1088 =cut
1089
1090 sub SetImportBatchMatcher {
1091     my ($batch_id, $new_matcher_id) = @_;
1092
1093     my $dbh = C4::Context->dbh;
1094     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1095     $sth->execute($new_matcher_id, $batch_id);
1096     $sth->finish();
1097
1098 }
1099
1100 =head2 GetImportRecordOverlayStatus
1101
1102 =over 4
1103
1104 my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1105
1106 =back
1107
1108 =cut
1109
1110 sub GetImportRecordOverlayStatus {
1111     my ($import_record_id) = @_;
1112
1113     my $dbh = C4::Context->dbh;
1114     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1115     $sth->execute($import_record_id);
1116     my ($overlay_status) = $sth->fetchrow_array();
1117     $sth->finish();
1118     return $overlay_status;
1119
1120 }
1121
1122
1123 =head2 SetImportRecordOverlayStatus
1124
1125 =over 4
1126
1127 SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1128
1129 =back
1130
1131 =cut
1132
1133 sub SetImportRecordOverlayStatus {
1134     my ($import_record_id, $new_overlay_status) = @_;
1135
1136     my $dbh = C4::Context->dbh;
1137     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1138     $sth->execute($new_overlay_status, $import_record_id);
1139     $sth->finish();
1140
1141 }
1142
1143 =head2 GetImportRecordStatus
1144
1145 =over 4
1146
1147 my $overlay_status = GetImportRecordStatus($import_record_id);
1148
1149 =back
1150
1151 =cut
1152
1153 sub GetImportRecordStatus {
1154     my ($import_record_id) = @_;
1155
1156     my $dbh = C4::Context->dbh;
1157     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1158     $sth->execute($import_record_id);
1159     my ($overlay_status) = $sth->fetchrow_array();
1160     $sth->finish();
1161     return $overlay_status;
1162
1163 }
1164
1165
1166 =head2 SetImportRecordStatus
1167
1168 =over 4
1169
1170 SetImportRecordStatus($import_record_id, $new_overlay_status);
1171
1172 =back
1173
1174 =cut
1175
1176 sub SetImportRecordStatus {
1177     my ($import_record_id, $new_overlay_status) = @_;
1178
1179     my $dbh = C4::Context->dbh;
1180     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1181     $sth->execute($new_overlay_status, $import_record_id);
1182     $sth->finish();
1183
1184 }
1185
1186 =head2 GetImportRecordMatches
1187
1188 =over 4
1189
1190 my $results = GetImportRecordMatches($import_record_id, $best_only);
1191
1192 =back
1193
1194 =cut
1195
1196 sub GetImportRecordMatches {
1197     my $import_record_id = shift;
1198     my $best_only = @_ ? shift : 0;
1199
1200     my $dbh = C4::Context->dbh;
1201     # FIXME currently biblio only
1202     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber, score
1203                                     FROM import_records
1204                                     JOIN import_record_matches USING (import_record_id)
1205                                     JOIN biblio ON (biblionumber = candidate_match_id)
1206                                     WHERE import_record_id = ?
1207                                     ORDER BY score DESC, biblionumber DESC");
1208     $sth->bind_param(1, $import_record_id);
1209     my $results = [];
1210     $sth->execute();
1211     while (my $row = $sth->fetchrow_hashref) {
1212         push @$results, $row;
1213         last if $best_only;
1214     }
1215     $sth->finish();
1216
1217     return $results;
1218     
1219 }
1220
1221
1222 =head2 SetImportRecordMatches
1223
1224 =over 4
1225
1226 SetImportRecordMatches($import_record_id, @matches);
1227
1228 =back
1229
1230 =cut
1231
1232 sub SetImportRecordMatches {
1233     my $import_record_id = shift;
1234     my @matches = @_;
1235
1236     my $dbh = C4::Context->dbh;
1237     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1238     $delsth->execute($import_record_id);
1239     $delsth->finish();
1240
1241     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1242                                     VALUES (?, ?, ?)");
1243     foreach my $match (@matches) {
1244         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1245     }
1246 }
1247
1248
1249 # internal functions
1250
1251 sub _create_import_record {
1252     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random) = @_;
1253
1254     my $dbh = C4::Context->dbh;
1255     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1256                                                          record_type, encoding, z3950random)
1257                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1258     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml(),
1259                   $record_type, $encoding, $z3950random);
1260     my $import_record_id = $dbh->{'mysql_insertid'};
1261     $sth->finish();
1262     return $import_record_id;
1263 }
1264
1265 sub _update_import_record_marc {
1266     my ($import_record_id, $marc_record) = @_;
1267
1268     my $dbh = C4::Context->dbh;
1269     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1270                              WHERE  import_record_id = ?");
1271     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml(), $import_record_id);
1272     $sth->finish();
1273 }
1274
1275 sub _add_biblio_fields {
1276     my ($import_record_id, $marc_record) = @_;
1277
1278     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1279     my $dbh = C4::Context->dbh;
1280     # FIXME no controlnumber, originalsource
1281     $isbn = C4::Koha::_isbn_cleanup($isbn); # FIXME C4::Koha::_isbn_cleanup should be made public
1282     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1283     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1284     $sth->finish();
1285                 
1286 }
1287
1288 sub _update_biblio_fields {
1289     my ($import_record_id, $marc_record) = @_;
1290
1291     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1292     my $dbh = C4::Context->dbh;
1293     # FIXME no controlnumber, originalsource
1294     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1295     $isbn =~ s/\(.*$//;
1296     $isbn =~ tr/ -_//;
1297     $isbn = uc $isbn;
1298     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1299                              WHERE  import_record_id = ?");
1300     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1301     $sth->finish();
1302 }
1303
1304 sub _parse_biblio_fields {
1305     my ($marc_record) = @_;
1306
1307     my $dbh = C4::Context->dbh;
1308     my $bibliofields = TransformMarcToKoha($dbh, $marc_record, '');
1309     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1310
1311 }
1312
1313 sub _update_batch_record_counts {
1314     my ($batch_id) = @_;
1315
1316     my $dbh = C4::Context->dbh;
1317     my $sth = $dbh->prepare_cached("UPDATE import_batches SET num_biblios = (
1318                                     SELECT COUNT(*)
1319                                     FROM import_records
1320                                     WHERE import_batch_id = import_batches.import_batch_id
1321                                     AND record_type = 'biblio')
1322                                     WHERE import_batch_id = ?");
1323     $sth->bind_param(1, $batch_id);
1324     $sth->execute();
1325     $sth->finish();
1326     $sth = $dbh->prepare_cached("UPDATE import_batches SET num_items = (
1327                                     SELECT COUNT(*)
1328                                     FROM import_records
1329                                     JOIN import_items USING (import_record_id)
1330                                     WHERE import_batch_id = import_batches.import_batch_id
1331                                     AND record_type = 'biblio')
1332                                     WHERE import_batch_id = ?");
1333     $sth->bind_param(1, $batch_id);
1334     $sth->execute();
1335     $sth->finish();
1336
1337 }
1338
1339 sub _get_commit_action {
1340     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id) = @_;
1341     
1342     my ($bib_result, $bib_match, $item_result);
1343
1344     if ($overlay_status ne 'no_match') {
1345         $bib_match = GetBestRecordMatch($import_record_id);
1346         if ($overlay_action eq 'replace') {
1347             $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1348         } elsif ($overlay_action eq 'create_new') {
1349             $bib_result  = 'create_new';
1350         } elsif ($overlay_action eq 'ignore') {
1351             $bib_result  = 'ignore';
1352         } 
1353         $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_matches') ? 'create_new' : 'ignore';
1354     } else {
1355         $bib_result = $nomatch_action;
1356         $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1357     }
1358
1359     return ($bib_result, $item_result, $bib_match);
1360 }
1361
1362 sub _get_revert_action {
1363     my ($overlay_action, $overlay_status, $status) = @_;
1364
1365     my $bib_result;
1366
1367     if ($status eq 'ignored') {
1368         $bib_result = 'ignore';
1369     } else {
1370         if ($overlay_action eq 'create_new') {
1371             $bib_result = 'delete';
1372         } else {
1373             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1374         }
1375     }
1376     return $bib_result;
1377 }
1378
1379 1;
1380 __END__
1381
1382 =head1 AUTHOR
1383
1384 Koha Development Team <info@koha.org>
1385
1386 Galen Charlton <galen.charlton@liblime.com>
1387
1388 =cut