Bug 30779: Remove _update_import_record_marc and update tests
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     return ($batch_id, $num_valid, $num_items, @invalid_records);
394 }
395
396 =head2 AddItemsToImportBiblio
397
398   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
399                 $import_record_id, $marc_record, $update_counts);
400
401 =cut
402
403 sub AddItemsToImportBiblio {
404     my $batch_id = shift;
405     my $import_record_id = shift;
406     my $marc_record = shift;
407     my $update_counts = @_ ? shift : 0;
408
409     my @import_items_ids = ();
410    
411     my $dbh = C4::Context->dbh; 
412     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
413     foreach my $item_field ($marc_record->field($item_tag)) {
414         my $item_marc = MARC::Record->new();
415         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
416         $item_marc->append_fields($item_field);
417         $marc_record->delete_field($item_field);
418         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
419                                         VALUES (?, ?, ?)");
420         $sth->bind_param(1, $import_record_id);
421         $sth->bind_param(2, 'staged');
422         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
423         $sth->execute();
424         push @import_items_ids, $dbh->{'mysql_insertid'};
425         $sth->finish();
426     }
427
428     if ($#import_items_ids > -1) {
429         _update_batch_record_counts($batch_id) if $update_counts;
430     }
431     return @import_items_ids;
432 }
433
434 =head2 BatchFindDuplicates
435
436   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
437              $max_matches, $progress_interval, $progress_callback);
438
439 Goes through the records loaded in the batch and attempts to 
440 find duplicates for each one.  Sets the matching status 
441 of each record to "no_match" or "auto_match" as appropriate.
442
443 The $max_matches parameter is optional; if it is not supplied,
444 it defaults to 10.
445
446 The $progress_interval and $progress_callback parameters are 
447 optional; if both are supplied, the sub referred to by
448 $progress_callback will be invoked every $progress_interval
449 records using the number of records processed as the 
450 singular argument.
451
452 =cut
453
454 sub BatchFindDuplicates {
455     my $batch_id = shift;
456     my $matcher = shift;
457     my $max_matches = @_ ? shift : 10;
458
459     # optional callback to monitor status 
460     # of job
461     my $progress_interval = 0;
462     my $progress_callback = undef;
463     if ($#_ == 1) {
464         $progress_interval = shift;
465         $progress_callback = shift;
466         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
467         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
468     }
469
470     my $dbh = C4::Context->dbh;
471
472     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
473                              FROM import_records
474                              WHERE import_batch_id = ?");
475     $sth->execute($batch_id);
476     my $num_with_matches = 0;
477     my $rec_num = 0;
478     while (my $rowref = $sth->fetchrow_hashref) {
479         $rec_num++;
480         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
481             &$progress_callback($rec_num);
482         }
483         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
484         my @matches = ();
485         if (defined $matcher) {
486             @matches = $matcher->get_matches($marc_record, $max_matches);
487         }
488         if (scalar(@matches) > 0) {
489             $num_with_matches++;
490             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
491             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
492         } else {
493             SetImportRecordMatches($rowref->{'import_record_id'}, ());
494             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
495         }
496     }
497     $sth->finish();
498     return $num_with_matches;
499 }
500
501 =head2 BatchCommitRecords
502
503   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
504         BatchCommitRecords($batch_id, $framework,
505         $progress_interval, $progress_callback);
506
507 =cut
508
509 sub BatchCommitRecords {
510     my $batch_id = shift;
511     my $framework = shift;
512
513     my $schema = Koha::Database->schema;
514
515     # optional callback to monitor status 
516     # of job
517     my $progress_interval = 0;
518     my $progress_callback = undef;
519     if ($#_ == 1) {
520         $progress_interval = shift;
521         $progress_callback = shift;
522         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
523         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
524     }
525
526     my $record_type;
527     my $num_added = 0;
528     my $num_updated = 0;
529     my $num_items_added = 0;
530     my $num_items_replaced = 0;
531     my $num_items_errored = 0;
532     my $num_ignored = 0;
533     # commit (i.e., save, all records in the batch)
534     SetImportBatchStatus($batch_id, 'importing');
535     my $overlay_action = GetImportBatchOverlayAction($batch_id);
536     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
537     my $item_action = GetImportBatchItemAction($batch_id);
538     my $item_tag;
539     my $item_subfield;
540     my $dbh = C4::Context->dbh;
541     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
542                              FROM import_records
543                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
544                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
545                              WHERE import_batch_id = ?");
546     $sth->execute($batch_id);
547     my $marcflavour = C4::Context->preference('marcflavour');
548
549     my $userenv = C4::Context->userenv;
550     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
551
552     my $rec_num = 0;
553     my @biblio_ids;
554     $schema->txn_begin; # We commit in a transaction
555     while (my $rowref = $sth->fetchrow_hashref) {
556         $record_type = $rowref->{'record_type'};
557
558         $rec_num++;
559
560         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
561             # report progress and commit
562             $schema->txn_commit;
563             &$progress_callback( $rec_num );
564             $schema->txn_begin;
565         }
566         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
567             $num_ignored++;
568             next;
569         }
570
571         my $marc_type;
572         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
573             $marc_type = 'UNIMARCAUTH';
574         } elsif ($marcflavour eq 'UNIMARC') {
575             $marc_type = 'UNIMARC';
576         } else {
577             $marc_type = 'USMARC';
578         }
579         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
580
581         if ($record_type eq 'biblio') {
582             # remove any item tags - rely on _batchCommitItems
583             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
584             foreach my $item_field ($marc_record->field($item_tag)) {
585                 $marc_record->delete_field($item_field);
586             }
587         }
588
589         my ($record_result, $item_result, $record_match) =
590             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
591                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
592
593         my $recordid;
594         my $query;
595         if ($record_result eq 'create_new') {
596             $num_added++;
597             if ($record_type eq 'biblio') {
598                 my $biblioitemnumber;
599                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
600                 push @biblio_ids, $recordid;
601                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
602                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
603                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
604                     $num_items_added += $bib_items_added;
605                     $num_items_replaced += $bib_items_replaced;
606                     $num_items_errored += $bib_items_errored;
607                 }
608             } else {
609                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
610                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
611             }
612             my $sth = $dbh->prepare_cached($query);
613             $sth->execute($recordid, $rowref->{'import_record_id'});
614             $sth->finish();
615             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
616         } elsif ($record_result eq 'replace') {
617             $num_updated++;
618             $recordid = $record_match;
619             my $oldxml;
620             if ($record_type eq 'biblio') {
621                 my $oldbiblio = Koha::Biblios->find( $recordid );
622                 $oldxml = GetXmlBiblio($recordid);
623
624                 # remove item fields so that they don't get
625                 # added again if record is reverted
626                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
627                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
628                 foreach my $item_field ($old_marc->field($item_tag)) {
629                     $old_marc->delete_field($item_field);
630                 }
631                 $oldxml = $old_marc->as_xml($marc_type);
632
633                 my $context = { source => 'batchimport' };
634                 if ($logged_in_patron) {
635                     $context->{categorycode} = $logged_in_patron->categorycode;
636                     $context->{userid} = $logged_in_patron->userid;
637                 }
638
639                 ModBiblio(
640                     $marc_record,
641                     $recordid,
642                     $oldbiblio->frameworkcode,
643                     {
644                         overlay_context   => $context,
645                         skip_record_index => 1
646                     }
647                 );
648                 push @biblio_ids, $recordid;
649                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
650
651                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
652                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
653                     $num_items_added += $bib_items_added;
654                     $num_items_replaced += $bib_items_replaced;
655                     $num_items_errored += $bib_items_errored;
656                 }
657             } else {
658                 $oldxml = GetAuthorityXML($recordid);
659
660                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
661                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
662             }
663             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
664             $sth->execute($oldxml, $rowref->{'import_record_id'});
665             $sth->finish();
666             my $sth2 = $dbh->prepare_cached($query);
667             $sth2->execute($recordid, $rowref->{'import_record_id'});
668             $sth2->finish();
669             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
670             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
671         } elsif ($record_result eq 'ignore') {
672             $recordid = $record_match;
673             $num_ignored++;
674             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
675                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
676                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
677                 $num_items_added += $bib_items_added;
678          $num_items_replaced += $bib_items_replaced;
679                 $num_items_errored += $bib_items_errored;
680                 # still need to record the matched biblionumber so that the
681                 # items can be reverted
682                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
683                 $sth2->execute($recordid, $rowref->{'import_record_id'});
684                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
685             }
686             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
687         }
688     }
689     $schema->txn_commit; # Commit final records that may not have hit callback threshold
690     $sth->finish();
691
692     if ( @biblio_ids ) {
693         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
694         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
695     }
696
697     SetImportBatchStatus($batch_id, 'imported');
698     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
699 }
700
701 =head2 _batchCommitItems
702
703   ($num_items_added, $num_items_errored) = 
704          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
705
706 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
707
708 =cut
709
710 sub _batchCommitItems {
711     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
712
713     my $dbh = C4::Context->dbh;
714
715     my $num_items_added = 0;
716     my $num_items_errored = 0;
717     my $num_items_replaced = 0;
718
719     my $sth = $dbh->prepare( "
720         SELECT import_items_id, import_items.marcxml, encoding
721         FROM import_items
722         JOIN import_records USING (import_record_id)
723         WHERE import_record_id = ?
724         ORDER BY import_items_id
725     " );
726     $sth->bind_param( 1, $import_record_id );
727     $sth->execute();
728
729     while ( my $row = $sth->fetchrow_hashref() ) {
730         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
731
732         # Delete date_due subfield as to not accidentally delete item checkout due dates
733         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
734         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
735
736         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
737
738         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
739         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
740
741         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
742         if ( $action eq "replace" && $duplicate_itemnumber ) {
743             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
744             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
745             $updsth->bind_param( 1, 'imported' );
746             $updsth->bind_param( 2, $item->{itemnumber} );
747             $updsth->bind_param( 3, undef );
748             $updsth->bind_param( 4, $row->{'import_items_id'} );
749             $updsth->execute();
750             $updsth->finish();
751             $num_items_replaced++;
752         } elsif ( $action eq "replace" && $duplicate_barcode ) {
753             my $itemnumber = $duplicate_barcode->itemnumber;
754             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
755             $updsth->bind_param( 1, 'imported' );
756             $updsth->bind_param( 2, $item->{itemnumber} );
757             $updsth->bind_param( 3, undef );
758             $updsth->bind_param( 4, $row->{'import_items_id'} );
759             $updsth->execute();
760             $updsth->finish();
761             $num_items_replaced++;
762         } elsif ($duplicate_barcode) {
763             $updsth->bind_param( 1, 'error' );
764             $updsth->bind_param( 2, undef );
765             $updsth->bind_param( 3, 'duplicate item barcode' );
766             $updsth->bind_param( 4, $row->{'import_items_id'} );
767             $updsth->execute();
768             $num_items_errored++;
769         } else {
770             # Remove the itemnumber if it exists, we want to create a new item
771             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
772             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
773
774             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
775             if( $itemnumber ) {
776                 $updsth->bind_param( 1, 'imported' );
777                 $updsth->bind_param( 2, $itemnumber );
778                 $updsth->bind_param( 3, undef );
779                 $updsth->bind_param( 4, $row->{'import_items_id'} );
780                 $updsth->execute();
781                 $updsth->finish();
782                 $num_items_added++;
783             }
784         }
785     }
786
787     return ( $num_items_added, $num_items_replaced, $num_items_errored );
788 }
789
790 =head2 BatchRevertRecords
791
792   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
793       $num_ignored) = BatchRevertRecords($batch_id);
794
795 =cut
796
797 sub BatchRevertRecords {
798     my $batch_id = shift;
799
800     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
801
802     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
803
804     my $record_type;
805     my $num_deleted = 0;
806     my $num_errors = 0;
807     my $num_reverted = 0;
808     my $num_ignored = 0;
809     my $num_items_deleted = 0;
810     # commit (i.e., save, all records in the batch)
811     SetImportBatchStatus($batch_id, 'reverting');
812     my $overlay_action = GetImportBatchOverlayAction($batch_id);
813     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
814     my $dbh = C4::Context->dbh;
815     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
816                              FROM import_records
817                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
818                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
819                              WHERE import_batch_id = ?");
820     $sth->execute($batch_id);
821     my $marc_type;
822     my $marcflavour = C4::Context->preference('marcflavour');
823     while (my $rowref = $sth->fetchrow_hashref) {
824         $record_type = $rowref->{'record_type'};
825         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
826             $num_ignored++;
827             next;
828         }
829         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
830             $marc_type = 'UNIMARCAUTH';
831         } elsif ($marcflavour eq 'UNIMARC') {
832             $marc_type = 'UNIMARC';
833         } else {
834             $marc_type = 'USMARC';
835         }
836
837         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
838
839         if ($record_result eq 'delete') {
840             my $error = undef;
841             if  ($record_type eq 'biblio') {
842                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
843                 $error = DelBiblio($rowref->{'matched_biblionumber'});
844             } else {
845                 DelAuthority({ authid => $rowref->{'matched_authid'} });
846             }
847             if (defined $error) {
848                 $num_errors++;
849             } else {
850                 $num_deleted++;
851                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
852             }
853         } elsif ($record_result eq 'restore') {
854             $num_reverted++;
855             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
856             if ($record_type eq 'biblio') {
857                 my $biblionumber = $rowref->{'matched_biblionumber'};
858                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
859
860                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
861                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
862
863                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
864                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
865             } else {
866                 my $authid = $rowref->{'matched_authid'};
867                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
868             }
869             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
870         } elsif ($record_result eq 'ignore') {
871             if ($record_type eq 'biblio') {
872                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
873             }
874             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
875         }
876         my $query;
877         if ($record_type eq 'biblio') {
878             # remove matched_biblionumber only if there is no 'imported' item left
879             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
880             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
881         } else {
882             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
883         }
884         my $sth2 = $dbh->prepare_cached($query);
885         $sth2->execute($rowref->{'import_record_id'});
886     }
887
888     $sth->finish();
889     SetImportBatchStatus($batch_id, 'reverted');
890     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
891 }
892
893 =head2 BatchRevertItems
894
895   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
896
897 =cut
898
899 sub BatchRevertItems {
900     my ($import_record_id, $biblionumber) = @_;
901
902     my $dbh = C4::Context->dbh;
903     my $num_items_deleted = 0;
904
905     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
906                                    FROM import_items
907                                    JOIN items USING (itemnumber)
908                                    WHERE import_record_id = ?");
909     $sth->bind_param(1, $import_record_id);
910     $sth->execute();
911     while (my $row = $sth->fetchrow_hashref()) {
912         my $item = Koha::Items->find($row->{itemnumber});
913         if ($item->safe_delete){
914             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
915             $updsth->bind_param(1, 'reverted');
916             $updsth->bind_param(2, $row->{'import_items_id'});
917             $updsth->execute();
918             $updsth->finish();
919             $num_items_deleted++;
920         }
921         else {
922             next;
923         }
924     }
925     $sth->finish();
926     return $num_items_deleted;
927 }
928
929 =head2 CleanBatch
930
931   CleanBatch($batch_id)
932
933 Deletes all staged records from the import batch
934 and sets the status of the batch to 'cleaned'.  Note
935 that deleting a stage record does *not* affect
936 any record that has been committed to the database.
937
938 =cut
939
940 sub CleanBatch {
941     my $batch_id = shift;
942     return unless defined $batch_id;
943
944     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
945     SetImportBatchStatus($batch_id, 'cleaned');
946 }
947
948 =head2 DeleteBatch
949
950   DeleteBatch($batch_id)
951
952 Deletes the record from the database. This can only be done
953 once the batch has been cleaned.
954
955 =cut
956
957 sub DeleteBatch {
958     my $batch_id = shift;
959     return unless defined $batch_id;
960
961     my $dbh = C4::Context->dbh;
962     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
963     $sth->execute( $batch_id );
964 }
965
966 =head2 GetAllImportBatches
967
968   my $results = GetAllImportBatches();
969
970 Returns a references to an array of hash references corresponding
971 to all import_batches rows (of batch_type 'batch'), sorted in 
972 ascending order by import_batch_id.
973
974 =cut
975
976 sub  GetAllImportBatches {
977     my $dbh = C4::Context->dbh;
978     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
979                                     WHERE batch_type IN ('batch', 'webservice')
980                                     ORDER BY import_batch_id ASC");
981
982     my $results = [];
983     $sth->execute();
984     while (my $row = $sth->fetchrow_hashref) {
985         push @$results, $row;
986     }
987     $sth->finish();
988     return $results;
989 }
990
991 =head2 GetStagedWebserviceBatches
992
993   my $batch_ids = GetStagedWebserviceBatches();
994
995 Returns a references to an array of batch id's
996 of batch_type 'webservice' that are not imported
997
998 =cut
999
1000 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1001 SELECT import_batch_id FROM import_batches
1002 WHERE batch_type = 'webservice'
1003 AND import_status = 'staged'
1004 EOQ
1005 sub  GetStagedWebserviceBatches {
1006     my $dbh = C4::Context->dbh;
1007     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1008 }
1009
1010 =head2 GetImportBatchRangeDesc
1011
1012   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1013
1014 Returns a reference to an array of hash references corresponding to
1015 import_batches rows (sorted in descending order by import_batch_id)
1016 start at the given offset.
1017
1018 =cut
1019
1020 sub GetImportBatchRangeDesc {
1021     my ($offset, $results_per_group) = @_;
1022
1023     my $dbh = C4::Context->dbh;
1024     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1025                                     LEFT JOIN import_batch_profiles p
1026                                     ON b.profile_id = p.id
1027                                     WHERE b.batch_type IN ('batch', 'webservice')
1028                                     ORDER BY b.import_batch_id DESC";
1029     my @params;
1030     if ($results_per_group){
1031         $query .= " LIMIT ?";
1032         push(@params, $results_per_group);
1033     }
1034     if ($offset){
1035         $query .= " OFFSET ?";
1036         push(@params, $offset);
1037     }
1038     my $sth = $dbh->prepare_cached($query);
1039     $sth->execute(@params);
1040     my $results = $sth->fetchall_arrayref({});
1041     $sth->finish();
1042     return $results;
1043 }
1044
1045 =head2 GetItemNumbersFromImportBatch
1046
1047   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1048
1049 =cut
1050
1051 sub GetItemNumbersFromImportBatch {
1052     my ($batch_id) = @_;
1053     my $dbh = C4::Context->dbh;
1054     my $sql = q|
1055 SELECT itemnumber FROM import_items
1056 INNER JOIN items USING (itemnumber)
1057 INNER JOIN import_records USING (import_record_id)
1058 WHERE import_batch_id = ?|;
1059     my  $sth = $dbh->prepare( $sql );
1060     $sth->execute($batch_id);
1061     my @items ;
1062     while ( my ($itm) = $sth->fetchrow_array ) {
1063         push @items, $itm;
1064     }
1065     return @items;
1066 }
1067
1068 =head2 GetNumberOfImportBatches
1069
1070   my $count = GetNumberOfImportBatches();
1071
1072 =cut
1073
1074 sub GetNumberOfNonZ3950ImportBatches {
1075     my $dbh = C4::Context->dbh;
1076     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1077     $sth->execute();
1078     my ($count) = $sth->fetchrow_array();
1079     $sth->finish();
1080     return $count;
1081 }
1082
1083 =head2 GetImportBiblios
1084
1085   my $results = GetImportBiblios($importid);
1086
1087 =cut
1088
1089 sub GetImportBiblios {
1090     my ($import_record_id) = @_;
1091
1092     my $dbh = C4::Context->dbh;
1093     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1094     return $dbh->selectall_arrayref(
1095         $query,
1096         { Slice => {} },
1097         $import_record_id
1098     );
1099
1100 }
1101
1102 =head2 GetImportRecordsRange
1103
1104   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1105
1106 Returns a reference to an array of hash references corresponding to
1107 import_biblios/import_auths/import_records rows for a given batch
1108 starting at the given offset.
1109
1110 =cut
1111
1112 sub GetImportRecordsRange {
1113     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1114
1115     my $dbh = C4::Context->dbh;
1116
1117     my $order_by = $parameters->{order_by} || 'import_record_id';
1118     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1119
1120     my $order_by_direction =
1121       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1122
1123     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1124
1125     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1126                                            record_sequence, status, overlay_status,
1127                                            matched_biblionumber, matched_authid, record_type
1128                                     FROM   import_records
1129                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1130                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1131                                     WHERE  import_batch_id = ?";
1132     my @params;
1133     push(@params, $batch_id);
1134     if ($status) {
1135         $query .= " AND status=?";
1136         push(@params,$status);
1137     }
1138
1139     $query.=" ORDER BY $order_by $order_by_direction";
1140
1141     if($results_per_group){
1142         $query .= " LIMIT ?";
1143         push(@params, $results_per_group);
1144     }
1145     if($offset){
1146         $query .= " OFFSET ?";
1147         push(@params, $offset);
1148     }
1149     my $sth = $dbh->prepare_cached($query);
1150     $sth->execute(@params);
1151     my $results = $sth->fetchall_arrayref({});
1152     $sth->finish();
1153     return $results;
1154
1155 }
1156
1157 =head2 GetBestRecordMatch
1158
1159   my $record_id = GetBestRecordMatch($import_record_id);
1160
1161 =cut
1162
1163 sub GetBestRecordMatch {
1164     my ($import_record_id) = @_;
1165
1166     my $dbh = C4::Context->dbh;
1167     my $sth = $dbh->prepare("SELECT candidate_match_id
1168                              FROM   import_record_matches
1169                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1170                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1171                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1172                              WHERE  import_record_matches.import_record_id = ? AND
1173                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1174                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1175                              AND chosen = 1
1176                              ORDER BY score DESC, candidate_match_id DESC");
1177     $sth->execute($import_record_id);
1178     my ($record_id) = $sth->fetchrow_array();
1179     $sth->finish();
1180     return $record_id;
1181 }
1182
1183 =head2 GetImportBatchStatus
1184
1185   my $status = GetImportBatchStatus($batch_id);
1186
1187 =cut
1188
1189 sub GetImportBatchStatus {
1190     my ($batch_id) = @_;
1191
1192     my $dbh = C4::Context->dbh;
1193     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1194     $sth->execute($batch_id);
1195     my ($status) = $sth->fetchrow_array();
1196     $sth->finish();
1197     return $status;
1198
1199 }
1200
1201 =head2 SetImportBatchStatus
1202
1203   SetImportBatchStatus($batch_id, $new_status);
1204
1205 =cut
1206
1207 sub SetImportBatchStatus {
1208     my ($batch_id, $new_status) = @_;
1209
1210     my $dbh = C4::Context->dbh;
1211     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1212     $sth->execute($new_status, $batch_id);
1213     $sth->finish();
1214
1215 }
1216
1217 =head2 SetMatchedBiblionumber
1218
1219   SetMatchedBiblionumber($import_record_id, $biblionumber);
1220
1221 =cut
1222
1223 sub SetMatchedBiblionumber {
1224     my ($import_record_id, $biblionumber) = @_;
1225
1226     my $dbh = C4::Context->dbh;
1227     $dbh->do(
1228         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1229         undef, $biblionumber, $import_record_id
1230     );
1231 }
1232
1233 =head2 GetImportBatchOverlayAction
1234
1235   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1236
1237 =cut
1238
1239 sub GetImportBatchOverlayAction {
1240     my ($batch_id) = @_;
1241
1242     my $dbh = C4::Context->dbh;
1243     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1244     $sth->execute($batch_id);
1245     my ($overlay_action) = $sth->fetchrow_array();
1246     $sth->finish();
1247     return $overlay_action;
1248
1249 }
1250
1251
1252 =head2 SetImportBatchOverlayAction
1253
1254   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1255
1256 =cut
1257
1258 sub SetImportBatchOverlayAction {
1259     my ($batch_id, $new_overlay_action) = @_;
1260
1261     my $dbh = C4::Context->dbh;
1262     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1263     $sth->execute($new_overlay_action, $batch_id);
1264     $sth->finish();
1265
1266 }
1267
1268 =head2 GetImportBatchNoMatchAction
1269
1270   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1271
1272 =cut
1273
1274 sub GetImportBatchNoMatchAction {
1275     my ($batch_id) = @_;
1276
1277     my $dbh = C4::Context->dbh;
1278     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1279     $sth->execute($batch_id);
1280     my ($nomatch_action) = $sth->fetchrow_array();
1281     $sth->finish();
1282     return $nomatch_action;
1283
1284 }
1285
1286
1287 =head2 SetImportBatchNoMatchAction
1288
1289   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1290
1291 =cut
1292
1293 sub SetImportBatchNoMatchAction {
1294     my ($batch_id, $new_nomatch_action) = @_;
1295
1296     my $dbh = C4::Context->dbh;
1297     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1298     $sth->execute($new_nomatch_action, $batch_id);
1299     $sth->finish();
1300
1301 }
1302
1303 =head2 GetImportBatchItemAction
1304
1305   my $item_action = GetImportBatchItemAction($batch_id);
1306
1307 =cut
1308
1309 sub GetImportBatchItemAction {
1310     my ($batch_id) = @_;
1311
1312     my $dbh = C4::Context->dbh;
1313     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1314     $sth->execute($batch_id);
1315     my ($item_action) = $sth->fetchrow_array();
1316     $sth->finish();
1317     return $item_action;
1318
1319 }
1320
1321
1322 =head2 SetImportBatchItemAction
1323
1324   SetImportBatchItemAction($batch_id, $new_item_action);
1325
1326 =cut
1327
1328 sub SetImportBatchItemAction {
1329     my ($batch_id, $new_item_action) = @_;
1330
1331     my $dbh = C4::Context->dbh;
1332     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1333     $sth->execute($new_item_action, $batch_id);
1334     $sth->finish();
1335
1336 }
1337
1338 =head2 GetImportBatchMatcher
1339
1340   my $matcher_id = GetImportBatchMatcher($batch_id);
1341
1342 =cut
1343
1344 sub GetImportBatchMatcher {
1345     my ($batch_id) = @_;
1346
1347     my $dbh = C4::Context->dbh;
1348     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1349     $sth->execute($batch_id);
1350     my ($matcher_id) = $sth->fetchrow_array();
1351     $sth->finish();
1352     return $matcher_id;
1353
1354 }
1355
1356
1357 =head2 SetImportBatchMatcher
1358
1359   SetImportBatchMatcher($batch_id, $new_matcher_id);
1360
1361 =cut
1362
1363 sub SetImportBatchMatcher {
1364     my ($batch_id, $new_matcher_id) = @_;
1365
1366     my $dbh = C4::Context->dbh;
1367     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1368     $sth->execute($new_matcher_id, $batch_id);
1369     $sth->finish();
1370
1371 }
1372
1373 =head2 GetImportRecordOverlayStatus
1374
1375   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1376
1377 =cut
1378
1379 sub GetImportRecordOverlayStatus {
1380     my ($import_record_id) = @_;
1381
1382     my $dbh = C4::Context->dbh;
1383     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1384     $sth->execute($import_record_id);
1385     my ($overlay_status) = $sth->fetchrow_array();
1386     $sth->finish();
1387     return $overlay_status;
1388
1389 }
1390
1391
1392 =head2 SetImportRecordOverlayStatus
1393
1394   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1395
1396 =cut
1397
1398 sub SetImportRecordOverlayStatus {
1399     my ($import_record_id, $new_overlay_status) = @_;
1400
1401     my $dbh = C4::Context->dbh;
1402     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1403     $sth->execute($new_overlay_status, $import_record_id);
1404     $sth->finish();
1405
1406 }
1407
1408 =head2 GetImportRecordStatus
1409
1410   my $status = GetImportRecordStatus($import_record_id);
1411
1412 =cut
1413
1414 sub GetImportRecordStatus {
1415     my ($import_record_id) = @_;
1416
1417     my $dbh = C4::Context->dbh;
1418     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1419     $sth->execute($import_record_id);
1420     my ($status) = $sth->fetchrow_array();
1421     $sth->finish();
1422     return $status;
1423
1424 }
1425
1426
1427 =head2 SetImportRecordStatus
1428
1429   SetImportRecordStatus($import_record_id, $new_status);
1430
1431 =cut
1432
1433 sub SetImportRecordStatus {
1434     my ($import_record_id, $new_status) = @_;
1435
1436     my $dbh = C4::Context->dbh;
1437     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1438     $sth->execute($new_status, $import_record_id);
1439     $sth->finish();
1440
1441 }
1442
1443 =head2 GetImportRecordMatches
1444
1445   my $results = GetImportRecordMatches($import_record_id, $best_only);
1446
1447 =cut
1448
1449 sub GetImportRecordMatches {
1450     my $import_record_id = shift;
1451     my $best_only = @_ ? shift : 0;
1452
1453     my $dbh = C4::Context->dbh;
1454     # FIXME currently biblio only
1455     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1456                                     candidate_match_id, score, record_type,
1457                                     chosen
1458                                     FROM import_records
1459                                     JOIN import_record_matches USING (import_record_id)
1460                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1461                                     WHERE import_record_id = ?
1462                                     ORDER BY score DESC, biblionumber DESC");
1463     $sth->bind_param(1, $import_record_id);
1464     my $results = [];
1465     $sth->execute();
1466     while (my $row = $sth->fetchrow_hashref) {
1467         if ($row->{'record_type'} eq 'auth') {
1468             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1469         }
1470         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1471         push @$results, $row;
1472         last if $best_only;
1473     }
1474     $sth->finish();
1475
1476     return $results;
1477     
1478 }
1479
1480 =head2 SetImportRecordMatches
1481
1482   SetImportRecordMatches($import_record_id, @matches);
1483
1484 =cut
1485
1486 sub SetImportRecordMatches {
1487     my $import_record_id = shift;
1488     my @matches = @_;
1489
1490     my $dbh = C4::Context->dbh;
1491     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1492     $delsth->execute($import_record_id);
1493     $delsth->finish();
1494
1495     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1496                                     VALUES (?, ?, ?, ?)");
1497     my $chosen = 1; #The first match is defaulted to be chosen
1498     foreach my $match (@matches) {
1499         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1500         $chosen = 0; #After the first we do not default to other matches
1501     }
1502 }
1503
1504 =head2 RecordsFromISO2709File
1505
1506     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1507
1508 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1509
1510 @PARAM1, String, absolute path to the ISO2709 file.
1511 @PARAM2, String, see stage_file.pl
1512 @PARAM3, String, should be utf8
1513
1514 Returns two array refs.
1515
1516 =cut
1517
1518 sub RecordsFromISO2709File {
1519     my ($input_file, $record_type, $encoding) = @_;
1520     my @errors;
1521
1522     my $marc_type = C4::Context->preference('marcflavour');
1523     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1524
1525     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1526     my @marc_records;
1527     $/ = "\035";
1528     while (<$fh>) {
1529         s/^\s+//;
1530         s/\s+$//;
1531         next unless $_; # skip if record has only whitespace, as might occur
1532                         # if file includes newlines between each MARC record
1533         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1534         push @marc_records, $marc_record;
1535         if ($charset_guessed ne $encoding) {
1536             push @errors,
1537                 "Unexpected charset $charset_guessed, expecting $encoding";
1538         }
1539     }
1540     close $fh;
1541     return ( \@errors, \@marc_records );
1542 }
1543
1544 =head2 RecordsFromMARCXMLFile
1545
1546     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1547
1548 Creates MARC::Record-objects out of the given MARCXML-file.
1549
1550 @PARAM1, String, absolute path to the ISO2709 file.
1551 @PARAM2, String, should be utf8
1552
1553 Returns two array refs.
1554
1555 =cut
1556
1557 sub RecordsFromMARCXMLFile {
1558     my ( $filename, $encoding ) = @_;
1559     my $batch = MARC::File::XML->in( $filename );
1560     my ( @marcRecords, @errors, $record );
1561     do {
1562         eval { $record = $batch->next( $encoding ); };
1563         if ($@) {
1564             push @errors, $@;
1565         }
1566         push @marcRecords, $record if $record;
1567     } while( $record );
1568     return (\@errors, \@marcRecords);
1569 }
1570
1571 =head2 RecordsFromMarcPlugin
1572
1573     Converts text of input_file into array of MARC records with to_marc plugin
1574
1575 =cut
1576
1577 sub RecordsFromMarcPlugin {
1578     my ($input_file, $plugin_class, $encoding) = @_;
1579     my ( $text, @return );
1580     return \@return if !$input_file || !$plugin_class;
1581
1582     # Read input file
1583     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1584     $/ = "\035";
1585     while (<$fh>) {
1586         s/^\s+//;
1587         s/\s+$//;
1588         next unless $_;
1589         $text .= $_;
1590     }
1591     close $fh;
1592
1593     # Convert to large MARC blob with plugin
1594     $text = Koha::Plugins::Handler->run({
1595         class  => $plugin_class,
1596         method => 'to_marc',
1597         params => { data => $text },
1598     }) if $text;
1599
1600     # Convert to array of MARC records
1601     if( $text ) {
1602         my $marc_type = C4::Context->preference('marcflavour');
1603         foreach my $blob ( split(/\x1D/, $text) ) {
1604             next if $blob =~ /^\s*$/;
1605             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1606             push @return, $marcrecord;
1607         }
1608     }
1609     return \@return;
1610 }
1611
1612 # internal functions
1613
1614 sub _create_import_record {
1615     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1616
1617     my $dbh = C4::Context->dbh;
1618     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1619                                                          record_type, encoding)
1620                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1621     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1622                   $record_type, $encoding);
1623     my $import_record_id = $dbh->{'mysql_insertid'};
1624     $sth->finish();
1625     return $import_record_id;
1626 }
1627
1628 sub _add_auth_fields {
1629     my ($import_record_id, $marc_record) = @_;
1630
1631     my $controlnumber;
1632     if ($marc_record->field('001')) {
1633         $controlnumber = $marc_record->field('001')->data();
1634     }
1635     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1636     my $dbh = C4::Context->dbh;
1637     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1638     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1639     $sth->finish();
1640 }
1641
1642 sub _add_biblio_fields {
1643     my ($import_record_id, $marc_record) = @_;
1644
1645     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1646     my $dbh = C4::Context->dbh;
1647     # FIXME no controlnumber, originalsource
1648     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1649     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1650     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1651     $sth->finish();
1652                 
1653 }
1654
1655 sub _update_biblio_fields {
1656     my ($import_record_id, $marc_record) = @_;
1657
1658     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1659     my $dbh = C4::Context->dbh;
1660     # FIXME no controlnumber, originalsource
1661     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1662     $isbn =~ s/\(.*$//;
1663     $isbn =~ tr/ -_//;
1664     $isbn = uc $isbn;
1665     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1666                              WHERE  import_record_id = ?");
1667     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1668     $sth->finish();
1669 }
1670
1671 sub _parse_biblio_fields {
1672     my ($marc_record) = @_;
1673
1674     my $dbh = C4::Context->dbh;
1675     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1676     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1677
1678 }
1679
1680 sub _update_batch_record_counts {
1681     my ($batch_id) = @_;
1682
1683     my $dbh = C4::Context->dbh;
1684     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1685                                         num_records = (
1686                                             SELECT COUNT(*)
1687                                             FROM import_records
1688                                             WHERE import_batch_id = import_batches.import_batch_id),
1689                                         num_items = (
1690                                             SELECT COUNT(*)
1691                                             FROM import_records
1692                                             JOIN import_items USING (import_record_id)
1693                                             WHERE import_batch_id = import_batches.import_batch_id
1694                                             AND record_type = 'biblio')
1695                                     WHERE import_batch_id = ?");
1696     $sth->bind_param(1, $batch_id);
1697     $sth->execute();
1698     $sth->finish();
1699 }
1700
1701 sub _get_commit_action {
1702     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1703     
1704     if ($record_type eq 'biblio') {
1705         my ($bib_result, $bib_match, $item_result);
1706
1707         $bib_match = GetBestRecordMatch($import_record_id);
1708         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1709
1710             $bib_result = $overlay_action;
1711
1712             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1713                 $item_result = 'create_new';
1714             } elsif($item_action eq 'replace'){
1715                 $item_result = 'replace';
1716             } else {
1717                 $item_result = 'ignore';
1718             }
1719
1720         } else {
1721             $bib_result = $nomatch_action;
1722             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1723         }
1724         return ($bib_result, $item_result, $bib_match);
1725     } else { # must be auths
1726         my ($auth_result, $auth_match);
1727
1728         $auth_match = GetBestRecordMatch($import_record_id);
1729         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1730             $auth_result = $overlay_action;
1731         } else {
1732             $auth_result = $nomatch_action;
1733         }
1734
1735         return ($auth_result, undef, $auth_match);
1736
1737     }
1738 }
1739
1740 sub _get_revert_action {
1741     my ($overlay_action, $overlay_status, $status) = @_;
1742
1743     my $bib_result;
1744
1745     if ($status eq 'ignored') {
1746         $bib_result = 'ignore';
1747     } else {
1748         if ($overlay_action eq 'create_new') {
1749             $bib_result = 'delete';
1750         } else {
1751             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1752         }
1753     }
1754     return $bib_result;
1755 }
1756
1757 1;
1758 __END__
1759
1760 =head1 AUTHOR
1761
1762 Koha Development Team <http://koha-community.org/>
1763
1764 Galen Charlton <galen.charlton@liblime.com>
1765
1766 =cut