Bug 31281: Use correct reply-to email when sending overdue mails
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     return ($batch_id, $num_valid, $num_items, @invalid_records);
394 }
395
396 =head2 AddItemsToImportBiblio
397
398   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
399                 $import_record_id, $marc_record, $update_counts);
400
401 =cut
402
403 sub AddItemsToImportBiblio {
404     my $batch_id = shift;
405     my $import_record_id = shift;
406     my $marc_record = shift;
407     my $update_counts = @_ ? shift : 0;
408
409     my @import_items_ids = ();
410    
411     my $dbh = C4::Context->dbh; 
412     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
413     foreach my $item_field ($marc_record->field($item_tag)) {
414         my $item_marc = MARC::Record->new();
415         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
416         $item_marc->append_fields($item_field);
417         $marc_record->delete_field($item_field);
418         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
419                                         VALUES (?, ?, ?)");
420         $sth->bind_param(1, $import_record_id);
421         $sth->bind_param(2, 'staged');
422         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
423         $sth->execute();
424         push @import_items_ids, $dbh->{'mysql_insertid'};
425         $sth->finish();
426     }
427
428     if ($#import_items_ids > -1) {
429         _update_batch_record_counts($batch_id) if $update_counts;
430         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
431     }
432     return @import_items_ids;
433 }
434
435 =head2 BatchFindDuplicates
436
437   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
438              $max_matches, $progress_interval, $progress_callback);
439
440 Goes through the records loaded in the batch and attempts to 
441 find duplicates for each one.  Sets the matching status 
442 of each record to "no_match" or "auto_match" as appropriate.
443
444 The $max_matches parameter is optional; if it is not supplied,
445 it defaults to 10.
446
447 The $progress_interval and $progress_callback parameters are 
448 optional; if both are supplied, the sub referred to by
449 $progress_callback will be invoked every $progress_interval
450 records using the number of records processed as the 
451 singular argument.
452
453 =cut
454
455 sub BatchFindDuplicates {
456     my $batch_id = shift;
457     my $matcher = shift;
458     my $max_matches = @_ ? shift : 10;
459
460     # optional callback to monitor status 
461     # of job
462     my $progress_interval = 0;
463     my $progress_callback = undef;
464     if ($#_ == 1) {
465         $progress_interval = shift;
466         $progress_callback = shift;
467         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
468         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
469     }
470
471     my $dbh = C4::Context->dbh;
472
473     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
474                              FROM import_records
475                              WHERE import_batch_id = ?");
476     $sth->execute($batch_id);
477     my $num_with_matches = 0;
478     my $rec_num = 0;
479     while (my $rowref = $sth->fetchrow_hashref) {
480         $rec_num++;
481         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
482             &$progress_callback($rec_num);
483         }
484         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
485         my @matches = ();
486         if (defined $matcher) {
487             @matches = $matcher->get_matches($marc_record, $max_matches);
488         }
489         if (scalar(@matches) > 0) {
490             $num_with_matches++;
491             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
492             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
493         } else {
494             SetImportRecordMatches($rowref->{'import_record_id'}, ());
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
496         }
497     }
498     $sth->finish();
499     return $num_with_matches;
500 }
501
502 =head2 BatchCommitRecords
503
504   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
505         BatchCommitRecords($batch_id, $framework,
506         $progress_interval, $progress_callback);
507
508 =cut
509
510 sub BatchCommitRecords {
511     my $batch_id = shift;
512     my $framework = shift;
513
514     my $schema = Koha::Database->schema;
515
516     # optional callback to monitor status 
517     # of job
518     my $progress_interval = 0;
519     my $progress_callback = undef;
520     if ($#_ == 1) {
521         $progress_interval = shift;
522         $progress_callback = shift;
523         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
524         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
525     }
526
527     my $record_type;
528     my $num_added = 0;
529     my $num_updated = 0;
530     my $num_items_added = 0;
531     my $num_items_replaced = 0;
532     my $num_items_errored = 0;
533     my $num_ignored = 0;
534     # commit (i.e., save, all records in the batch)
535     SetImportBatchStatus($batch_id, 'importing');
536     my $overlay_action = GetImportBatchOverlayAction($batch_id);
537     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
538     my $item_action = GetImportBatchItemAction($batch_id);
539     my $item_tag;
540     my $item_subfield;
541     my $dbh = C4::Context->dbh;
542     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
543                              FROM import_records
544                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
545                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
546                              WHERE import_batch_id = ?");
547     $sth->execute($batch_id);
548     my $marcflavour = C4::Context->preference('marcflavour');
549
550     my $userenv = C4::Context->userenv;
551     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
552
553     my $rec_num = 0;
554     my @biblio_ids;
555     $schema->txn_begin; # We commit in a transaction
556     while (my $rowref = $sth->fetchrow_hashref) {
557         $record_type = $rowref->{'record_type'};
558
559         $rec_num++;
560
561         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
562             # report progress and commit
563             $schema->txn_commit;
564             &$progress_callback( $rec_num );
565             $schema->txn_begin;
566         }
567         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
568             $num_ignored++;
569             next;
570         }
571
572         my $marc_type;
573         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
574             $marc_type = 'UNIMARCAUTH';
575         } elsif ($marcflavour eq 'UNIMARC') {
576             $marc_type = 'UNIMARC';
577         } else {
578             $marc_type = 'USMARC';
579         }
580         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
581
582         if ($record_type eq 'biblio') {
583             # remove any item tags - rely on _batchCommitItems
584             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
585             foreach my $item_field ($marc_record->field($item_tag)) {
586                 $marc_record->delete_field($item_field);
587             }
588         }
589
590         my ($record_result, $item_result, $record_match) =
591             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
592                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
593
594         my $recordid;
595         my $query;
596         if ($record_result eq 'create_new') {
597             $num_added++;
598             if ($record_type eq 'biblio') {
599                 my $biblioitemnumber;
600                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
601                 push @biblio_ids, $recordid;
602                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
603                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
604                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
605                     $num_items_added += $bib_items_added;
606                     $num_items_replaced += $bib_items_replaced;
607                     $num_items_errored += $bib_items_errored;
608                 }
609             } else {
610                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
611                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
612             }
613             my $sth = $dbh->prepare_cached($query);
614             $sth->execute($recordid, $rowref->{'import_record_id'});
615             $sth->finish();
616             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
617         } elsif ($record_result eq 'replace') {
618             $num_updated++;
619             $recordid = $record_match;
620             my $oldxml;
621             if ($record_type eq 'biblio') {
622                 my $oldbiblio = Koha::Biblios->find( $recordid );
623                 $oldxml = GetXmlBiblio($recordid);
624
625                 # remove item fields so that they don't get
626                 # added again if record is reverted
627                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
628                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
629                 foreach my $item_field ($old_marc->field($item_tag)) {
630                     $old_marc->delete_field($item_field);
631                 }
632                 $oldxml = $old_marc->as_xml($marc_type);
633
634                 my $context = { source => 'batchimport' };
635                 if ($logged_in_patron) {
636                     $context->{categorycode} = $logged_in_patron->categorycode;
637                     $context->{userid} = $logged_in_patron->userid;
638                 }
639
640                 ModBiblio(
641                     $marc_record,
642                     $recordid,
643                     $oldbiblio->frameworkcode,
644                     {
645                         overlay_context   => $context,
646                         skip_record_index => 1
647                     }
648                 );
649                 push @biblio_ids, $recordid;
650                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
651
652                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
653                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
654                     $num_items_added += $bib_items_added;
655                     $num_items_replaced += $bib_items_replaced;
656                     $num_items_errored += $bib_items_errored;
657                 }
658             } else {
659                 $oldxml = GetAuthorityXML($recordid);
660
661                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
662                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
663             }
664             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
665             $sth->execute($oldxml, $rowref->{'import_record_id'});
666             $sth->finish();
667             my $sth2 = $dbh->prepare_cached($query);
668             $sth2->execute($recordid, $rowref->{'import_record_id'});
669             $sth2->finish();
670             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
671             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
672         } elsif ($record_result eq 'ignore') {
673             $recordid = $record_match;
674             $num_ignored++;
675             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
676                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
677                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
678                 $num_items_added += $bib_items_added;
679          $num_items_replaced += $bib_items_replaced;
680                 $num_items_errored += $bib_items_errored;
681                 # still need to record the matched biblionumber so that the
682                 # items can be reverted
683                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
684                 $sth2->execute($recordid, $rowref->{'import_record_id'});
685                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
686             }
687             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
688         }
689     }
690     $schema->txn_commit; # Commit final records that may not have hit callback threshold
691     $sth->finish();
692
693     if ( @biblio_ids ) {
694         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
695         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
696     }
697
698     SetImportBatchStatus($batch_id, 'imported');
699     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
700 }
701
702 =head2 _batchCommitItems
703
704   ($num_items_added, $num_items_errored) = 
705          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
706
707 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
708
709 =cut
710
711 sub _batchCommitItems {
712     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
713
714     my $dbh = C4::Context->dbh;
715
716     my $num_items_added = 0;
717     my $num_items_errored = 0;
718     my $num_items_replaced = 0;
719
720     my $sth = $dbh->prepare( "
721         SELECT import_items_id, import_items.marcxml, encoding
722         FROM import_items
723         JOIN import_records USING (import_record_id)
724         WHERE import_record_id = ?
725         ORDER BY import_items_id
726     " );
727     $sth->bind_param( 1, $import_record_id );
728     $sth->execute();
729
730     while ( my $row = $sth->fetchrow_hashref() ) {
731         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
732
733         # Delete date_due subfield as to not accidentally delete item checkout due dates
734         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
735         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
736
737         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
738
739         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
740         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
741
742         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
743         if ( $action eq "replace" && $duplicate_itemnumber ) {
744             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
745             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
746             $updsth->bind_param( 1, 'imported' );
747             $updsth->bind_param( 2, $item->{itemnumber} );
748             $updsth->bind_param( 3, undef );
749             $updsth->bind_param( 4, $row->{'import_items_id'} );
750             $updsth->execute();
751             $updsth->finish();
752             $num_items_replaced++;
753         } elsif ( $action eq "replace" && $duplicate_barcode ) {
754             my $itemnumber = $duplicate_barcode->itemnumber;
755             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
756             $updsth->bind_param( 1, 'imported' );
757             $updsth->bind_param( 2, $item->{itemnumber} );
758             $updsth->bind_param( 3, undef );
759             $updsth->bind_param( 4, $row->{'import_items_id'} );
760             $updsth->execute();
761             $updsth->finish();
762             $num_items_replaced++;
763         } elsif ($duplicate_barcode) {
764             $updsth->bind_param( 1, 'error' );
765             $updsth->bind_param( 2, undef );
766             $updsth->bind_param( 3, 'duplicate item barcode' );
767             $updsth->bind_param( 4, $row->{'import_items_id'} );
768             $updsth->execute();
769             $num_items_errored++;
770         } else {
771             # Remove the itemnumber if it exists, we want to create a new item
772             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
773             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
774
775             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
776             if( $itemnumber ) {
777                 $updsth->bind_param( 1, 'imported' );
778                 $updsth->bind_param( 2, $itemnumber );
779                 $updsth->bind_param( 3, undef );
780                 $updsth->bind_param( 4, $row->{'import_items_id'} );
781                 $updsth->execute();
782                 $updsth->finish();
783                 $num_items_added++;
784             }
785         }
786     }
787
788     return ( $num_items_added, $num_items_replaced, $num_items_errored );
789 }
790
791 =head2 BatchRevertRecords
792
793   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
794       $num_ignored) = BatchRevertRecords($batch_id);
795
796 =cut
797
798 sub BatchRevertRecords {
799     my $batch_id = shift;
800
801     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
802
803     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
804
805     my $record_type;
806     my $num_deleted = 0;
807     my $num_errors = 0;
808     my $num_reverted = 0;
809     my $num_ignored = 0;
810     my $num_items_deleted = 0;
811     # commit (i.e., save, all records in the batch)
812     SetImportBatchStatus($batch_id, 'reverting');
813     my $overlay_action = GetImportBatchOverlayAction($batch_id);
814     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
815     my $dbh = C4::Context->dbh;
816     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
817                              FROM import_records
818                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
819                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
820                              WHERE import_batch_id = ?");
821     $sth->execute($batch_id);
822     my $marc_type;
823     my $marcflavour = C4::Context->preference('marcflavour');
824     while (my $rowref = $sth->fetchrow_hashref) {
825         $record_type = $rowref->{'record_type'};
826         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
827             $num_ignored++;
828             next;
829         }
830         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
831             $marc_type = 'UNIMARCAUTH';
832         } elsif ($marcflavour eq 'UNIMARC') {
833             $marc_type = 'UNIMARC';
834         } else {
835             $marc_type = 'USMARC';
836         }
837
838         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
839
840         if ($record_result eq 'delete') {
841             my $error = undef;
842             if  ($record_type eq 'biblio') {
843                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
844                 $error = DelBiblio($rowref->{'matched_biblionumber'});
845             } else {
846                 DelAuthority({ authid => $rowref->{'matched_authid'} });
847             }
848             if (defined $error) {
849                 $num_errors++;
850             } else {
851                 $num_deleted++;
852                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
853             }
854         } elsif ($record_result eq 'restore') {
855             $num_reverted++;
856             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
857             if ($record_type eq 'biblio') {
858                 my $biblionumber = $rowref->{'matched_biblionumber'};
859                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
860
861                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
862                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
863
864                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
865                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
866             } else {
867                 my $authid = $rowref->{'matched_authid'};
868                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
869             }
870             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
871         } elsif ($record_result eq 'ignore') {
872             if ($record_type eq 'biblio') {
873                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
874             }
875             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
876         }
877         my $query;
878         if ($record_type eq 'biblio') {
879             # remove matched_biblionumber only if there is no 'imported' item left
880             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
881             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
882         } else {
883             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
884         }
885         my $sth2 = $dbh->prepare_cached($query);
886         $sth2->execute($rowref->{'import_record_id'});
887     }
888
889     $sth->finish();
890     SetImportBatchStatus($batch_id, 'reverted');
891     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
892 }
893
894 =head2 BatchRevertItems
895
896   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
897
898 =cut
899
900 sub BatchRevertItems {
901     my ($import_record_id, $biblionumber) = @_;
902
903     my $dbh = C4::Context->dbh;
904     my $num_items_deleted = 0;
905
906     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
907                                    FROM import_items
908                                    JOIN items USING (itemnumber)
909                                    WHERE import_record_id = ?");
910     $sth->bind_param(1, $import_record_id);
911     $sth->execute();
912     while (my $row = $sth->fetchrow_hashref()) {
913         my $item = Koha::Items->find($row->{itemnumber});
914         if ($item->safe_delete){
915             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
916             $updsth->bind_param(1, 'reverted');
917             $updsth->bind_param(2, $row->{'import_items_id'});
918             $updsth->execute();
919             $updsth->finish();
920             $num_items_deleted++;
921         }
922         else {
923             next;
924         }
925     }
926     $sth->finish();
927     return $num_items_deleted;
928 }
929
930 =head2 CleanBatch
931
932   CleanBatch($batch_id)
933
934 Deletes all staged records from the import batch
935 and sets the status of the batch to 'cleaned'.  Note
936 that deleting a stage record does *not* affect
937 any record that has been committed to the database.
938
939 =cut
940
941 sub CleanBatch {
942     my $batch_id = shift;
943     return unless defined $batch_id;
944
945     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
946     SetImportBatchStatus($batch_id, 'cleaned');
947 }
948
949 =head2 DeleteBatch
950
951   DeleteBatch($batch_id)
952
953 Deletes the record from the database. This can only be done
954 once the batch has been cleaned.
955
956 =cut
957
958 sub DeleteBatch {
959     my $batch_id = shift;
960     return unless defined $batch_id;
961
962     my $dbh = C4::Context->dbh;
963     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
964     $sth->execute( $batch_id );
965 }
966
967 =head2 GetAllImportBatches
968
969   my $results = GetAllImportBatches();
970
971 Returns a references to an array of hash references corresponding
972 to all import_batches rows (of batch_type 'batch'), sorted in 
973 ascending order by import_batch_id.
974
975 =cut
976
977 sub  GetAllImportBatches {
978     my $dbh = C4::Context->dbh;
979     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
980                                     WHERE batch_type IN ('batch', 'webservice')
981                                     ORDER BY import_batch_id ASC");
982
983     my $results = [];
984     $sth->execute();
985     while (my $row = $sth->fetchrow_hashref) {
986         push @$results, $row;
987     }
988     $sth->finish();
989     return $results;
990 }
991
992 =head2 GetStagedWebserviceBatches
993
994   my $batch_ids = GetStagedWebserviceBatches();
995
996 Returns a references to an array of batch id's
997 of batch_type 'webservice' that are not imported
998
999 =cut
1000
1001 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1002 SELECT import_batch_id FROM import_batches
1003 WHERE batch_type = 'webservice'
1004 AND import_status = 'staged'
1005 EOQ
1006 sub  GetStagedWebserviceBatches {
1007     my $dbh = C4::Context->dbh;
1008     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1009 }
1010
1011 =head2 GetImportBatchRangeDesc
1012
1013   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1014
1015 Returns a reference to an array of hash references corresponding to
1016 import_batches rows (sorted in descending order by import_batch_id)
1017 start at the given offset.
1018
1019 =cut
1020
1021 sub GetImportBatchRangeDesc {
1022     my ($offset, $results_per_group) = @_;
1023
1024     my $dbh = C4::Context->dbh;
1025     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1026                                     LEFT JOIN import_batch_profiles p
1027                                     ON b.profile_id = p.id
1028                                     WHERE b.batch_type IN ('batch', 'webservice')
1029                                     ORDER BY b.import_batch_id DESC";
1030     my @params;
1031     if ($results_per_group){
1032         $query .= " LIMIT ?";
1033         push(@params, $results_per_group);
1034     }
1035     if ($offset){
1036         $query .= " OFFSET ?";
1037         push(@params, $offset);
1038     }
1039     my $sth = $dbh->prepare_cached($query);
1040     $sth->execute(@params);
1041     my $results = $sth->fetchall_arrayref({});
1042     $sth->finish();
1043     return $results;
1044 }
1045
1046 =head2 GetItemNumbersFromImportBatch
1047
1048   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1049
1050 =cut
1051
1052 sub GetItemNumbersFromImportBatch {
1053     my ($batch_id) = @_;
1054     my $dbh = C4::Context->dbh;
1055     my $sql = q|
1056 SELECT itemnumber FROM import_items
1057 INNER JOIN items USING (itemnumber)
1058 INNER JOIN import_records USING (import_record_id)
1059 WHERE import_batch_id = ?|;
1060     my  $sth = $dbh->prepare( $sql );
1061     $sth->execute($batch_id);
1062     my @items ;
1063     while ( my ($itm) = $sth->fetchrow_array ) {
1064         push @items, $itm;
1065     }
1066     return @items;
1067 }
1068
1069 =head2 GetNumberOfImportBatches
1070
1071   my $count = GetNumberOfImportBatches();
1072
1073 =cut
1074
1075 sub GetNumberOfNonZ3950ImportBatches {
1076     my $dbh = C4::Context->dbh;
1077     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1078     $sth->execute();
1079     my ($count) = $sth->fetchrow_array();
1080     $sth->finish();
1081     return $count;
1082 }
1083
1084 =head2 GetImportBiblios
1085
1086   my $results = GetImportBiblios($importid);
1087
1088 =cut
1089
1090 sub GetImportBiblios {
1091     my ($import_record_id) = @_;
1092
1093     my $dbh = C4::Context->dbh;
1094     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1095     return $dbh->selectall_arrayref(
1096         $query,
1097         { Slice => {} },
1098         $import_record_id
1099     );
1100
1101 }
1102
1103 =head2 GetImportRecordsRange
1104
1105   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1106
1107 Returns a reference to an array of hash references corresponding to
1108 import_biblios/import_auths/import_records rows for a given batch
1109 starting at the given offset.
1110
1111 =cut
1112
1113 sub GetImportRecordsRange {
1114     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1115
1116     my $dbh = C4::Context->dbh;
1117
1118     my $order_by = $parameters->{order_by} || 'import_record_id';
1119     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1120
1121     my $order_by_direction =
1122       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1123
1124     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1125
1126     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1127                                            record_sequence, status, overlay_status,
1128                                            matched_biblionumber, matched_authid, record_type
1129                                     FROM   import_records
1130                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1131                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1132                                     WHERE  import_batch_id = ?";
1133     my @params;
1134     push(@params, $batch_id);
1135     if ($status) {
1136         $query .= " AND status=?";
1137         push(@params,$status);
1138     }
1139
1140     $query.=" ORDER BY $order_by $order_by_direction";
1141
1142     if($results_per_group){
1143         $query .= " LIMIT ?";
1144         push(@params, $results_per_group);
1145     }
1146     if($offset){
1147         $query .= " OFFSET ?";
1148         push(@params, $offset);
1149     }
1150     my $sth = $dbh->prepare_cached($query);
1151     $sth->execute(@params);
1152     my $results = $sth->fetchall_arrayref({});
1153     $sth->finish();
1154     return $results;
1155
1156 }
1157
1158 =head2 GetBestRecordMatch
1159
1160   my $record_id = GetBestRecordMatch($import_record_id);
1161
1162 =cut
1163
1164 sub GetBestRecordMatch {
1165     my ($import_record_id) = @_;
1166
1167     my $dbh = C4::Context->dbh;
1168     my $sth = $dbh->prepare("SELECT candidate_match_id
1169                              FROM   import_record_matches
1170                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1171                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1172                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1173                              WHERE  import_record_matches.import_record_id = ? AND
1174                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1175                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1176                              AND chosen = 1
1177                              ORDER BY score DESC, candidate_match_id DESC");
1178     $sth->execute($import_record_id);
1179     my ($record_id) = $sth->fetchrow_array();
1180     $sth->finish();
1181     return $record_id;
1182 }
1183
1184 =head2 GetImportBatchStatus
1185
1186   my $status = GetImportBatchStatus($batch_id);
1187
1188 =cut
1189
1190 sub GetImportBatchStatus {
1191     my ($batch_id) = @_;
1192
1193     my $dbh = C4::Context->dbh;
1194     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1195     $sth->execute($batch_id);
1196     my ($status) = $sth->fetchrow_array();
1197     $sth->finish();
1198     return $status;
1199
1200 }
1201
1202 =head2 SetImportBatchStatus
1203
1204   SetImportBatchStatus($batch_id, $new_status);
1205
1206 =cut
1207
1208 sub SetImportBatchStatus {
1209     my ($batch_id, $new_status) = @_;
1210
1211     my $dbh = C4::Context->dbh;
1212     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1213     $sth->execute($new_status, $batch_id);
1214     $sth->finish();
1215
1216 }
1217
1218 =head2 SetMatchedBiblionumber
1219
1220   SetMatchedBiblionumber($import_record_id, $biblionumber);
1221
1222 =cut
1223
1224 sub SetMatchedBiblionumber {
1225     my ($import_record_id, $biblionumber) = @_;
1226
1227     my $dbh = C4::Context->dbh;
1228     $dbh->do(
1229         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1230         undef, $biblionumber, $import_record_id
1231     );
1232 }
1233
1234 =head2 GetImportBatchOverlayAction
1235
1236   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1237
1238 =cut
1239
1240 sub GetImportBatchOverlayAction {
1241     my ($batch_id) = @_;
1242
1243     my $dbh = C4::Context->dbh;
1244     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1245     $sth->execute($batch_id);
1246     my ($overlay_action) = $sth->fetchrow_array();
1247     $sth->finish();
1248     return $overlay_action;
1249
1250 }
1251
1252
1253 =head2 SetImportBatchOverlayAction
1254
1255   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1256
1257 =cut
1258
1259 sub SetImportBatchOverlayAction {
1260     my ($batch_id, $new_overlay_action) = @_;
1261
1262     my $dbh = C4::Context->dbh;
1263     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1264     $sth->execute($new_overlay_action, $batch_id);
1265     $sth->finish();
1266
1267 }
1268
1269 =head2 GetImportBatchNoMatchAction
1270
1271   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1272
1273 =cut
1274
1275 sub GetImportBatchNoMatchAction {
1276     my ($batch_id) = @_;
1277
1278     my $dbh = C4::Context->dbh;
1279     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1280     $sth->execute($batch_id);
1281     my ($nomatch_action) = $sth->fetchrow_array();
1282     $sth->finish();
1283     return $nomatch_action;
1284
1285 }
1286
1287
1288 =head2 SetImportBatchNoMatchAction
1289
1290   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1291
1292 =cut
1293
1294 sub SetImportBatchNoMatchAction {
1295     my ($batch_id, $new_nomatch_action) = @_;
1296
1297     my $dbh = C4::Context->dbh;
1298     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1299     $sth->execute($new_nomatch_action, $batch_id);
1300     $sth->finish();
1301
1302 }
1303
1304 =head2 GetImportBatchItemAction
1305
1306   my $item_action = GetImportBatchItemAction($batch_id);
1307
1308 =cut
1309
1310 sub GetImportBatchItemAction {
1311     my ($batch_id) = @_;
1312
1313     my $dbh = C4::Context->dbh;
1314     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1315     $sth->execute($batch_id);
1316     my ($item_action) = $sth->fetchrow_array();
1317     $sth->finish();
1318     return $item_action;
1319
1320 }
1321
1322
1323 =head2 SetImportBatchItemAction
1324
1325   SetImportBatchItemAction($batch_id, $new_item_action);
1326
1327 =cut
1328
1329 sub SetImportBatchItemAction {
1330     my ($batch_id, $new_item_action) = @_;
1331
1332     my $dbh = C4::Context->dbh;
1333     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1334     $sth->execute($new_item_action, $batch_id);
1335     $sth->finish();
1336
1337 }
1338
1339 =head2 GetImportBatchMatcher
1340
1341   my $matcher_id = GetImportBatchMatcher($batch_id);
1342
1343 =cut
1344
1345 sub GetImportBatchMatcher {
1346     my ($batch_id) = @_;
1347
1348     my $dbh = C4::Context->dbh;
1349     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1350     $sth->execute($batch_id);
1351     my ($matcher_id) = $sth->fetchrow_array();
1352     $sth->finish();
1353     return $matcher_id;
1354
1355 }
1356
1357
1358 =head2 SetImportBatchMatcher
1359
1360   SetImportBatchMatcher($batch_id, $new_matcher_id);
1361
1362 =cut
1363
1364 sub SetImportBatchMatcher {
1365     my ($batch_id, $new_matcher_id) = @_;
1366
1367     my $dbh = C4::Context->dbh;
1368     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1369     $sth->execute($new_matcher_id, $batch_id);
1370     $sth->finish();
1371
1372 }
1373
1374 =head2 GetImportRecordOverlayStatus
1375
1376   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1377
1378 =cut
1379
1380 sub GetImportRecordOverlayStatus {
1381     my ($import_record_id) = @_;
1382
1383     my $dbh = C4::Context->dbh;
1384     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1385     $sth->execute($import_record_id);
1386     my ($overlay_status) = $sth->fetchrow_array();
1387     $sth->finish();
1388     return $overlay_status;
1389
1390 }
1391
1392
1393 =head2 SetImportRecordOverlayStatus
1394
1395   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1396
1397 =cut
1398
1399 sub SetImportRecordOverlayStatus {
1400     my ($import_record_id, $new_overlay_status) = @_;
1401
1402     my $dbh = C4::Context->dbh;
1403     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1404     $sth->execute($new_overlay_status, $import_record_id);
1405     $sth->finish();
1406
1407 }
1408
1409 =head2 GetImportRecordStatus
1410
1411   my $status = GetImportRecordStatus($import_record_id);
1412
1413 =cut
1414
1415 sub GetImportRecordStatus {
1416     my ($import_record_id) = @_;
1417
1418     my $dbh = C4::Context->dbh;
1419     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1420     $sth->execute($import_record_id);
1421     my ($status) = $sth->fetchrow_array();
1422     $sth->finish();
1423     return $status;
1424
1425 }
1426
1427
1428 =head2 SetImportRecordStatus
1429
1430   SetImportRecordStatus($import_record_id, $new_status);
1431
1432 =cut
1433
1434 sub SetImportRecordStatus {
1435     my ($import_record_id, $new_status) = @_;
1436
1437     my $dbh = C4::Context->dbh;
1438     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1439     $sth->execute($new_status, $import_record_id);
1440     $sth->finish();
1441
1442 }
1443
1444 =head2 GetImportRecordMatches
1445
1446   my $results = GetImportRecordMatches($import_record_id, $best_only);
1447
1448 =cut
1449
1450 sub GetImportRecordMatches {
1451     my $import_record_id = shift;
1452     my $best_only = @_ ? shift : 0;
1453
1454     my $dbh = C4::Context->dbh;
1455     # FIXME currently biblio only
1456     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1457                                     candidate_match_id, score, record_type,
1458                                     chosen
1459                                     FROM import_records
1460                                     JOIN import_record_matches USING (import_record_id)
1461                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1462                                     WHERE import_record_id = ?
1463                                     ORDER BY score DESC, biblionumber DESC");
1464     $sth->bind_param(1, $import_record_id);
1465     my $results = [];
1466     $sth->execute();
1467     while (my $row = $sth->fetchrow_hashref) {
1468         if ($row->{'record_type'} eq 'auth') {
1469             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1470         }
1471         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1472         push @$results, $row;
1473         last if $best_only;
1474     }
1475     $sth->finish();
1476
1477     return $results;
1478     
1479 }
1480
1481 =head2 SetImportRecordMatches
1482
1483   SetImportRecordMatches($import_record_id, @matches);
1484
1485 =cut
1486
1487 sub SetImportRecordMatches {
1488     my $import_record_id = shift;
1489     my @matches = @_;
1490
1491     my $dbh = C4::Context->dbh;
1492     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1493     $delsth->execute($import_record_id);
1494     $delsth->finish();
1495
1496     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1497                                     VALUES (?, ?, ?, ?)");
1498     my $chosen = 1; #The first match is defaulted to be chosen
1499     foreach my $match (@matches) {
1500         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1501         $chosen = 0; #After the first we do not default to other matches
1502     }
1503 }
1504
1505 =head2 RecordsFromISO2709File
1506
1507     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1508
1509 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1510
1511 @PARAM1, String, absolute path to the ISO2709 file.
1512 @PARAM2, String, see stage_file.pl
1513 @PARAM3, String, should be utf8
1514
1515 Returns two array refs.
1516
1517 =cut
1518
1519 sub RecordsFromISO2709File {
1520     my ($input_file, $record_type, $encoding) = @_;
1521     my @errors;
1522
1523     my $marc_type = C4::Context->preference('marcflavour');
1524     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1525
1526     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1527     my @marc_records;
1528     $/ = "\035";
1529     while (<$fh>) {
1530         s/^\s+//;
1531         s/\s+$//;
1532         next unless $_; # skip if record has only whitespace, as might occur
1533                         # if file includes newlines between each MARC record
1534         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1535         push @marc_records, $marc_record;
1536         if ($charset_guessed ne $encoding) {
1537             push @errors,
1538                 "Unexpected charset $charset_guessed, expecting $encoding";
1539         }
1540     }
1541     close $fh;
1542     return ( \@errors, \@marc_records );
1543 }
1544
1545 =head2 RecordsFromMARCXMLFile
1546
1547     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1548
1549 Creates MARC::Record-objects out of the given MARCXML-file.
1550
1551 @PARAM1, String, absolute path to the ISO2709 file.
1552 @PARAM2, String, should be utf8
1553
1554 Returns two array refs.
1555
1556 =cut
1557
1558 sub RecordsFromMARCXMLFile {
1559     my ( $filename, $encoding ) = @_;
1560     my $batch = MARC::File::XML->in( $filename );
1561     my ( @marcRecords, @errors, $record );
1562     do {
1563         eval { $record = $batch->next( $encoding ); };
1564         if ($@) {
1565             push @errors, $@;
1566         }
1567         push @marcRecords, $record if $record;
1568     } while( $record );
1569     return (\@errors, \@marcRecords);
1570 }
1571
1572 =head2 RecordsFromMarcPlugin
1573
1574     Converts text of input_file into array of MARC records with to_marc plugin
1575
1576 =cut
1577
1578 sub RecordsFromMarcPlugin {
1579     my ($input_file, $plugin_class, $encoding) = @_;
1580     my ( $text, @return );
1581     return \@return if !$input_file || !$plugin_class;
1582
1583     # Read input file
1584     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1585     $/ = "\035";
1586     while (<$fh>) {
1587         s/^\s+//;
1588         s/\s+$//;
1589         next unless $_;
1590         $text .= $_;
1591     }
1592     close $fh;
1593
1594     # Convert to large MARC blob with plugin
1595     $text = Koha::Plugins::Handler->run({
1596         class  => $plugin_class,
1597         method => 'to_marc',
1598         params => { data => $text },
1599     }) if $text;
1600
1601     # Convert to array of MARC records
1602     if( $text ) {
1603         my $marc_type = C4::Context->preference('marcflavour');
1604         foreach my $blob ( split(/\x1D/, $text) ) {
1605             next if $blob =~ /^\s*$/;
1606             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1607             push @return, $marcrecord;
1608         }
1609     }
1610     return \@return;
1611 }
1612
1613 # internal functions
1614
1615 sub _create_import_record {
1616     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1617
1618     my $dbh = C4::Context->dbh;
1619     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1620                                                          record_type, encoding)
1621                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1622     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1623                   $record_type, $encoding);
1624     my $import_record_id = $dbh->{'mysql_insertid'};
1625     $sth->finish();
1626     return $import_record_id;
1627 }
1628
1629 sub _update_import_record_marc {
1630     my ($import_record_id, $marc_record, $marc_type) = @_;
1631
1632     my $dbh = C4::Context->dbh;
1633     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1634                              WHERE  import_record_id = ?");
1635     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1636     $sth->finish();
1637 }
1638
1639 sub _add_auth_fields {
1640     my ($import_record_id, $marc_record) = @_;
1641
1642     my $controlnumber;
1643     if ($marc_record->field('001')) {
1644         $controlnumber = $marc_record->field('001')->data();
1645     }
1646     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1647     my $dbh = C4::Context->dbh;
1648     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1649     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1650     $sth->finish();
1651 }
1652
1653 sub _add_biblio_fields {
1654     my ($import_record_id, $marc_record) = @_;
1655
1656     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1657     my $dbh = C4::Context->dbh;
1658     # FIXME no controlnumber, originalsource
1659     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1660     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1661     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1662     $sth->finish();
1663                 
1664 }
1665
1666 sub _update_biblio_fields {
1667     my ($import_record_id, $marc_record) = @_;
1668
1669     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1670     my $dbh = C4::Context->dbh;
1671     # FIXME no controlnumber, originalsource
1672     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1673     $isbn =~ s/\(.*$//;
1674     $isbn =~ tr/ -_//;
1675     $isbn = uc $isbn;
1676     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1677                              WHERE  import_record_id = ?");
1678     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1679     $sth->finish();
1680 }
1681
1682 sub _parse_biblio_fields {
1683     my ($marc_record) = @_;
1684
1685     my $dbh = C4::Context->dbh;
1686     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1687     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1688
1689 }
1690
1691 sub _update_batch_record_counts {
1692     my ($batch_id) = @_;
1693
1694     my $dbh = C4::Context->dbh;
1695     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1696                                         num_records = (
1697                                             SELECT COUNT(*)
1698                                             FROM import_records
1699                                             WHERE import_batch_id = import_batches.import_batch_id),
1700                                         num_items = (
1701                                             SELECT COUNT(*)
1702                                             FROM import_records
1703                                             JOIN import_items USING (import_record_id)
1704                                             WHERE import_batch_id = import_batches.import_batch_id
1705                                             AND record_type = 'biblio')
1706                                     WHERE import_batch_id = ?");
1707     $sth->bind_param(1, $batch_id);
1708     $sth->execute();
1709     $sth->finish();
1710 }
1711
1712 sub _get_commit_action {
1713     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1714     
1715     if ($record_type eq 'biblio') {
1716         my ($bib_result, $bib_match, $item_result);
1717
1718         $bib_match = GetBestRecordMatch($import_record_id);
1719         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1720
1721             $bib_result = $overlay_action;
1722
1723             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1724                 $item_result = 'create_new';
1725             } elsif($item_action eq 'replace'){
1726                 $item_result = 'replace';
1727             } else {
1728                 $item_result = 'ignore';
1729             }
1730
1731         } else {
1732             $bib_result = $nomatch_action;
1733             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1734         }
1735         return ($bib_result, $item_result, $bib_match);
1736     } else { # must be auths
1737         my ($auth_result, $auth_match);
1738
1739         $auth_match = GetBestRecordMatch($import_record_id);
1740         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1741             $auth_result = $overlay_action;
1742         } else {
1743             $auth_result = $nomatch_action;
1744         }
1745
1746         return ($auth_result, undef, $auth_match);
1747
1748     }
1749 }
1750
1751 sub _get_revert_action {
1752     my ($overlay_action, $overlay_status, $status) = @_;
1753
1754     my $bib_result;
1755
1756     if ($status eq 'ignored') {
1757         $bib_result = 'ignore';
1758     } else {
1759         if ($overlay_action eq 'create_new') {
1760             $bib_result = 'delete';
1761         } else {
1762             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1763         }
1764     }
1765     return $bib_result;
1766 }
1767
1768 1;
1769 __END__
1770
1771 =head1 AUTHOR
1772
1773 Koha Development Team <http://koha-community.org/>
1774
1775 Galen Charlton <galen.charlton@liblime.com>
1776
1777 =cut