Bug 31276: Report results are limited to 999,999 no matter the actual number of results
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       AddImportBatch
50       GetImportBatch
51       AddAuthToBatch
52       AddBiblioToBatch
53       AddItemsToImportBiblio
54       ModAuthorityInBatch
55       ModBiblioInBatch
56
57       BatchStageMarcRecords
58       BatchFindDuplicates
59       BatchCommitRecords
60       BatchRevertRecords
61       CleanBatch
62       DeleteBatch
63
64       GetAllImportBatches
65       GetStagedWebserviceBatches
66       GetImportBatchRangeDesc
67       GetNumberOfNonZ3950ImportBatches
68       GetImportBiblios
69       GetImportRecordsRange
70       GetItemNumbersFromImportBatch
71
72       GetImportBatchStatus
73       SetImportBatchStatus
74       GetImportBatchOverlayAction
75       SetImportBatchOverlayAction
76       GetImportBatchNoMatchAction
77       SetImportBatchNoMatchAction
78       GetImportBatchItemAction
79       SetImportBatchItemAction
80       GetImportBatchMatcher
81       SetImportBatchMatcher
82       GetImportRecordOverlayStatus
83       SetImportRecordOverlayStatus
84       GetImportRecordStatus
85       SetImportRecordStatus
86       SetMatchedBiblionumber
87       GetImportRecordMatches
88       SetImportRecordMatches
89
90       RecordsFromMARCXMLFile
91       RecordsFromISO2709File
92       RecordsFromMarcPlugin
93     );
94 }
95
96 =head1 NAME
97
98 C4::ImportBatch - manage batches of imported MARC records
99
100 =head1 SYNOPSIS
101
102 use C4::ImportBatch;
103
104 =head1 FUNCTIONS
105
106 =head2 GetZ3950BatchId
107
108   my $batchid = GetZ3950BatchId($z3950server);
109
110 Retrieves the ID of the import batch for the Z39.50
111 reservoir for the given target.  If necessary,
112 creates the import batch.
113
114 =cut
115
116 sub GetZ3950BatchId {
117     my ($z3950server) = @_;
118
119     my $dbh = C4::Context->dbh;
120     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
121                              WHERE  batch_type = 'z3950'
122                              AND    file_name = ?");
123     $sth->execute($z3950server);
124     my $rowref = $sth->fetchrow_arrayref();
125     $sth->finish();
126     if (defined $rowref) {
127         return $rowref->[0];
128     } else {
129         my $batch_id = AddImportBatch( {
130                 overlay_action => 'create_new',
131                 import_status => 'staged',
132                 batch_type => 'z3950',
133                 file_name => $z3950server,
134             } );
135         return $batch_id;
136     }
137     
138 }
139
140 =head2 GetWebserviceBatchId
141
142   my $batchid = GetWebserviceBatchId();
143
144 Retrieves the ID of the import batch for webservice.
145 If necessary, creates the import batch.
146
147 =cut
148
149 my $WEBSERVICE_BASE_QRY = <<EOQ;
150 SELECT import_batch_id FROM import_batches
151 WHERE  batch_type = 'webservice'
152 AND    import_status = 'staged'
153 EOQ
154 sub GetWebserviceBatchId {
155     my ($params) = @_;
156
157     my $dbh = C4::Context->dbh;
158     my $sql = $WEBSERVICE_BASE_QRY;
159     my @args;
160     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
161         if (my $val = $params->{$field}) {
162             $sql .= " AND $field = ?";
163             push @args, $val;
164         }
165     }
166     my $id = $dbh->selectrow_array($sql, undef, @args);
167     return $id if $id;
168
169     $params->{batch_type} = 'webservice';
170     $params->{import_status} = 'staged';
171     return AddImportBatch($params);
172 }
173
174 =head2 GetImportRecordMarc
175
176   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
177
178 =cut
179
180 sub GetImportRecordMarc {
181     my ($import_record_id) = @_;
182
183     my $dbh = C4::Context->dbh;
184     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
185         SELECT marc, encoding
186         FROM import_records
187         WHERE import_record_id = ?
188     |, undef, $import_record_id );
189
190     return $marc, $encoding;
191 }
192
193 sub EmbedItemsInImportBiblio {
194     my ( $record, $import_record_id ) = @_;
195     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
196     my $dbh = C4::Context->dbh;
197     my $import_items = $dbh->selectall_arrayref(q|
198         SELECT import_items.marcxml
199         FROM import_items
200         WHERE import_record_id = ?
201     |, { Slice => {} }, $import_record_id );
202     my @item_fields;
203     for my $import_item ( @$import_items ) {
204         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
205         push @item_fields, $item_marc->field($itemtag);
206     }
207     $record->append_fields(@item_fields);
208     return $record;
209 }
210
211 =head2 AddImportBatch
212
213   my $batch_id = AddImportBatch($params_hash);
214
215 =cut
216
217 sub AddImportBatch {
218     my ($params) = @_;
219
220     my (@fields, @vals);
221     foreach (qw( matcher_id template_id branchcode
222                  overlay_action nomatch_action item_action
223                  import_status batch_type file_name comments record_type )) {
224         if (exists $params->{$_}) {
225             push @fields, $_;
226             push @vals, $params->{$_};
227         }
228     }
229     my $dbh = C4::Context->dbh;
230     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
231                                   VALUES (".join( ',', map '?', @fields).")",
232              undef,
233              @vals);
234     return $dbh->{'mysql_insertid'};
235 }
236
237 =head2 GetImportBatch 
238
239   my $row = GetImportBatch($batch_id);
240
241 Retrieve a hashref of an import_batches row.
242
243 =cut
244
245 sub GetImportBatch {
246     my ($batch_id) = @_;
247
248     my $dbh = C4::Context->dbh;
249     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
250     $sth->bind_param(1, $batch_id);
251     $sth->execute();
252     my $result = $sth->fetchrow_hashref;
253     $sth->finish();
254     return $result;
255
256 }
257
258 =head2 AddBiblioToBatch 
259
260   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
261                 $marc_record, $encoding, $update_counts);
262
263 =cut
264
265 sub AddBiblioToBatch {
266     my $batch_id = shift;
267     my $record_sequence = shift;
268     my $marc_record = shift;
269     my $encoding = shift;
270     my $update_counts = @_ ? shift : 1;
271
272     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
273     _add_biblio_fields($import_record_id, $marc_record);
274     _update_batch_record_counts($batch_id) if $update_counts;
275     return $import_record_id;
276 }
277
278 =head2 ModBiblioInBatch
279
280   ModBiblioInBatch($import_record_id, $marc_record);
281
282 =cut
283
284 sub ModBiblioInBatch {
285     my ($import_record_id, $marc_record) = @_;
286
287     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
288     _update_biblio_fields($import_record_id, $marc_record);
289
290 }
291
292 =head2 AddAuthToBatch
293
294   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
295                 $marc_record, $encoding, $update_counts, [$marc_type]);
296
297 =cut
298
299 sub AddAuthToBatch {
300     my $batch_id = shift;
301     my $record_sequence = shift;
302     my $marc_record = shift;
303     my $encoding = shift;
304     my $update_counts = @_ ? shift : 1;
305     my $marc_type = shift || C4::Context->preference('marcflavour');
306
307     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
308
309     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
310     _add_auth_fields($import_record_id, $marc_record);
311     _update_batch_record_counts($batch_id) if $update_counts;
312     return $import_record_id;
313 }
314
315 =head2 ModAuthInBatch
316
317   ModAuthInBatch($import_record_id, $marc_record);
318
319 =cut
320
321 sub ModAuthInBatch {
322     my ($import_record_id, $marc_record) = @_;
323
324     my $marcflavour = C4::Context->preference('marcflavour');
325     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
326
327 }
328
329 =head2 BatchStageMarcRecords
330
331 ( $batch_id, $num_records, $num_items, @invalid_records ) =
332   BatchStageMarcRecords(
333     $record_type,                $encoding,
334     $marc_records,               $file_name,
335     $marc_modification_template, $comments,
336     $branch_code,                $parse_items,
337     $leave_as_staging,           $progress_interval,
338     $progress_callback
339   );
340
341 =cut
342
343 sub BatchStageMarcRecords {
344     my $record_type = shift;
345     my $encoding = shift;
346     my $marc_records = shift;
347     my $file_name = shift;
348     my $marc_modification_template = shift;
349     my $comments = shift;
350     my $branch_code = shift;
351     my $parse_items = shift;
352     my $leave_as_staging = shift;
353
354     # optional callback to monitor status 
355     # of job
356     my $progress_interval = 0;
357     my $progress_callback = undef;
358     if ($#_ == 1) {
359         $progress_interval = shift;
360         $progress_callback = shift;
361         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
362         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
363     } 
364     
365     my $batch_id = AddImportBatch( {
366             overlay_action => 'create_new',
367             import_status => 'staging',
368             batch_type => 'batch',
369             file_name => $file_name,
370             comments => $comments,
371             record_type => $record_type,
372         } );
373     if ($parse_items) {
374         SetImportBatchItemAction($batch_id, 'always_add');
375     } else {
376         SetImportBatchItemAction($batch_id, 'ignore');
377     }
378
379
380     my $marc_type = C4::Context->preference('marcflavour');
381     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
382     my @invalid_records = ();
383     my $num_valid = 0;
384     my $num_items = 0;
385     # FIXME - for now, we're dealing only with bibs
386     my $rec_num = 0;
387     foreach my $marc_record (@$marc_records) {
388         $rec_num++;
389         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
390             &$progress_callback($rec_num);
391         }
392
393         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
394
395         my $import_record_id;
396         if (scalar($marc_record->fields()) == 0) {
397             push @invalid_records, $marc_record;
398         } else {
399
400             # Normalize the record so it doesn't have separated diacritics
401             SetUTF8Flag($marc_record);
402
403             $num_valid++;
404             if ($record_type eq 'biblio') {
405                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
406                 if ($parse_items) {
407                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
408                     $num_items += scalar(@import_items_ids);
409                 }
410             } elsif ($record_type eq 'auth') {
411                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
412             }
413         }
414     }
415     unless ($leave_as_staging) {
416         SetImportBatchStatus($batch_id, 'staged');
417     }
418     # FIXME branch_code, number of bibs, number of items
419     _update_batch_record_counts($batch_id);
420     return ($batch_id, $num_valid, $num_items, @invalid_records);
421 }
422
423 =head2 AddItemsToImportBiblio
424
425   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
426                 $import_record_id, $marc_record, $update_counts);
427
428 =cut
429
430 sub AddItemsToImportBiblio {
431     my $batch_id = shift;
432     my $import_record_id = shift;
433     my $marc_record = shift;
434     my $update_counts = @_ ? shift : 0;
435
436     my @import_items_ids = ();
437    
438     my $dbh = C4::Context->dbh; 
439     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
440     foreach my $item_field ($marc_record->field($item_tag)) {
441         my $item_marc = MARC::Record->new();
442         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
443         $item_marc->append_fields($item_field);
444         $marc_record->delete_field($item_field);
445         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
446                                         VALUES (?, ?, ?)");
447         $sth->bind_param(1, $import_record_id);
448         $sth->bind_param(2, 'staged');
449         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
450         $sth->execute();
451         push @import_items_ids, $dbh->{'mysql_insertid'};
452         $sth->finish();
453     }
454
455     if ($#import_items_ids > -1) {
456         _update_batch_record_counts($batch_id) if $update_counts;
457         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
458     }
459     return @import_items_ids;
460 }
461
462 =head2 BatchFindDuplicates
463
464   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
465              $max_matches, $progress_interval, $progress_callback);
466
467 Goes through the records loaded in the batch and attempts to 
468 find duplicates for each one.  Sets the matching status 
469 of each record to "no_match" or "auto_match" as appropriate.
470
471 The $max_matches parameter is optional; if it is not supplied,
472 it defaults to 10.
473
474 The $progress_interval and $progress_callback parameters are 
475 optional; if both are supplied, the sub referred to by
476 $progress_callback will be invoked every $progress_interval
477 records using the number of records processed as the 
478 singular argument.
479
480 =cut
481
482 sub BatchFindDuplicates {
483     my $batch_id = shift;
484     my $matcher = shift;
485     my $max_matches = @_ ? shift : 10;
486
487     # optional callback to monitor status 
488     # of job
489     my $progress_interval = 0;
490     my $progress_callback = undef;
491     if ($#_ == 1) {
492         $progress_interval = shift;
493         $progress_callback = shift;
494         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
495         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
496     }
497
498     my $dbh = C4::Context->dbh;
499
500     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
501                              FROM import_records
502                              WHERE import_batch_id = ?");
503     $sth->execute($batch_id);
504     my $num_with_matches = 0;
505     my $rec_num = 0;
506     while (my $rowref = $sth->fetchrow_hashref) {
507         $rec_num++;
508         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
509             &$progress_callback($rec_num);
510         }
511         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
512         my @matches = ();
513         if (defined $matcher) {
514             @matches = $matcher->get_matches($marc_record, $max_matches);
515         }
516         if (scalar(@matches) > 0) {
517             $num_with_matches++;
518             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
519             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
520         } else {
521             SetImportRecordMatches($rowref->{'import_record_id'}, ());
522             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
523         }
524     }
525     $sth->finish();
526     return $num_with_matches;
527 }
528
529 =head2 BatchCommitRecords
530
531   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
532         BatchCommitRecords($batch_id, $framework,
533         $progress_interval, $progress_callback);
534
535 =cut
536
537 sub BatchCommitRecords {
538     my $batch_id = shift;
539     my $framework = shift;
540
541     # optional callback to monitor status 
542     # of job
543     my $progress_interval = 0;
544     my $progress_callback = undef;
545     if ($#_ == 1) {
546         $progress_interval = shift;
547         $progress_callback = shift;
548         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
549         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
550     }
551
552     my $record_type;
553     my $num_added = 0;
554     my $num_updated = 0;
555     my $num_items_added = 0;
556     my $num_items_replaced = 0;
557     my $num_items_errored = 0;
558     my $num_ignored = 0;
559     # commit (i.e., save, all records in the batch)
560     SetImportBatchStatus($batch_id, 'importing');
561     my $overlay_action = GetImportBatchOverlayAction($batch_id);
562     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
563     my $item_action = GetImportBatchItemAction($batch_id);
564     my $item_tag;
565     my $item_subfield;
566     my $dbh = C4::Context->dbh;
567     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
568                              FROM import_records
569                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
570                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
571                              WHERE import_batch_id = ?");
572     $sth->execute($batch_id);
573     my $marcflavour = C4::Context->preference('marcflavour');
574
575     my $userenv = C4::Context->userenv;
576     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
577
578     my $rec_num = 0;
579     while (my $rowref = $sth->fetchrow_hashref) {
580         $record_type = $rowref->{'record_type'};
581         $rec_num++;
582         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
583             &$progress_callback($rec_num);
584         }
585         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
586             $num_ignored++;
587             next;
588         }
589
590         my $marc_type;
591         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
592             $marc_type = 'UNIMARCAUTH';
593         } elsif ($marcflavour eq 'UNIMARC') {
594             $marc_type = 'UNIMARC';
595         } else {
596             $marc_type = 'USMARC';
597         }
598         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
599
600         if ($record_type eq 'biblio') {
601             # remove any item tags - rely on BatchCommitItems
602             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
603             foreach my $item_field ($marc_record->field($item_tag)) {
604                 $marc_record->delete_field($item_field);
605             }
606         }
607
608         my ($record_result, $item_result, $record_match) =
609             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
610                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
611
612         my $recordid;
613         my $query;
614         if ($record_result eq 'create_new') {
615             $num_added++;
616             if ($record_type eq 'biblio') {
617                 my $biblioitemnumber;
618                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
619                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
620                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
621                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
622                     $num_items_added += $bib_items_added;
623                     $num_items_replaced += $bib_items_replaced;
624                     $num_items_errored += $bib_items_errored;
625                 }
626             } else {
627                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
628                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
629             }
630             my $sth = $dbh->prepare_cached($query);
631             $sth->execute($recordid, $rowref->{'import_record_id'});
632             $sth->finish();
633             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
634         } elsif ($record_result eq 'replace') {
635             $num_updated++;
636             $recordid = $record_match;
637             my $oldxml;
638             if ($record_type eq 'biblio') {
639                 my $oldbiblio = Koha::Biblios->find( $recordid );
640                 $oldxml = GetXmlBiblio($recordid);
641
642                 # remove item fields so that they don't get
643                 # added again if record is reverted
644                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
645                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
646                 foreach my $item_field ($old_marc->field($item_tag)) {
647                     $old_marc->delete_field($item_field);
648                 }
649                 $oldxml = $old_marc->as_xml($marc_type);
650
651                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
652                     overlay_context => {
653                         source => 'batchimport',
654                         categorycode => $logged_in_patron->categorycode,
655                         userid => $logged_in_patron->userid
656                     },
657                 });
658                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
659
660                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
661                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
662                     $num_items_added += $bib_items_added;
663                     $num_items_replaced += $bib_items_replaced;
664                     $num_items_errored += $bib_items_errored;
665                 }
666             } else {
667                 $oldxml = GetAuthorityXML($recordid);
668
669                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
670                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
671             }
672             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
673             $sth->execute($oldxml, $rowref->{'import_record_id'});
674             $sth->finish();
675             my $sth2 = $dbh->prepare_cached($query);
676             $sth2->execute($recordid, $rowref->{'import_record_id'});
677             $sth2->finish();
678             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
679             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
680         } elsif ($record_result eq 'ignore') {
681             $recordid = $record_match;
682             $num_ignored++;
683             $recordid = $record_match;
684             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
685                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
686                 $num_items_added += $bib_items_added;
687          $num_items_replaced += $bib_items_replaced;
688                 $num_items_errored += $bib_items_errored;
689                 # still need to record the matched biblionumber so that the
690                 # items can be reverted
691                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
692                 $sth2->execute($recordid, $rowref->{'import_record_id'});
693                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
694             }
695             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
696         }
697     }
698     $sth->finish();
699     SetImportBatchStatus($batch_id, 'imported');
700     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
701 }
702
703 =head2 BatchCommitItems
704
705   ($num_items_added, $num_items_errored) = 
706          BatchCommitItems($import_record_id, $biblionumber);
707
708 =cut
709
710 sub BatchCommitItems {
711     my ( $import_record_id, $biblionumber, $action ) = @_;
712
713     my $dbh = C4::Context->dbh;
714
715     my $num_items_added = 0;
716     my $num_items_errored = 0;
717     my $num_items_replaced = 0;
718
719     my $sth = $dbh->prepare( "
720         SELECT import_items_id, import_items.marcxml, encoding
721         FROM import_items
722         JOIN import_records USING (import_record_id)
723         WHERE import_record_id = ?
724         ORDER BY import_items_id
725     " );
726     $sth->bind_param( 1, $import_record_id );
727     $sth->execute();
728
729     while ( my $row = $sth->fetchrow_hashref() ) {
730         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
731
732         # Delete date_due subfield as to not accidentally delete item checkout due dates
733         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
734         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
735
736         my $item = TransformMarcToKoha( $item_marc );
737
738         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
739         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
740
741         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
742         if ( $action eq "replace" && $duplicate_itemnumber ) {
743             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
744             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
745             $updsth->bind_param( 1, 'imported' );
746             $updsth->bind_param( 2, $item->{itemnumber} );
747             $updsth->bind_param( 3, undef );
748             $updsth->bind_param( 4, $row->{'import_items_id'} );
749             $updsth->execute();
750             $updsth->finish();
751             $num_items_replaced++;
752         } elsif ( $action eq "replace" && $duplicate_barcode ) {
753             my $itemnumber = $duplicate_barcode->itemnumber;
754             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
755             $updsth->bind_param( 1, 'imported' );
756             $updsth->bind_param( 2, $item->{itemnumber} );
757             $updsth->bind_param( 3, undef );
758             $updsth->bind_param( 4, $row->{'import_items_id'} );
759             $updsth->execute();
760             $updsth->finish();
761             $num_items_replaced++;
762         } elsif ($duplicate_barcode) {
763             $updsth->bind_param( 1, 'error' );
764             $updsth->bind_param( 2, undef );
765             $updsth->bind_param( 3, 'duplicate item barcode' );
766             $updsth->bind_param( 4, $row->{'import_items_id'} );
767             $updsth->execute();
768             $num_items_errored++;
769         } else {
770             # Remove the itemnumber if it exists, we want to create a new item
771             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
772             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
773
774             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
775             if( $itemnumber ) {
776                 $updsth->bind_param( 1, 'imported' );
777                 $updsth->bind_param( 2, $itemnumber );
778                 $updsth->bind_param( 3, undef );
779                 $updsth->bind_param( 4, $row->{'import_items_id'} );
780                 $updsth->execute();
781                 $updsth->finish();
782                 $num_items_added++;
783             }
784         }
785     }
786
787     return ( $num_items_added, $num_items_replaced, $num_items_errored );
788 }
789
790 =head2 BatchRevertRecords
791
792   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
793       $num_ignored) = BatchRevertRecords($batch_id);
794
795 =cut
796
797 sub BatchRevertRecords {
798     my $batch_id = shift;
799
800     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
801
802     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
803
804     my $record_type;
805     my $num_deleted = 0;
806     my $num_errors = 0;
807     my $num_reverted = 0;
808     my $num_ignored = 0;
809     my $num_items_deleted = 0;
810     # commit (i.e., save, all records in the batch)
811     SetImportBatchStatus($batch_id, 'reverting');
812     my $overlay_action = GetImportBatchOverlayAction($batch_id);
813     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
814     my $dbh = C4::Context->dbh;
815     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
816                              FROM import_records
817                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
818                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
819                              WHERE import_batch_id = ?");
820     $sth->execute($batch_id);
821     my $marc_type;
822     my $marcflavour = C4::Context->preference('marcflavour');
823     while (my $rowref = $sth->fetchrow_hashref) {
824         $record_type = $rowref->{'record_type'};
825         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
826             $num_ignored++;
827             next;
828         }
829         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
830             $marc_type = 'UNIMARCAUTH';
831         } elsif ($marcflavour eq 'UNIMARC') {
832             $marc_type = 'UNIMARC';
833         } else {
834             $marc_type = 'USMARC';
835         }
836
837         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
838
839         if ($record_result eq 'delete') {
840             my $error = undef;
841             if  ($record_type eq 'biblio') {
842                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
843                 $error = DelBiblio($rowref->{'matched_biblionumber'});
844             } else {
845                 DelAuthority({ authid => $rowref->{'matched_authid'} });
846             }
847             if (defined $error) {
848                 $num_errors++;
849             } else {
850                 $num_deleted++;
851                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
852             }
853         } elsif ($record_result eq 'restore') {
854             $num_reverted++;
855             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
856             if ($record_type eq 'biblio') {
857                 my $biblionumber = $rowref->{'matched_biblionumber'};
858                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
859
860                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
861                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
862
863                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
864                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
865             } else {
866                 my $authid = $rowref->{'matched_authid'};
867                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
868             }
869             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
870         } elsif ($record_result eq 'ignore') {
871             if ($record_type eq 'biblio') {
872                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
873             }
874             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
875         }
876         my $query;
877         if ($record_type eq 'biblio') {
878             # remove matched_biblionumber only if there is no 'imported' item left
879             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
880             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
881         } else {
882             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
883         }
884         my $sth2 = $dbh->prepare_cached($query);
885         $sth2->execute($rowref->{'import_record_id'});
886     }
887
888     $sth->finish();
889     SetImportBatchStatus($batch_id, 'reverted');
890     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
891 }
892
893 =head2 BatchRevertItems
894
895   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
896
897 =cut
898
899 sub BatchRevertItems {
900     my ($import_record_id, $biblionumber) = @_;
901
902     my $dbh = C4::Context->dbh;
903     my $num_items_deleted = 0;
904
905     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
906                                    FROM import_items
907                                    JOIN items USING (itemnumber)
908                                    WHERE import_record_id = ?");
909     $sth->bind_param(1, $import_record_id);
910     $sth->execute();
911     while (my $row = $sth->fetchrow_hashref()) {
912         my $item = Koha::Items->find($row->{itemnumber});
913         my $error = $item->safe_delete;
914         if ($error eq '1'){
915             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
916             $updsth->bind_param(1, 'reverted');
917             $updsth->bind_param(2, $row->{'import_items_id'});
918             $updsth->execute();
919             $updsth->finish();
920             $num_items_deleted++;
921         }
922         else {
923             next;
924         }
925     }
926     $sth->finish();
927     return $num_items_deleted;
928 }
929
930 =head2 CleanBatch
931
932   CleanBatch($batch_id)
933
934 Deletes all staged records from the import batch
935 and sets the status of the batch to 'cleaned'.  Note
936 that deleting a stage record does *not* affect
937 any record that has been committed to the database.
938
939 =cut
940
941 sub CleanBatch {
942     my $batch_id = shift;
943     return unless defined $batch_id;
944
945     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
946     SetImportBatchStatus($batch_id, 'cleaned');
947 }
948
949 =head2 DeleteBatch
950
951   DeleteBatch($batch_id)
952
953 Deletes the record from the database. This can only be done
954 once the batch has been cleaned.
955
956 =cut
957
958 sub DeleteBatch {
959     my $batch_id = shift;
960     return unless defined $batch_id;
961
962     my $dbh = C4::Context->dbh;
963     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
964     $sth->execute( $batch_id );
965 }
966
967 =head2 GetAllImportBatches
968
969   my $results = GetAllImportBatches();
970
971 Returns a references to an array of hash references corresponding
972 to all import_batches rows (of batch_type 'batch'), sorted in 
973 ascending order by import_batch_id.
974
975 =cut
976
977 sub  GetAllImportBatches {
978     my $dbh = C4::Context->dbh;
979     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
980                                     WHERE batch_type IN ('batch', 'webservice')
981                                     ORDER BY import_batch_id ASC");
982
983     my $results = [];
984     $sth->execute();
985     while (my $row = $sth->fetchrow_hashref) {
986         push @$results, $row;
987     }
988     $sth->finish();
989     return $results;
990 }
991
992 =head2 GetStagedWebserviceBatches
993
994   my $batch_ids = GetStagedWebserviceBatches();
995
996 Returns a references to an array of batch id's
997 of batch_type 'webservice' that are not imported
998
999 =cut
1000
1001 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1002 SELECT import_batch_id FROM import_batches
1003 WHERE batch_type = 'webservice'
1004 AND import_status = 'staged'
1005 EOQ
1006 sub  GetStagedWebserviceBatches {
1007     my $dbh = C4::Context->dbh;
1008     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1009 }
1010
1011 =head2 GetImportBatchRangeDesc
1012
1013   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1014
1015 Returns a reference to an array of hash references corresponding to
1016 import_batches rows (sorted in descending order by import_batch_id)
1017 start at the given offset.
1018
1019 =cut
1020
1021 sub GetImportBatchRangeDesc {
1022     my ($offset, $results_per_group) = @_;
1023
1024     my $dbh = C4::Context->dbh;
1025     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1026                                     LEFT JOIN import_batch_profiles p
1027                                     ON b.profile_id = p.id
1028                                     WHERE b.batch_type IN ('batch', 'webservice')
1029                                     ORDER BY b.import_batch_id DESC";
1030     my @params;
1031     if ($results_per_group){
1032         $query .= " LIMIT ?";
1033         push(@params, $results_per_group);
1034     }
1035     if ($offset){
1036         $query .= " OFFSET ?";
1037         push(@params, $offset);
1038     }
1039     my $sth = $dbh->prepare_cached($query);
1040     $sth->execute(@params);
1041     my $results = $sth->fetchall_arrayref({});
1042     $sth->finish();
1043     return $results;
1044 }
1045
1046 =head2 GetItemNumbersFromImportBatch
1047
1048   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1049
1050 =cut
1051
1052 sub GetItemNumbersFromImportBatch {
1053     my ($batch_id) = @_;
1054     my $dbh = C4::Context->dbh;
1055     my $sql = q|
1056 SELECT itemnumber FROM import_items
1057 INNER JOIN items USING (itemnumber)
1058 INNER JOIN import_records USING (import_record_id)
1059 WHERE import_batch_id = ?|;
1060     my  $sth = $dbh->prepare( $sql );
1061     $sth->execute($batch_id);
1062     my @items ;
1063     while ( my ($itm) = $sth->fetchrow_array ) {
1064         push @items, $itm;
1065     }
1066     return @items;
1067 }
1068
1069 =head2 GetNumberOfImportBatches
1070
1071   my $count = GetNumberOfImportBatches();
1072
1073 =cut
1074
1075 sub GetNumberOfNonZ3950ImportBatches {
1076     my $dbh = C4::Context->dbh;
1077     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1078     $sth->execute();
1079     my ($count) = $sth->fetchrow_array();
1080     $sth->finish();
1081     return $count;
1082 }
1083
1084 =head2 GetImportBiblios
1085
1086   my $results = GetImportBiblios($importid);
1087
1088 =cut
1089
1090 sub GetImportBiblios {
1091     my ($import_record_id) = @_;
1092
1093     my $dbh = C4::Context->dbh;
1094     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1095     return $dbh->selectall_arrayref(
1096         $query,
1097         { Slice => {} },
1098         $import_record_id
1099     );
1100
1101 }
1102
1103 =head2 GetImportRecordsRange
1104
1105   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1106
1107 Returns a reference to an array of hash references corresponding to
1108 import_biblios/import_auths/import_records rows for a given batch
1109 starting at the given offset.
1110
1111 =cut
1112
1113 sub GetImportRecordsRange {
1114     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1115
1116     my $dbh = C4::Context->dbh;
1117
1118     my $order_by = $parameters->{order_by} || 'import_record_id';
1119     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1120
1121     my $order_by_direction =
1122       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1123
1124     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1125
1126     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1127                                            record_sequence, status, overlay_status,
1128                                            matched_biblionumber, matched_authid, record_type
1129                                     FROM   import_records
1130                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1131                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1132                                     WHERE  import_batch_id = ?";
1133     my @params;
1134     push(@params, $batch_id);
1135     if ($status) {
1136         $query .= " AND status=?";
1137         push(@params,$status);
1138     }
1139
1140     $query.=" ORDER BY $order_by $order_by_direction";
1141
1142     if($results_per_group){
1143         $query .= " LIMIT ?";
1144         push(@params, $results_per_group);
1145     }
1146     if($offset){
1147         $query .= " OFFSET ?";
1148         push(@params, $offset);
1149     }
1150     my $sth = $dbh->prepare_cached($query);
1151     $sth->execute(@params);
1152     my $results = $sth->fetchall_arrayref({});
1153     $sth->finish();
1154     return $results;
1155
1156 }
1157
1158 =head2 GetBestRecordMatch
1159
1160   my $record_id = GetBestRecordMatch($import_record_id);
1161
1162 =cut
1163
1164 sub GetBestRecordMatch {
1165     my ($import_record_id) = @_;
1166
1167     my $dbh = C4::Context->dbh;
1168     my $sth = $dbh->prepare("SELECT candidate_match_id
1169                              FROM   import_record_matches
1170                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1171                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1172                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1173                              WHERE  import_record_matches.import_record_id = ? AND
1174                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1175                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1176                              ORDER BY score DESC, candidate_match_id DESC");
1177     $sth->execute($import_record_id);
1178     my ($record_id) = $sth->fetchrow_array();
1179     $sth->finish();
1180     return $record_id;
1181 }
1182
1183 =head2 GetImportBatchStatus
1184
1185   my $status = GetImportBatchStatus($batch_id);
1186
1187 =cut
1188
1189 sub GetImportBatchStatus {
1190     my ($batch_id) = @_;
1191
1192     my $dbh = C4::Context->dbh;
1193     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1194     $sth->execute($batch_id);
1195     my ($status) = $sth->fetchrow_array();
1196     $sth->finish();
1197     return $status;
1198
1199 }
1200
1201 =head2 SetImportBatchStatus
1202
1203   SetImportBatchStatus($batch_id, $new_status);
1204
1205 =cut
1206
1207 sub SetImportBatchStatus {
1208     my ($batch_id, $new_status) = @_;
1209
1210     my $dbh = C4::Context->dbh;
1211     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1212     $sth->execute($new_status, $batch_id);
1213     $sth->finish();
1214
1215 }
1216
1217 =head2 SetMatchedBiblionumber
1218
1219   SetMatchedBiblionumber($import_record_id, $biblionumber);
1220
1221 =cut
1222
1223 sub SetMatchedBiblionumber {
1224     my ($import_record_id, $biblionumber) = @_;
1225
1226     my $dbh = C4::Context->dbh;
1227     $dbh->do(
1228         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1229         undef, $biblionumber, $import_record_id
1230     );
1231 }
1232
1233 =head2 GetImportBatchOverlayAction
1234
1235   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1236
1237 =cut
1238
1239 sub GetImportBatchOverlayAction {
1240     my ($batch_id) = @_;
1241
1242     my $dbh = C4::Context->dbh;
1243     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1244     $sth->execute($batch_id);
1245     my ($overlay_action) = $sth->fetchrow_array();
1246     $sth->finish();
1247     return $overlay_action;
1248
1249 }
1250
1251
1252 =head2 SetImportBatchOverlayAction
1253
1254   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1255
1256 =cut
1257
1258 sub SetImportBatchOverlayAction {
1259     my ($batch_id, $new_overlay_action) = @_;
1260
1261     my $dbh = C4::Context->dbh;
1262     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1263     $sth->execute($new_overlay_action, $batch_id);
1264     $sth->finish();
1265
1266 }
1267
1268 =head2 GetImportBatchNoMatchAction
1269
1270   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1271
1272 =cut
1273
1274 sub GetImportBatchNoMatchAction {
1275     my ($batch_id) = @_;
1276
1277     my $dbh = C4::Context->dbh;
1278     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1279     $sth->execute($batch_id);
1280     my ($nomatch_action) = $sth->fetchrow_array();
1281     $sth->finish();
1282     return $nomatch_action;
1283
1284 }
1285
1286
1287 =head2 SetImportBatchNoMatchAction
1288
1289   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1290
1291 =cut
1292
1293 sub SetImportBatchNoMatchAction {
1294     my ($batch_id, $new_nomatch_action) = @_;
1295
1296     my $dbh = C4::Context->dbh;
1297     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1298     $sth->execute($new_nomatch_action, $batch_id);
1299     $sth->finish();
1300
1301 }
1302
1303 =head2 GetImportBatchItemAction
1304
1305   my $item_action = GetImportBatchItemAction($batch_id);
1306
1307 =cut
1308
1309 sub GetImportBatchItemAction {
1310     my ($batch_id) = @_;
1311
1312     my $dbh = C4::Context->dbh;
1313     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1314     $sth->execute($batch_id);
1315     my ($item_action) = $sth->fetchrow_array();
1316     $sth->finish();
1317     return $item_action;
1318
1319 }
1320
1321
1322 =head2 SetImportBatchItemAction
1323
1324   SetImportBatchItemAction($batch_id, $new_item_action);
1325
1326 =cut
1327
1328 sub SetImportBatchItemAction {
1329     my ($batch_id, $new_item_action) = @_;
1330
1331     my $dbh = C4::Context->dbh;
1332     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1333     $sth->execute($new_item_action, $batch_id);
1334     $sth->finish();
1335
1336 }
1337
1338 =head2 GetImportBatchMatcher
1339
1340   my $matcher_id = GetImportBatchMatcher($batch_id);
1341
1342 =cut
1343
1344 sub GetImportBatchMatcher {
1345     my ($batch_id) = @_;
1346
1347     my $dbh = C4::Context->dbh;
1348     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1349     $sth->execute($batch_id);
1350     my ($matcher_id) = $sth->fetchrow_array();
1351     $sth->finish();
1352     return $matcher_id;
1353
1354 }
1355
1356
1357 =head2 SetImportBatchMatcher
1358
1359   SetImportBatchMatcher($batch_id, $new_matcher_id);
1360
1361 =cut
1362
1363 sub SetImportBatchMatcher {
1364     my ($batch_id, $new_matcher_id) = @_;
1365
1366     my $dbh = C4::Context->dbh;
1367     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1368     $sth->execute($new_matcher_id, $batch_id);
1369     $sth->finish();
1370
1371 }
1372
1373 =head2 GetImportRecordOverlayStatus
1374
1375   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1376
1377 =cut
1378
1379 sub GetImportRecordOverlayStatus {
1380     my ($import_record_id) = @_;
1381
1382     my $dbh = C4::Context->dbh;
1383     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1384     $sth->execute($import_record_id);
1385     my ($overlay_status) = $sth->fetchrow_array();
1386     $sth->finish();
1387     return $overlay_status;
1388
1389 }
1390
1391
1392 =head2 SetImportRecordOverlayStatus
1393
1394   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1395
1396 =cut
1397
1398 sub SetImportRecordOverlayStatus {
1399     my ($import_record_id, $new_overlay_status) = @_;
1400
1401     my $dbh = C4::Context->dbh;
1402     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1403     $sth->execute($new_overlay_status, $import_record_id);
1404     $sth->finish();
1405
1406 }
1407
1408 =head2 GetImportRecordStatus
1409
1410   my $status = GetImportRecordStatus($import_record_id);
1411
1412 =cut
1413
1414 sub GetImportRecordStatus {
1415     my ($import_record_id) = @_;
1416
1417     my $dbh = C4::Context->dbh;
1418     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1419     $sth->execute($import_record_id);
1420     my ($status) = $sth->fetchrow_array();
1421     $sth->finish();
1422     return $status;
1423
1424 }
1425
1426
1427 =head2 SetImportRecordStatus
1428
1429   SetImportRecordStatus($import_record_id, $new_status);
1430
1431 =cut
1432
1433 sub SetImportRecordStatus {
1434     my ($import_record_id, $new_status) = @_;
1435
1436     my $dbh = C4::Context->dbh;
1437     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1438     $sth->execute($new_status, $import_record_id);
1439     $sth->finish();
1440
1441 }
1442
1443 =head2 GetImportRecordMatches
1444
1445   my $results = GetImportRecordMatches($import_record_id, $best_only);
1446
1447 =cut
1448
1449 sub GetImportRecordMatches {
1450     my $import_record_id = shift;
1451     my $best_only = @_ ? shift : 0;
1452
1453     my $dbh = C4::Context->dbh;
1454     # FIXME currently biblio only
1455     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1456                                     candidate_match_id, score, record_type
1457                                     FROM import_records
1458                                     JOIN import_record_matches USING (import_record_id)
1459                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1460                                     WHERE import_record_id = ?
1461                                     ORDER BY score DESC, biblionumber DESC");
1462     $sth->bind_param(1, $import_record_id);
1463     my $results = [];
1464     $sth->execute();
1465     while (my $row = $sth->fetchrow_hashref) {
1466         if ($row->{'record_type'} eq 'auth') {
1467             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1468         }
1469         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1470         push @$results, $row;
1471         last if $best_only;
1472     }
1473     $sth->finish();
1474
1475     return $results;
1476     
1477 }
1478
1479 =head2 SetImportRecordMatches
1480
1481   SetImportRecordMatches($import_record_id, @matches);
1482
1483 =cut
1484
1485 sub SetImportRecordMatches {
1486     my $import_record_id = shift;
1487     my @matches = @_;
1488
1489     my $dbh = C4::Context->dbh;
1490     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1491     $delsth->execute($import_record_id);
1492     $delsth->finish();
1493
1494     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1495                                     VALUES (?, ?, ?)");
1496     foreach my $match (@matches) {
1497         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1498     }
1499 }
1500
1501 =head2 RecordsFromISO2709File
1502
1503     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1504
1505 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1506
1507 @PARAM1, String, absolute path to the ISO2709 file.
1508 @PARAM2, String, see stage_file.pl
1509 @PARAM3, String, should be utf8
1510
1511 Returns two array refs.
1512
1513 =cut
1514
1515 sub RecordsFromISO2709File {
1516     my ($input_file, $record_type, $encoding) = @_;
1517     my @errors;
1518
1519     my $marc_type = C4::Context->preference('marcflavour');
1520     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1521
1522     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1523     my @marc_records;
1524     $/ = "\035";
1525     while (<$fh>) {
1526         s/^\s+//;
1527         s/\s+$//;
1528         next unless $_; # skip if record has only whitespace, as might occur
1529                         # if file includes newlines between each MARC record
1530         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1531         push @marc_records, $marc_record;
1532         if ($charset_guessed ne $encoding) {
1533             push @errors,
1534                 "Unexpected charset $charset_guessed, expecting $encoding";
1535         }
1536     }
1537     close $fh;
1538     return ( \@errors, \@marc_records );
1539 }
1540
1541 =head2 RecordsFromMARCXMLFile
1542
1543     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1544
1545 Creates MARC::Record-objects out of the given MARCXML-file.
1546
1547 @PARAM1, String, absolute path to the ISO2709 file.
1548 @PARAM2, String, should be utf8
1549
1550 Returns two array refs.
1551
1552 =cut
1553
1554 sub RecordsFromMARCXMLFile {
1555     my ( $filename, $encoding ) = @_;
1556     my $batch = MARC::File::XML->in( $filename );
1557     my ( @marcRecords, @errors, $record );
1558     do {
1559         eval { $record = $batch->next( $encoding ); };
1560         if ($@) {
1561             push @errors, $@;
1562         }
1563         push @marcRecords, $record if $record;
1564     } while( $record );
1565     return (\@errors, \@marcRecords);
1566 }
1567
1568 =head2 RecordsFromMarcPlugin
1569
1570     Converts text of input_file into array of MARC records with to_marc plugin
1571
1572 =cut
1573
1574 sub RecordsFromMarcPlugin {
1575     my ($input_file, $plugin_class, $encoding) = @_;
1576     my ( $text, @return );
1577     return \@return if !$input_file || !$plugin_class;
1578
1579     # Read input file
1580     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1581     $/ = "\035";
1582     while (<$fh>) {
1583         s/^\s+//;
1584         s/\s+$//;
1585         next unless $_;
1586         $text .= $_;
1587     }
1588     close $fh;
1589
1590     # Convert to large MARC blob with plugin
1591     $text = Koha::Plugins::Handler->run({
1592         class  => $plugin_class,
1593         method => 'to_marc',
1594         params => { data => $text },
1595     }) if $text;
1596
1597     # Convert to array of MARC records
1598     if( $text ) {
1599         my $marc_type = C4::Context->preference('marcflavour');
1600         foreach my $blob ( split(/\x1D/, $text) ) {
1601             next if $blob =~ /^\s*$/;
1602             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1603             push @return, $marcrecord;
1604         }
1605     }
1606     return \@return;
1607 }
1608
1609 # internal functions
1610
1611 sub _create_import_record {
1612     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1613
1614     my $dbh = C4::Context->dbh;
1615     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1616                                                          record_type, encoding)
1617                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1618     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1619                   $record_type, $encoding);
1620     my $import_record_id = $dbh->{'mysql_insertid'};
1621     $sth->finish();
1622     return $import_record_id;
1623 }
1624
1625 sub _update_import_record_marc {
1626     my ($import_record_id, $marc_record, $marc_type) = @_;
1627
1628     my $dbh = C4::Context->dbh;
1629     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1630                              WHERE  import_record_id = ?");
1631     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1632     $sth->finish();
1633 }
1634
1635 sub _add_auth_fields {
1636     my ($import_record_id, $marc_record) = @_;
1637
1638     my $controlnumber;
1639     if ($marc_record->field('001')) {
1640         $controlnumber = $marc_record->field('001')->data();
1641     }
1642     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1643     my $dbh = C4::Context->dbh;
1644     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1645     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1646     $sth->finish();
1647 }
1648
1649 sub _add_biblio_fields {
1650     my ($import_record_id, $marc_record) = @_;
1651
1652     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1653     my $dbh = C4::Context->dbh;
1654     # FIXME no controlnumber, originalsource
1655     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1656     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1657     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1658     $sth->finish();
1659                 
1660 }
1661
1662 sub _update_biblio_fields {
1663     my ($import_record_id, $marc_record) = @_;
1664
1665     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1666     my $dbh = C4::Context->dbh;
1667     # FIXME no controlnumber, originalsource
1668     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1669     $isbn =~ s/\(.*$//;
1670     $isbn =~ tr/ -_//;
1671     $isbn = uc $isbn;
1672     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1673                              WHERE  import_record_id = ?");
1674     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1675     $sth->finish();
1676 }
1677
1678 sub _parse_biblio_fields {
1679     my ($marc_record) = @_;
1680
1681     my $dbh = C4::Context->dbh;
1682     my $bibliofields = TransformMarcToKoha($marc_record, '');
1683     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1684
1685 }
1686
1687 sub _update_batch_record_counts {
1688     my ($batch_id) = @_;
1689
1690     my $dbh = C4::Context->dbh;
1691     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1692                                         num_records = (
1693                                             SELECT COUNT(*)
1694                                             FROM import_records
1695                                             WHERE import_batch_id = import_batches.import_batch_id),
1696                                         num_items = (
1697                                             SELECT COUNT(*)
1698                                             FROM import_records
1699                                             JOIN import_items USING (import_record_id)
1700                                             WHERE import_batch_id = import_batches.import_batch_id
1701                                             AND record_type = 'biblio')
1702                                     WHERE import_batch_id = ?");
1703     $sth->bind_param(1, $batch_id);
1704     $sth->execute();
1705     $sth->finish();
1706 }
1707
1708 sub _get_commit_action {
1709     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1710     
1711     if ($record_type eq 'biblio') {
1712         my ($bib_result, $bib_match, $item_result);
1713
1714         if ($overlay_status ne 'no_match') {
1715             $bib_match = GetBestRecordMatch($import_record_id);
1716             if ($overlay_action eq 'replace') {
1717                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1718             } elsif ($overlay_action eq 'create_new') {
1719                 $bib_result  = 'create_new';
1720             } elsif ($overlay_action eq 'ignore') {
1721                 $bib_result  = 'ignore';
1722             }
1723          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1724                 $item_result = 'create_new';
1725        }
1726       elsif($item_action eq 'replace'){
1727           $item_result = 'replace';
1728           }
1729       else {
1730              $item_result = 'ignore';
1731            }
1732         } else {
1733             $bib_result = $nomatch_action;
1734             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1735         }
1736         return ($bib_result, $item_result, $bib_match);
1737     } else { # must be auths
1738         my ($auth_result, $auth_match);
1739
1740         if ($overlay_status ne 'no_match') {
1741             $auth_match = GetBestRecordMatch($import_record_id);
1742             if ($overlay_action eq 'replace') {
1743                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1744             } elsif ($overlay_action eq 'create_new') {
1745                 $auth_result  = 'create_new';
1746             } elsif ($overlay_action eq 'ignore') {
1747                 $auth_result  = 'ignore';
1748             }
1749         } else {
1750             $auth_result = $nomatch_action;
1751         }
1752
1753         return ($auth_result, undef, $auth_match);
1754
1755     }
1756 }
1757
1758 sub _get_revert_action {
1759     my ($overlay_action, $overlay_status, $status) = @_;
1760
1761     my $bib_result;
1762
1763     if ($status eq 'ignored') {
1764         $bib_result = 'ignore';
1765     } else {
1766         if ($overlay_action eq 'create_new') {
1767             $bib_result = 'delete';
1768         } else {
1769             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1770         }
1771     }
1772     return $bib_result;
1773 }
1774
1775 1;
1776 __END__
1777
1778 =head1 AUTHOR
1779
1780 Koha Development Team <http://koha-community.org/>
1781
1782 Galen Charlton <galen.charlton@liblime.com>
1783
1784 =cut