Bug 27981: Adjust imported records, svc/import_bibs, records from Z3950
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     return ($batch_id, $num_valid, $num_items, @invalid_records);
394 }
395
396 =head2 AddItemsToImportBiblio
397
398   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
399                 $import_record_id, $marc_record, $update_counts);
400
401 =cut
402
403 sub AddItemsToImportBiblio {
404     my $batch_id = shift;
405     my $import_record_id = shift;
406     my $marc_record = shift;
407     my $update_counts = @_ ? shift : 0;
408
409     my @import_items_ids = ();
410    
411     my $dbh = C4::Context->dbh; 
412     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
413     foreach my $item_field ($marc_record->field($item_tag)) {
414         my $item_marc = MARC::Record->new();
415         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
416         $item_marc->append_fields($item_field);
417         $marc_record->delete_field($item_field);
418         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
419                                         VALUES (?, ?, ?)");
420         $sth->bind_param(1, $import_record_id);
421         $sth->bind_param(2, 'staged');
422         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
423         $sth->execute();
424         push @import_items_ids, $dbh->{'mysql_insertid'};
425         $sth->finish();
426     }
427
428     if ($#import_items_ids > -1) {
429         _update_batch_record_counts($batch_id) if $update_counts;
430     }
431     return @import_items_ids;
432 }
433
434 =head2 BatchFindDuplicates
435
436   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
437              $max_matches, $progress_interval, $progress_callback);
438
439 Goes through the records loaded in the batch and attempts to 
440 find duplicates for each one.  Sets the matching status 
441 of each record to "no_match" or "auto_match" as appropriate.
442
443 The $max_matches parameter is optional; if it is not supplied,
444 it defaults to 10.
445
446 The $progress_interval and $progress_callback parameters are 
447 optional; if both are supplied, the sub referred to by
448 $progress_callback will be invoked every $progress_interval
449 records using the number of records processed as the 
450 singular argument.
451
452 =cut
453
454 sub BatchFindDuplicates {
455     my $batch_id = shift;
456     my $matcher = shift;
457     my $max_matches = @_ ? shift : 10;
458
459     # optional callback to monitor status 
460     # of job
461     my $progress_interval = 0;
462     my $progress_callback = undef;
463     if ($#_ == 1) {
464         $progress_interval = shift;
465         $progress_callback = shift;
466         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
467         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
468     }
469
470     my $dbh = C4::Context->dbh;
471
472     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
473                              FROM import_records
474                              WHERE import_batch_id = ?");
475     $sth->execute($batch_id);
476     my $num_with_matches = 0;
477     my $rec_num = 0;
478     while (my $rowref = $sth->fetchrow_hashref) {
479         $rec_num++;
480         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
481             &$progress_callback($rec_num);
482         }
483         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
484         my @matches = ();
485         if (defined $matcher) {
486             @matches = $matcher->get_matches($marc_record, $max_matches);
487         }
488         if (scalar(@matches) > 0) {
489             $num_with_matches++;
490             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
491             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
492         } else {
493             SetImportRecordMatches($rowref->{'import_record_id'}, ());
494             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
495         }
496     }
497     $sth->finish();
498     return $num_with_matches;
499 }
500
501 =head2 BatchCommitRecords
502
503   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
504         BatchCommitRecords($batch_id, $framework,
505         $progress_interval, $progress_callback);
506
507 =cut
508
509 sub BatchCommitRecords {
510     my $batch_id = shift;
511     my $framework = shift;
512
513     my $schema = Koha::Database->schema;
514
515     # optional callback to monitor status 
516     # of job
517     my $progress_interval = 0;
518     my $progress_callback = undef;
519     if ($#_ == 1) {
520         $progress_interval = shift;
521         $progress_callback = shift;
522         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
523         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
524     }
525
526     my $record_type;
527     my $num_added = 0;
528     my $num_updated = 0;
529     my $num_items_added = 0;
530     my $num_items_replaced = 0;
531     my $num_items_errored = 0;
532     my $num_ignored = 0;
533     # commit (i.e., save, all records in the batch)
534     SetImportBatchStatus($batch_id, 'importing');
535     my $overlay_action = GetImportBatchOverlayAction($batch_id);
536     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
537     my $item_action = GetImportBatchItemAction($batch_id);
538     my $item_tag;
539     my $item_subfield;
540     my $dbh = C4::Context->dbh;
541     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
542                              FROM import_records
543                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
544                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
545                              WHERE import_batch_id = ?");
546     $sth->execute($batch_id);
547     my $marcflavour = C4::Context->preference('marcflavour');
548
549     my $userenv = C4::Context->userenv;
550     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
551
552     my $rec_num = 0;
553     my @biblio_ids;
554     $schema->txn_begin; # We commit in a transaction
555     while (my $rowref = $sth->fetchrow_hashref) {
556         $record_type = $rowref->{'record_type'};
557
558         $rec_num++;
559
560         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
561             # report progress and commit
562             $schema->txn_commit;
563             &$progress_callback( $rec_num );
564             $schema->txn_begin;
565         }
566         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
567             $num_ignored++;
568             next;
569         }
570
571         my $marc_type;
572         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
573             $marc_type = 'UNIMARCAUTH';
574         } elsif ($marcflavour eq 'UNIMARC') {
575             $marc_type = 'UNIMARC';
576         } else {
577             $marc_type = 'USMARC';
578         }
579         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
580
581         if ($record_type eq 'biblio') {
582             # remove any item tags - rely on _batchCommitItems
583             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
584             foreach my $item_field ($marc_record->field($item_tag)) {
585                 $marc_record->delete_field($item_field);
586             }
587             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
588                 my @control_num = $marc_record->field('001');
589                 $marc_record->delete_fields(@control_num);
590             }
591         }
592
593         my ($record_result, $item_result, $record_match) =
594             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
595                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
596
597         my $recordid;
598         my $query;
599         if ($record_result eq 'create_new') {
600             $num_added++;
601             if ($record_type eq 'biblio') {
602                 my $biblioitemnumber;
603                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
604                 push @biblio_ids, $recordid;
605                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
606                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
607                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
608                     $num_items_added += $bib_items_added;
609                     $num_items_replaced += $bib_items_replaced;
610                     $num_items_errored += $bib_items_errored;
611                 }
612             } else {
613                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
614                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
615             }
616             my $sth = $dbh->prepare_cached($query);
617             $sth->execute($recordid, $rowref->{'import_record_id'});
618             $sth->finish();
619             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
620         } elsif ($record_result eq 'replace') {
621             $num_updated++;
622             $recordid = $record_match;
623             my $oldxml;
624             if ($record_type eq 'biblio') {
625                 my $oldbiblio = Koha::Biblios->find( $recordid );
626                 $oldxml = GetXmlBiblio($recordid);
627
628                 # remove item fields so that they don't get
629                 # added again if record is reverted
630                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
631                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
632                 foreach my $item_field ($old_marc->field($item_tag)) {
633                     $old_marc->delete_field($item_field);
634                 }
635                 $oldxml = $old_marc->as_xml($marc_type);
636
637                 my $context = { source => 'batchimport' };
638                 if ($logged_in_patron) {
639                     $context->{categorycode} = $logged_in_patron->categorycode;
640                     $context->{userid} = $logged_in_patron->userid;
641                 }
642
643                 ModBiblio(
644                     $marc_record,
645                     $recordid,
646                     $oldbiblio->frameworkcode,
647                     {
648                         overlay_context   => $context,
649                         skip_record_index => 1
650                     }
651                 );
652                 push @biblio_ids, $recordid;
653                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
654
655                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
656                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
657                     $num_items_added += $bib_items_added;
658                     $num_items_replaced += $bib_items_replaced;
659                     $num_items_errored += $bib_items_errored;
660                 }
661             } else {
662                 $oldxml = GetAuthorityXML($recordid);
663
664                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
665                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
666             }
667             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
668             $sth->execute($oldxml, $rowref->{'import_record_id'});
669             $sth->finish();
670             my $sth2 = $dbh->prepare_cached($query);
671             $sth2->execute($recordid, $rowref->{'import_record_id'});
672             $sth2->finish();
673             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
674             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
675         } elsif ($record_result eq 'ignore') {
676             $recordid = $record_match;
677             $num_ignored++;
678             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
679                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
680                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
681                 $num_items_added += $bib_items_added;
682          $num_items_replaced += $bib_items_replaced;
683                 $num_items_errored += $bib_items_errored;
684                 # still need to record the matched biblionumber so that the
685                 # items can be reverted
686                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
687                 $sth2->execute($recordid, $rowref->{'import_record_id'});
688                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
689             }
690             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
691         }
692     }
693     $schema->txn_commit; # Commit final records that may not have hit callback threshold
694     $sth->finish();
695
696     if ( @biblio_ids ) {
697         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
698         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
699     }
700
701     SetImportBatchStatus($batch_id, 'imported');
702     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
703 }
704
705 =head2 _batchCommitItems
706
707   ($num_items_added, $num_items_errored) = 
708          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
709
710 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
711
712 =cut
713
714 sub _batchCommitItems {
715     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
716
717     my $dbh = C4::Context->dbh;
718
719     my $num_items_added = 0;
720     my $num_items_errored = 0;
721     my $num_items_replaced = 0;
722
723     my $sth = $dbh->prepare( "
724         SELECT import_items_id, import_items.marcxml, encoding
725         FROM import_items
726         JOIN import_records USING (import_record_id)
727         WHERE import_record_id = ?
728         ORDER BY import_items_id
729     " );
730     $sth->bind_param( 1, $import_record_id );
731     $sth->execute();
732
733     while ( my $row = $sth->fetchrow_hashref() ) {
734         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
735
736         # Delete date_due subfield as to not accidentally delete item checkout due dates
737         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
738         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
739
740         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
741
742         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
743         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
744
745         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
746         if ( $action eq "replace" && $duplicate_itemnumber ) {
747             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
748             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
749             $updsth->bind_param( 1, 'imported' );
750             $updsth->bind_param( 2, $item->{itemnumber} );
751             $updsth->bind_param( 3, undef );
752             $updsth->bind_param( 4, $row->{'import_items_id'} );
753             $updsth->execute();
754             $updsth->finish();
755             $num_items_replaced++;
756         } elsif ( $action eq "replace" && $duplicate_barcode ) {
757             my $itemnumber = $duplicate_barcode->itemnumber;
758             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
759             $updsth->bind_param( 1, 'imported' );
760             $updsth->bind_param( 2, $item->{itemnumber} );
761             $updsth->bind_param( 3, undef );
762             $updsth->bind_param( 4, $row->{'import_items_id'} );
763             $updsth->execute();
764             $updsth->finish();
765             $num_items_replaced++;
766         } elsif ($duplicate_barcode) {
767             $updsth->bind_param( 1, 'error' );
768             $updsth->bind_param( 2, undef );
769             $updsth->bind_param( 3, 'duplicate item barcode' );
770             $updsth->bind_param( 4, $row->{'import_items_id'} );
771             $updsth->execute();
772             $num_items_errored++;
773         } else {
774             # Remove the itemnumber if it exists, we want to create a new item
775             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
776             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
777
778             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
779             if( $itemnumber ) {
780                 $updsth->bind_param( 1, 'imported' );
781                 $updsth->bind_param( 2, $itemnumber );
782                 $updsth->bind_param( 3, undef );
783                 $updsth->bind_param( 4, $row->{'import_items_id'} );
784                 $updsth->execute();
785                 $updsth->finish();
786                 $num_items_added++;
787             }
788         }
789     }
790
791     return ( $num_items_added, $num_items_replaced, $num_items_errored );
792 }
793
794 =head2 BatchRevertRecords
795
796   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
797       $num_ignored) = BatchRevertRecords($batch_id);
798
799 =cut
800
801 sub BatchRevertRecords {
802     my $batch_id = shift;
803
804     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
805
806     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
807
808     my $record_type;
809     my $num_deleted = 0;
810     my $num_errors = 0;
811     my $num_reverted = 0;
812     my $num_ignored = 0;
813     my $num_items_deleted = 0;
814     # commit (i.e., save, all records in the batch)
815     SetImportBatchStatus($batch_id, 'reverting');
816     my $overlay_action = GetImportBatchOverlayAction($batch_id);
817     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
818     my $dbh = C4::Context->dbh;
819     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
820                              FROM import_records
821                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
822                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
823                              WHERE import_batch_id = ?");
824     $sth->execute($batch_id);
825     my $marc_type;
826     my $marcflavour = C4::Context->preference('marcflavour');
827     while (my $rowref = $sth->fetchrow_hashref) {
828         $record_type = $rowref->{'record_type'};
829         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
830             $num_ignored++;
831             next;
832         }
833         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
834             $marc_type = 'UNIMARCAUTH';
835         } elsif ($marcflavour eq 'UNIMARC') {
836             $marc_type = 'UNIMARC';
837         } else {
838             $marc_type = 'USMARC';
839         }
840
841         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
842
843         if ($record_result eq 'delete') {
844             my $error = undef;
845             if  ($record_type eq 'biblio') {
846                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
847                 $error = DelBiblio($rowref->{'matched_biblionumber'});
848             } else {
849                 DelAuthority({ authid => $rowref->{'matched_authid'} });
850             }
851             if (defined $error) {
852                 $num_errors++;
853             } else {
854                 $num_deleted++;
855                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
856             }
857         } elsif ($record_result eq 'restore') {
858             $num_reverted++;
859             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
860             if ($record_type eq 'biblio') {
861                 my $biblionumber = $rowref->{'matched_biblionumber'};
862                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
863
864                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
865                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
866
867                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
868                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
869             } else {
870                 my $authid = $rowref->{'matched_authid'};
871                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
872             }
873             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
874         } elsif ($record_result eq 'ignore') {
875             if ($record_type eq 'biblio') {
876                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
877             }
878             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
879         }
880         my $query;
881         if ($record_type eq 'biblio') {
882             # remove matched_biblionumber only if there is no 'imported' item left
883             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
884             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
885         } else {
886             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
887         }
888         my $sth2 = $dbh->prepare_cached($query);
889         $sth2->execute($rowref->{'import_record_id'});
890     }
891
892     $sth->finish();
893     SetImportBatchStatus($batch_id, 'reverted');
894     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
895 }
896
897 =head2 BatchRevertItems
898
899   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
900
901 =cut
902
903 sub BatchRevertItems {
904     my ($import_record_id, $biblionumber) = @_;
905
906     my $dbh = C4::Context->dbh;
907     my $num_items_deleted = 0;
908
909     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
910                                    FROM import_items
911                                    JOIN items USING (itemnumber)
912                                    WHERE import_record_id = ?");
913     $sth->bind_param(1, $import_record_id);
914     $sth->execute();
915     while (my $row = $sth->fetchrow_hashref()) {
916         my $item = Koha::Items->find($row->{itemnumber});
917         if ($item->safe_delete){
918             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
919             $updsth->bind_param(1, 'reverted');
920             $updsth->bind_param(2, $row->{'import_items_id'});
921             $updsth->execute();
922             $updsth->finish();
923             $num_items_deleted++;
924         }
925         else {
926             next;
927         }
928     }
929     $sth->finish();
930     return $num_items_deleted;
931 }
932
933 =head2 CleanBatch
934
935   CleanBatch($batch_id)
936
937 Deletes all staged records from the import batch
938 and sets the status of the batch to 'cleaned'.  Note
939 that deleting a stage record does *not* affect
940 any record that has been committed to the database.
941
942 =cut
943
944 sub CleanBatch {
945     my $batch_id = shift;
946     return unless defined $batch_id;
947
948     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
949     SetImportBatchStatus($batch_id, 'cleaned');
950 }
951
952 =head2 DeleteBatch
953
954   DeleteBatch($batch_id)
955
956 Deletes the record from the database. This can only be done
957 once the batch has been cleaned.
958
959 =cut
960
961 sub DeleteBatch {
962     my $batch_id = shift;
963     return unless defined $batch_id;
964
965     my $dbh = C4::Context->dbh;
966     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
967     $sth->execute( $batch_id );
968 }
969
970 =head2 GetAllImportBatches
971
972   my $results = GetAllImportBatches();
973
974 Returns a references to an array of hash references corresponding
975 to all import_batches rows (of batch_type 'batch'), sorted in 
976 ascending order by import_batch_id.
977
978 =cut
979
980 sub  GetAllImportBatches {
981     my $dbh = C4::Context->dbh;
982     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
983                                     WHERE batch_type IN ('batch', 'webservice')
984                                     ORDER BY import_batch_id ASC");
985
986     my $results = [];
987     $sth->execute();
988     while (my $row = $sth->fetchrow_hashref) {
989         push @$results, $row;
990     }
991     $sth->finish();
992     return $results;
993 }
994
995 =head2 GetStagedWebserviceBatches
996
997   my $batch_ids = GetStagedWebserviceBatches();
998
999 Returns a references to an array of batch id's
1000 of batch_type 'webservice' that are not imported
1001
1002 =cut
1003
1004 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1005 SELECT import_batch_id FROM import_batches
1006 WHERE batch_type = 'webservice'
1007 AND import_status = 'staged'
1008 EOQ
1009 sub  GetStagedWebserviceBatches {
1010     my $dbh = C4::Context->dbh;
1011     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1012 }
1013
1014 =head2 GetImportBatchRangeDesc
1015
1016   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1017
1018 Returns a reference to an array of hash references corresponding to
1019 import_batches rows (sorted in descending order by import_batch_id)
1020 start at the given offset.
1021
1022 =cut
1023
1024 sub GetImportBatchRangeDesc {
1025     my ($offset, $results_per_group) = @_;
1026
1027     my $dbh = C4::Context->dbh;
1028     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1029                                     LEFT JOIN import_batch_profiles p
1030                                     ON b.profile_id = p.id
1031                                     WHERE b.batch_type IN ('batch', 'webservice')
1032                                     ORDER BY b.import_batch_id DESC";
1033     my @params;
1034     if ($results_per_group){
1035         $query .= " LIMIT ?";
1036         push(@params, $results_per_group);
1037     }
1038     if ($offset){
1039         $query .= " OFFSET ?";
1040         push(@params, $offset);
1041     }
1042     my $sth = $dbh->prepare_cached($query);
1043     $sth->execute(@params);
1044     my $results = $sth->fetchall_arrayref({});
1045     $sth->finish();
1046     return $results;
1047 }
1048
1049 =head2 GetItemNumbersFromImportBatch
1050
1051   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1052
1053 =cut
1054
1055 sub GetItemNumbersFromImportBatch {
1056     my ($batch_id) = @_;
1057     my $dbh = C4::Context->dbh;
1058     my $sql = q|
1059 SELECT itemnumber FROM import_items
1060 INNER JOIN items USING (itemnumber)
1061 INNER JOIN import_records USING (import_record_id)
1062 WHERE import_batch_id = ?|;
1063     my  $sth = $dbh->prepare( $sql );
1064     $sth->execute($batch_id);
1065     my @items ;
1066     while ( my ($itm) = $sth->fetchrow_array ) {
1067         push @items, $itm;
1068     }
1069     return @items;
1070 }
1071
1072 =head2 GetNumberOfImportBatches
1073
1074   my $count = GetNumberOfImportBatches();
1075
1076 =cut
1077
1078 sub GetNumberOfNonZ3950ImportBatches {
1079     my $dbh = C4::Context->dbh;
1080     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1081     $sth->execute();
1082     my ($count) = $sth->fetchrow_array();
1083     $sth->finish();
1084     return $count;
1085 }
1086
1087 =head2 GetImportBiblios
1088
1089   my $results = GetImportBiblios($importid);
1090
1091 =cut
1092
1093 sub GetImportBiblios {
1094     my ($import_record_id) = @_;
1095
1096     my $dbh = C4::Context->dbh;
1097     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1098     return $dbh->selectall_arrayref(
1099         $query,
1100         { Slice => {} },
1101         $import_record_id
1102     );
1103
1104 }
1105
1106 =head2 GetImportRecordsRange
1107
1108   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1109
1110 Returns a reference to an array of hash references corresponding to
1111 import_biblios/import_auths/import_records rows for a given batch
1112 starting at the given offset.
1113
1114 =cut
1115
1116 sub GetImportRecordsRange {
1117     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1118
1119     my $dbh = C4::Context->dbh;
1120
1121     my $order_by = $parameters->{order_by} || 'import_record_id';
1122     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1123
1124     my $order_by_direction =
1125       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1126
1127     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1128
1129     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1130                                            record_sequence, status, overlay_status,
1131                                            matched_biblionumber, matched_authid, record_type
1132                                     FROM   import_records
1133                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1134                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1135                                     WHERE  import_batch_id = ?";
1136     my @params;
1137     push(@params, $batch_id);
1138     if ($status) {
1139         $query .= " AND status=?";
1140         push(@params,$status);
1141     }
1142
1143     $query.=" ORDER BY $order_by $order_by_direction";
1144
1145     if($results_per_group){
1146         $query .= " LIMIT ?";
1147         push(@params, $results_per_group);
1148     }
1149     if($offset){
1150         $query .= " OFFSET ?";
1151         push(@params, $offset);
1152     }
1153     my $sth = $dbh->prepare_cached($query);
1154     $sth->execute(@params);
1155     my $results = $sth->fetchall_arrayref({});
1156     $sth->finish();
1157     return $results;
1158
1159 }
1160
1161 =head2 GetBestRecordMatch
1162
1163   my $record_id = GetBestRecordMatch($import_record_id);
1164
1165 =cut
1166
1167 sub GetBestRecordMatch {
1168     my ($import_record_id) = @_;
1169
1170     my $dbh = C4::Context->dbh;
1171     my $sth = $dbh->prepare("SELECT candidate_match_id
1172                              FROM   import_record_matches
1173                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1174                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1175                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1176                              WHERE  import_record_matches.import_record_id = ? AND
1177                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1178                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1179                              AND chosen = 1
1180                              ORDER BY score DESC, candidate_match_id DESC");
1181     $sth->execute($import_record_id);
1182     my ($record_id) = $sth->fetchrow_array();
1183     $sth->finish();
1184     return $record_id;
1185 }
1186
1187 =head2 GetImportBatchStatus
1188
1189   my $status = GetImportBatchStatus($batch_id);
1190
1191 =cut
1192
1193 sub GetImportBatchStatus {
1194     my ($batch_id) = @_;
1195
1196     my $dbh = C4::Context->dbh;
1197     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1198     $sth->execute($batch_id);
1199     my ($status) = $sth->fetchrow_array();
1200     $sth->finish();
1201     return $status;
1202
1203 }
1204
1205 =head2 SetImportBatchStatus
1206
1207   SetImportBatchStatus($batch_id, $new_status);
1208
1209 =cut
1210
1211 sub SetImportBatchStatus {
1212     my ($batch_id, $new_status) = @_;
1213
1214     my $dbh = C4::Context->dbh;
1215     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1216     $sth->execute($new_status, $batch_id);
1217     $sth->finish();
1218
1219 }
1220
1221 =head2 SetMatchedBiblionumber
1222
1223   SetMatchedBiblionumber($import_record_id, $biblionumber);
1224
1225 =cut
1226
1227 sub SetMatchedBiblionumber {
1228     my ($import_record_id, $biblionumber) = @_;
1229
1230     my $dbh = C4::Context->dbh;
1231     $dbh->do(
1232         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1233         undef, $biblionumber, $import_record_id
1234     );
1235 }
1236
1237 =head2 GetImportBatchOverlayAction
1238
1239   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1240
1241 =cut
1242
1243 sub GetImportBatchOverlayAction {
1244     my ($batch_id) = @_;
1245
1246     my $dbh = C4::Context->dbh;
1247     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1248     $sth->execute($batch_id);
1249     my ($overlay_action) = $sth->fetchrow_array();
1250     $sth->finish();
1251     return $overlay_action;
1252
1253 }
1254
1255
1256 =head2 SetImportBatchOverlayAction
1257
1258   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1259
1260 =cut
1261
1262 sub SetImportBatchOverlayAction {
1263     my ($batch_id, $new_overlay_action) = @_;
1264
1265     my $dbh = C4::Context->dbh;
1266     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1267     $sth->execute($new_overlay_action, $batch_id);
1268     $sth->finish();
1269
1270 }
1271
1272 =head2 GetImportBatchNoMatchAction
1273
1274   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1275
1276 =cut
1277
1278 sub GetImportBatchNoMatchAction {
1279     my ($batch_id) = @_;
1280
1281     my $dbh = C4::Context->dbh;
1282     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1283     $sth->execute($batch_id);
1284     my ($nomatch_action) = $sth->fetchrow_array();
1285     $sth->finish();
1286     return $nomatch_action;
1287
1288 }
1289
1290
1291 =head2 SetImportBatchNoMatchAction
1292
1293   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1294
1295 =cut
1296
1297 sub SetImportBatchNoMatchAction {
1298     my ($batch_id, $new_nomatch_action) = @_;
1299
1300     my $dbh = C4::Context->dbh;
1301     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1302     $sth->execute($new_nomatch_action, $batch_id);
1303     $sth->finish();
1304
1305 }
1306
1307 =head2 GetImportBatchItemAction
1308
1309   my $item_action = GetImportBatchItemAction($batch_id);
1310
1311 =cut
1312
1313 sub GetImportBatchItemAction {
1314     my ($batch_id) = @_;
1315
1316     my $dbh = C4::Context->dbh;
1317     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1318     $sth->execute($batch_id);
1319     my ($item_action) = $sth->fetchrow_array();
1320     $sth->finish();
1321     return $item_action;
1322
1323 }
1324
1325
1326 =head2 SetImportBatchItemAction
1327
1328   SetImportBatchItemAction($batch_id, $new_item_action);
1329
1330 =cut
1331
1332 sub SetImportBatchItemAction {
1333     my ($batch_id, $new_item_action) = @_;
1334
1335     my $dbh = C4::Context->dbh;
1336     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1337     $sth->execute($new_item_action, $batch_id);
1338     $sth->finish();
1339
1340 }
1341
1342 =head2 GetImportBatchMatcher
1343
1344   my $matcher_id = GetImportBatchMatcher($batch_id);
1345
1346 =cut
1347
1348 sub GetImportBatchMatcher {
1349     my ($batch_id) = @_;
1350
1351     my $dbh = C4::Context->dbh;
1352     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1353     $sth->execute($batch_id);
1354     my ($matcher_id) = $sth->fetchrow_array();
1355     $sth->finish();
1356     return $matcher_id;
1357
1358 }
1359
1360
1361 =head2 SetImportBatchMatcher
1362
1363   SetImportBatchMatcher($batch_id, $new_matcher_id);
1364
1365 =cut
1366
1367 sub SetImportBatchMatcher {
1368     my ($batch_id, $new_matcher_id) = @_;
1369
1370     my $dbh = C4::Context->dbh;
1371     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1372     $sth->execute($new_matcher_id, $batch_id);
1373     $sth->finish();
1374
1375 }
1376
1377 =head2 GetImportRecordOverlayStatus
1378
1379   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1380
1381 =cut
1382
1383 sub GetImportRecordOverlayStatus {
1384     my ($import_record_id) = @_;
1385
1386     my $dbh = C4::Context->dbh;
1387     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1388     $sth->execute($import_record_id);
1389     my ($overlay_status) = $sth->fetchrow_array();
1390     $sth->finish();
1391     return $overlay_status;
1392
1393 }
1394
1395
1396 =head2 SetImportRecordOverlayStatus
1397
1398   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1399
1400 =cut
1401
1402 sub SetImportRecordOverlayStatus {
1403     my ($import_record_id, $new_overlay_status) = @_;
1404
1405     my $dbh = C4::Context->dbh;
1406     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1407     $sth->execute($new_overlay_status, $import_record_id);
1408     $sth->finish();
1409
1410 }
1411
1412 =head2 GetImportRecordStatus
1413
1414   my $status = GetImportRecordStatus($import_record_id);
1415
1416 =cut
1417
1418 sub GetImportRecordStatus {
1419     my ($import_record_id) = @_;
1420
1421     my $dbh = C4::Context->dbh;
1422     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1423     $sth->execute($import_record_id);
1424     my ($status) = $sth->fetchrow_array();
1425     $sth->finish();
1426     return $status;
1427
1428 }
1429
1430
1431 =head2 SetImportRecordStatus
1432
1433   SetImportRecordStatus($import_record_id, $new_status);
1434
1435 =cut
1436
1437 sub SetImportRecordStatus {
1438     my ($import_record_id, $new_status) = @_;
1439
1440     my $dbh = C4::Context->dbh;
1441     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1442     $sth->execute($new_status, $import_record_id);
1443     $sth->finish();
1444
1445 }
1446
1447 =head2 GetImportRecordMatches
1448
1449   my $results = GetImportRecordMatches($import_record_id, $best_only);
1450
1451 =cut
1452
1453 sub GetImportRecordMatches {
1454     my $import_record_id = shift;
1455     my $best_only = @_ ? shift : 0;
1456
1457     my $dbh = C4::Context->dbh;
1458     # FIXME currently biblio only
1459     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1460                                     candidate_match_id, score, record_type,
1461                                     chosen
1462                                     FROM import_records
1463                                     JOIN import_record_matches USING (import_record_id)
1464                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1465                                     WHERE import_record_id = ?
1466                                     ORDER BY score DESC, biblionumber DESC");
1467     $sth->bind_param(1, $import_record_id);
1468     my $results = [];
1469     $sth->execute();
1470     while (my $row = $sth->fetchrow_hashref) {
1471         if ($row->{'record_type'} eq 'auth') {
1472             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1473         }
1474         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1475         push @$results, $row;
1476         last if $best_only;
1477     }
1478     $sth->finish();
1479
1480     return $results;
1481     
1482 }
1483
1484 =head2 SetImportRecordMatches
1485
1486   SetImportRecordMatches($import_record_id, @matches);
1487
1488 =cut
1489
1490 sub SetImportRecordMatches {
1491     my $import_record_id = shift;
1492     my @matches = @_;
1493
1494     my $dbh = C4::Context->dbh;
1495     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1496     $delsth->execute($import_record_id);
1497     $delsth->finish();
1498
1499     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1500                                     VALUES (?, ?, ?, ?)");
1501     my $chosen = 1; #The first match is defaulted to be chosen
1502     foreach my $match (@matches) {
1503         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1504         $chosen = 0; #After the first we do not default to other matches
1505     }
1506 }
1507
1508 =head2 RecordsFromISO2709File
1509
1510     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1511
1512 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1513
1514 @PARAM1, String, absolute path to the ISO2709 file.
1515 @PARAM2, String, see stage_file.pl
1516 @PARAM3, String, should be utf8
1517
1518 Returns two array refs.
1519
1520 =cut
1521
1522 sub RecordsFromISO2709File {
1523     my ($input_file, $record_type, $encoding) = @_;
1524     my @errors;
1525
1526     my $marc_type = C4::Context->preference('marcflavour');
1527     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1528
1529     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1530     my @marc_records;
1531     $/ = "\035";
1532     while (<$fh>) {
1533         s/^\s+//;
1534         s/\s+$//;
1535         next unless $_; # skip if record has only whitespace, as might occur
1536                         # if file includes newlines between each MARC record
1537         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1538         push @marc_records, $marc_record;
1539         if ($charset_guessed ne $encoding) {
1540             push @errors,
1541                 "Unexpected charset $charset_guessed, expecting $encoding";
1542         }
1543     }
1544     close $fh;
1545     return ( \@errors, \@marc_records );
1546 }
1547
1548 =head2 RecordsFromMARCXMLFile
1549
1550     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1551
1552 Creates MARC::Record-objects out of the given MARCXML-file.
1553
1554 @PARAM1, String, absolute path to the ISO2709 file.
1555 @PARAM2, String, should be utf8
1556
1557 Returns two array refs.
1558
1559 =cut
1560
1561 sub RecordsFromMARCXMLFile {
1562     my ( $filename, $encoding ) = @_;
1563     my $batch = MARC::File::XML->in( $filename );
1564     my ( @marcRecords, @errors, $record );
1565     do {
1566         eval { $record = $batch->next( $encoding ); };
1567         if ($@) {
1568             push @errors, $@;
1569         }
1570         push @marcRecords, $record if $record;
1571     } while( $record );
1572     return (\@errors, \@marcRecords);
1573 }
1574
1575 =head2 RecordsFromMarcPlugin
1576
1577     Converts text of input_file into array of MARC records with to_marc plugin
1578
1579 =cut
1580
1581 sub RecordsFromMarcPlugin {
1582     my ($input_file, $plugin_class, $encoding) = @_;
1583     my ( $text, @return );
1584     return \@return if !$input_file || !$plugin_class;
1585
1586     # Read input file
1587     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1588     $/ = "\035";
1589     while (<$fh>) {
1590         s/^\s+//;
1591         s/\s+$//;
1592         next unless $_;
1593         $text .= $_;
1594     }
1595     close $fh;
1596
1597     # Convert to large MARC blob with plugin
1598     $text = Koha::Plugins::Handler->run({
1599         class  => $plugin_class,
1600         method => 'to_marc',
1601         params => { data => $text },
1602     }) if $text;
1603
1604     # Convert to array of MARC records
1605     if( $text ) {
1606         my $marc_type = C4::Context->preference('marcflavour');
1607         foreach my $blob ( split(/\x1D/, $text) ) {
1608             next if $blob =~ /^\s*$/;
1609             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1610             push @return, $marcrecord;
1611         }
1612     }
1613     return \@return;
1614 }
1615
1616 # internal functions
1617
1618 sub _create_import_record {
1619     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1620
1621     my $dbh = C4::Context->dbh;
1622     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1623                                                          record_type, encoding)
1624                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1625     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1626                   $record_type, $encoding);
1627     my $import_record_id = $dbh->{'mysql_insertid'};
1628     $sth->finish();
1629     return $import_record_id;
1630 }
1631
1632 sub _add_auth_fields {
1633     my ($import_record_id, $marc_record) = @_;
1634
1635     my $controlnumber;
1636     if ($marc_record->field('001')) {
1637         $controlnumber = $marc_record->field('001')->data();
1638     }
1639     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1640     my $dbh = C4::Context->dbh;
1641     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1642     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1643     $sth->finish();
1644 }
1645
1646 sub _add_biblio_fields {
1647     my ($import_record_id, $marc_record) = @_;
1648
1649     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1650     my $dbh = C4::Context->dbh;
1651     # FIXME no controlnumber, originalsource
1652     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1653     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1654     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1655     $sth->finish();
1656                 
1657 }
1658
1659 sub _update_biblio_fields {
1660     my ($import_record_id, $marc_record) = @_;
1661
1662     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1663     my $dbh = C4::Context->dbh;
1664     # FIXME no controlnumber, originalsource
1665     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1666     $isbn =~ s/\(.*$//;
1667     $isbn =~ tr/ -_//;
1668     $isbn = uc $isbn;
1669     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1670                              WHERE  import_record_id = ?");
1671     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1672     $sth->finish();
1673 }
1674
1675 sub _parse_biblio_fields {
1676     my ($marc_record) = @_;
1677
1678     my $dbh = C4::Context->dbh;
1679     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1680     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1681
1682 }
1683
1684 sub _update_batch_record_counts {
1685     my ($batch_id) = @_;
1686
1687     my $dbh = C4::Context->dbh;
1688     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1689                                         num_records = (
1690                                             SELECT COUNT(*)
1691                                             FROM import_records
1692                                             WHERE import_batch_id = import_batches.import_batch_id),
1693                                         num_items = (
1694                                             SELECT COUNT(*)
1695                                             FROM import_records
1696                                             JOIN import_items USING (import_record_id)
1697                                             WHERE import_batch_id = import_batches.import_batch_id
1698                                             AND record_type = 'biblio')
1699                                     WHERE import_batch_id = ?");
1700     $sth->bind_param(1, $batch_id);
1701     $sth->execute();
1702     $sth->finish();
1703 }
1704
1705 sub _get_commit_action {
1706     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1707     
1708     if ($record_type eq 'biblio') {
1709         my ($bib_result, $bib_match, $item_result);
1710
1711         $bib_match = GetBestRecordMatch($import_record_id);
1712         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1713
1714             $bib_result = $overlay_action;
1715
1716             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1717                 $item_result = 'create_new';
1718             } elsif($item_action eq 'replace'){
1719                 $item_result = 'replace';
1720             } else {
1721                 $item_result = 'ignore';
1722             }
1723
1724         } else {
1725             $bib_result = $nomatch_action;
1726             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1727         }
1728         return ($bib_result, $item_result, $bib_match);
1729     } else { # must be auths
1730         my ($auth_result, $auth_match);
1731
1732         $auth_match = GetBestRecordMatch($import_record_id);
1733         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1734             $auth_result = $overlay_action;
1735         } else {
1736             $auth_result = $nomatch_action;
1737         }
1738
1739         return ($auth_result, undef, $auth_match);
1740
1741     }
1742 }
1743
1744 sub _get_revert_action {
1745     my ($overlay_action, $overlay_status, $status) = @_;
1746
1747     my $bib_result;
1748
1749     if ($status eq 'ignored') {
1750         $bib_result = 'ignore';
1751     } else {
1752         if ($overlay_action eq 'create_new') {
1753             $bib_result = 'delete';
1754         } else {
1755             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1756         }
1757     }
1758     return $bib_result;
1759 }
1760
1761 1;
1762 __END__
1763
1764 =head1 AUTHOR
1765
1766 Koha Development Team <http://koha-community.org/>
1767
1768 Galen Charlton <galen.charlton@liblime.com>
1769
1770 =cut