Bug 30822: Make BatchCommitRecords update the index in one request
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       GetImportRecordMarcXML
52       GetRecordFromImportBiblio
53       AddImportBatch
54       GetImportBatch
55       AddAuthToBatch
56       AddBiblioToBatch
57       AddItemsToImportBiblio
58       ModAuthorityInBatch
59
60       BatchStageMarcRecords
61       BatchFindDuplicates
62       BatchCommitRecords
63       BatchRevertRecords
64       CleanBatch
65       DeleteBatch
66
67       GetAllImportBatches
68       GetStagedWebserviceBatches
69       GetImportBatchRangeDesc
70       GetNumberOfNonZ3950ImportBatches
71       GetImportBiblios
72       GetImportRecordsRange
73       GetItemNumbersFromImportBatch
74
75       GetImportBatchStatus
76       SetImportBatchStatus
77       GetImportBatchOverlayAction
78       SetImportBatchOverlayAction
79       GetImportBatchNoMatchAction
80       SetImportBatchNoMatchAction
81       GetImportBatchItemAction
82       SetImportBatchItemAction
83       GetImportBatchMatcher
84       SetImportBatchMatcher
85       GetImportRecordOverlayStatus
86       SetImportRecordOverlayStatus
87       GetImportRecordStatus
88       SetImportRecordStatus
89       SetMatchedBiblionumber
90       GetImportRecordMatches
91       SetImportRecordMatches
92
93       RecordsFromMARCXMLFile
94       RecordsFromISO2709File
95       RecordsFromMarcPlugin
96     );
97 }
98
99 =head1 NAME
100
101 C4::ImportBatch - manage batches of imported MARC records
102
103 =head1 SYNOPSIS
104
105 use C4::ImportBatch;
106
107 =head1 FUNCTIONS
108
109 =head2 GetZ3950BatchId
110
111   my $batchid = GetZ3950BatchId($z3950server);
112
113 Retrieves the ID of the import batch for the Z39.50
114 reservoir for the given target.  If necessary,
115 creates the import batch.
116
117 =cut
118
119 sub GetZ3950BatchId {
120     my ($z3950server) = @_;
121
122     my $dbh = C4::Context->dbh;
123     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
124                              WHERE  batch_type = 'z3950'
125                              AND    file_name = ?");
126     $sth->execute($z3950server);
127     my $rowref = $sth->fetchrow_arrayref();
128     $sth->finish();
129     if (defined $rowref) {
130         return $rowref->[0];
131     } else {
132         my $batch_id = AddImportBatch( {
133                 overlay_action => 'create_new',
134                 import_status => 'staged',
135                 batch_type => 'z3950',
136                 file_name => $z3950server,
137             } );
138         return $batch_id;
139     }
140     
141 }
142
143 =head2 GetWebserviceBatchId
144
145   my $batchid = GetWebserviceBatchId();
146
147 Retrieves the ID of the import batch for webservice.
148 If necessary, creates the import batch.
149
150 =cut
151
152 my $WEBSERVICE_BASE_QRY = <<EOQ;
153 SELECT import_batch_id FROM import_batches
154 WHERE  batch_type = 'webservice'
155 AND    import_status = 'staged'
156 EOQ
157 sub GetWebserviceBatchId {
158     my ($params) = @_;
159
160     my $dbh = C4::Context->dbh;
161     my $sql = $WEBSERVICE_BASE_QRY;
162     my @args;
163     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
164         if (my $val = $params->{$field}) {
165             $sql .= " AND $field = ?";
166             push @args, $val;
167         }
168     }
169     my $id = $dbh->selectrow_array($sql, undef, @args);
170     return $id if $id;
171
172     $params->{batch_type} = 'webservice';
173     $params->{import_status} = 'staged';
174     return AddImportBatch($params);
175 }
176
177 =head2 GetImportRecordMarc
178
179   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
180
181 =cut
182
183 sub GetImportRecordMarc {
184     my ($import_record_id) = @_;
185
186     my $dbh = C4::Context->dbh;
187     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
188         SELECT marc, encoding
189         FROM import_records
190         WHERE import_record_id = ?
191     |, undef, $import_record_id );
192
193     return $marc, $encoding;
194 }
195
196 sub GetRecordFromImportBiblio {
197     my ( $import_record_id, $embed_items ) = @_;
198
199     my ($marc) = GetImportRecordMarc($import_record_id);
200     my $record = MARC::Record->new_from_usmarc($marc);
201
202     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
203
204     return $record;
205 }
206
207 sub EmbedItemsInImportBiblio {
208     my ( $record, $import_record_id ) = @_;
209     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
210     my $dbh = C4::Context->dbh;
211     my $import_items = $dbh->selectall_arrayref(q|
212         SELECT import_items.marcxml
213         FROM import_items
214         WHERE import_record_id = ?
215     |, { Slice => {} }, $import_record_id );
216     my @item_fields;
217     for my $import_item ( @$import_items ) {
218         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
219         push @item_fields, $item_marc->field($itemtag);
220     }
221     $record->append_fields(@item_fields);
222     return $record;
223 }
224
225 =head2 GetImportRecordMarcXML
226
227   my $marcxml = GetImportRecordMarcXML($import_record_id);
228
229 =cut
230
231 sub GetImportRecordMarcXML {
232     my ($import_record_id) = @_;
233
234     my $dbh = C4::Context->dbh;
235     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
236     $sth->execute($import_record_id);
237     my ($marcxml) = $sth->fetchrow();
238     $sth->finish();
239     return $marcxml;
240
241 }
242
243 =head2 AddImportBatch
244
245   my $batch_id = AddImportBatch($params_hash);
246
247 =cut
248
249 sub AddImportBatch {
250     my ($params) = @_;
251
252     my (@fields, @vals);
253     foreach (qw( matcher_id template_id branchcode
254                  overlay_action nomatch_action item_action
255                  import_status batch_type file_name comments record_type )) {
256         if (exists $params->{$_}) {
257             push @fields, $_;
258             push @vals, $params->{$_};
259         }
260     }
261     my $dbh = C4::Context->dbh;
262     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
263                                   VALUES (".join( ',', map '?', @fields).")",
264              undef,
265              @vals);
266     return $dbh->{'mysql_insertid'};
267 }
268
269 =head2 GetImportBatch 
270
271   my $row = GetImportBatch($batch_id);
272
273 Retrieve a hashref of an import_batches row.
274
275 =cut
276
277 sub GetImportBatch {
278     my ($batch_id) = @_;
279
280     my $dbh = C4::Context->dbh;
281     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
282     $sth->bind_param(1, $batch_id);
283     $sth->execute();
284     my $result = $sth->fetchrow_hashref;
285     $sth->finish();
286     return $result;
287
288 }
289
290 =head2 AddBiblioToBatch 
291
292   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
293                 $marc_record, $encoding, $update_counts);
294
295 =cut
296
297 sub AddBiblioToBatch {
298     my $batch_id = shift;
299     my $record_sequence = shift;
300     my $marc_record = shift;
301     my $encoding = shift;
302     my $update_counts = @_ ? shift : 1;
303
304     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
305     _add_biblio_fields($import_record_id, $marc_record);
306     _update_batch_record_counts($batch_id) if $update_counts;
307     return $import_record_id;
308 }
309
310 =head2 AddAuthToBatch
311
312   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
313                 $marc_record, $encoding, $update_counts, [$marc_type]);
314
315 =cut
316
317 sub AddAuthToBatch {
318     my $batch_id = shift;
319     my $record_sequence = shift;
320     my $marc_record = shift;
321     my $encoding = shift;
322     my $update_counts = @_ ? shift : 1;
323     my $marc_type = shift || C4::Context->preference('marcflavour');
324
325     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
326
327     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
328     _add_auth_fields($import_record_id, $marc_record);
329     _update_batch_record_counts($batch_id) if $update_counts;
330     return $import_record_id;
331 }
332
333 =head2 BatchStageMarcRecords
334
335 ( $batch_id, $num_records, $num_items, @invalid_records ) =
336   BatchStageMarcRecords(
337     $record_type,                $encoding,
338     $marc_records,               $file_name,
339     $marc_modification_template, $comments,
340     $branch_code,                $parse_items,
341     $leave_as_staging,           $progress_interval,
342     $progress_callback
343   );
344
345 =cut
346
347 sub BatchStageMarcRecords {
348     my $record_type = shift;
349     my $encoding = shift;
350     my $marc_records = shift;
351     my $file_name = shift;
352     my $marc_modification_template = shift;
353     my $comments = shift;
354     my $branch_code = shift;
355     my $parse_items = shift;
356     my $leave_as_staging = shift;
357
358     # optional callback to monitor status 
359     # of job
360     my $progress_interval = 0;
361     my $progress_callback = undef;
362     if ($#_ == 1) {
363         $progress_interval = shift;
364         $progress_callback = shift;
365         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
366         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
367     } 
368     
369     my $batch_id = AddImportBatch( {
370             overlay_action => 'create_new',
371             import_status => 'staging',
372             batch_type => 'batch',
373             file_name => $file_name,
374             comments => $comments,
375             record_type => $record_type,
376         } );
377     if ($parse_items) {
378         SetImportBatchItemAction($batch_id, 'always_add');
379     } else {
380         SetImportBatchItemAction($batch_id, 'ignore');
381     }
382
383
384     my $marc_type = C4::Context->preference('marcflavour');
385     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
386     my @invalid_records = ();
387     my $num_valid = 0;
388     my $num_items = 0;
389     # FIXME - for now, we're dealing only with bibs
390     my $rec_num = 0;
391     foreach my $marc_record (@$marc_records) {
392         $rec_num++;
393         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
394             &$progress_callback($rec_num);
395         }
396
397         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
398
399         my $import_record_id;
400         if (scalar($marc_record->fields()) == 0) {
401             push @invalid_records, $marc_record;
402         } else {
403
404             # Normalize the record so it doesn't have separated diacritics
405             SetUTF8Flag($marc_record);
406
407             $num_valid++;
408             if ($record_type eq 'biblio') {
409                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
410                 if ($parse_items) {
411                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
412                     $num_items += scalar(@import_items_ids);
413                 }
414             } elsif ($record_type eq 'auth') {
415                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
416             }
417         }
418     }
419     unless ($leave_as_staging) {
420         SetImportBatchStatus($batch_id, 'staged');
421     }
422     # FIXME branch_code, number of bibs, number of items
423     _update_batch_record_counts($batch_id);
424     return ($batch_id, $num_valid, $num_items, @invalid_records);
425 }
426
427 =head2 AddItemsToImportBiblio
428
429   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
430                 $import_record_id, $marc_record, $update_counts);
431
432 =cut
433
434 sub AddItemsToImportBiblio {
435     my $batch_id = shift;
436     my $import_record_id = shift;
437     my $marc_record = shift;
438     my $update_counts = @_ ? shift : 0;
439
440     my @import_items_ids = ();
441    
442     my $dbh = C4::Context->dbh; 
443     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
444     foreach my $item_field ($marc_record->field($item_tag)) {
445         my $item_marc = MARC::Record->new();
446         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
447         $item_marc->append_fields($item_field);
448         $marc_record->delete_field($item_field);
449         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
450                                         VALUES (?, ?, ?)");
451         $sth->bind_param(1, $import_record_id);
452         $sth->bind_param(2, 'staged');
453         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
454         $sth->execute();
455         push @import_items_ids, $dbh->{'mysql_insertid'};
456         $sth->finish();
457     }
458
459     if ($#import_items_ids > -1) {
460         _update_batch_record_counts($batch_id) if $update_counts;
461         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
462     }
463     return @import_items_ids;
464 }
465
466 =head2 BatchFindDuplicates
467
468   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
469              $max_matches, $progress_interval, $progress_callback);
470
471 Goes through the records loaded in the batch and attempts to 
472 find duplicates for each one.  Sets the matching status 
473 of each record to "no_match" or "auto_match" as appropriate.
474
475 The $max_matches parameter is optional; if it is not supplied,
476 it defaults to 10.
477
478 The $progress_interval and $progress_callback parameters are 
479 optional; if both are supplied, the sub referred to by
480 $progress_callback will be invoked every $progress_interval
481 records using the number of records processed as the 
482 singular argument.
483
484 =cut
485
486 sub BatchFindDuplicates {
487     my $batch_id = shift;
488     my $matcher = shift;
489     my $max_matches = @_ ? shift : 10;
490
491     # optional callback to monitor status 
492     # of job
493     my $progress_interval = 0;
494     my $progress_callback = undef;
495     if ($#_ == 1) {
496         $progress_interval = shift;
497         $progress_callback = shift;
498         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
499         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
500     }
501
502     my $dbh = C4::Context->dbh;
503
504     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
505                              FROM import_records
506                              WHERE import_batch_id = ?");
507     $sth->execute($batch_id);
508     my $num_with_matches = 0;
509     my $rec_num = 0;
510     while (my $rowref = $sth->fetchrow_hashref) {
511         $rec_num++;
512         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
513             &$progress_callback($rec_num);
514         }
515         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
516         my @matches = ();
517         if (defined $matcher) {
518             @matches = $matcher->get_matches($marc_record, $max_matches);
519         }
520         if (scalar(@matches) > 0) {
521             $num_with_matches++;
522             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
523             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
524         } else {
525             SetImportRecordMatches($rowref->{'import_record_id'}, ());
526             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
527         }
528     }
529     $sth->finish();
530     return $num_with_matches;
531 }
532
533 =head2 BatchCommitRecords
534
535   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
536         BatchCommitRecords($batch_id, $framework,
537         $progress_interval, $progress_callback);
538
539 =cut
540
541 sub BatchCommitRecords {
542     my $batch_id = shift;
543     my $framework = shift;
544
545     # optional callback to monitor status 
546     # of job
547     my $progress_interval = 0;
548     my $progress_callback = undef;
549     if ($#_ == 1) {
550         $progress_interval = shift;
551         $progress_callback = shift;
552         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
553         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
554     }
555
556     my $record_type;
557     my $num_added = 0;
558     my $num_updated = 0;
559     my $num_items_added = 0;
560     my $num_items_replaced = 0;
561     my $num_items_errored = 0;
562     my $num_ignored = 0;
563     # commit (i.e., save, all records in the batch)
564     SetImportBatchStatus($batch_id, 'importing');
565     my $overlay_action = GetImportBatchOverlayAction($batch_id);
566     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
567     my $item_action = GetImportBatchItemAction($batch_id);
568     my $item_tag;
569     my $item_subfield;
570     my $dbh = C4::Context->dbh;
571     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
572                              FROM import_records
573                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
574                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
575                              WHERE import_batch_id = ?");
576     $sth->execute($batch_id);
577     my $marcflavour = C4::Context->preference('marcflavour');
578
579     my $userenv = C4::Context->userenv;
580     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
581
582     my $rec_num = 0;
583     my @biblio_ids;
584     while (my $rowref = $sth->fetchrow_hashref) {
585         $record_type = $rowref->{'record_type'};
586         $rec_num++;
587         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
588             &$progress_callback($rec_num);
589         }
590         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
591             $num_ignored++;
592             next;
593         }
594
595         my $marc_type;
596         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
597             $marc_type = 'UNIMARCAUTH';
598         } elsif ($marcflavour eq 'UNIMARC') {
599             $marc_type = 'UNIMARC';
600         } else {
601             $marc_type = 'USMARC';
602         }
603         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
604
605         if ($record_type eq 'biblio') {
606             # remove any item tags - rely on BatchCommitItems
607             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
608             foreach my $item_field ($marc_record->field($item_tag)) {
609                 $marc_record->delete_field($item_field);
610             }
611         }
612
613         my ($record_result, $item_result, $record_match) =
614             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
615                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
616
617         my $recordid;
618         my $query;
619         if ($record_result eq 'create_new') {
620             $num_added++;
621             if ($record_type eq 'biblio') {
622                 my $biblioitemnumber;
623                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
624                 push @biblio_ids, $recordid;
625                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
626                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
627                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
628                     $num_items_added += $bib_items_added;
629                     $num_items_replaced += $bib_items_replaced;
630                     $num_items_errored += $bib_items_errored;
631                 }
632             } else {
633                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
634                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
635             }
636             my $sth = $dbh->prepare_cached($query);
637             $sth->execute($recordid, $rowref->{'import_record_id'});
638             $sth->finish();
639             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
640         } elsif ($record_result eq 'replace') {
641             $num_updated++;
642             $recordid = $record_match;
643             my $oldxml;
644             if ($record_type eq 'biblio') {
645                 my $oldbiblio = Koha::Biblios->find( $recordid );
646                 $oldxml = GetXmlBiblio($recordid);
647
648                 # remove item fields so that they don't get
649                 # added again if record is reverted
650                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
651                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
652                 foreach my $item_field ($old_marc->field($item_tag)) {
653                     $old_marc->delete_field($item_field);
654                 }
655                 $oldxml = $old_marc->as_xml($marc_type);
656
657                 my $context = { source => 'batchimport' };
658                 if ($logged_in_patron) {
659                     $context->{categorycode} = $logged_in_patron->categorycode;
660                     $context->{userid} = $logged_in_patron->userid;
661                 }
662
663                 ModBiblio(
664                     $marc_record,
665                     $recordid,
666                     $oldbiblio->frameworkcode,
667                     {
668                         overlay_context   => $context,
669                         skip_record_index => 1
670                     }
671                 );
672                 push @biblio_ids, $recordid;
673                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
674
675                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
676                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
677                     $num_items_added += $bib_items_added;
678                     $num_items_replaced += $bib_items_replaced;
679                     $num_items_errored += $bib_items_errored;
680                 }
681             } else {
682                 $oldxml = GetAuthorityXML($recordid);
683
684                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
685                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
686             }
687             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
688             $sth->execute($oldxml, $rowref->{'import_record_id'});
689             $sth->finish();
690             my $sth2 = $dbh->prepare_cached($query);
691             $sth2->execute($recordid, $rowref->{'import_record_id'});
692             $sth2->finish();
693             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
694             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
695         } elsif ($record_result eq 'ignore') {
696             $recordid = $record_match;
697             $num_ignored++;
698             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
699                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
700                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
701                 $num_items_added += $bib_items_added;
702          $num_items_replaced += $bib_items_replaced;
703                 $num_items_errored += $bib_items_errored;
704                 # still need to record the matched biblionumber so that the
705                 # items can be reverted
706                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
707                 $sth2->execute($recordid, $rowref->{'import_record_id'});
708                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
709             }
710             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
711         }
712     }
713     $sth->finish();
714
715     if ( @biblio_ids ) {
716         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
717         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
718     }
719
720     SetImportBatchStatus($batch_id, 'imported');
721     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
722 }
723
724 =head2 BatchCommitItems
725
726   ($num_items_added, $num_items_errored) = 
727          BatchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
728
729 =cut
730
731 sub BatchCommitItems {
732     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
733
734     my $dbh = C4::Context->dbh;
735
736     my $num_items_added = 0;
737     my $num_items_errored = 0;
738     my $num_items_replaced = 0;
739
740     my $sth = $dbh->prepare( "
741         SELECT import_items_id, import_items.marcxml, encoding
742         FROM import_items
743         JOIN import_records USING (import_record_id)
744         WHERE import_record_id = ?
745         ORDER BY import_items_id
746     " );
747     $sth->bind_param( 1, $import_record_id );
748     $sth->execute();
749
750     while ( my $row = $sth->fetchrow_hashref() ) {
751         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
752
753         # Delete date_due subfield as to not accidentally delete item checkout due dates
754         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
755         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
756
757         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
758
759         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
760         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
761
762         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
763         if ( $action eq "replace" && $duplicate_itemnumber ) {
764             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
765             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
766             $updsth->bind_param( 1, 'imported' );
767             $updsth->bind_param( 2, $item->{itemnumber} );
768             $updsth->bind_param( 3, undef );
769             $updsth->bind_param( 4, $row->{'import_items_id'} );
770             $updsth->execute();
771             $updsth->finish();
772             $num_items_replaced++;
773         } elsif ( $action eq "replace" && $duplicate_barcode ) {
774             my $itemnumber = $duplicate_barcode->itemnumber;
775             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
776             $updsth->bind_param( 1, 'imported' );
777             $updsth->bind_param( 2, $item->{itemnumber} );
778             $updsth->bind_param( 3, undef );
779             $updsth->bind_param( 4, $row->{'import_items_id'} );
780             $updsth->execute();
781             $updsth->finish();
782             $num_items_replaced++;
783         } elsif ($duplicate_barcode) {
784             $updsth->bind_param( 1, 'error' );
785             $updsth->bind_param( 2, undef );
786             $updsth->bind_param( 3, 'duplicate item barcode' );
787             $updsth->bind_param( 4, $row->{'import_items_id'} );
788             $updsth->execute();
789             $num_items_errored++;
790         } else {
791             # Remove the itemnumber if it exists, we want to create a new item
792             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
793             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
794
795             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
796             if( $itemnumber ) {
797                 $updsth->bind_param( 1, 'imported' );
798                 $updsth->bind_param( 2, $itemnumber );
799                 $updsth->bind_param( 3, undef );
800                 $updsth->bind_param( 4, $row->{'import_items_id'} );
801                 $updsth->execute();
802                 $updsth->finish();
803                 $num_items_added++;
804             }
805         }
806     }
807
808     return ( $num_items_added, $num_items_replaced, $num_items_errored );
809 }
810
811 =head2 BatchRevertRecords
812
813   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
814       $num_ignored) = BatchRevertRecords($batch_id);
815
816 =cut
817
818 sub BatchRevertRecords {
819     my $batch_id = shift;
820
821     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
822
823     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
824
825     my $record_type;
826     my $num_deleted = 0;
827     my $num_errors = 0;
828     my $num_reverted = 0;
829     my $num_ignored = 0;
830     my $num_items_deleted = 0;
831     # commit (i.e., save, all records in the batch)
832     SetImportBatchStatus($batch_id, 'reverting');
833     my $overlay_action = GetImportBatchOverlayAction($batch_id);
834     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
835     my $dbh = C4::Context->dbh;
836     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
837                              FROM import_records
838                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
839                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
840                              WHERE import_batch_id = ?");
841     $sth->execute($batch_id);
842     my $marc_type;
843     my $marcflavour = C4::Context->preference('marcflavour');
844     while (my $rowref = $sth->fetchrow_hashref) {
845         $record_type = $rowref->{'record_type'};
846         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
847             $num_ignored++;
848             next;
849         }
850         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
851             $marc_type = 'UNIMARCAUTH';
852         } elsif ($marcflavour eq 'UNIMARC') {
853             $marc_type = 'UNIMARC';
854         } else {
855             $marc_type = 'USMARC';
856         }
857
858         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
859
860         if ($record_result eq 'delete') {
861             my $error = undef;
862             if  ($record_type eq 'biblio') {
863                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
864                 $error = DelBiblio($rowref->{'matched_biblionumber'});
865             } else {
866                 DelAuthority({ authid => $rowref->{'matched_authid'} });
867             }
868             if (defined $error) {
869                 $num_errors++;
870             } else {
871                 $num_deleted++;
872                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
873             }
874         } elsif ($record_result eq 'restore') {
875             $num_reverted++;
876             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
877             if ($record_type eq 'biblio') {
878                 my $biblionumber = $rowref->{'matched_biblionumber'};
879                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
880
881                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
882                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
883
884                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
885                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
886             } else {
887                 my $authid = $rowref->{'matched_authid'};
888                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
889             }
890             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
891         } elsif ($record_result eq 'ignore') {
892             if ($record_type eq 'biblio') {
893                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
894             }
895             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
896         }
897         my $query;
898         if ($record_type eq 'biblio') {
899             # remove matched_biblionumber only if there is no 'imported' item left
900             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
901             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
902         } else {
903             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
904         }
905         my $sth2 = $dbh->prepare_cached($query);
906         $sth2->execute($rowref->{'import_record_id'});
907     }
908
909     $sth->finish();
910     SetImportBatchStatus($batch_id, 'reverted');
911     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
912 }
913
914 =head2 BatchRevertItems
915
916   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
917
918 =cut
919
920 sub BatchRevertItems {
921     my ($import_record_id, $biblionumber) = @_;
922
923     my $dbh = C4::Context->dbh;
924     my $num_items_deleted = 0;
925
926     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
927                                    FROM import_items
928                                    JOIN items USING (itemnumber)
929                                    WHERE import_record_id = ?");
930     $sth->bind_param(1, $import_record_id);
931     $sth->execute();
932     while (my $row = $sth->fetchrow_hashref()) {
933         my $item = Koha::Items->find($row->{itemnumber});
934         if ($item->safe_delete){
935             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
936             $updsth->bind_param(1, 'reverted');
937             $updsth->bind_param(2, $row->{'import_items_id'});
938             $updsth->execute();
939             $updsth->finish();
940             $num_items_deleted++;
941         }
942         else {
943             next;
944         }
945     }
946     $sth->finish();
947     return $num_items_deleted;
948 }
949
950 =head2 CleanBatch
951
952   CleanBatch($batch_id)
953
954 Deletes all staged records from the import batch
955 and sets the status of the batch to 'cleaned'.  Note
956 that deleting a stage record does *not* affect
957 any record that has been committed to the database.
958
959 =cut
960
961 sub CleanBatch {
962     my $batch_id = shift;
963     return unless defined $batch_id;
964
965     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
966     SetImportBatchStatus($batch_id, 'cleaned');
967 }
968
969 =head2 DeleteBatch
970
971   DeleteBatch($batch_id)
972
973 Deletes the record from the database. This can only be done
974 once the batch has been cleaned.
975
976 =cut
977
978 sub DeleteBatch {
979     my $batch_id = shift;
980     return unless defined $batch_id;
981
982     my $dbh = C4::Context->dbh;
983     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
984     $sth->execute( $batch_id );
985 }
986
987 =head2 GetAllImportBatches
988
989   my $results = GetAllImportBatches();
990
991 Returns a references to an array of hash references corresponding
992 to all import_batches rows (of batch_type 'batch'), sorted in 
993 ascending order by import_batch_id.
994
995 =cut
996
997 sub  GetAllImportBatches {
998     my $dbh = C4::Context->dbh;
999     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1000                                     WHERE batch_type IN ('batch', 'webservice')
1001                                     ORDER BY import_batch_id ASC");
1002
1003     my $results = [];
1004     $sth->execute();
1005     while (my $row = $sth->fetchrow_hashref) {
1006         push @$results, $row;
1007     }
1008     $sth->finish();
1009     return $results;
1010 }
1011
1012 =head2 GetStagedWebserviceBatches
1013
1014   my $batch_ids = GetStagedWebserviceBatches();
1015
1016 Returns a references to an array of batch id's
1017 of batch_type 'webservice' that are not imported
1018
1019 =cut
1020
1021 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1022 SELECT import_batch_id FROM import_batches
1023 WHERE batch_type = 'webservice'
1024 AND import_status = 'staged'
1025 EOQ
1026 sub  GetStagedWebserviceBatches {
1027     my $dbh = C4::Context->dbh;
1028     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1029 }
1030
1031 =head2 GetImportBatchRangeDesc
1032
1033   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1034
1035 Returns a reference to an array of hash references corresponding to
1036 import_batches rows (sorted in descending order by import_batch_id)
1037 start at the given offset.
1038
1039 =cut
1040
1041 sub GetImportBatchRangeDesc {
1042     my ($offset, $results_per_group) = @_;
1043
1044     my $dbh = C4::Context->dbh;
1045     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1046                                     LEFT JOIN import_batch_profiles p
1047                                     ON b.profile_id = p.id
1048                                     WHERE b.batch_type IN ('batch', 'webservice')
1049                                     ORDER BY b.import_batch_id DESC";
1050     my @params;
1051     if ($results_per_group){
1052         $query .= " LIMIT ?";
1053         push(@params, $results_per_group);
1054     }
1055     if ($offset){
1056         $query .= " OFFSET ?";
1057         push(@params, $offset);
1058     }
1059     my $sth = $dbh->prepare_cached($query);
1060     $sth->execute(@params);
1061     my $results = $sth->fetchall_arrayref({});
1062     $sth->finish();
1063     return $results;
1064 }
1065
1066 =head2 GetItemNumbersFromImportBatch
1067
1068   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1069
1070 =cut
1071
1072 sub GetItemNumbersFromImportBatch {
1073     my ($batch_id) = @_;
1074     my $dbh = C4::Context->dbh;
1075     my $sql = q|
1076 SELECT itemnumber FROM import_items
1077 INNER JOIN items USING (itemnumber)
1078 INNER JOIN import_records USING (import_record_id)
1079 WHERE import_batch_id = ?|;
1080     my  $sth = $dbh->prepare( $sql );
1081     $sth->execute($batch_id);
1082     my @items ;
1083     while ( my ($itm) = $sth->fetchrow_array ) {
1084         push @items, $itm;
1085     }
1086     return @items;
1087 }
1088
1089 =head2 GetNumberOfImportBatches
1090
1091   my $count = GetNumberOfImportBatches();
1092
1093 =cut
1094
1095 sub GetNumberOfNonZ3950ImportBatches {
1096     my $dbh = C4::Context->dbh;
1097     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1098     $sth->execute();
1099     my ($count) = $sth->fetchrow_array();
1100     $sth->finish();
1101     return $count;
1102 }
1103
1104 =head2 GetImportBiblios
1105
1106   my $results = GetImportBiblios($importid);
1107
1108 =cut
1109
1110 sub GetImportBiblios {
1111     my ($import_record_id) = @_;
1112
1113     my $dbh = C4::Context->dbh;
1114     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1115     return $dbh->selectall_arrayref(
1116         $query,
1117         { Slice => {} },
1118         $import_record_id
1119     );
1120
1121 }
1122
1123 =head2 GetImportRecordsRange
1124
1125   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1126
1127 Returns a reference to an array of hash references corresponding to
1128 import_biblios/import_auths/import_records rows for a given batch
1129 starting at the given offset.
1130
1131 =cut
1132
1133 sub GetImportRecordsRange {
1134     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1135
1136     my $dbh = C4::Context->dbh;
1137
1138     my $order_by = $parameters->{order_by} || 'import_record_id';
1139     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1140
1141     my $order_by_direction =
1142       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1143
1144     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1145
1146     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1147                                            record_sequence, status, overlay_status,
1148                                            matched_biblionumber, matched_authid, record_type
1149                                     FROM   import_records
1150                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1151                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1152                                     WHERE  import_batch_id = ?";
1153     my @params;
1154     push(@params, $batch_id);
1155     if ($status) {
1156         $query .= " AND status=?";
1157         push(@params,$status);
1158     }
1159
1160     $query.=" ORDER BY $order_by $order_by_direction";
1161
1162     if($results_per_group){
1163         $query .= " LIMIT ?";
1164         push(@params, $results_per_group);
1165     }
1166     if($offset){
1167         $query .= " OFFSET ?";
1168         push(@params, $offset);
1169     }
1170     my $sth = $dbh->prepare_cached($query);
1171     $sth->execute(@params);
1172     my $results = $sth->fetchall_arrayref({});
1173     $sth->finish();
1174     return $results;
1175
1176 }
1177
1178 =head2 GetBestRecordMatch
1179
1180   my $record_id = GetBestRecordMatch($import_record_id);
1181
1182 =cut
1183
1184 sub GetBestRecordMatch {
1185     my ($import_record_id) = @_;
1186
1187     my $dbh = C4::Context->dbh;
1188     my $sth = $dbh->prepare("SELECT candidate_match_id
1189                              FROM   import_record_matches
1190                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1191                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1192                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1193                              WHERE  import_record_matches.import_record_id = ? AND
1194                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1195                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1196                              AND chosen = 1
1197                              ORDER BY score DESC, candidate_match_id DESC");
1198     $sth->execute($import_record_id);
1199     my ($record_id) = $sth->fetchrow_array();
1200     $sth->finish();
1201     return $record_id;
1202 }
1203
1204 =head2 GetImportBatchStatus
1205
1206   my $status = GetImportBatchStatus($batch_id);
1207
1208 =cut
1209
1210 sub GetImportBatchStatus {
1211     my ($batch_id) = @_;
1212
1213     my $dbh = C4::Context->dbh;
1214     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1215     $sth->execute($batch_id);
1216     my ($status) = $sth->fetchrow_array();
1217     $sth->finish();
1218     return $status;
1219
1220 }
1221
1222 =head2 SetImportBatchStatus
1223
1224   SetImportBatchStatus($batch_id, $new_status);
1225
1226 =cut
1227
1228 sub SetImportBatchStatus {
1229     my ($batch_id, $new_status) = @_;
1230
1231     my $dbh = C4::Context->dbh;
1232     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1233     $sth->execute($new_status, $batch_id);
1234     $sth->finish();
1235
1236 }
1237
1238 =head2 SetMatchedBiblionumber
1239
1240   SetMatchedBiblionumber($import_record_id, $biblionumber);
1241
1242 =cut
1243
1244 sub SetMatchedBiblionumber {
1245     my ($import_record_id, $biblionumber) = @_;
1246
1247     my $dbh = C4::Context->dbh;
1248     $dbh->do(
1249         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1250         undef, $biblionumber, $import_record_id
1251     );
1252 }
1253
1254 =head2 GetImportBatchOverlayAction
1255
1256   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1257
1258 =cut
1259
1260 sub GetImportBatchOverlayAction {
1261     my ($batch_id) = @_;
1262
1263     my $dbh = C4::Context->dbh;
1264     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1265     $sth->execute($batch_id);
1266     my ($overlay_action) = $sth->fetchrow_array();
1267     $sth->finish();
1268     return $overlay_action;
1269
1270 }
1271
1272
1273 =head2 SetImportBatchOverlayAction
1274
1275   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1276
1277 =cut
1278
1279 sub SetImportBatchOverlayAction {
1280     my ($batch_id, $new_overlay_action) = @_;
1281
1282     my $dbh = C4::Context->dbh;
1283     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1284     $sth->execute($new_overlay_action, $batch_id);
1285     $sth->finish();
1286
1287 }
1288
1289 =head2 GetImportBatchNoMatchAction
1290
1291   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1292
1293 =cut
1294
1295 sub GetImportBatchNoMatchAction {
1296     my ($batch_id) = @_;
1297
1298     my $dbh = C4::Context->dbh;
1299     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1300     $sth->execute($batch_id);
1301     my ($nomatch_action) = $sth->fetchrow_array();
1302     $sth->finish();
1303     return $nomatch_action;
1304
1305 }
1306
1307
1308 =head2 SetImportBatchNoMatchAction
1309
1310   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1311
1312 =cut
1313
1314 sub SetImportBatchNoMatchAction {
1315     my ($batch_id, $new_nomatch_action) = @_;
1316
1317     my $dbh = C4::Context->dbh;
1318     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1319     $sth->execute($new_nomatch_action, $batch_id);
1320     $sth->finish();
1321
1322 }
1323
1324 =head2 GetImportBatchItemAction
1325
1326   my $item_action = GetImportBatchItemAction($batch_id);
1327
1328 =cut
1329
1330 sub GetImportBatchItemAction {
1331     my ($batch_id) = @_;
1332
1333     my $dbh = C4::Context->dbh;
1334     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1335     $sth->execute($batch_id);
1336     my ($item_action) = $sth->fetchrow_array();
1337     $sth->finish();
1338     return $item_action;
1339
1340 }
1341
1342
1343 =head2 SetImportBatchItemAction
1344
1345   SetImportBatchItemAction($batch_id, $new_item_action);
1346
1347 =cut
1348
1349 sub SetImportBatchItemAction {
1350     my ($batch_id, $new_item_action) = @_;
1351
1352     my $dbh = C4::Context->dbh;
1353     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1354     $sth->execute($new_item_action, $batch_id);
1355     $sth->finish();
1356
1357 }
1358
1359 =head2 GetImportBatchMatcher
1360
1361   my $matcher_id = GetImportBatchMatcher($batch_id);
1362
1363 =cut
1364
1365 sub GetImportBatchMatcher {
1366     my ($batch_id) = @_;
1367
1368     my $dbh = C4::Context->dbh;
1369     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1370     $sth->execute($batch_id);
1371     my ($matcher_id) = $sth->fetchrow_array();
1372     $sth->finish();
1373     return $matcher_id;
1374
1375 }
1376
1377
1378 =head2 SetImportBatchMatcher
1379
1380   SetImportBatchMatcher($batch_id, $new_matcher_id);
1381
1382 =cut
1383
1384 sub SetImportBatchMatcher {
1385     my ($batch_id, $new_matcher_id) = @_;
1386
1387     my $dbh = C4::Context->dbh;
1388     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1389     $sth->execute($new_matcher_id, $batch_id);
1390     $sth->finish();
1391
1392 }
1393
1394 =head2 GetImportRecordOverlayStatus
1395
1396   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1397
1398 =cut
1399
1400 sub GetImportRecordOverlayStatus {
1401     my ($import_record_id) = @_;
1402
1403     my $dbh = C4::Context->dbh;
1404     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1405     $sth->execute($import_record_id);
1406     my ($overlay_status) = $sth->fetchrow_array();
1407     $sth->finish();
1408     return $overlay_status;
1409
1410 }
1411
1412
1413 =head2 SetImportRecordOverlayStatus
1414
1415   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1416
1417 =cut
1418
1419 sub SetImportRecordOverlayStatus {
1420     my ($import_record_id, $new_overlay_status) = @_;
1421
1422     my $dbh = C4::Context->dbh;
1423     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1424     $sth->execute($new_overlay_status, $import_record_id);
1425     $sth->finish();
1426
1427 }
1428
1429 =head2 GetImportRecordStatus
1430
1431   my $status = GetImportRecordStatus($import_record_id);
1432
1433 =cut
1434
1435 sub GetImportRecordStatus {
1436     my ($import_record_id) = @_;
1437
1438     my $dbh = C4::Context->dbh;
1439     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1440     $sth->execute($import_record_id);
1441     my ($status) = $sth->fetchrow_array();
1442     $sth->finish();
1443     return $status;
1444
1445 }
1446
1447
1448 =head2 SetImportRecordStatus
1449
1450   SetImportRecordStatus($import_record_id, $new_status);
1451
1452 =cut
1453
1454 sub SetImportRecordStatus {
1455     my ($import_record_id, $new_status) = @_;
1456
1457     my $dbh = C4::Context->dbh;
1458     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1459     $sth->execute($new_status, $import_record_id);
1460     $sth->finish();
1461
1462 }
1463
1464 =head2 GetImportRecordMatches
1465
1466   my $results = GetImportRecordMatches($import_record_id, $best_only);
1467
1468 =cut
1469
1470 sub GetImportRecordMatches {
1471     my $import_record_id = shift;
1472     my $best_only = @_ ? shift : 0;
1473
1474     my $dbh = C4::Context->dbh;
1475     # FIXME currently biblio only
1476     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1477                                     candidate_match_id, score, record_type,
1478                                     chosen
1479                                     FROM import_records
1480                                     JOIN import_record_matches USING (import_record_id)
1481                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1482                                     WHERE import_record_id = ?
1483                                     ORDER BY score DESC, biblionumber DESC");
1484     $sth->bind_param(1, $import_record_id);
1485     my $results = [];
1486     $sth->execute();
1487     while (my $row = $sth->fetchrow_hashref) {
1488         if ($row->{'record_type'} eq 'auth') {
1489             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1490         }
1491         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1492         push @$results, $row;
1493         last if $best_only;
1494     }
1495     $sth->finish();
1496
1497     return $results;
1498     
1499 }
1500
1501 =head2 SetImportRecordMatches
1502
1503   SetImportRecordMatches($import_record_id, @matches);
1504
1505 =cut
1506
1507 sub SetImportRecordMatches {
1508     my $import_record_id = shift;
1509     my @matches = @_;
1510
1511     my $dbh = C4::Context->dbh;
1512     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1513     $delsth->execute($import_record_id);
1514     $delsth->finish();
1515
1516     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1517                                     VALUES (?, ?, ?, ?)");
1518     my $chosen = 1; #The first match is defaulted to be chosen
1519     foreach my $match (@matches) {
1520         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1521         $chosen = 0; #After the first we do not default to other matches
1522     }
1523 }
1524
1525 =head2 RecordsFromISO2709File
1526
1527     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1528
1529 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1530
1531 @PARAM1, String, absolute path to the ISO2709 file.
1532 @PARAM2, String, see stage_file.pl
1533 @PARAM3, String, should be utf8
1534
1535 Returns two array refs.
1536
1537 =cut
1538
1539 sub RecordsFromISO2709File {
1540     my ($input_file, $record_type, $encoding) = @_;
1541     my @errors;
1542
1543     my $marc_type = C4::Context->preference('marcflavour');
1544     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1545
1546     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1547     my @marc_records;
1548     $/ = "\035";
1549     while (<$fh>) {
1550         s/^\s+//;
1551         s/\s+$//;
1552         next unless $_; # skip if record has only whitespace, as might occur
1553                         # if file includes newlines between each MARC record
1554         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1555         push @marc_records, $marc_record;
1556         if ($charset_guessed ne $encoding) {
1557             push @errors,
1558                 "Unexpected charset $charset_guessed, expecting $encoding";
1559         }
1560     }
1561     close $fh;
1562     return ( \@errors, \@marc_records );
1563 }
1564
1565 =head2 RecordsFromMARCXMLFile
1566
1567     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1568
1569 Creates MARC::Record-objects out of the given MARCXML-file.
1570
1571 @PARAM1, String, absolute path to the ISO2709 file.
1572 @PARAM2, String, should be utf8
1573
1574 Returns two array refs.
1575
1576 =cut
1577
1578 sub RecordsFromMARCXMLFile {
1579     my ( $filename, $encoding ) = @_;
1580     my $batch = MARC::File::XML->in( $filename );
1581     my ( @marcRecords, @errors, $record );
1582     do {
1583         eval { $record = $batch->next( $encoding ); };
1584         if ($@) {
1585             push @errors, $@;
1586         }
1587         push @marcRecords, $record if $record;
1588     } while( $record );
1589     return (\@errors, \@marcRecords);
1590 }
1591
1592 =head2 RecordsFromMarcPlugin
1593
1594     Converts text of input_file into array of MARC records with to_marc plugin
1595
1596 =cut
1597
1598 sub RecordsFromMarcPlugin {
1599     my ($input_file, $plugin_class, $encoding) = @_;
1600     my ( $text, @return );
1601     return \@return if !$input_file || !$plugin_class;
1602
1603     # Read input file
1604     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1605     $/ = "\035";
1606     while (<$fh>) {
1607         s/^\s+//;
1608         s/\s+$//;
1609         next unless $_;
1610         $text .= $_;
1611     }
1612     close $fh;
1613
1614     # Convert to large MARC blob with plugin
1615     $text = Koha::Plugins::Handler->run({
1616         class  => $plugin_class,
1617         method => 'to_marc',
1618         params => { data => $text },
1619     }) if $text;
1620
1621     # Convert to array of MARC records
1622     if( $text ) {
1623         my $marc_type = C4::Context->preference('marcflavour');
1624         foreach my $blob ( split(/\x1D/, $text) ) {
1625             next if $blob =~ /^\s*$/;
1626             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1627             push @return, $marcrecord;
1628         }
1629     }
1630     return \@return;
1631 }
1632
1633 # internal functions
1634
1635 sub _create_import_record {
1636     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1637
1638     my $dbh = C4::Context->dbh;
1639     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1640                                                          record_type, encoding)
1641                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1642     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1643                   $record_type, $encoding);
1644     my $import_record_id = $dbh->{'mysql_insertid'};
1645     $sth->finish();
1646     return $import_record_id;
1647 }
1648
1649 sub _update_import_record_marc {
1650     my ($import_record_id, $marc_record, $marc_type) = @_;
1651
1652     my $dbh = C4::Context->dbh;
1653     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1654                              WHERE  import_record_id = ?");
1655     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1656     $sth->finish();
1657 }
1658
1659 sub _add_auth_fields {
1660     my ($import_record_id, $marc_record) = @_;
1661
1662     my $controlnumber;
1663     if ($marc_record->field('001')) {
1664         $controlnumber = $marc_record->field('001')->data();
1665     }
1666     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1667     my $dbh = C4::Context->dbh;
1668     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1669     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1670     $sth->finish();
1671 }
1672
1673 sub _add_biblio_fields {
1674     my ($import_record_id, $marc_record) = @_;
1675
1676     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1677     my $dbh = C4::Context->dbh;
1678     # FIXME no controlnumber, originalsource
1679     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1680     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1681     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1682     $sth->finish();
1683                 
1684 }
1685
1686 sub _update_biblio_fields {
1687     my ($import_record_id, $marc_record) = @_;
1688
1689     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1690     my $dbh = C4::Context->dbh;
1691     # FIXME no controlnumber, originalsource
1692     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1693     $isbn =~ s/\(.*$//;
1694     $isbn =~ tr/ -_//;
1695     $isbn = uc $isbn;
1696     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1697                              WHERE  import_record_id = ?");
1698     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1699     $sth->finish();
1700 }
1701
1702 sub _parse_biblio_fields {
1703     my ($marc_record) = @_;
1704
1705     my $dbh = C4::Context->dbh;
1706     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1707     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1708
1709 }
1710
1711 sub _update_batch_record_counts {
1712     my ($batch_id) = @_;
1713
1714     my $dbh = C4::Context->dbh;
1715     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1716                                         num_records = (
1717                                             SELECT COUNT(*)
1718                                             FROM import_records
1719                                             WHERE import_batch_id = import_batches.import_batch_id),
1720                                         num_items = (
1721                                             SELECT COUNT(*)
1722                                             FROM import_records
1723                                             JOIN import_items USING (import_record_id)
1724                                             WHERE import_batch_id = import_batches.import_batch_id
1725                                             AND record_type = 'biblio')
1726                                     WHERE import_batch_id = ?");
1727     $sth->bind_param(1, $batch_id);
1728     $sth->execute();
1729     $sth->finish();
1730 }
1731
1732 sub _get_commit_action {
1733     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1734     
1735     if ($record_type eq 'biblio') {
1736         my ($bib_result, $bib_match, $item_result);
1737
1738         $bib_match = GetBestRecordMatch($import_record_id);
1739         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1740
1741             $bib_result = $overlay_action;
1742
1743             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1744                 $item_result = 'create_new';
1745             } elsif($item_action eq 'replace'){
1746                 $item_result = 'replace';
1747             } else {
1748                 $item_result = 'ignore';
1749             }
1750
1751         } else {
1752             $bib_result = $nomatch_action;
1753             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1754         }
1755         return ($bib_result, $item_result, $bib_match);
1756     } else { # must be auths
1757         my ($auth_result, $auth_match);
1758
1759         $auth_match = GetBestRecordMatch($import_record_id);
1760         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1761             $auth_result = $overlay_action;
1762         } else {
1763             $auth_result = $nomatch_action;
1764         }
1765
1766         return ($auth_result, undef, $auth_match);
1767
1768     }
1769 }
1770
1771 sub _get_revert_action {
1772     my ($overlay_action, $overlay_status, $status) = @_;
1773
1774     my $bib_result;
1775
1776     if ($status eq 'ignored') {
1777         $bib_result = 'ignore';
1778     } else {
1779         if ($overlay_action eq 'create_new') {
1780             $bib_result = 'delete';
1781         } else {
1782             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1783         }
1784     }
1785     return $bib_result;
1786 }
1787
1788 1;
1789 __END__
1790
1791 =head1 AUTHOR
1792
1793 Koha Development Team <http://koha-community.org/>
1794
1795 Galen Charlton <galen.charlton@liblime.com>
1796
1797 =cut