Bug 30779: Remove _update_import_record_marc and update tests
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       AddImportBatch
50       GetImportBatch
51       AddAuthToBatch
52       AddBiblioToBatch
53       AddItemsToImportBiblio
54       ModAuthorityInBatch
55       ModBiblioInBatch
56
57       BatchStageMarcRecords
58       BatchFindDuplicates
59       BatchCommitRecords
60       BatchRevertRecords
61       CleanBatch
62       DeleteBatch
63
64       GetAllImportBatches
65       GetStagedWebserviceBatches
66       GetImportBatchRangeDesc
67       GetNumberOfNonZ3950ImportBatches
68       GetImportBiblios
69       GetImportRecordsRange
70       GetItemNumbersFromImportBatch
71
72       GetImportBatchStatus
73       SetImportBatchStatus
74       GetImportBatchOverlayAction
75       SetImportBatchOverlayAction
76       GetImportBatchNoMatchAction
77       SetImportBatchNoMatchAction
78       GetImportBatchItemAction
79       SetImportBatchItemAction
80       GetImportBatchMatcher
81       SetImportBatchMatcher
82       GetImportRecordOverlayStatus
83       SetImportRecordOverlayStatus
84       GetImportRecordStatus
85       SetImportRecordStatus
86       SetMatchedBiblionumber
87       GetImportRecordMatches
88       SetImportRecordMatches
89
90       RecordsFromMARCXMLFile
91       RecordsFromISO2709File
92       RecordsFromMarcPlugin
93     );
94 }
95
96 =head1 NAME
97
98 C4::ImportBatch - manage batches of imported MARC records
99
100 =head1 SYNOPSIS
101
102 use C4::ImportBatch;
103
104 =head1 FUNCTIONS
105
106 =head2 GetZ3950BatchId
107
108   my $batchid = GetZ3950BatchId($z3950server);
109
110 Retrieves the ID of the import batch for the Z39.50
111 reservoir for the given target.  If necessary,
112 creates the import batch.
113
114 =cut
115
116 sub GetZ3950BatchId {
117     my ($z3950server) = @_;
118
119     my $dbh = C4::Context->dbh;
120     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
121                              WHERE  batch_type = 'z3950'
122                              AND    file_name = ?");
123     $sth->execute($z3950server);
124     my $rowref = $sth->fetchrow_arrayref();
125     $sth->finish();
126     if (defined $rowref) {
127         return $rowref->[0];
128     } else {
129         my $batch_id = AddImportBatch( {
130                 overlay_action => 'create_new',
131                 import_status => 'staged',
132                 batch_type => 'z3950',
133                 file_name => $z3950server,
134             } );
135         return $batch_id;
136     }
137     
138 }
139
140 =head2 GetWebserviceBatchId
141
142   my $batchid = GetWebserviceBatchId();
143
144 Retrieves the ID of the import batch for webservice.
145 If necessary, creates the import batch.
146
147 =cut
148
149 my $WEBSERVICE_BASE_QRY = <<EOQ;
150 SELECT import_batch_id FROM import_batches
151 WHERE  batch_type = 'webservice'
152 AND    import_status = 'staged'
153 EOQ
154 sub GetWebserviceBatchId {
155     my ($params) = @_;
156
157     my $dbh = C4::Context->dbh;
158     my $sql = $WEBSERVICE_BASE_QRY;
159     my @args;
160     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
161         if (my $val = $params->{$field}) {
162             $sql .= " AND $field = ?";
163             push @args, $val;
164         }
165     }
166     my $id = $dbh->selectrow_array($sql, undef, @args);
167     return $id if $id;
168
169     $params->{batch_type} = 'webservice';
170     $params->{import_status} = 'staged';
171     return AddImportBatch($params);
172 }
173
174 =head2 GetImportRecordMarc
175
176   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
177
178 =cut
179
180 sub GetImportRecordMarc {
181     my ($import_record_id) = @_;
182
183     my $dbh = C4::Context->dbh;
184     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
185         SELECT marc, encoding
186         FROM import_records
187         WHERE import_record_id = ?
188     |, undef, $import_record_id );
189
190     return $marc, $encoding;
191 }
192
193 sub EmbedItemsInImportBiblio {
194     my ( $record, $import_record_id ) = @_;
195     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
196     my $dbh = C4::Context->dbh;
197     my $import_items = $dbh->selectall_arrayref(q|
198         SELECT import_items.marcxml
199         FROM import_items
200         WHERE import_record_id = ?
201     |, { Slice => {} }, $import_record_id );
202     my @item_fields;
203     for my $import_item ( @$import_items ) {
204         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
205         push @item_fields, $item_marc->field($itemtag);
206     }
207     $record->append_fields(@item_fields);
208     return $record;
209 }
210
211 =head2 AddImportBatch
212
213   my $batch_id = AddImportBatch($params_hash);
214
215 =cut
216
217 sub AddImportBatch {
218     my ($params) = @_;
219
220     my (@fields, @vals);
221     foreach (qw( matcher_id template_id branchcode
222                  overlay_action nomatch_action item_action
223                  import_status batch_type file_name comments record_type )) {
224         if (exists $params->{$_}) {
225             push @fields, $_;
226             push @vals, $params->{$_};
227         }
228     }
229     my $dbh = C4::Context->dbh;
230     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
231                                   VALUES (".join( ',', map '?', @fields).")",
232              undef,
233              @vals);
234     return $dbh->{'mysql_insertid'};
235 }
236
237 =head2 GetImportBatch 
238
239   my $row = GetImportBatch($batch_id);
240
241 Retrieve a hashref of an import_batches row.
242
243 =cut
244
245 sub GetImportBatch {
246     my ($batch_id) = @_;
247
248     my $dbh = C4::Context->dbh;
249     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
250     $sth->bind_param(1, $batch_id);
251     $sth->execute();
252     my $result = $sth->fetchrow_hashref;
253     $sth->finish();
254     return $result;
255
256 }
257
258 =head2 AddBiblioToBatch 
259
260   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
261                 $marc_record, $encoding, $update_counts);
262
263 =cut
264
265 sub AddBiblioToBatch {
266     my $batch_id = shift;
267     my $record_sequence = shift;
268     my $marc_record = shift;
269     my $encoding = shift;
270     my $update_counts = @_ ? shift : 1;
271
272     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
273     _add_biblio_fields($import_record_id, $marc_record);
274     _update_batch_record_counts($batch_id) if $update_counts;
275     return $import_record_id;
276 }
277
278 =head2 ModBiblioInBatch
279
280   ModBiblioInBatch($import_record_id, $marc_record);
281
282 =cut
283
284 sub ModBiblioInBatch {
285     my ($import_record_id, $marc_record) = @_;
286
287     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
288     _update_biblio_fields($import_record_id, $marc_record);
289
290 }
291
292 =head2 AddAuthToBatch
293
294   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
295                 $marc_record, $encoding, $update_counts, [$marc_type]);
296
297 =cut
298
299 sub AddAuthToBatch {
300     my $batch_id = shift;
301     my $record_sequence = shift;
302     my $marc_record = shift;
303     my $encoding = shift;
304     my $update_counts = @_ ? shift : 1;
305     my $marc_type = shift || C4::Context->preference('marcflavour');
306
307     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
308
309     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
310     _add_auth_fields($import_record_id, $marc_record);
311     _update_batch_record_counts($batch_id) if $update_counts;
312     return $import_record_id;
313 }
314
315 =head2 ModAuthInBatch
316
317   ModAuthInBatch($import_record_id, $marc_record);
318
319 =cut
320
321 sub ModAuthInBatch {
322     my ($import_record_id, $marc_record) = @_;
323
324     my $marcflavour = C4::Context->preference('marcflavour');
325     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
326
327 }
328
329 =head2 BatchStageMarcRecords
330
331 ( $batch_id, $num_records, $num_items, @invalid_records ) =
332   BatchStageMarcRecords(
333     $record_type,                $encoding,
334     $marc_records,               $file_name,
335     $marc_modification_template, $comments,
336     $branch_code,                $parse_items,
337     $leave_as_staging,           $progress_interval,
338     $progress_callback
339   );
340
341 =cut
342
343 sub BatchStageMarcRecords {
344     my $record_type = shift;
345     my $encoding = shift;
346     my $marc_records = shift;
347     my $file_name = shift;
348     my $marc_modification_template = shift;
349     my $comments = shift;
350     my $branch_code = shift;
351     my $parse_items = shift;
352     my $leave_as_staging = shift;
353
354     # optional callback to monitor status 
355     # of job
356     my $progress_interval = 0;
357     my $progress_callback = undef;
358     if ($#_ == 1) {
359         $progress_interval = shift;
360         $progress_callback = shift;
361         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
362         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
363     } 
364     
365     my $batch_id = AddImportBatch( {
366             overlay_action => 'create_new',
367             import_status => 'staging',
368             batch_type => 'batch',
369             file_name => $file_name,
370             comments => $comments,
371             record_type => $record_type,
372         } );
373     if ($parse_items) {
374         SetImportBatchItemAction($batch_id, 'always_add');
375     } else {
376         SetImportBatchItemAction($batch_id, 'ignore');
377     }
378
379
380     my $marc_type = C4::Context->preference('marcflavour');
381     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
382     my @invalid_records = ();
383     my $num_valid = 0;
384     my $num_items = 0;
385     # FIXME - for now, we're dealing only with bibs
386     my $rec_num = 0;
387     foreach my $marc_record (@$marc_records) {
388         $rec_num++;
389         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
390             &$progress_callback($rec_num);
391         }
392
393         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
394
395         my $import_record_id;
396         if (scalar($marc_record->fields()) == 0) {
397             push @invalid_records, $marc_record;
398         } else {
399
400             # Normalize the record so it doesn't have separated diacritics
401             SetUTF8Flag($marc_record);
402
403             $num_valid++;
404             if ($record_type eq 'biblio') {
405                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
406                 if ($parse_items) {
407                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
408                     $num_items += scalar(@import_items_ids);
409                 }
410             } elsif ($record_type eq 'auth') {
411                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
412             }
413         }
414     }
415     unless ($leave_as_staging) {
416         SetImportBatchStatus($batch_id, 'staged');
417     }
418     # FIXME branch_code, number of bibs, number of items
419     _update_batch_record_counts($batch_id);
420     return ($batch_id, $num_valid, $num_items, @invalid_records);
421 }
422
423 =head2 AddItemsToImportBiblio
424
425   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
426                 $import_record_id, $marc_record, $update_counts);
427
428 =cut
429
430 sub AddItemsToImportBiblio {
431     my $batch_id = shift;
432     my $import_record_id = shift;
433     my $marc_record = shift;
434     my $update_counts = @_ ? shift : 0;
435
436     my @import_items_ids = ();
437    
438     my $dbh = C4::Context->dbh; 
439     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
440     foreach my $item_field ($marc_record->field($item_tag)) {
441         my $item_marc = MARC::Record->new();
442         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
443         $item_marc->append_fields($item_field);
444         $marc_record->delete_field($item_field);
445         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
446                                         VALUES (?, ?, ?)");
447         $sth->bind_param(1, $import_record_id);
448         $sth->bind_param(2, 'staged');
449         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
450         $sth->execute();
451         push @import_items_ids, $dbh->{'mysql_insertid'};
452         $sth->finish();
453     }
454
455     if ($#import_items_ids > -1) {
456         _update_batch_record_counts($batch_id) if $update_counts;
457     }
458     return @import_items_ids;
459 }
460
461 =head2 BatchFindDuplicates
462
463   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
464              $max_matches, $progress_interval, $progress_callback);
465
466 Goes through the records loaded in the batch and attempts to 
467 find duplicates for each one.  Sets the matching status 
468 of each record to "no_match" or "auto_match" as appropriate.
469
470 The $max_matches parameter is optional; if it is not supplied,
471 it defaults to 10.
472
473 The $progress_interval and $progress_callback parameters are 
474 optional; if both are supplied, the sub referred to by
475 $progress_callback will be invoked every $progress_interval
476 records using the number of records processed as the 
477 singular argument.
478
479 =cut
480
481 sub BatchFindDuplicates {
482     my $batch_id = shift;
483     my $matcher = shift;
484     my $max_matches = @_ ? shift : 10;
485
486     # optional callback to monitor status 
487     # of job
488     my $progress_interval = 0;
489     my $progress_callback = undef;
490     if ($#_ == 1) {
491         $progress_interval = shift;
492         $progress_callback = shift;
493         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
494         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
495     }
496
497     my $dbh = C4::Context->dbh;
498
499     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
500                              FROM import_records
501                              WHERE import_batch_id = ?");
502     $sth->execute($batch_id);
503     my $num_with_matches = 0;
504     my $rec_num = 0;
505     while (my $rowref = $sth->fetchrow_hashref) {
506         $rec_num++;
507         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
508             &$progress_callback($rec_num);
509         }
510         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
511         my @matches = ();
512         if (defined $matcher) {
513             @matches = $matcher->get_matches($marc_record, $max_matches);
514         }
515         if (scalar(@matches) > 0) {
516             $num_with_matches++;
517             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
518             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
519         } else {
520             SetImportRecordMatches($rowref->{'import_record_id'}, ());
521             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
522         }
523     }
524     $sth->finish();
525     return $num_with_matches;
526 }
527
528 =head2 BatchCommitRecords
529
530   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
531         BatchCommitRecords($batch_id, $framework,
532         $progress_interval, $progress_callback);
533
534 =cut
535
536 sub BatchCommitRecords {
537     my $batch_id = shift;
538     my $framework = shift;
539
540     # optional callback to monitor status 
541     # of job
542     my $progress_interval = 0;
543     my $progress_callback = undef;
544     if ($#_ == 1) {
545         $progress_interval = shift;
546         $progress_callback = shift;
547         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
548         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
549     }
550
551     my $record_type;
552     my $num_added = 0;
553     my $num_updated = 0;
554     my $num_items_added = 0;
555     my $num_items_replaced = 0;
556     my $num_items_errored = 0;
557     my $num_ignored = 0;
558     # commit (i.e., save, all records in the batch)
559     SetImportBatchStatus($batch_id, 'importing');
560     my $overlay_action = GetImportBatchOverlayAction($batch_id);
561     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
562     my $item_action = GetImportBatchItemAction($batch_id);
563     my $item_tag;
564     my $item_subfield;
565     my $dbh = C4::Context->dbh;
566     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
567                              FROM import_records
568                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
569                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
570                              WHERE import_batch_id = ?");
571     $sth->execute($batch_id);
572     my $marcflavour = C4::Context->preference('marcflavour');
573
574     my $userenv = C4::Context->userenv;
575     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
576
577     my $rec_num = 0;
578     while (my $rowref = $sth->fetchrow_hashref) {
579         $record_type = $rowref->{'record_type'};
580         $rec_num++;
581         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
582             &$progress_callback($rec_num);
583         }
584         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
585             $num_ignored++;
586             next;
587         }
588
589         my $marc_type;
590         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
591             $marc_type = 'UNIMARCAUTH';
592         } elsif ($marcflavour eq 'UNIMARC') {
593             $marc_type = 'UNIMARC';
594         } else {
595             $marc_type = 'USMARC';
596         }
597         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
598
599         if ($record_type eq 'biblio') {
600             # remove any item tags - rely on BatchCommitItems
601             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
602             foreach my $item_field ($marc_record->field($item_tag)) {
603                 $marc_record->delete_field($item_field);
604             }
605         }
606
607         my ($record_result, $item_result, $record_match) =
608             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
609                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
610
611         my $recordid;
612         my $query;
613         if ($record_result eq 'create_new') {
614             $num_added++;
615             if ($record_type eq 'biblio') {
616                 my $biblioitemnumber;
617                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
618                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
619                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
620                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
621                     $num_items_added += $bib_items_added;
622                     $num_items_replaced += $bib_items_replaced;
623                     $num_items_errored += $bib_items_errored;
624                 }
625             } else {
626                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
627                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
628             }
629             my $sth = $dbh->prepare_cached($query);
630             $sth->execute($recordid, $rowref->{'import_record_id'});
631             $sth->finish();
632             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
633         } elsif ($record_result eq 'replace') {
634             $num_updated++;
635             $recordid = $record_match;
636             my $oldxml;
637             if ($record_type eq 'biblio') {
638                 my $oldbiblio = Koha::Biblios->find( $recordid );
639                 $oldxml = GetXmlBiblio($recordid);
640
641                 # remove item fields so that they don't get
642                 # added again if record is reverted
643                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
644                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
645                 foreach my $item_field ($old_marc->field($item_tag)) {
646                     $old_marc->delete_field($item_field);
647                 }
648                 $oldxml = $old_marc->as_xml($marc_type);
649
650                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
651                     overlay_context => {
652                         source => 'batchimport',
653                         categorycode => $logged_in_patron->categorycode,
654                         userid => $logged_in_patron->userid
655                     },
656                 });
657                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
658
659                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
660                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
661                     $num_items_added += $bib_items_added;
662                     $num_items_replaced += $bib_items_replaced;
663                     $num_items_errored += $bib_items_errored;
664                 }
665             } else {
666                 $oldxml = GetAuthorityXML($recordid);
667
668                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
669                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
670             }
671             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
672             $sth->execute($oldxml, $rowref->{'import_record_id'});
673             $sth->finish();
674             my $sth2 = $dbh->prepare_cached($query);
675             $sth2->execute($recordid, $rowref->{'import_record_id'});
676             $sth2->finish();
677             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
678             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
679         } elsif ($record_result eq 'ignore') {
680             $recordid = $record_match;
681             $num_ignored++;
682             $recordid = $record_match;
683             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
684                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
685                 $num_items_added += $bib_items_added;
686          $num_items_replaced += $bib_items_replaced;
687                 $num_items_errored += $bib_items_errored;
688                 # still need to record the matched biblionumber so that the
689                 # items can be reverted
690                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
691                 $sth2->execute($recordid, $rowref->{'import_record_id'});
692                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
693             }
694             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
695         }
696     }
697     $sth->finish();
698     SetImportBatchStatus($batch_id, 'imported');
699     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
700 }
701
702 =head2 BatchCommitItems
703
704   ($num_items_added, $num_items_errored) = 
705          BatchCommitItems($import_record_id, $biblionumber);
706
707 =cut
708
709 sub BatchCommitItems {
710     my ( $import_record_id, $biblionumber, $action ) = @_;
711
712     my $dbh = C4::Context->dbh;
713
714     my $num_items_added = 0;
715     my $num_items_errored = 0;
716     my $num_items_replaced = 0;
717
718     my $sth = $dbh->prepare( "
719         SELECT import_items_id, import_items.marcxml, encoding
720         FROM import_items
721         JOIN import_records USING (import_record_id)
722         WHERE import_record_id = ?
723         ORDER BY import_items_id
724     " );
725     $sth->bind_param( 1, $import_record_id );
726     $sth->execute();
727
728     while ( my $row = $sth->fetchrow_hashref() ) {
729         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
730
731         # Delete date_due subfield as to not accidentally delete item checkout due dates
732         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
733         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
734
735         my $item = TransformMarcToKoha( $item_marc );
736
737         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
738         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
739
740         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
741         if ( $action eq "replace" && $duplicate_itemnumber ) {
742             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
743             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
744             $updsth->bind_param( 1, 'imported' );
745             $updsth->bind_param( 2, $item->{itemnumber} );
746             $updsth->bind_param( 3, undef );
747             $updsth->bind_param( 4, $row->{'import_items_id'} );
748             $updsth->execute();
749             $updsth->finish();
750             $num_items_replaced++;
751         } elsif ( $action eq "replace" && $duplicate_barcode ) {
752             my $itemnumber = $duplicate_barcode->itemnumber;
753             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
754             $updsth->bind_param( 1, 'imported' );
755             $updsth->bind_param( 2, $item->{itemnumber} );
756             $updsth->bind_param( 3, undef );
757             $updsth->bind_param( 4, $row->{'import_items_id'} );
758             $updsth->execute();
759             $updsth->finish();
760             $num_items_replaced++;
761         } elsif ($duplicate_barcode) {
762             $updsth->bind_param( 1, 'error' );
763             $updsth->bind_param( 2, undef );
764             $updsth->bind_param( 3, 'duplicate item barcode' );
765             $updsth->bind_param( 4, $row->{'import_items_id'} );
766             $updsth->execute();
767             $num_items_errored++;
768         } else {
769             # Remove the itemnumber if it exists, we want to create a new item
770             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
771             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
772
773             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
774             if( $itemnumber ) {
775                 $updsth->bind_param( 1, 'imported' );
776                 $updsth->bind_param( 2, $itemnumber );
777                 $updsth->bind_param( 3, undef );
778                 $updsth->bind_param( 4, $row->{'import_items_id'} );
779                 $updsth->execute();
780                 $updsth->finish();
781                 $num_items_added++;
782             }
783         }
784     }
785
786     return ( $num_items_added, $num_items_replaced, $num_items_errored );
787 }
788
789 =head2 BatchRevertRecords
790
791   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
792       $num_ignored) = BatchRevertRecords($batch_id);
793
794 =cut
795
796 sub BatchRevertRecords {
797     my $batch_id = shift;
798
799     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
800
801     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
802
803     my $record_type;
804     my $num_deleted = 0;
805     my $num_errors = 0;
806     my $num_reverted = 0;
807     my $num_ignored = 0;
808     my $num_items_deleted = 0;
809     # commit (i.e., save, all records in the batch)
810     SetImportBatchStatus($batch_id, 'reverting');
811     my $overlay_action = GetImportBatchOverlayAction($batch_id);
812     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
813     my $dbh = C4::Context->dbh;
814     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
815                              FROM import_records
816                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
817                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
818                              WHERE import_batch_id = ?");
819     $sth->execute($batch_id);
820     my $marc_type;
821     my $marcflavour = C4::Context->preference('marcflavour');
822     while (my $rowref = $sth->fetchrow_hashref) {
823         $record_type = $rowref->{'record_type'};
824         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
825             $num_ignored++;
826             next;
827         }
828         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
829             $marc_type = 'UNIMARCAUTH';
830         } elsif ($marcflavour eq 'UNIMARC') {
831             $marc_type = 'UNIMARC';
832         } else {
833             $marc_type = 'USMARC';
834         }
835
836         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
837
838         if ($record_result eq 'delete') {
839             my $error = undef;
840             if  ($record_type eq 'biblio') {
841                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
842                 $error = DelBiblio($rowref->{'matched_biblionumber'});
843             } else {
844                 DelAuthority({ authid => $rowref->{'matched_authid'} });
845             }
846             if (defined $error) {
847                 $num_errors++;
848             } else {
849                 $num_deleted++;
850                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
851             }
852         } elsif ($record_result eq 'restore') {
853             $num_reverted++;
854             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
855             if ($record_type eq 'biblio') {
856                 my $biblionumber = $rowref->{'matched_biblionumber'};
857                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
858
859                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
860                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
861
862                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
863                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
864             } else {
865                 my $authid = $rowref->{'matched_authid'};
866                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
867             }
868             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
869         } elsif ($record_result eq 'ignore') {
870             if ($record_type eq 'biblio') {
871                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
872             }
873             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
874         }
875         my $query;
876         if ($record_type eq 'biblio') {
877             # remove matched_biblionumber only if there is no 'imported' item left
878             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
879             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
880         } else {
881             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
882         }
883         my $sth2 = $dbh->prepare_cached($query);
884         $sth2->execute($rowref->{'import_record_id'});
885     }
886
887     $sth->finish();
888     SetImportBatchStatus($batch_id, 'reverted');
889     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
890 }
891
892 =head2 BatchRevertItems
893
894   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
895
896 =cut
897
898 sub BatchRevertItems {
899     my ($import_record_id, $biblionumber) = @_;
900
901     my $dbh = C4::Context->dbh;
902     my $num_items_deleted = 0;
903
904     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
905                                    FROM import_items
906                                    JOIN items USING (itemnumber)
907                                    WHERE import_record_id = ?");
908     $sth->bind_param(1, $import_record_id);
909     $sth->execute();
910     while (my $row = $sth->fetchrow_hashref()) {
911         my $item = Koha::Items->find($row->{itemnumber});
912         my $error = $item->safe_delete;
913         if ($error eq '1'){
914             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
915             $updsth->bind_param(1, 'reverted');
916             $updsth->bind_param(2, $row->{'import_items_id'});
917             $updsth->execute();
918             $updsth->finish();
919             $num_items_deleted++;
920         }
921         else {
922             next;
923         }
924     }
925     $sth->finish();
926     return $num_items_deleted;
927 }
928
929 =head2 CleanBatch
930
931   CleanBatch($batch_id)
932
933 Deletes all staged records from the import batch
934 and sets the status of the batch to 'cleaned'.  Note
935 that deleting a stage record does *not* affect
936 any record that has been committed to the database.
937
938 =cut
939
940 sub CleanBatch {
941     my $batch_id = shift;
942     return unless defined $batch_id;
943
944     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
945     SetImportBatchStatus($batch_id, 'cleaned');
946 }
947
948 =head2 DeleteBatch
949
950   DeleteBatch($batch_id)
951
952 Deletes the record from the database. This can only be done
953 once the batch has been cleaned.
954
955 =cut
956
957 sub DeleteBatch {
958     my $batch_id = shift;
959     return unless defined $batch_id;
960
961     my $dbh = C4::Context->dbh;
962     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
963     $sth->execute( $batch_id );
964 }
965
966 =head2 GetAllImportBatches
967
968   my $results = GetAllImportBatches();
969
970 Returns a references to an array of hash references corresponding
971 to all import_batches rows (of batch_type 'batch'), sorted in 
972 ascending order by import_batch_id.
973
974 =cut
975
976 sub  GetAllImportBatches {
977     my $dbh = C4::Context->dbh;
978     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
979                                     WHERE batch_type IN ('batch', 'webservice')
980                                     ORDER BY import_batch_id ASC");
981
982     my $results = [];
983     $sth->execute();
984     while (my $row = $sth->fetchrow_hashref) {
985         push @$results, $row;
986     }
987     $sth->finish();
988     return $results;
989 }
990
991 =head2 GetStagedWebserviceBatches
992
993   my $batch_ids = GetStagedWebserviceBatches();
994
995 Returns a references to an array of batch id's
996 of batch_type 'webservice' that are not imported
997
998 =cut
999
1000 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1001 SELECT import_batch_id FROM import_batches
1002 WHERE batch_type = 'webservice'
1003 AND import_status = 'staged'
1004 EOQ
1005 sub  GetStagedWebserviceBatches {
1006     my $dbh = C4::Context->dbh;
1007     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1008 }
1009
1010 =head2 GetImportBatchRangeDesc
1011
1012   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1013
1014 Returns a reference to an array of hash references corresponding to
1015 import_batches rows (sorted in descending order by import_batch_id)
1016 start at the given offset.
1017
1018 =cut
1019
1020 sub GetImportBatchRangeDesc {
1021     my ($offset, $results_per_group) = @_;
1022
1023     my $dbh = C4::Context->dbh;
1024     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1025                                     LEFT JOIN import_batch_profiles p
1026                                     ON b.profile_id = p.id
1027                                     WHERE b.batch_type IN ('batch', 'webservice')
1028                                     ORDER BY b.import_batch_id DESC";
1029     my @params;
1030     if ($results_per_group){
1031         $query .= " LIMIT ?";
1032         push(@params, $results_per_group);
1033     }
1034     if ($offset){
1035         $query .= " OFFSET ?";
1036         push(@params, $offset);
1037     }
1038     my $sth = $dbh->prepare_cached($query);
1039     $sth->execute(@params);
1040     my $results = $sth->fetchall_arrayref({});
1041     $sth->finish();
1042     return $results;
1043 }
1044
1045 =head2 GetItemNumbersFromImportBatch
1046
1047   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1048
1049 =cut
1050
1051 sub GetItemNumbersFromImportBatch {
1052     my ($batch_id) = @_;
1053     my $dbh = C4::Context->dbh;
1054     my $sql = q|
1055 SELECT itemnumber FROM import_items
1056 INNER JOIN items USING (itemnumber)
1057 INNER JOIN import_records USING (import_record_id)
1058 WHERE import_batch_id = ?|;
1059     my  $sth = $dbh->prepare( $sql );
1060     $sth->execute($batch_id);
1061     my @items ;
1062     while ( my ($itm) = $sth->fetchrow_array ) {
1063         push @items, $itm;
1064     }
1065     return @items;
1066 }
1067
1068 =head2 GetNumberOfImportBatches
1069
1070   my $count = GetNumberOfImportBatches();
1071
1072 =cut
1073
1074 sub GetNumberOfNonZ3950ImportBatches {
1075     my $dbh = C4::Context->dbh;
1076     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1077     $sth->execute();
1078     my ($count) = $sth->fetchrow_array();
1079     $sth->finish();
1080     return $count;
1081 }
1082
1083 =head2 GetImportBiblios
1084
1085   my $results = GetImportBiblios($importid);
1086
1087 =cut
1088
1089 sub GetImportBiblios {
1090     my ($import_record_id) = @_;
1091
1092     my $dbh = C4::Context->dbh;
1093     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1094     return $dbh->selectall_arrayref(
1095         $query,
1096         { Slice => {} },
1097         $import_record_id
1098     );
1099
1100 }
1101
1102 =head2 GetImportRecordsRange
1103
1104   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1105
1106 Returns a reference to an array of hash references corresponding to
1107 import_biblios/import_auths/import_records rows for a given batch
1108 starting at the given offset.
1109
1110 =cut
1111
1112 sub GetImportRecordsRange {
1113     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1114
1115     my $dbh = C4::Context->dbh;
1116
1117     my $order_by = $parameters->{order_by} || 'import_record_id';
1118     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1119
1120     my $order_by_direction =
1121       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1122
1123     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1124
1125     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1126                                            record_sequence, status, overlay_status,
1127                                            matched_biblionumber, matched_authid, record_type
1128                                     FROM   import_records
1129                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1130                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1131                                     WHERE  import_batch_id = ?";
1132     my @params;
1133     push(@params, $batch_id);
1134     if ($status) {
1135         $query .= " AND status=?";
1136         push(@params,$status);
1137     }
1138
1139     $query.=" ORDER BY $order_by $order_by_direction";
1140
1141     if($results_per_group){
1142         $query .= " LIMIT ?";
1143         push(@params, $results_per_group);
1144     }
1145     if($offset){
1146         $query .= " OFFSET ?";
1147         push(@params, $offset);
1148     }
1149     my $sth = $dbh->prepare_cached($query);
1150     $sth->execute(@params);
1151     my $results = $sth->fetchall_arrayref({});
1152     $sth->finish();
1153     return $results;
1154
1155 }
1156
1157 =head2 GetBestRecordMatch
1158
1159   my $record_id = GetBestRecordMatch($import_record_id);
1160
1161 =cut
1162
1163 sub GetBestRecordMatch {
1164     my ($import_record_id) = @_;
1165
1166     my $dbh = C4::Context->dbh;
1167     my $sth = $dbh->prepare("SELECT candidate_match_id
1168                              FROM   import_record_matches
1169                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1170                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1171                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1172                              WHERE  import_record_matches.import_record_id = ? AND
1173                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1174                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1175                              ORDER BY score DESC, candidate_match_id DESC");
1176     $sth->execute($import_record_id);
1177     my ($record_id) = $sth->fetchrow_array();
1178     $sth->finish();
1179     return $record_id;
1180 }
1181
1182 =head2 GetImportBatchStatus
1183
1184   my $status = GetImportBatchStatus($batch_id);
1185
1186 =cut
1187
1188 sub GetImportBatchStatus {
1189     my ($batch_id) = @_;
1190
1191     my $dbh = C4::Context->dbh;
1192     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1193     $sth->execute($batch_id);
1194     my ($status) = $sth->fetchrow_array();
1195     $sth->finish();
1196     return $status;
1197
1198 }
1199
1200 =head2 SetImportBatchStatus
1201
1202   SetImportBatchStatus($batch_id, $new_status);
1203
1204 =cut
1205
1206 sub SetImportBatchStatus {
1207     my ($batch_id, $new_status) = @_;
1208
1209     my $dbh = C4::Context->dbh;
1210     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1211     $sth->execute($new_status, $batch_id);
1212     $sth->finish();
1213
1214 }
1215
1216 =head2 SetMatchedBiblionumber
1217
1218   SetMatchedBiblionumber($import_record_id, $biblionumber);
1219
1220 =cut
1221
1222 sub SetMatchedBiblionumber {
1223     my ($import_record_id, $biblionumber) = @_;
1224
1225     my $dbh = C4::Context->dbh;
1226     $dbh->do(
1227         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1228         undef, $biblionumber, $import_record_id
1229     );
1230 }
1231
1232 =head2 GetImportBatchOverlayAction
1233
1234   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1235
1236 =cut
1237
1238 sub GetImportBatchOverlayAction {
1239     my ($batch_id) = @_;
1240
1241     my $dbh = C4::Context->dbh;
1242     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1243     $sth->execute($batch_id);
1244     my ($overlay_action) = $sth->fetchrow_array();
1245     $sth->finish();
1246     return $overlay_action;
1247
1248 }
1249
1250
1251 =head2 SetImportBatchOverlayAction
1252
1253   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1254
1255 =cut
1256
1257 sub SetImportBatchOverlayAction {
1258     my ($batch_id, $new_overlay_action) = @_;
1259
1260     my $dbh = C4::Context->dbh;
1261     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1262     $sth->execute($new_overlay_action, $batch_id);
1263     $sth->finish();
1264
1265 }
1266
1267 =head2 GetImportBatchNoMatchAction
1268
1269   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1270
1271 =cut
1272
1273 sub GetImportBatchNoMatchAction {
1274     my ($batch_id) = @_;
1275
1276     my $dbh = C4::Context->dbh;
1277     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1278     $sth->execute($batch_id);
1279     my ($nomatch_action) = $sth->fetchrow_array();
1280     $sth->finish();
1281     return $nomatch_action;
1282
1283 }
1284
1285
1286 =head2 SetImportBatchNoMatchAction
1287
1288   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1289
1290 =cut
1291
1292 sub SetImportBatchNoMatchAction {
1293     my ($batch_id, $new_nomatch_action) = @_;
1294
1295     my $dbh = C4::Context->dbh;
1296     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1297     $sth->execute($new_nomatch_action, $batch_id);
1298     $sth->finish();
1299
1300 }
1301
1302 =head2 GetImportBatchItemAction
1303
1304   my $item_action = GetImportBatchItemAction($batch_id);
1305
1306 =cut
1307
1308 sub GetImportBatchItemAction {
1309     my ($batch_id) = @_;
1310
1311     my $dbh = C4::Context->dbh;
1312     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1313     $sth->execute($batch_id);
1314     my ($item_action) = $sth->fetchrow_array();
1315     $sth->finish();
1316     return $item_action;
1317
1318 }
1319
1320
1321 =head2 SetImportBatchItemAction
1322
1323   SetImportBatchItemAction($batch_id, $new_item_action);
1324
1325 =cut
1326
1327 sub SetImportBatchItemAction {
1328     my ($batch_id, $new_item_action) = @_;
1329
1330     my $dbh = C4::Context->dbh;
1331     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1332     $sth->execute($new_item_action, $batch_id);
1333     $sth->finish();
1334
1335 }
1336
1337 =head2 GetImportBatchMatcher
1338
1339   my $matcher_id = GetImportBatchMatcher($batch_id);
1340
1341 =cut
1342
1343 sub GetImportBatchMatcher {
1344     my ($batch_id) = @_;
1345
1346     my $dbh = C4::Context->dbh;
1347     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1348     $sth->execute($batch_id);
1349     my ($matcher_id) = $sth->fetchrow_array();
1350     $sth->finish();
1351     return $matcher_id;
1352
1353 }
1354
1355
1356 =head2 SetImportBatchMatcher
1357
1358   SetImportBatchMatcher($batch_id, $new_matcher_id);
1359
1360 =cut
1361
1362 sub SetImportBatchMatcher {
1363     my ($batch_id, $new_matcher_id) = @_;
1364
1365     my $dbh = C4::Context->dbh;
1366     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1367     $sth->execute($new_matcher_id, $batch_id);
1368     $sth->finish();
1369
1370 }
1371
1372 =head2 GetImportRecordOverlayStatus
1373
1374   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1375
1376 =cut
1377
1378 sub GetImportRecordOverlayStatus {
1379     my ($import_record_id) = @_;
1380
1381     my $dbh = C4::Context->dbh;
1382     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1383     $sth->execute($import_record_id);
1384     my ($overlay_status) = $sth->fetchrow_array();
1385     $sth->finish();
1386     return $overlay_status;
1387
1388 }
1389
1390
1391 =head2 SetImportRecordOverlayStatus
1392
1393   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1394
1395 =cut
1396
1397 sub SetImportRecordOverlayStatus {
1398     my ($import_record_id, $new_overlay_status) = @_;
1399
1400     my $dbh = C4::Context->dbh;
1401     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1402     $sth->execute($new_overlay_status, $import_record_id);
1403     $sth->finish();
1404
1405 }
1406
1407 =head2 GetImportRecordStatus
1408
1409   my $status = GetImportRecordStatus($import_record_id);
1410
1411 =cut
1412
1413 sub GetImportRecordStatus {
1414     my ($import_record_id) = @_;
1415
1416     my $dbh = C4::Context->dbh;
1417     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1418     $sth->execute($import_record_id);
1419     my ($status) = $sth->fetchrow_array();
1420     $sth->finish();
1421     return $status;
1422
1423 }
1424
1425
1426 =head2 SetImportRecordStatus
1427
1428   SetImportRecordStatus($import_record_id, $new_status);
1429
1430 =cut
1431
1432 sub SetImportRecordStatus {
1433     my ($import_record_id, $new_status) = @_;
1434
1435     my $dbh = C4::Context->dbh;
1436     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1437     $sth->execute($new_status, $import_record_id);
1438     $sth->finish();
1439
1440 }
1441
1442 =head2 GetImportRecordMatches
1443
1444   my $results = GetImportRecordMatches($import_record_id, $best_only);
1445
1446 =cut
1447
1448 sub GetImportRecordMatches {
1449     my $import_record_id = shift;
1450     my $best_only = @_ ? shift : 0;
1451
1452     my $dbh = C4::Context->dbh;
1453     # FIXME currently biblio only
1454     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1455                                     candidate_match_id, score, record_type
1456                                     FROM import_records
1457                                     JOIN import_record_matches USING (import_record_id)
1458                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1459                                     WHERE import_record_id = ?
1460                                     ORDER BY score DESC, biblionumber DESC");
1461     $sth->bind_param(1, $import_record_id);
1462     my $results = [];
1463     $sth->execute();
1464     while (my $row = $sth->fetchrow_hashref) {
1465         if ($row->{'record_type'} eq 'auth') {
1466             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1467         }
1468         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1469         push @$results, $row;
1470         last if $best_only;
1471     }
1472     $sth->finish();
1473
1474     return $results;
1475     
1476 }
1477
1478 =head2 SetImportRecordMatches
1479
1480   SetImportRecordMatches($import_record_id, @matches);
1481
1482 =cut
1483
1484 sub SetImportRecordMatches {
1485     my $import_record_id = shift;
1486     my @matches = @_;
1487
1488     my $dbh = C4::Context->dbh;
1489     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1490     $delsth->execute($import_record_id);
1491     $delsth->finish();
1492
1493     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1494                                     VALUES (?, ?, ?)");
1495     foreach my $match (@matches) {
1496         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1497     }
1498 }
1499
1500 =head2 RecordsFromISO2709File
1501
1502     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1503
1504 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1505
1506 @PARAM1, String, absolute path to the ISO2709 file.
1507 @PARAM2, String, see stage_file.pl
1508 @PARAM3, String, should be utf8
1509
1510 Returns two array refs.
1511
1512 =cut
1513
1514 sub RecordsFromISO2709File {
1515     my ($input_file, $record_type, $encoding) = @_;
1516     my @errors;
1517
1518     my $marc_type = C4::Context->preference('marcflavour');
1519     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1520
1521     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1522     my @marc_records;
1523     $/ = "\035";
1524     while (<$fh>) {
1525         s/^\s+//;
1526         s/\s+$//;
1527         next unless $_; # skip if record has only whitespace, as might occur
1528                         # if file includes newlines between each MARC record
1529         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1530         push @marc_records, $marc_record;
1531         if ($charset_guessed ne $encoding) {
1532             push @errors,
1533                 "Unexpected charset $charset_guessed, expecting $encoding";
1534         }
1535     }
1536     close $fh;
1537     return ( \@errors, \@marc_records );
1538 }
1539
1540 =head2 RecordsFromMARCXMLFile
1541
1542     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1543
1544 Creates MARC::Record-objects out of the given MARCXML-file.
1545
1546 @PARAM1, String, absolute path to the ISO2709 file.
1547 @PARAM2, String, should be utf8
1548
1549 Returns two array refs.
1550
1551 =cut
1552
1553 sub RecordsFromMARCXMLFile {
1554     my ( $filename, $encoding ) = @_;
1555     my $batch = MARC::File::XML->in( $filename );
1556     my ( @marcRecords, @errors, $record );
1557     do {
1558         eval { $record = $batch->next( $encoding ); };
1559         if ($@) {
1560             push @errors, $@;
1561         }
1562         push @marcRecords, $record if $record;
1563     } while( $record );
1564     return (\@errors, \@marcRecords);
1565 }
1566
1567 =head2 RecordsFromMarcPlugin
1568
1569     Converts text of input_file into array of MARC records with to_marc plugin
1570
1571 =cut
1572
1573 sub RecordsFromMarcPlugin {
1574     my ($input_file, $plugin_class, $encoding) = @_;
1575     my ( $text, @return );
1576     return \@return if !$input_file || !$plugin_class;
1577
1578     # Read input file
1579     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1580     $/ = "\035";
1581     while (<$fh>) {
1582         s/^\s+//;
1583         s/\s+$//;
1584         next unless $_;
1585         $text .= $_;
1586     }
1587     close $fh;
1588
1589     # Convert to large MARC blob with plugin
1590     $text = Koha::Plugins::Handler->run({
1591         class  => $plugin_class,
1592         method => 'to_marc',
1593         params => { data => $text },
1594     }) if $text;
1595
1596     # Convert to array of MARC records
1597     if( $text ) {
1598         my $marc_type = C4::Context->preference('marcflavour');
1599         foreach my $blob ( split(/\x1D/, $text) ) {
1600             next if $blob =~ /^\s*$/;
1601             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1602             push @return, $marcrecord;
1603         }
1604     }
1605     return \@return;
1606 }
1607
1608 # internal functions
1609
1610 sub _create_import_record {
1611     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1612
1613     my $dbh = C4::Context->dbh;
1614     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1615                                                          record_type, encoding)
1616                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1617     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1618                   $record_type, $encoding);
1619     my $import_record_id = $dbh->{'mysql_insertid'};
1620     $sth->finish();
1621     return $import_record_id;
1622 }
1623
1624 sub _add_auth_fields {
1625     my ($import_record_id, $marc_record) = @_;
1626
1627     my $controlnumber;
1628     if ($marc_record->field('001')) {
1629         $controlnumber = $marc_record->field('001')->data();
1630     }
1631     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1632     my $dbh = C4::Context->dbh;
1633     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1634     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1635     $sth->finish();
1636 }
1637
1638 sub _add_biblio_fields {
1639     my ($import_record_id, $marc_record) = @_;
1640
1641     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1642     my $dbh = C4::Context->dbh;
1643     # FIXME no controlnumber, originalsource
1644     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1645     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1646     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1647     $sth->finish();
1648                 
1649 }
1650
1651 sub _update_biblio_fields {
1652     my ($import_record_id, $marc_record) = @_;
1653
1654     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1655     my $dbh = C4::Context->dbh;
1656     # FIXME no controlnumber, originalsource
1657     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1658     $isbn =~ s/\(.*$//;
1659     $isbn =~ tr/ -_//;
1660     $isbn = uc $isbn;
1661     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1662                              WHERE  import_record_id = ?");
1663     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1664     $sth->finish();
1665 }
1666
1667 sub _parse_biblio_fields {
1668     my ($marc_record) = @_;
1669
1670     my $dbh = C4::Context->dbh;
1671     my $bibliofields = TransformMarcToKoha($marc_record, '');
1672     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1673
1674 }
1675
1676 sub _update_batch_record_counts {
1677     my ($batch_id) = @_;
1678
1679     my $dbh = C4::Context->dbh;
1680     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1681                                         num_records = (
1682                                             SELECT COUNT(*)
1683                                             FROM import_records
1684                                             WHERE import_batch_id = import_batches.import_batch_id),
1685                                         num_items = (
1686                                             SELECT COUNT(*)
1687                                             FROM import_records
1688                                             JOIN import_items USING (import_record_id)
1689                                             WHERE import_batch_id = import_batches.import_batch_id
1690                                             AND record_type = 'biblio')
1691                                     WHERE import_batch_id = ?");
1692     $sth->bind_param(1, $batch_id);
1693     $sth->execute();
1694     $sth->finish();
1695 }
1696
1697 sub _get_commit_action {
1698     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1699     
1700     if ($record_type eq 'biblio') {
1701         my ($bib_result, $bib_match, $item_result);
1702
1703         if ($overlay_status ne 'no_match') {
1704             $bib_match = GetBestRecordMatch($import_record_id);
1705             if ($overlay_action eq 'replace') {
1706                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1707             } elsif ($overlay_action eq 'create_new') {
1708                 $bib_result  = 'create_new';
1709             } elsif ($overlay_action eq 'ignore') {
1710                 $bib_result  = 'ignore';
1711             }
1712          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1713                 $item_result = 'create_new';
1714        }
1715       elsif($item_action eq 'replace'){
1716           $item_result = 'replace';
1717           }
1718       else {
1719              $item_result = 'ignore';
1720            }
1721         } else {
1722             $bib_result = $nomatch_action;
1723             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1724         }
1725         return ($bib_result, $item_result, $bib_match);
1726     } else { # must be auths
1727         my ($auth_result, $auth_match);
1728
1729         if ($overlay_status ne 'no_match') {
1730             $auth_match = GetBestRecordMatch($import_record_id);
1731             if ($overlay_action eq 'replace') {
1732                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1733             } elsif ($overlay_action eq 'create_new') {
1734                 $auth_result  = 'create_new';
1735             } elsif ($overlay_action eq 'ignore') {
1736                 $auth_result  = 'ignore';
1737             }
1738         } else {
1739             $auth_result = $nomatch_action;
1740         }
1741
1742         return ($auth_result, undef, $auth_match);
1743
1744     }
1745 }
1746
1747 sub _get_revert_action {
1748     my ($overlay_action, $overlay_status, $status) = @_;
1749
1750     my $bib_result;
1751
1752     if ($status eq 'ignored') {
1753         $bib_result = 'ignore';
1754     } else {
1755         if ($overlay_action eq 'create_new') {
1756             $bib_result = 'delete';
1757         } else {
1758             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1759         }
1760     }
1761     return $bib_result;
1762 }
1763
1764 1;
1765 __END__
1766
1767 =head1 AUTHOR
1768
1769 Koha Development Team <http://koha-community.org/>
1770
1771 Galen Charlton <galen.charlton@liblime.com>
1772
1773 =cut