Bug 28152: Log the "duplicate item barcode" error
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57       ModBiblioInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub GetRecordFromImportBiblio {
196     my ( $import_record_id, $embed_items ) = @_;
197
198     my ($marc) = GetImportRecordMarc($import_record_id);
199     my $record = MARC::Record->new_from_usmarc($marc);
200
201     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
202
203     return $record;
204 }
205
206 sub EmbedItemsInImportBiblio {
207     my ( $record, $import_record_id ) = @_;
208     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
209     my $dbh = C4::Context->dbh;
210     my $import_items = $dbh->selectall_arrayref(q|
211         SELECT import_items.marcxml
212         FROM import_items
213         WHERE import_record_id = ?
214     |, { Slice => {} }, $import_record_id );
215     my @item_fields;
216     for my $import_item ( @$import_items ) {
217         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
218         push @item_fields, $item_marc->field($itemtag);
219     }
220     $record->append_fields(@item_fields);
221     return $record;
222 }
223
224 =head2 GetImportRecordMarcXML
225
226   my $marcxml = GetImportRecordMarcXML($import_record_id);
227
228 =cut
229
230 sub GetImportRecordMarcXML {
231     my ($import_record_id) = @_;
232
233     my $dbh = C4::Context->dbh;
234     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
235     $sth->execute($import_record_id);
236     my ($marcxml) = $sth->fetchrow();
237     $sth->finish();
238     return $marcxml;
239
240 }
241
242 =head2 AddImportBatch
243
244   my $batch_id = AddImportBatch($params_hash);
245
246 =cut
247
248 sub AddImportBatch {
249     my ($params) = @_;
250
251     my (@fields, @vals);
252     foreach (qw( matcher_id template_id branchcode
253                  overlay_action nomatch_action item_action
254                  import_status batch_type file_name comments record_type )) {
255         if (exists $params->{$_}) {
256             push @fields, $_;
257             push @vals, $params->{$_};
258         }
259     }
260     my $dbh = C4::Context->dbh;
261     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
262                                   VALUES (".join( ',', map '?', @fields).")",
263              undef,
264              @vals);
265     return $dbh->{'mysql_insertid'};
266 }
267
268 =head2 GetImportBatch 
269
270   my $row = GetImportBatch($batch_id);
271
272 Retrieve a hashref of an import_batches row.
273
274 =cut
275
276 sub GetImportBatch {
277     my ($batch_id) = @_;
278
279     my $dbh = C4::Context->dbh;
280     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
281     $sth->bind_param(1, $batch_id);
282     $sth->execute();
283     my $result = $sth->fetchrow_hashref;
284     $sth->finish();
285     return $result;
286
287 }
288
289 =head2 AddBiblioToBatch 
290
291   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
292                 $marc_record, $encoding, $update_counts);
293
294 =cut
295
296 sub AddBiblioToBatch {
297     my $batch_id = shift;
298     my $record_sequence = shift;
299     my $marc_record = shift;
300     my $encoding = shift;
301     my $update_counts = @_ ? shift : 1;
302
303     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
304     _add_biblio_fields($import_record_id, $marc_record);
305     _update_batch_record_counts($batch_id) if $update_counts;
306     return $import_record_id;
307 }
308
309 =head2 ModBiblioInBatch
310
311   ModBiblioInBatch($import_record_id, $marc_record);
312
313 =cut
314
315 sub ModBiblioInBatch {
316     my ($import_record_id, $marc_record) = @_;
317
318     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
319     _update_biblio_fields($import_record_id, $marc_record);
320
321 }
322
323 =head2 AddAuthToBatch
324
325   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
326                 $marc_record, $encoding, $update_counts, [$marc_type]);
327
328 =cut
329
330 sub AddAuthToBatch {
331     my $batch_id = shift;
332     my $record_sequence = shift;
333     my $marc_record = shift;
334     my $encoding = shift;
335     my $update_counts = @_ ? shift : 1;
336     my $marc_type = shift || C4::Context->preference('marcflavour');
337
338     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
339
340     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
341     _add_auth_fields($import_record_id, $marc_record);
342     _update_batch_record_counts($batch_id) if $update_counts;
343     return $import_record_id;
344 }
345
346 =head2 ModAuthInBatch
347
348   ModAuthInBatch($import_record_id, $marc_record);
349
350 =cut
351
352 sub ModAuthInBatch {
353     my ($import_record_id, $marc_record) = @_;
354
355     my $marcflavour = C4::Context->preference('marcflavour');
356     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
357
358 }
359
360 =head2 BatchStageMarcRecords
361
362 ( $batch_id, $num_records, $num_items, @invalid_records ) =
363   BatchStageMarcRecords(
364     $record_type,                $encoding,
365     $marc_records,               $file_name,
366     $marc_modification_template, $comments,
367     $branch_code,                $parse_items,
368     $leave_as_staging,           $progress_interval,
369     $progress_callback
370   );
371
372 =cut
373
374 sub BatchStageMarcRecords {
375     my $record_type = shift;
376     my $encoding = shift;
377     my $marc_records = shift;
378     my $file_name = shift;
379     my $marc_modification_template = shift;
380     my $comments = shift;
381     my $branch_code = shift;
382     my $parse_items = shift;
383     my $leave_as_staging = shift;
384
385     # optional callback to monitor status 
386     # of job
387     my $progress_interval = 0;
388     my $progress_callback = undef;
389     if ($#_ == 1) {
390         $progress_interval = shift;
391         $progress_callback = shift;
392         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
393         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
394     } 
395     
396     my $batch_id = AddImportBatch( {
397             overlay_action => 'create_new',
398             import_status => 'staging',
399             batch_type => 'batch',
400             file_name => $file_name,
401             comments => $comments,
402             record_type => $record_type,
403         } );
404     if ($parse_items) {
405         SetImportBatchItemAction($batch_id, 'always_add');
406     } else {
407         SetImportBatchItemAction($batch_id, 'ignore');
408     }
409
410
411     my $marc_type = C4::Context->preference('marcflavour');
412     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
413     my @invalid_records = ();
414     my $num_valid = 0;
415     my $num_items = 0;
416     # FIXME - for now, we're dealing only with bibs
417     my $rec_num = 0;
418     foreach my $marc_record (@$marc_records) {
419         $rec_num++;
420         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
421             &$progress_callback($rec_num);
422         }
423
424         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
425
426         my $import_record_id;
427         if (scalar($marc_record->fields()) == 0) {
428             push @invalid_records, $marc_record;
429         } else {
430
431             # Normalize the record so it doesn't have separated diacritics
432             SetUTF8Flag($marc_record);
433
434             $num_valid++;
435             if ($record_type eq 'biblio') {
436                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
437                 if ($parse_items) {
438                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
439                     $num_items += scalar(@import_items_ids);
440                 }
441             } elsif ($record_type eq 'auth') {
442                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
443             }
444         }
445     }
446     unless ($leave_as_staging) {
447         SetImportBatchStatus($batch_id, 'staged');
448     }
449     # FIXME branch_code, number of bibs, number of items
450     _update_batch_record_counts($batch_id);
451     return ($batch_id, $num_valid, $num_items, @invalid_records);
452 }
453
454 =head2 AddItemsToImportBiblio
455
456   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
457                 $import_record_id, $marc_record, $update_counts);
458
459 =cut
460
461 sub AddItemsToImportBiblio {
462     my $batch_id = shift;
463     my $import_record_id = shift;
464     my $marc_record = shift;
465     my $update_counts = @_ ? shift : 0;
466
467     my @import_items_ids = ();
468    
469     my $dbh = C4::Context->dbh; 
470     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
471     foreach my $item_field ($marc_record->field($item_tag)) {
472         my $item_marc = MARC::Record->new();
473         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
474         $item_marc->append_fields($item_field);
475         $marc_record->delete_field($item_field);
476         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
477                                         VALUES (?, ?, ?)");
478         $sth->bind_param(1, $import_record_id);
479         $sth->bind_param(2, 'staged');
480         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
481         $sth->execute();
482         push @import_items_ids, $dbh->{'mysql_insertid'};
483         $sth->finish();
484     }
485
486     if ($#import_items_ids > -1) {
487         _update_batch_record_counts($batch_id) if $update_counts;
488         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
489     }
490     return @import_items_ids;
491 }
492
493 =head2 BatchFindDuplicates
494
495   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
496              $max_matches, $progress_interval, $progress_callback);
497
498 Goes through the records loaded in the batch and attempts to 
499 find duplicates for each one.  Sets the matching status 
500 of each record to "no_match" or "auto_match" as appropriate.
501
502 The $max_matches parameter is optional; if it is not supplied,
503 it defaults to 10.
504
505 The $progress_interval and $progress_callback parameters are 
506 optional; if both are supplied, the sub referred to by
507 $progress_callback will be invoked every $progress_interval
508 records using the number of records processed as the 
509 singular argument.
510
511 =cut
512
513 sub BatchFindDuplicates {
514     my $batch_id = shift;
515     my $matcher = shift;
516     my $max_matches = @_ ? shift : 10;
517
518     # optional callback to monitor status 
519     # of job
520     my $progress_interval = 0;
521     my $progress_callback = undef;
522     if ($#_ == 1) {
523         $progress_interval = shift;
524         $progress_callback = shift;
525         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
526         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
527     }
528
529     my $dbh = C4::Context->dbh;
530
531     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
532                              FROM import_records
533                              WHERE import_batch_id = ?");
534     $sth->execute($batch_id);
535     my $num_with_matches = 0;
536     my $rec_num = 0;
537     while (my $rowref = $sth->fetchrow_hashref) {
538         $rec_num++;
539         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
540             &$progress_callback($rec_num);
541         }
542         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
543         my @matches = ();
544         if (defined $matcher) {
545             @matches = $matcher->get_matches($marc_record, $max_matches);
546         }
547         if (scalar(@matches) > 0) {
548             $num_with_matches++;
549             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
550             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
551         } else {
552             SetImportRecordMatches($rowref->{'import_record_id'}, ());
553             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
554         }
555     }
556     $sth->finish();
557     return $num_with_matches;
558 }
559
560 =head2 BatchCommitRecords
561
562   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
563         BatchCommitRecords($batch_id, $framework,
564         $progress_interval, $progress_callback);
565
566 =cut
567
568 sub BatchCommitRecords {
569     my $batch_id = shift;
570     my $framework = shift;
571
572     # optional callback to monitor status 
573     # of job
574     my $progress_interval = 0;
575     my $progress_callback = undef;
576     if ($#_ == 1) {
577         $progress_interval = shift;
578         $progress_callback = shift;
579         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
580         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
581     }
582
583     my $record_type;
584     my $num_added = 0;
585     my $num_updated = 0;
586     my $num_items_added = 0;
587     my $num_items_replaced = 0;
588     my $num_items_errored = 0;
589     my $num_ignored = 0;
590     # commit (i.e., save, all records in the batch)
591     SetImportBatchStatus($batch_id, 'importing');
592     my $overlay_action = GetImportBatchOverlayAction($batch_id);
593     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
594     my $item_action = GetImportBatchItemAction($batch_id);
595     my $item_tag;
596     my $item_subfield;
597     my $dbh = C4::Context->dbh;
598     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
599                              FROM import_records
600                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
601                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
602                              WHERE import_batch_id = ?");
603     $sth->execute($batch_id);
604     my $marcflavour = C4::Context->preference('marcflavour');
605
606     my $userenv = C4::Context->userenv;
607     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
608
609     my $rec_num = 0;
610     while (my $rowref = $sth->fetchrow_hashref) {
611         $record_type = $rowref->{'record_type'};
612         $rec_num++;
613         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
614             &$progress_callback($rec_num);
615         }
616         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
617             $num_ignored++;
618             next;
619         }
620
621         my $marc_type;
622         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
623             $marc_type = 'UNIMARCAUTH';
624         } elsif ($marcflavour eq 'UNIMARC') {
625             $marc_type = 'UNIMARC';
626         } else {
627             $marc_type = 'USMARC';
628         }
629         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
630
631         if ($record_type eq 'biblio') {
632             # remove any item tags - rely on BatchCommitItems
633             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
634             foreach my $item_field ($marc_record->field($item_tag)) {
635                 $marc_record->delete_field($item_field);
636             }
637         }
638
639         my ($record_result, $item_result, $record_match) =
640             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
641                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
642
643         my $recordid;
644         my $query;
645         if ($record_result eq 'create_new') {
646             $num_added++;
647             if ($record_type eq 'biblio') {
648                 my $biblioitemnumber;
649                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
650                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
651                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
652                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
653                     $num_items_added += $bib_items_added;
654                     $num_items_replaced += $bib_items_replaced;
655                     $num_items_errored += $bib_items_errored;
656                 }
657             } else {
658                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
659                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
660             }
661             my $sth = $dbh->prepare_cached($query);
662             $sth->execute($recordid, $rowref->{'import_record_id'});
663             $sth->finish();
664             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
665         } elsif ($record_result eq 'replace') {
666             $num_updated++;
667             $recordid = $record_match;
668             my $oldxml;
669             if ($record_type eq 'biblio') {
670                 my $oldbiblio = Koha::Biblios->find( $recordid );
671                 $oldxml = GetXmlBiblio($recordid);
672
673                 # remove item fields so that they don't get
674                 # added again if record is reverted
675                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
676                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
677                 foreach my $item_field ($old_marc->field($item_tag)) {
678                     $old_marc->delete_field($item_field);
679                 }
680                 $oldxml = $old_marc->as_xml($marc_type);
681
682                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
683                     overlay_context => {
684                         source => 'batchimport',
685                         categorycode => $logged_in_patron->categorycode,
686                         userid => $logged_in_patron->userid
687                     },
688                 });
689                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
690
691                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
692                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
693                     $num_items_added += $bib_items_added;
694                     $num_items_replaced += $bib_items_replaced;
695                     $num_items_errored += $bib_items_errored;
696                 }
697             } else {
698                 $oldxml = GetAuthorityXML($recordid);
699
700                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
701                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
702             }
703             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
704             $sth->execute($oldxml, $rowref->{'import_record_id'});
705             $sth->finish();
706             my $sth2 = $dbh->prepare_cached($query);
707             $sth2->execute($recordid, $rowref->{'import_record_id'});
708             $sth2->finish();
709             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
710             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
711         } elsif ($record_result eq 'ignore') {
712             $recordid = $record_match;
713             $num_ignored++;
714             $recordid = $record_match;
715             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
716                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
717                 $num_items_added += $bib_items_added;
718          $num_items_replaced += $bib_items_replaced;
719                 $num_items_errored += $bib_items_errored;
720                 # still need to record the matched biblionumber so that the
721                 # items can be reverted
722                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
723                 $sth2->execute($recordid, $rowref->{'import_record_id'});
724                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
725             }
726             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
727         }
728     }
729     $sth->finish();
730     SetImportBatchStatus($batch_id, 'imported');
731     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
732 }
733
734 =head2 BatchCommitItems
735
736   ($num_items_added, $num_items_errored) = 
737          BatchCommitItems($import_record_id, $biblionumber);
738
739 =cut
740
741 sub BatchCommitItems {
742     my ( $import_record_id, $biblionumber, $action ) = @_;
743
744     my $dbh = C4::Context->dbh;
745
746     my $num_items_added = 0;
747     my $num_items_errored = 0;
748     my $num_items_replaced = 0;
749
750     my $sth = $dbh->prepare( "
751         SELECT import_items_id, import_items.marcxml, encoding
752         FROM import_items
753         JOIN import_records USING (import_record_id)
754         WHERE import_record_id = ?
755         ORDER BY import_items_id
756     " );
757     $sth->bind_param( 1, $import_record_id );
758     $sth->execute();
759
760     while ( my $row = $sth->fetchrow_hashref() ) {
761         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
762
763         # Delete date_due subfield as to not accidentally delete item checkout due dates
764         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
765         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
766
767         my $item = TransformMarcToKoha( $item_marc );
768
769         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
770         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
771
772         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
773         if ( $action eq "replace" && $duplicate_itemnumber ) {
774             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
775             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
776             $updsth->bind_param( 1, 'imported' );
777             $updsth->bind_param( 2, $item->{itemnumber} );
778             $updsth->bind_param( 3, undef );
779             $updsth->bind_param( 4, $row->{'import_items_id'} );
780             $updsth->execute();
781             $updsth->finish();
782             $num_items_replaced++;
783         } elsif ( $action eq "replace" && $duplicate_barcode ) {
784             my $itemnumber = $duplicate_barcode->itemnumber;
785             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
786             $updsth->bind_param( 1, 'imported' );
787             $updsth->bind_param( 2, $item->{itemnumber} );
788             $updsth->bind_param( 3, undef );
789             $updsth->bind_param( 4, $row->{'import_items_id'} );
790             $updsth->execute();
791             $updsth->finish();
792             $num_items_replaced++;
793         } elsif ($duplicate_barcode) {
794             $updsth->bind_param( 1, 'error' );
795             $updsth->bind_param( 2, undef );
796             $updsth->bind_param( 3, 'duplicate item barcode' );
797             $updsth->bind_param( 4, $row->{'import_items_id'} );
798             $updsth->execute();
799             $num_items_errored++;
800         } else {
801             # Remove the itemnumber if it exists, we want to create a new item
802             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
803             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
804
805             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
806             if( $itemnumber ) {
807                 $updsth->bind_param( 1, 'imported' );
808                 $updsth->bind_param( 2, $itemnumber );
809                 $updsth->bind_param( 3, undef );
810                 $updsth->bind_param( 4, $row->{'import_items_id'} );
811                 $updsth->execute();
812                 $updsth->finish();
813                 $num_items_added++;
814             }
815         }
816     }
817
818     return ( $num_items_added, $num_items_replaced, $num_items_errored );
819 }
820
821 =head2 BatchRevertRecords
822
823   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
824       $num_ignored) = BatchRevertRecords($batch_id);
825
826 =cut
827
828 sub BatchRevertRecords {
829     my $batch_id = shift;
830
831     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
832
833     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
834
835     my $record_type;
836     my $num_deleted = 0;
837     my $num_errors = 0;
838     my $num_reverted = 0;
839     my $num_ignored = 0;
840     my $num_items_deleted = 0;
841     # commit (i.e., save, all records in the batch)
842     SetImportBatchStatus($batch_id, 'reverting');
843     my $overlay_action = GetImportBatchOverlayAction($batch_id);
844     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
845     my $dbh = C4::Context->dbh;
846     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
847                              FROM import_records
848                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
849                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
850                              WHERE import_batch_id = ?");
851     $sth->execute($batch_id);
852     my $marc_type;
853     my $marcflavour = C4::Context->preference('marcflavour');
854     while (my $rowref = $sth->fetchrow_hashref) {
855         $record_type = $rowref->{'record_type'};
856         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
857             $num_ignored++;
858             next;
859         }
860         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
861             $marc_type = 'UNIMARCAUTH';
862         } elsif ($marcflavour eq 'UNIMARC') {
863             $marc_type = 'UNIMARC';
864         } else {
865             $marc_type = 'USMARC';
866         }
867
868         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
869
870         if ($record_result eq 'delete') {
871             my $error = undef;
872             if  ($record_type eq 'biblio') {
873                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
874                 $error = DelBiblio($rowref->{'matched_biblionumber'});
875             } else {
876                 DelAuthority({ authid => $rowref->{'matched_authid'} });
877             }
878             if (defined $error) {
879                 $num_errors++;
880             } else {
881                 $num_deleted++;
882                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
883             }
884         } elsif ($record_result eq 'restore') {
885             $num_reverted++;
886             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
887             if ($record_type eq 'biblio') {
888                 my $biblionumber = $rowref->{'matched_biblionumber'};
889                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
890
891                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
892                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
893
894                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
895                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
896             } else {
897                 my $authid = $rowref->{'matched_authid'};
898                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
899             }
900             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
901         } elsif ($record_result eq 'ignore') {
902             if ($record_type eq 'biblio') {
903                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
904             }
905             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
906         }
907         my $query;
908         if ($record_type eq 'biblio') {
909             # remove matched_biblionumber only if there is no 'imported' item left
910             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
911             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
912         } else {
913             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
914         }
915         my $sth2 = $dbh->prepare_cached($query);
916         $sth2->execute($rowref->{'import_record_id'});
917     }
918
919     $sth->finish();
920     SetImportBatchStatus($batch_id, 'reverted');
921     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
922 }
923
924 =head2 BatchRevertItems
925
926   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
927
928 =cut
929
930 sub BatchRevertItems {
931     my ($import_record_id, $biblionumber) = @_;
932
933     my $dbh = C4::Context->dbh;
934     my $num_items_deleted = 0;
935
936     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
937                                    FROM import_items
938                                    JOIN items USING (itemnumber)
939                                    WHERE import_record_id = ?");
940     $sth->bind_param(1, $import_record_id);
941     $sth->execute();
942     while (my $row = $sth->fetchrow_hashref()) {
943         my $item = Koha::Items->find($row->{itemnumber});
944         my $error = $item->safe_delete;
945         if ($error eq '1'){
946             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
947             $updsth->bind_param(1, 'reverted');
948             $updsth->bind_param(2, $row->{'import_items_id'});
949             $updsth->execute();
950             $updsth->finish();
951             $num_items_deleted++;
952         }
953         else {
954             next;
955         }
956     }
957     $sth->finish();
958     return $num_items_deleted;
959 }
960
961 =head2 CleanBatch
962
963   CleanBatch($batch_id)
964
965 Deletes all staged records from the import batch
966 and sets the status of the batch to 'cleaned'.  Note
967 that deleting a stage record does *not* affect
968 any record that has been committed to the database.
969
970 =cut
971
972 sub CleanBatch {
973     my $batch_id = shift;
974     return unless defined $batch_id;
975
976     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
977     SetImportBatchStatus($batch_id, 'cleaned');
978 }
979
980 =head2 DeleteBatch
981
982   DeleteBatch($batch_id)
983
984 Deletes the record from the database. This can only be done
985 once the batch has been cleaned.
986
987 =cut
988
989 sub DeleteBatch {
990     my $batch_id = shift;
991     return unless defined $batch_id;
992
993     my $dbh = C4::Context->dbh;
994     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
995     $sth->execute( $batch_id );
996 }
997
998 =head2 GetAllImportBatches
999
1000   my $results = GetAllImportBatches();
1001
1002 Returns a references to an array of hash references corresponding
1003 to all import_batches rows (of batch_type 'batch'), sorted in 
1004 ascending order by import_batch_id.
1005
1006 =cut
1007
1008 sub  GetAllImportBatches {
1009     my $dbh = C4::Context->dbh;
1010     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1011                                     WHERE batch_type IN ('batch', 'webservice')
1012                                     ORDER BY import_batch_id ASC");
1013
1014     my $results = [];
1015     $sth->execute();
1016     while (my $row = $sth->fetchrow_hashref) {
1017         push @$results, $row;
1018     }
1019     $sth->finish();
1020     return $results;
1021 }
1022
1023 =head2 GetStagedWebserviceBatches
1024
1025   my $batch_ids = GetStagedWebserviceBatches();
1026
1027 Returns a references to an array of batch id's
1028 of batch_type 'webservice' that are not imported
1029
1030 =cut
1031
1032 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1033 SELECT import_batch_id FROM import_batches
1034 WHERE batch_type = 'webservice'
1035 AND import_status = 'staged'
1036 EOQ
1037 sub  GetStagedWebserviceBatches {
1038     my $dbh = C4::Context->dbh;
1039     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1040 }
1041
1042 =head2 GetImportBatchRangeDesc
1043
1044   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1045
1046 Returns a reference to an array of hash references corresponding to
1047 import_batches rows (sorted in descending order by import_batch_id)
1048 start at the given offset.
1049
1050 =cut
1051
1052 sub GetImportBatchRangeDesc {
1053     my ($offset, $results_per_group) = @_;
1054
1055     my $dbh = C4::Context->dbh;
1056     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1057                                     LEFT JOIN import_batch_profiles p
1058                                     ON b.profile_id = p.id
1059                                     WHERE b.batch_type IN ('batch', 'webservice')
1060                                     ORDER BY b.import_batch_id DESC";
1061     my @params;
1062     if ($results_per_group){
1063         $query .= " LIMIT ?";
1064         push(@params, $results_per_group);
1065     }
1066     if ($offset){
1067         $query .= " OFFSET ?";
1068         push(@params, $offset);
1069     }
1070     my $sth = $dbh->prepare_cached($query);
1071     $sth->execute(@params);
1072     my $results = $sth->fetchall_arrayref({});
1073     $sth->finish();
1074     return $results;
1075 }
1076
1077 =head2 GetItemNumbersFromImportBatch
1078
1079   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1080
1081 =cut
1082
1083 sub GetItemNumbersFromImportBatch {
1084     my ($batch_id) = @_;
1085     my $dbh = C4::Context->dbh;
1086     my $sql = q|
1087 SELECT itemnumber FROM import_items
1088 INNER JOIN items USING (itemnumber)
1089 INNER JOIN import_records USING (import_record_id)
1090 WHERE import_batch_id = ?|;
1091     my  $sth = $dbh->prepare( $sql );
1092     $sth->execute($batch_id);
1093     my @items ;
1094     while ( my ($itm) = $sth->fetchrow_array ) {
1095         push @items, $itm;
1096     }
1097     return @items;
1098 }
1099
1100 =head2 GetNumberOfImportBatches
1101
1102   my $count = GetNumberOfImportBatches();
1103
1104 =cut
1105
1106 sub GetNumberOfNonZ3950ImportBatches {
1107     my $dbh = C4::Context->dbh;
1108     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1109     $sth->execute();
1110     my ($count) = $sth->fetchrow_array();
1111     $sth->finish();
1112     return $count;
1113 }
1114
1115 =head2 GetImportBiblios
1116
1117   my $results = GetImportBiblios($importid);
1118
1119 =cut
1120
1121 sub GetImportBiblios {
1122     my ($import_record_id) = @_;
1123
1124     my $dbh = C4::Context->dbh;
1125     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1126     return $dbh->selectall_arrayref(
1127         $query,
1128         { Slice => {} },
1129         $import_record_id
1130     );
1131
1132 }
1133
1134 =head2 GetImportRecordsRange
1135
1136   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1137
1138 Returns a reference to an array of hash references corresponding to
1139 import_biblios/import_auths/import_records rows for a given batch
1140 starting at the given offset.
1141
1142 =cut
1143
1144 sub GetImportRecordsRange {
1145     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1146
1147     my $dbh = C4::Context->dbh;
1148
1149     my $order_by = $parameters->{order_by} || 'import_record_id';
1150     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1151
1152     my $order_by_direction =
1153       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1154
1155     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1156
1157     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1158                                            record_sequence, status, overlay_status,
1159                                            matched_biblionumber, matched_authid, record_type
1160                                     FROM   import_records
1161                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1162                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1163                                     WHERE  import_batch_id = ?";
1164     my @params;
1165     push(@params, $batch_id);
1166     if ($status) {
1167         $query .= " AND status=?";
1168         push(@params,$status);
1169     }
1170
1171     $query.=" ORDER BY $order_by $order_by_direction";
1172
1173     if($results_per_group){
1174         $query .= " LIMIT ?";
1175         push(@params, $results_per_group);
1176     }
1177     if($offset){
1178         $query .= " OFFSET ?";
1179         push(@params, $offset);
1180     }
1181     my $sth = $dbh->prepare_cached($query);
1182     $sth->execute(@params);
1183     my $results = $sth->fetchall_arrayref({});
1184     $sth->finish();
1185     return $results;
1186
1187 }
1188
1189 =head2 GetBestRecordMatch
1190
1191   my $record_id = GetBestRecordMatch($import_record_id);
1192
1193 =cut
1194
1195 sub GetBestRecordMatch {
1196     my ($import_record_id) = @_;
1197
1198     my $dbh = C4::Context->dbh;
1199     my $sth = $dbh->prepare("SELECT candidate_match_id
1200                              FROM   import_record_matches
1201                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1202                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1203                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1204                              WHERE  import_record_matches.import_record_id = ? AND
1205                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1206                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1207                              ORDER BY score DESC, candidate_match_id DESC");
1208     $sth->execute($import_record_id);
1209     my ($record_id) = $sth->fetchrow_array();
1210     $sth->finish();
1211     return $record_id;
1212 }
1213
1214 =head2 GetImportBatchStatus
1215
1216   my $status = GetImportBatchStatus($batch_id);
1217
1218 =cut
1219
1220 sub GetImportBatchStatus {
1221     my ($batch_id) = @_;
1222
1223     my $dbh = C4::Context->dbh;
1224     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1225     $sth->execute($batch_id);
1226     my ($status) = $sth->fetchrow_array();
1227     $sth->finish();
1228     return $status;
1229
1230 }
1231
1232 =head2 SetImportBatchStatus
1233
1234   SetImportBatchStatus($batch_id, $new_status);
1235
1236 =cut
1237
1238 sub SetImportBatchStatus {
1239     my ($batch_id, $new_status) = @_;
1240
1241     my $dbh = C4::Context->dbh;
1242     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1243     $sth->execute($new_status, $batch_id);
1244     $sth->finish();
1245
1246 }
1247
1248 =head2 SetMatchedBiblionumber
1249
1250   SetMatchedBiblionumber($import_record_id, $biblionumber);
1251
1252 =cut
1253
1254 sub SetMatchedBiblionumber {
1255     my ($import_record_id, $biblionumber) = @_;
1256
1257     my $dbh = C4::Context->dbh;
1258     $dbh->do(
1259         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1260         undef, $biblionumber, $import_record_id
1261     );
1262 }
1263
1264 =head2 GetImportBatchOverlayAction
1265
1266   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1267
1268 =cut
1269
1270 sub GetImportBatchOverlayAction {
1271     my ($batch_id) = @_;
1272
1273     my $dbh = C4::Context->dbh;
1274     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1275     $sth->execute($batch_id);
1276     my ($overlay_action) = $sth->fetchrow_array();
1277     $sth->finish();
1278     return $overlay_action;
1279
1280 }
1281
1282
1283 =head2 SetImportBatchOverlayAction
1284
1285   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1286
1287 =cut
1288
1289 sub SetImportBatchOverlayAction {
1290     my ($batch_id, $new_overlay_action) = @_;
1291
1292     my $dbh = C4::Context->dbh;
1293     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1294     $sth->execute($new_overlay_action, $batch_id);
1295     $sth->finish();
1296
1297 }
1298
1299 =head2 GetImportBatchNoMatchAction
1300
1301   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1302
1303 =cut
1304
1305 sub GetImportBatchNoMatchAction {
1306     my ($batch_id) = @_;
1307
1308     my $dbh = C4::Context->dbh;
1309     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1310     $sth->execute($batch_id);
1311     my ($nomatch_action) = $sth->fetchrow_array();
1312     $sth->finish();
1313     return $nomatch_action;
1314
1315 }
1316
1317
1318 =head2 SetImportBatchNoMatchAction
1319
1320   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1321
1322 =cut
1323
1324 sub SetImportBatchNoMatchAction {
1325     my ($batch_id, $new_nomatch_action) = @_;
1326
1327     my $dbh = C4::Context->dbh;
1328     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1329     $sth->execute($new_nomatch_action, $batch_id);
1330     $sth->finish();
1331
1332 }
1333
1334 =head2 GetImportBatchItemAction
1335
1336   my $item_action = GetImportBatchItemAction($batch_id);
1337
1338 =cut
1339
1340 sub GetImportBatchItemAction {
1341     my ($batch_id) = @_;
1342
1343     my $dbh = C4::Context->dbh;
1344     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1345     $sth->execute($batch_id);
1346     my ($item_action) = $sth->fetchrow_array();
1347     $sth->finish();
1348     return $item_action;
1349
1350 }
1351
1352
1353 =head2 SetImportBatchItemAction
1354
1355   SetImportBatchItemAction($batch_id, $new_item_action);
1356
1357 =cut
1358
1359 sub SetImportBatchItemAction {
1360     my ($batch_id, $new_item_action) = @_;
1361
1362     my $dbh = C4::Context->dbh;
1363     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1364     $sth->execute($new_item_action, $batch_id);
1365     $sth->finish();
1366
1367 }
1368
1369 =head2 GetImportBatchMatcher
1370
1371   my $matcher_id = GetImportBatchMatcher($batch_id);
1372
1373 =cut
1374
1375 sub GetImportBatchMatcher {
1376     my ($batch_id) = @_;
1377
1378     my $dbh = C4::Context->dbh;
1379     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1380     $sth->execute($batch_id);
1381     my ($matcher_id) = $sth->fetchrow_array();
1382     $sth->finish();
1383     return $matcher_id;
1384
1385 }
1386
1387
1388 =head2 SetImportBatchMatcher
1389
1390   SetImportBatchMatcher($batch_id, $new_matcher_id);
1391
1392 =cut
1393
1394 sub SetImportBatchMatcher {
1395     my ($batch_id, $new_matcher_id) = @_;
1396
1397     my $dbh = C4::Context->dbh;
1398     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1399     $sth->execute($new_matcher_id, $batch_id);
1400     $sth->finish();
1401
1402 }
1403
1404 =head2 GetImportRecordOverlayStatus
1405
1406   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1407
1408 =cut
1409
1410 sub GetImportRecordOverlayStatus {
1411     my ($import_record_id) = @_;
1412
1413     my $dbh = C4::Context->dbh;
1414     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1415     $sth->execute($import_record_id);
1416     my ($overlay_status) = $sth->fetchrow_array();
1417     $sth->finish();
1418     return $overlay_status;
1419
1420 }
1421
1422
1423 =head2 SetImportRecordOverlayStatus
1424
1425   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1426
1427 =cut
1428
1429 sub SetImportRecordOverlayStatus {
1430     my ($import_record_id, $new_overlay_status) = @_;
1431
1432     my $dbh = C4::Context->dbh;
1433     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1434     $sth->execute($new_overlay_status, $import_record_id);
1435     $sth->finish();
1436
1437 }
1438
1439 =head2 GetImportRecordStatus
1440
1441   my $status = GetImportRecordStatus($import_record_id);
1442
1443 =cut
1444
1445 sub GetImportRecordStatus {
1446     my ($import_record_id) = @_;
1447
1448     my $dbh = C4::Context->dbh;
1449     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1450     $sth->execute($import_record_id);
1451     my ($status) = $sth->fetchrow_array();
1452     $sth->finish();
1453     return $status;
1454
1455 }
1456
1457
1458 =head2 SetImportRecordStatus
1459
1460   SetImportRecordStatus($import_record_id, $new_status);
1461
1462 =cut
1463
1464 sub SetImportRecordStatus {
1465     my ($import_record_id, $new_status) = @_;
1466
1467     my $dbh = C4::Context->dbh;
1468     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1469     $sth->execute($new_status, $import_record_id);
1470     $sth->finish();
1471
1472 }
1473
1474 =head2 GetImportRecordMatches
1475
1476   my $results = GetImportRecordMatches($import_record_id, $best_only);
1477
1478 =cut
1479
1480 sub GetImportRecordMatches {
1481     my $import_record_id = shift;
1482     my $best_only = @_ ? shift : 0;
1483
1484     my $dbh = C4::Context->dbh;
1485     # FIXME currently biblio only
1486     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1487                                     candidate_match_id, score, record_type
1488                                     FROM import_records
1489                                     JOIN import_record_matches USING (import_record_id)
1490                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1491                                     WHERE import_record_id = ?
1492                                     ORDER BY score DESC, biblionumber DESC");
1493     $sth->bind_param(1, $import_record_id);
1494     my $results = [];
1495     $sth->execute();
1496     while (my $row = $sth->fetchrow_hashref) {
1497         if ($row->{'record_type'} eq 'auth') {
1498             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1499         }
1500         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1501         push @$results, $row;
1502         last if $best_only;
1503     }
1504     $sth->finish();
1505
1506     return $results;
1507     
1508 }
1509
1510 =head2 SetImportRecordMatches
1511
1512   SetImportRecordMatches($import_record_id, @matches);
1513
1514 =cut
1515
1516 sub SetImportRecordMatches {
1517     my $import_record_id = shift;
1518     my @matches = @_;
1519
1520     my $dbh = C4::Context->dbh;
1521     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1522     $delsth->execute($import_record_id);
1523     $delsth->finish();
1524
1525     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1526                                     VALUES (?, ?, ?)");
1527     foreach my $match (@matches) {
1528         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1529     }
1530 }
1531
1532 =head2 RecordsFromISO2709File
1533
1534     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1535
1536 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1537
1538 @PARAM1, String, absolute path to the ISO2709 file.
1539 @PARAM2, String, see stage_file.pl
1540 @PARAM3, String, should be utf8
1541
1542 Returns two array refs.
1543
1544 =cut
1545
1546 sub RecordsFromISO2709File {
1547     my ($input_file, $record_type, $encoding) = @_;
1548     my @errors;
1549
1550     my $marc_type = C4::Context->preference('marcflavour');
1551     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1552
1553     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1554     my @marc_records;
1555     $/ = "\035";
1556     while (<$fh>) {
1557         s/^\s+//;
1558         s/\s+$//;
1559         next unless $_; # skip if record has only whitespace, as might occur
1560                         # if file includes newlines between each MARC record
1561         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1562         push @marc_records, $marc_record;
1563         if ($charset_guessed ne $encoding) {
1564             push @errors,
1565                 "Unexpected charset $charset_guessed, expecting $encoding";
1566         }
1567     }
1568     close $fh;
1569     return ( \@errors, \@marc_records );
1570 }
1571
1572 =head2 RecordsFromMARCXMLFile
1573
1574     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1575
1576 Creates MARC::Record-objects out of the given MARCXML-file.
1577
1578 @PARAM1, String, absolute path to the ISO2709 file.
1579 @PARAM2, String, should be utf8
1580
1581 Returns two array refs.
1582
1583 =cut
1584
1585 sub RecordsFromMARCXMLFile {
1586     my ( $filename, $encoding ) = @_;
1587     my $batch = MARC::File::XML->in( $filename );
1588     my ( @marcRecords, @errors, $record );
1589     do {
1590         eval { $record = $batch->next( $encoding ); };
1591         if ($@) {
1592             push @errors, $@;
1593         }
1594         push @marcRecords, $record if $record;
1595     } while( $record );
1596     return (\@errors, \@marcRecords);
1597 }
1598
1599 =head2 RecordsFromMarcPlugin
1600
1601     Converts text of input_file into array of MARC records with to_marc plugin
1602
1603 =cut
1604
1605 sub RecordsFromMarcPlugin {
1606     my ($input_file, $plugin_class, $encoding) = @_;
1607     my ( $text, @return );
1608     return \@return if !$input_file || !$plugin_class;
1609
1610     # Read input file
1611     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1612     $/ = "\035";
1613     while (<$fh>) {
1614         s/^\s+//;
1615         s/\s+$//;
1616         next unless $_;
1617         $text .= $_;
1618     }
1619     close $fh;
1620
1621     # Convert to large MARC blob with plugin
1622     $text = Koha::Plugins::Handler->run({
1623         class  => $plugin_class,
1624         method => 'to_marc',
1625         params => { data => $text },
1626     }) if $text;
1627
1628     # Convert to array of MARC records
1629     if( $text ) {
1630         my $marc_type = C4::Context->preference('marcflavour');
1631         foreach my $blob ( split(/\x1D/, $text) ) {
1632             next if $blob =~ /^\s*$/;
1633             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1634             push @return, $marcrecord;
1635         }
1636     }
1637     return \@return;
1638 }
1639
1640 # internal functions
1641
1642 sub _create_import_record {
1643     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1644
1645     my $dbh = C4::Context->dbh;
1646     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1647                                                          record_type, encoding)
1648                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1649     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1650                   $record_type, $encoding);
1651     my $import_record_id = $dbh->{'mysql_insertid'};
1652     $sth->finish();
1653     return $import_record_id;
1654 }
1655
1656 sub _update_import_record_marc {
1657     my ($import_record_id, $marc_record, $marc_type) = @_;
1658
1659     my $dbh = C4::Context->dbh;
1660     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1661                              WHERE  import_record_id = ?");
1662     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1663     $sth->finish();
1664 }
1665
1666 sub _add_auth_fields {
1667     my ($import_record_id, $marc_record) = @_;
1668
1669     my $controlnumber;
1670     if ($marc_record->field('001')) {
1671         $controlnumber = $marc_record->field('001')->data();
1672     }
1673     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1674     my $dbh = C4::Context->dbh;
1675     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1676     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1677     $sth->finish();
1678 }
1679
1680 sub _add_biblio_fields {
1681     my ($import_record_id, $marc_record) = @_;
1682
1683     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1684     my $dbh = C4::Context->dbh;
1685     # FIXME no controlnumber, originalsource
1686     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1687     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1688     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1689     $sth->finish();
1690                 
1691 }
1692
1693 sub _update_biblio_fields {
1694     my ($import_record_id, $marc_record) = @_;
1695
1696     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1697     my $dbh = C4::Context->dbh;
1698     # FIXME no controlnumber, originalsource
1699     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1700     $isbn =~ s/\(.*$//;
1701     $isbn =~ tr/ -_//;
1702     $isbn = uc $isbn;
1703     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1704                              WHERE  import_record_id = ?");
1705     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1706     $sth->finish();
1707 }
1708
1709 sub _parse_biblio_fields {
1710     my ($marc_record) = @_;
1711
1712     my $dbh = C4::Context->dbh;
1713     my $bibliofields = TransformMarcToKoha($marc_record, '');
1714     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1715
1716 }
1717
1718 sub _update_batch_record_counts {
1719     my ($batch_id) = @_;
1720
1721     my $dbh = C4::Context->dbh;
1722     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1723                                         num_records = (
1724                                             SELECT COUNT(*)
1725                                             FROM import_records
1726                                             WHERE import_batch_id = import_batches.import_batch_id),
1727                                         num_items = (
1728                                             SELECT COUNT(*)
1729                                             FROM import_records
1730                                             JOIN import_items USING (import_record_id)
1731                                             WHERE import_batch_id = import_batches.import_batch_id
1732                                             AND record_type = 'biblio')
1733                                     WHERE import_batch_id = ?");
1734     $sth->bind_param(1, $batch_id);
1735     $sth->execute();
1736     $sth->finish();
1737 }
1738
1739 sub _get_commit_action {
1740     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1741     
1742     if ($record_type eq 'biblio') {
1743         my ($bib_result, $bib_match, $item_result);
1744
1745         if ($overlay_status ne 'no_match') {
1746             $bib_match = GetBestRecordMatch($import_record_id);
1747             if ($overlay_action eq 'replace') {
1748                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1749             } elsif ($overlay_action eq 'create_new') {
1750                 $bib_result  = 'create_new';
1751             } elsif ($overlay_action eq 'ignore') {
1752                 $bib_result  = 'ignore';
1753             }
1754          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1755                 $item_result = 'create_new';
1756        }
1757       elsif($item_action eq 'replace'){
1758           $item_result = 'replace';
1759           }
1760       else {
1761              $item_result = 'ignore';
1762            }
1763         } else {
1764             $bib_result = $nomatch_action;
1765             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1766         }
1767         return ($bib_result, $item_result, $bib_match);
1768     } else { # must be auths
1769         my ($auth_result, $auth_match);
1770
1771         if ($overlay_status ne 'no_match') {
1772             $auth_match = GetBestRecordMatch($import_record_id);
1773             if ($overlay_action eq 'replace') {
1774                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1775             } elsif ($overlay_action eq 'create_new') {
1776                 $auth_result  = 'create_new';
1777             } elsif ($overlay_action eq 'ignore') {
1778                 $auth_result  = 'ignore';
1779             }
1780         } else {
1781             $auth_result = $nomatch_action;
1782         }
1783
1784         return ($auth_result, undef, $auth_match);
1785
1786     }
1787 }
1788
1789 sub _get_revert_action {
1790     my ($overlay_action, $overlay_status, $status) = @_;
1791
1792     my $bib_result;
1793
1794     if ($status eq 'ignored') {
1795         $bib_result = 'ignore';
1796     } else {
1797         if ($overlay_action eq 'create_new') {
1798             $bib_result = 'delete';
1799         } else {
1800             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1801         }
1802     }
1803     return $bib_result;
1804 }
1805
1806 1;
1807 __END__
1808
1809 =head1 AUTHOR
1810
1811 Koha Development Team <http://koha-community.org/>
1812
1813 Galen Charlton <galen.charlton@liblime.com>
1814
1815 =cut