Bug 36034: (bug 34893 follow-up) fix capture of return values from checkpw
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue;
38 use Koha::Items;
39 use Koha::SearchEngine;
40 use Koha::SearchEngine::Indexer;
41 use Koha::Plugins::Handler;
42 use Koha::Logger;
43
44 our (@ISA, @EXPORT_OK);
45 BEGIN {
46     require Exporter;
47     @ISA       = qw(Exporter);
48     @EXPORT_OK = qw(
49       GetZ3950BatchId
50       GetWebserviceBatchId
51       GetImportRecordMarc
52       AddImportBatch
53       GetImportBatch
54       AddAuthToBatch
55       AddBiblioToBatch
56       AddItemsToImportBiblio
57       ModAuthorityInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub EmbedItemsInImportBiblio {
196     my ( $record, $import_record_id ) = @_;
197     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
198     my $dbh = C4::Context->dbh;
199     my $import_items = $dbh->selectall_arrayref(q|
200         SELECT import_items.marcxml
201         FROM import_items
202         WHERE import_record_id = ?
203     |, { Slice => {} }, $import_record_id );
204     my @item_fields;
205     for my $import_item ( @$import_items ) {
206         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
207         push @item_fields, $item_marc->field($itemtag);
208     }
209     $record->append_fields(@item_fields);
210     return $record;
211 }
212
213 =head2 AddImportBatch
214
215   my $batch_id = AddImportBatch($params_hash);
216
217 =cut
218
219 sub AddImportBatch {
220     my ($params) = @_;
221
222     my (@fields, @vals);
223     foreach (qw( matcher_id template_id branchcode
224                  overlay_action nomatch_action item_action
225                  import_status batch_type file_name comments record_type )) {
226         if (exists $params->{$_}) {
227             push @fields, $_;
228             push @vals, $params->{$_};
229         }
230     }
231     my $dbh = C4::Context->dbh;
232     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
233                                   VALUES (".join( ',', map '?', @fields).")",
234              undef,
235              @vals);
236     return $dbh->{'mysql_insertid'};
237 }
238
239 =head2 GetImportBatch 
240
241   my $row = GetImportBatch($batch_id);
242
243 Retrieve a hashref of an import_batches row.
244
245 =cut
246
247 sub GetImportBatch {
248     my ($batch_id) = @_;
249
250     my $dbh = C4::Context->dbh;
251     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
252     $sth->bind_param(1, $batch_id);
253     $sth->execute();
254     my $result = $sth->fetchrow_hashref;
255     $sth->finish();
256     return $result;
257
258 }
259
260 =head2 AddBiblioToBatch 
261
262   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
263                 $marc_record, $encoding, $update_counts);
264
265 =cut
266
267 sub AddBiblioToBatch {
268     my $batch_id = shift;
269     my $record_sequence = shift;
270     my $marc_record = shift;
271     my $encoding = shift;
272     my $update_counts = @_ ? shift : 1;
273
274     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
275     _add_biblio_fields($import_record_id, $marc_record);
276     _update_batch_record_counts($batch_id) if $update_counts;
277     return $import_record_id;
278 }
279
280 =head2 AddAuthToBatch
281
282   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
283                 $marc_record, $encoding, $update_counts, [$marc_type]);
284
285 =cut
286
287 sub AddAuthToBatch {
288     my $batch_id = shift;
289     my $record_sequence = shift;
290     my $marc_record = shift;
291     my $encoding = shift;
292     my $update_counts = @_ ? shift : 1;
293     my $marc_type = shift || C4::Context->preference('marcflavour');
294
295     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
296
297     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
298     _add_auth_fields($import_record_id, $marc_record);
299     _update_batch_record_counts($batch_id) if $update_counts;
300     return $import_record_id;
301 }
302
303 =head2 BatchStageMarcRecords
304
305 ( $batch_id, $num_records, $num_items, @invalid_records ) =
306   BatchStageMarcRecords(
307     $record_type,                $encoding,
308     $marc_records,               $file_name,
309     $marc_modification_template, $comments,
310     $branch_code,                $parse_items,
311     $leave_as_staging,           $progress_interval,
312     $progress_callback
313   );
314
315 =cut
316
317 sub BatchStageMarcRecords {
318     my $record_type = shift;
319     my $encoding = shift;
320     my $marc_records = shift;
321     my $file_name = shift;
322     my $marc_modification_template = shift;
323     my $comments = shift;
324     my $branch_code = shift;
325     my $parse_items = shift;
326     my $leave_as_staging = shift;
327
328     # optional callback to monitor status 
329     # of job
330     my $progress_interval = 0;
331     my $progress_callback = undef;
332     if ($#_ == 1) {
333         $progress_interval = shift;
334         $progress_callback = shift;
335         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
336         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
337     } 
338     
339     my $batch_id = AddImportBatch( {
340             overlay_action => 'create_new',
341             import_status => 'staging',
342             batch_type => 'batch',
343             file_name => $file_name,
344             comments => $comments,
345             record_type => $record_type,
346         } );
347     if ($parse_items) {
348         SetImportBatchItemAction($batch_id, 'always_add');
349     } else {
350         SetImportBatchItemAction($batch_id, 'ignore');
351     }
352
353
354     my $marc_type = C4::Context->preference('marcflavour');
355     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
356     my @invalid_records = ();
357     my $num_valid = 0;
358     my $num_items = 0;
359     # FIXME - for now, we're dealing only with bibs
360     my $rec_num = 0;
361     foreach my $marc_record (@$marc_records) {
362         $rec_num++;
363         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
364             &$progress_callback($rec_num);
365         }
366
367         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
368
369         my $import_record_id;
370         if (scalar($marc_record->fields()) == 0) {
371             push @invalid_records, $marc_record;
372         } else {
373
374             # Normalize the record so it doesn't have separated diacritics
375             SetUTF8Flag($marc_record);
376
377             $num_valid++;
378             if ($record_type eq 'biblio') {
379                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
380                 if ($parse_items) {
381                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
382                     $num_items += scalar(@import_items_ids);
383                 }
384             } elsif ($record_type eq 'auth') {
385                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
386             }
387         }
388     }
389     unless ($leave_as_staging) {
390         SetImportBatchStatus($batch_id, 'staged');
391     }
392     # FIXME branch_code, number of bibs, number of items
393     _update_batch_record_counts($batch_id);
394     if ($progress_interval){
395         &$progress_callback($rec_num);
396     }
397
398     return ($batch_id, $num_valid, $num_items, @invalid_records);
399 }
400
401 =head2 AddItemsToImportBiblio
402
403   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
404                 $import_record_id, $marc_record, $update_counts);
405
406 =cut
407
408 sub AddItemsToImportBiblio {
409     my $batch_id = shift;
410     my $import_record_id = shift;
411     my $marc_record = shift;
412     my $update_counts = @_ ? shift : 0;
413
414     my @import_items_ids = ();
415    
416     my $dbh = C4::Context->dbh; 
417     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
418     foreach my $item_field ($marc_record->field($item_tag)) {
419         my $item_marc = MARC::Record->new();
420         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
421         $item_marc->append_fields($item_field);
422         $marc_record->delete_field($item_field);
423         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
424                                         VALUES (?, ?, ?)");
425         $sth->bind_param(1, $import_record_id);
426         $sth->bind_param(2, 'staged');
427         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
428         $sth->execute();
429         push @import_items_ids, $dbh->{'mysql_insertid'};
430         $sth->finish();
431     }
432
433     if ($#import_items_ids > -1) {
434         _update_batch_record_counts($batch_id) if $update_counts;
435     }
436     return @import_items_ids;
437 }
438
439 =head2 BatchFindDuplicates
440
441   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
442              $max_matches, $progress_interval, $progress_callback);
443
444 Goes through the records loaded in the batch and attempts to 
445 find duplicates for each one.  Sets the matching status 
446 of each record to "no_match" or "auto_match" as appropriate.
447
448 The $max_matches parameter is optional; if it is not supplied,
449 it defaults to 10.
450
451 The $progress_interval and $progress_callback parameters are 
452 optional; if both are supplied, the sub referred to by
453 $progress_callback will be invoked every $progress_interval
454 records using the number of records processed as the 
455 singular argument.
456
457 =cut
458
459 sub BatchFindDuplicates {
460     my $batch_id = shift;
461     my $matcher = shift;
462     my $max_matches = @_ ? shift : 10;
463
464     # optional callback to monitor status 
465     # of job
466     my $progress_interval = 0;
467     my $progress_callback = undef;
468     if ($#_ == 1) {
469         $progress_interval = shift;
470         $progress_callback = shift;
471         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
472         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
473     }
474
475     my $dbh = C4::Context->dbh;
476
477     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
478                              FROM import_records
479                              WHERE import_batch_id = ?");
480     $sth->execute($batch_id);
481     my $num_with_matches = 0;
482     my $rec_num = 0;
483     while (my $rowref = $sth->fetchrow_hashref) {
484         $rec_num++;
485         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
486             &$progress_callback($rec_num);
487         }
488         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
489         my @matches = ();
490         if (defined $matcher) {
491             @matches = $matcher->get_matches($marc_record, $max_matches);
492         }
493         if (scalar(@matches) > 0) {
494             $num_with_matches++;
495             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
496             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
497         } else {
498             SetImportRecordMatches($rowref->{'import_record_id'}, ());
499             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
500         }
501     }
502
503     if ($progress_interval){
504         &$progress_callback($rec_num);
505     }
506
507     $sth->finish();
508     return $num_with_matches;
509 }
510
511 =head2 BatchCommitRecords
512
513   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
514   'progress_callback' will define code called every X records.
515
516   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
517         BatchCommitRecords({
518             batch_id  => $batch_id,
519             framework => $framework,
520             overlay_framework => $overlay_framework,
521             progress_interval => $progress_interval,
522             progress_callback => $progress_callback,
523         });
524 =cut
525
526 sub BatchCommitRecords {
527     my $params = shift;
528     my $batch_id          = $params->{batch_id};
529     my $framework         = $params->{framework};
530     my $overlay_framework = $params->{overlay_framework};
531     my $progress_interval = $params->{progress_interval} // 0;
532     my $progress_callback = $params->{progress_callback};
533     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
534     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
535
536     my $schema = Koha::Database->schema;
537
538     my $record_type;
539     my $num_added = 0;
540     my $num_updated = 0;
541     my $num_items_added = 0;
542     my $num_items_replaced = 0;
543     my $num_items_errored = 0;
544     my $num_ignored = 0;
545     # commit (i.e., save, all records in the batch)
546     my $overlay_action = GetImportBatchOverlayAction($batch_id);
547     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
548     my $item_action = GetImportBatchItemAction($batch_id);
549     my $item_tag;
550     my $item_subfield;
551     my $dbh = C4::Context->dbh;
552     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
553                              FROM import_records
554                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
555                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
556                              WHERE import_batch_id = ?");
557     $sth->execute($batch_id);
558     my $marcflavour = C4::Context->preference('marcflavour');
559
560     my $userenv = C4::Context->userenv;
561     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
562
563     my $rec_num = 0;
564     my @biblio_ids;
565     my @updated_ids;
566     while (my $rowref = $sth->fetchrow_hashref) {
567         $schema->txn_begin;
568         $record_type = $rowref->{'record_type'};
569
570         $rec_num++;
571
572         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
573             # report progress
574             &$progress_callback( $rec_num );
575         }
576         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
577             $num_ignored++;
578             next;
579         }
580
581         my $marc_type;
582         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
583             $marc_type = 'UNIMARCAUTH';
584         } elsif ($marcflavour eq 'UNIMARC') {
585             $marc_type = 'UNIMARC';
586         } else {
587             $marc_type = 'USMARC';
588         }
589         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
590
591         if ($record_type eq 'biblio') {
592             # remove any item tags - rely on _batchCommitItems
593             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
594             foreach my $item_field ($marc_record->field($item_tag)) {
595                 $marc_record->delete_field($item_field);
596             }
597             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
598                 my @control_num = $marc_record->field('001');
599                 $marc_record->delete_fields(@control_num);
600             }
601         }
602
603         my ($record_result, $item_result, $record_match) =
604             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
605                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
606
607         my $recordid;
608         my $query;
609         if ($record_result eq 'create_new') {
610             $num_added++;
611             if ($record_type eq 'biblio') {
612                 my $biblioitemnumber;
613                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
614                 push @biblio_ids, $recordid if $recordid;
615                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
616                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
617                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
618                     $num_items_added += $bib_items_added;
619                     $num_items_replaced += $bib_items_replaced;
620                     $num_items_errored += $bib_items_errored;
621                 }
622             } else {
623                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
624                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
625             }
626             my $sth = $dbh->prepare_cached($query);
627             $sth->execute($recordid, $rowref->{'import_record_id'});
628             $sth->finish();
629             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
630         } elsif ($record_result eq 'replace') {
631             $num_updated++;
632             $recordid = $record_match;
633             my $oldxml;
634             if ($record_type eq 'biblio') {
635                 my $oldbiblio = Koha::Biblios->find( $recordid );
636                 $oldxml = GetXmlBiblio($recordid);
637
638                 # remove item fields so that they don't get
639                 # added again if record is reverted
640                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
641                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
642                 foreach my $item_field ($old_marc->field($item_tag)) {
643                     $old_marc->delete_field($item_field);
644                 }
645                 $oldxml = $old_marc->as_xml($marc_type);
646
647                 my $context = { source => 'batchimport' };
648                 if ($logged_in_patron) {
649                     $context->{categorycode} = $logged_in_patron->categorycode;
650                     $context->{userid} = $logged_in_patron->userid;
651                 }
652
653                 ModBiblio(
654                     $marc_record,
655                     $recordid,
656                     $overlay_framework // $oldbiblio->frameworkcode,
657                     {
658                         overlay_context   => $context,
659                         skip_record_index => 1,
660                         skip_holds_queue  => 1,
661                     }
662                 );
663                 push @biblio_ids, $recordid;
664                 push @updated_ids, $recordid;
665                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
666
667                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
668                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
669                     $num_items_added += $bib_items_added;
670                     $num_items_replaced += $bib_items_replaced;
671                     $num_items_errored += $bib_items_errored;
672                 }
673             } else {
674                 $oldxml = GetAuthorityXML($recordid);
675
676                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
677                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
678             }
679             # Combine xml update, SetImportRecordOverlayStatus, and SetImportRecordStatus updates into a single update for efficiency, especially in a transaction
680             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ?, status = ?, overlay_status = ? WHERE import_record_id = ?");
681             $sth->execute( $oldxml, 'imported', 'match_applied', $rowref->{'import_record_id'} );
682             $sth->finish();
683             my $sth2 = $dbh->prepare_cached($query);
684             $sth2->execute($recordid, $rowref->{'import_record_id'});
685             $sth2->finish();
686         } elsif ($record_result eq 'ignore') {
687             $recordid = $record_match;
688             $num_ignored++;
689             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
690                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
691                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
692                 $num_items_added += $bib_items_added;
693          $num_items_replaced += $bib_items_replaced;
694                 $num_items_errored += $bib_items_errored;
695                 # still need to record the matched biblionumber so that the
696                 # items can be reverted
697                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
698                 $sth2->execute($recordid, $rowref->{'import_record_id'});
699                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
700             }
701             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
702         }
703         $schema->txn_commit;
704     }
705
706     if ($progress_interval){
707         &$progress_callback($rec_num);
708     }
709
710     $sth->finish();
711
712     SetImportBatchStatus($batch_id, 'imported');
713
714
715     if (@biblio_ids) {
716         my $indexer = Koha::SearchEngine::Indexer->new( { index => $Koha::SearchEngine::BIBLIOS_INDEX } );
717         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
718     }
719     Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue->new->enqueue( { biblio_ids => \@updated_ids } )
720         if ( @updated_ids && C4::Context->preference('RealTimeHoldsQueue') );
721
722     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
723 }
724
725 =head2 _batchCommitItems
726
727   ($num_items_added, $num_items_errored) = 
728          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
729
730 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
731
732 =cut
733
734 sub _batchCommitItems {
735     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
736
737     my $dbh = C4::Context->dbh;
738
739     my $num_items_added = 0;
740     my $num_items_errored = 0;
741     my $num_items_replaced = 0;
742
743     my $sth = $dbh->prepare( "
744         SELECT import_items_id, import_items.marcxml, encoding
745         FROM import_items
746         JOIN import_records USING (import_record_id)
747         WHERE import_record_id = ?
748         ORDER BY import_items_id
749     " );
750     $sth->bind_param( 1, $import_record_id );
751     $sth->execute();
752
753     while ( my $row = $sth->fetchrow_hashref() ) {
754         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
755
756         # Delete date_due subfield as to not accidentally delete item checkout due dates
757         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
758         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
759
760         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
761
762         my $item_match;
763         my $duplicate_barcode = exists( $item->{'barcode'} );
764         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
765
766         # We assume that when replacing items we do not want to move them - the onus is on the importer to
767         # ensure the correct items/records are being updated
768         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
769         if (
770             $action eq "replace" &&
771             $duplicate_itemnumber &&
772             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
773         ) {
774             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
775             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
776             $updsth->bind_param( 1, 'imported' );
777             $updsth->bind_param( 2, $item->{itemnumber} );
778             $updsth->bind_param( 3, undef );
779             $updsth->bind_param( 4, $row->{'import_items_id'} );
780             $updsth->execute();
781             $updsth->finish();
782             $num_items_replaced++;
783         } elsif (
784             $action eq "replace" &&
785             $duplicate_barcode &&
786             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
787         ) {
788             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
789             $updsth->bind_param( 1, 'imported' );
790             $updsth->bind_param( 2, $item->{itemnumber} );
791             $updsth->bind_param( 3, undef );
792             $updsth->bind_param( 4, $row->{'import_items_id'} );
793             $updsth->execute();
794             $updsth->finish();
795             $num_items_replaced++;
796         } elsif (
797             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
798             $duplicate_barcode &&
799             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
800         ) {
801             $updsth->bind_param( 1, 'error' );
802             $updsth->bind_param( 2, undef );
803             $updsth->bind_param( 3, 'duplicate item barcode' );
804             $updsth->bind_param( 4, $row->{'import_items_id'} );
805             $updsth->execute();
806             $num_items_errored++;
807         } else {
808             # Remove the itemnumber if it exists, we want to create a new item
809             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
810             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
811
812             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
813             if( $itemnumber ) {
814                 $updsth->bind_param( 1, 'imported' );
815                 $updsth->bind_param( 2, $itemnumber );
816                 $updsth->bind_param( 3, undef );
817                 $updsth->bind_param( 4, $row->{'import_items_id'} );
818                 $updsth->execute();
819                 $updsth->finish();
820                 $num_items_added++;
821             }
822         }
823     }
824
825     return ( $num_items_added, $num_items_replaced, $num_items_errored );
826 }
827
828 =head2 BatchRevertRecords
829
830   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
831       $num_ignored) = BatchRevertRecords($batch_id);
832
833 =cut
834
835 sub BatchRevertRecords {
836     my $batch_id = shift;
837
838     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
839
840     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
841
842     my $record_type;
843     my $num_deleted = 0;
844     my $num_errors = 0;
845     my $num_reverted = 0;
846     my $num_ignored = 0;
847     my $num_items_deleted = 0;
848     # commit (i.e., save, all records in the batch)
849     SetImportBatchStatus($batch_id, 'reverting');
850     my $overlay_action = GetImportBatchOverlayAction($batch_id);
851     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
852     my $dbh = C4::Context->dbh;
853     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
854                              FROM import_records
855                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
856                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
857                              WHERE import_batch_id = ?");
858     $sth->execute($batch_id);
859     my $marc_type;
860     my $marcflavour = C4::Context->preference('marcflavour');
861     while (my $rowref = $sth->fetchrow_hashref) {
862         $record_type = $rowref->{'record_type'};
863         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
864             $num_ignored++;
865             next;
866         }
867         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
868             $marc_type = 'UNIMARCAUTH';
869         } elsif ($marcflavour eq 'UNIMARC') {
870             $marc_type = 'UNIMARC';
871         } else {
872             $marc_type = 'USMARC';
873         }
874
875         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
876
877         if ($record_result eq 'delete') {
878             my $error = undef;
879             if  ($record_type eq 'biblio') {
880                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
881                 $error = DelBiblio($rowref->{'matched_biblionumber'});
882             } else {
883                 DelAuthority({ authid => $rowref->{'matched_authid'} });
884             }
885             if (defined $error) {
886                 $num_errors++;
887             } else {
888                 $num_deleted++;
889                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
890             }
891         } elsif ($record_result eq 'restore') {
892             $num_reverted++;
893             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
894             if ($record_type eq 'biblio') {
895                 my $biblionumber = $rowref->{'matched_biblionumber'};
896                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
897
898                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
899                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
900
901                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
902                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
903             } else {
904                 my $authid = $rowref->{'matched_authid'};
905                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
906             }
907             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
908         } elsif ($record_result eq 'ignore') {
909             if ($record_type eq 'biblio') {
910                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
911             }
912             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
913         }
914         my $query;
915         if ($record_type eq 'biblio') {
916             # remove matched_biblionumber only if there is no 'imported' item left
917             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
918             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
919         } else {
920             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
921         }
922         my $sth2 = $dbh->prepare_cached($query);
923         $sth2->execute($rowref->{'import_record_id'});
924     }
925
926     $sth->finish();
927     SetImportBatchStatus($batch_id, 'reverted');
928     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
929 }
930
931 =head2 BatchRevertItems
932
933   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
934
935 =cut
936
937 sub BatchRevertItems {
938     my ($import_record_id, $biblionumber) = @_;
939
940     my $dbh = C4::Context->dbh;
941     my $num_items_deleted = 0;
942
943     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
944                                    FROM import_items
945                                    JOIN items USING (itemnumber)
946                                    WHERE import_record_id = ?");
947     $sth->bind_param(1, $import_record_id);
948     $sth->execute();
949     while (my $row = $sth->fetchrow_hashref()) {
950         my $item = Koha::Items->find($row->{itemnumber});
951         if ($item->safe_delete){
952             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
953             $updsth->bind_param(1, 'reverted');
954             $updsth->bind_param(2, $row->{'import_items_id'});
955             $updsth->execute();
956             $updsth->finish();
957             $num_items_deleted++;
958         }
959         else {
960             next;
961         }
962     }
963     $sth->finish();
964     return $num_items_deleted;
965 }
966
967 =head2 CleanBatch
968
969   CleanBatch($batch_id)
970
971 Deletes all staged records from the import batch
972 and sets the status of the batch to 'cleaned'.  Note
973 that deleting a stage record does *not* affect
974 any record that has been committed to the database.
975
976 =cut
977
978 sub CleanBatch {
979     my $batch_id = shift;
980     return unless defined $batch_id;
981
982     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
983     SetImportBatchStatus($batch_id, 'cleaned');
984 }
985
986 =head2 DeleteBatch
987
988   DeleteBatch($batch_id)
989
990 Deletes the record from the database. This can only be done
991 once the batch has been cleaned.
992
993 =cut
994
995 sub DeleteBatch {
996     my $batch_id = shift;
997     return unless defined $batch_id;
998
999     my $dbh = C4::Context->dbh;
1000     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1001     $sth->execute( $batch_id );
1002 }
1003
1004 =head2 GetAllImportBatches
1005
1006   my $results = GetAllImportBatches();
1007
1008 Returns a references to an array of hash references corresponding
1009 to all import_batches rows (of batch_type 'batch'), sorted in 
1010 ascending order by import_batch_id.
1011
1012 =cut
1013
1014 sub  GetAllImportBatches {
1015     my $dbh = C4::Context->dbh;
1016     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1017                                     WHERE batch_type IN ('batch', 'webservice')
1018                                     ORDER BY import_batch_id ASC");
1019
1020     my $results = [];
1021     $sth->execute();
1022     while (my $row = $sth->fetchrow_hashref) {
1023         push @$results, $row;
1024     }
1025     $sth->finish();
1026     return $results;
1027 }
1028
1029 =head2 GetStagedWebserviceBatches
1030
1031   my $batch_ids = GetStagedWebserviceBatches();
1032
1033 Returns a references to an array of batch id's
1034 of batch_type 'webservice' that are not imported
1035
1036 =cut
1037
1038 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1039 SELECT import_batch_id FROM import_batches
1040 WHERE batch_type = 'webservice'
1041 AND import_status = 'staged'
1042 EOQ
1043 sub  GetStagedWebserviceBatches {
1044     my $dbh = C4::Context->dbh;
1045     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1046 }
1047
1048 =head2 GetImportBatchRangeDesc
1049
1050   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1051
1052 Returns a reference to an array of hash references corresponding to
1053 import_batches rows (sorted in descending order by import_batch_id)
1054 start at the given offset.
1055
1056 =cut
1057
1058 sub GetImportBatchRangeDesc {
1059     my ($offset, $results_per_group) = @_;
1060
1061     my $dbh = C4::Context->dbh;
1062     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1063                                     LEFT JOIN import_batch_profiles p
1064                                     ON b.profile_id = p.id
1065                                     WHERE b.batch_type IN ('batch', 'webservice')
1066                                     ORDER BY b.import_batch_id DESC";
1067     my @params;
1068     if ($results_per_group){
1069         $query .= " LIMIT ?";
1070         push(@params, $results_per_group);
1071     }
1072     if ($offset){
1073         $query .= " OFFSET ?";
1074         push(@params, $offset);
1075     }
1076     my $sth = $dbh->prepare_cached($query);
1077     $sth->execute(@params);
1078     my $results = $sth->fetchall_arrayref({});
1079     $sth->finish();
1080     return $results;
1081 }
1082
1083 =head2 GetItemNumbersFromImportBatch
1084
1085   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1086
1087 =cut
1088
1089 sub GetItemNumbersFromImportBatch {
1090     my ($batch_id) = @_;
1091     my $dbh = C4::Context->dbh;
1092     my $sql = q|
1093 SELECT itemnumber FROM import_items
1094 INNER JOIN items USING (itemnumber)
1095 INNER JOIN import_records USING (import_record_id)
1096 WHERE import_batch_id = ?|;
1097     my  $sth = $dbh->prepare( $sql );
1098     $sth->execute($batch_id);
1099     my @items ;
1100     while ( my ($itm) = $sth->fetchrow_array ) {
1101         push @items, $itm;
1102     }
1103     return @items;
1104 }
1105
1106 =head2 GetNumberOfImportBatches
1107
1108   my $count = GetNumberOfImportBatches();
1109
1110 =cut
1111
1112 sub GetNumberOfNonZ3950ImportBatches {
1113     my $dbh = C4::Context->dbh;
1114     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1115     $sth->execute();
1116     my ($count) = $sth->fetchrow_array();
1117     $sth->finish();
1118     return $count;
1119 }
1120
1121 =head2 GetImportBiblios
1122
1123   my $results = GetImportBiblios($importid);
1124
1125 =cut
1126
1127 sub GetImportBiblios {
1128     my ($import_record_id) = @_;
1129
1130     my $dbh = C4::Context->dbh;
1131     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1132     return $dbh->selectall_arrayref(
1133         $query,
1134         { Slice => {} },
1135         $import_record_id
1136     );
1137
1138 }
1139
1140 =head2 GetImportRecordsRange
1141
1142   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1143
1144 Returns a reference to an array of hash references corresponding to
1145 import_biblios/import_auths/import_records rows for a given batch
1146 starting at the given offset.
1147
1148 =cut
1149
1150 sub GetImportRecordsRange {
1151     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1152
1153     my $dbh = C4::Context->dbh;
1154
1155     my $order_by = $parameters->{order_by} || 'import_record_id';
1156     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1157
1158     my $order_by_direction =
1159       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1160
1161     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1162
1163     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1164                                            record_sequence, status, overlay_status,
1165                                            matched_biblionumber, matched_authid, record_type
1166                                     FROM   import_records
1167                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1168                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1169                                     WHERE  import_batch_id = ?";
1170     my @params;
1171     push(@params, $batch_id);
1172     if ($status) {
1173         $query .= " AND status=?";
1174         push(@params,$status);
1175     }
1176
1177     $query.=" ORDER BY $order_by $order_by_direction";
1178
1179     if($results_per_group){
1180         $query .= " LIMIT ?";
1181         push(@params, $results_per_group);
1182     }
1183     if($offset){
1184         $query .= " OFFSET ?";
1185         push(@params, $offset);
1186     }
1187     my $sth = $dbh->prepare_cached($query);
1188     $sth->execute(@params);
1189     my $results = $sth->fetchall_arrayref({});
1190     $sth->finish();
1191     return $results;
1192
1193 }
1194
1195 =head2 GetBestRecordMatch
1196
1197   my $record_id = GetBestRecordMatch($import_record_id);
1198
1199 =cut
1200
1201 sub GetBestRecordMatch {
1202     my ($import_record_id) = @_;
1203
1204     my $dbh = C4::Context->dbh;
1205     my $sth = $dbh->prepare("SELECT candidate_match_id
1206                              FROM   import_record_matches
1207                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1208                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1209                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1210                              WHERE  import_record_matches.import_record_id = ? AND
1211                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1212                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1213                              AND chosen = 1
1214                              ORDER BY score DESC, candidate_match_id DESC");
1215     $sth->execute($import_record_id);
1216     my ($record_id) = $sth->fetchrow_array();
1217     $sth->finish();
1218     return $record_id;
1219 }
1220
1221 =head2 GetImportBatchStatus
1222
1223   my $status = GetImportBatchStatus($batch_id);
1224
1225 =cut
1226
1227 sub GetImportBatchStatus {
1228     my ($batch_id) = @_;
1229
1230     my $dbh = C4::Context->dbh;
1231     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1232     $sth->execute($batch_id);
1233     my ($status) = $sth->fetchrow_array();
1234     $sth->finish();
1235     return $status;
1236
1237 }
1238
1239 =head2 SetImportBatchStatus
1240
1241   SetImportBatchStatus($batch_id, $new_status);
1242
1243 =cut
1244
1245 sub SetImportBatchStatus {
1246     my ($batch_id, $new_status) = @_;
1247
1248     my $dbh = C4::Context->dbh;
1249     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1250     $sth->execute($new_status, $batch_id);
1251     $sth->finish();
1252
1253 }
1254
1255 =head2 SetMatchedBiblionumber
1256
1257   SetMatchedBiblionumber($import_record_id, $biblionumber);
1258
1259 =cut
1260
1261 sub SetMatchedBiblionumber {
1262     my ($import_record_id, $biblionumber) = @_;
1263
1264     my $dbh = C4::Context->dbh;
1265     $dbh->do(
1266         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1267         undef, $biblionumber, $import_record_id
1268     );
1269 }
1270
1271 =head2 GetImportBatchOverlayAction
1272
1273   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1274
1275 =cut
1276
1277 sub GetImportBatchOverlayAction {
1278     my ($batch_id) = @_;
1279
1280     my $dbh = C4::Context->dbh;
1281     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1282     $sth->execute($batch_id);
1283     my ($overlay_action) = $sth->fetchrow_array();
1284     $sth->finish();
1285     return $overlay_action;
1286
1287 }
1288
1289
1290 =head2 SetImportBatchOverlayAction
1291
1292   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1293
1294 =cut
1295
1296 sub SetImportBatchOverlayAction {
1297     my ($batch_id, $new_overlay_action) = @_;
1298
1299     my $dbh = C4::Context->dbh;
1300     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1301     $sth->execute($new_overlay_action, $batch_id);
1302     $sth->finish();
1303
1304 }
1305
1306 =head2 GetImportBatchNoMatchAction
1307
1308   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1309
1310 =cut
1311
1312 sub GetImportBatchNoMatchAction {
1313     my ($batch_id) = @_;
1314
1315     my $dbh = C4::Context->dbh;
1316     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1317     $sth->execute($batch_id);
1318     my ($nomatch_action) = $sth->fetchrow_array();
1319     $sth->finish();
1320     return $nomatch_action;
1321
1322 }
1323
1324
1325 =head2 SetImportBatchNoMatchAction
1326
1327   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1328
1329 =cut
1330
1331 sub SetImportBatchNoMatchAction {
1332     my ($batch_id, $new_nomatch_action) = @_;
1333
1334     my $dbh = C4::Context->dbh;
1335     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1336     $sth->execute($new_nomatch_action, $batch_id);
1337     $sth->finish();
1338
1339 }
1340
1341 =head2 GetImportBatchItemAction
1342
1343   my $item_action = GetImportBatchItemAction($batch_id);
1344
1345 =cut
1346
1347 sub GetImportBatchItemAction {
1348     my ($batch_id) = @_;
1349
1350     my $dbh = C4::Context->dbh;
1351     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1352     $sth->execute($batch_id);
1353     my ($item_action) = $sth->fetchrow_array();
1354     $sth->finish();
1355     return $item_action;
1356
1357 }
1358
1359
1360 =head2 SetImportBatchItemAction
1361
1362   SetImportBatchItemAction($batch_id, $new_item_action);
1363
1364 =cut
1365
1366 sub SetImportBatchItemAction {
1367     my ($batch_id, $new_item_action) = @_;
1368
1369     my $dbh = C4::Context->dbh;
1370     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1371     $sth->execute($new_item_action, $batch_id);
1372     $sth->finish();
1373
1374 }
1375
1376 =head2 GetImportBatchMatcher
1377
1378   my $matcher_id = GetImportBatchMatcher($batch_id);
1379
1380 =cut
1381
1382 sub GetImportBatchMatcher {
1383     my ($batch_id) = @_;
1384
1385     my $dbh = C4::Context->dbh;
1386     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1387     $sth->execute($batch_id);
1388     my ($matcher_id) = $sth->fetchrow_array();
1389     $sth->finish();
1390     return $matcher_id;
1391
1392 }
1393
1394
1395 =head2 SetImportBatchMatcher
1396
1397   SetImportBatchMatcher($batch_id, $new_matcher_id);
1398
1399 =cut
1400
1401 sub SetImportBatchMatcher {
1402     my ($batch_id, $new_matcher_id) = @_;
1403
1404     my $dbh = C4::Context->dbh;
1405     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1406     $sth->execute($new_matcher_id, $batch_id);
1407     $sth->finish();
1408
1409 }
1410
1411 =head2 GetImportRecordOverlayStatus
1412
1413   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1414
1415 =cut
1416
1417 sub GetImportRecordOverlayStatus {
1418     my ($import_record_id) = @_;
1419
1420     my $dbh = C4::Context->dbh;
1421     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1422     $sth->execute($import_record_id);
1423     my ($overlay_status) = $sth->fetchrow_array();
1424     $sth->finish();
1425     return $overlay_status;
1426
1427 }
1428
1429
1430 =head2 SetImportRecordOverlayStatus
1431
1432   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1433
1434 =cut
1435
1436 sub SetImportRecordOverlayStatus {
1437     my ($import_record_id, $new_overlay_status) = @_;
1438
1439     my $dbh = C4::Context->dbh;
1440     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1441     $sth->execute($new_overlay_status, $import_record_id);
1442     $sth->finish();
1443
1444 }
1445
1446 =head2 GetImportRecordStatus
1447
1448   my $status = GetImportRecordStatus($import_record_id);
1449
1450 =cut
1451
1452 sub GetImportRecordStatus {
1453     my ($import_record_id) = @_;
1454
1455     my $dbh = C4::Context->dbh;
1456     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1457     $sth->execute($import_record_id);
1458     my ($status) = $sth->fetchrow_array();
1459     $sth->finish();
1460     return $status;
1461
1462 }
1463
1464
1465 =head2 SetImportRecordStatus
1466
1467   SetImportRecordStatus($import_record_id, $new_status);
1468
1469 =cut
1470
1471 sub SetImportRecordStatus {
1472     my ($import_record_id, $new_status) = @_;
1473
1474     my $dbh = C4::Context->dbh;
1475     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1476     $sth->execute($new_status, $import_record_id);
1477     $sth->finish();
1478
1479 }
1480
1481 =head2 GetImportRecordMatches
1482
1483   my $results = GetImportRecordMatches($import_record_id, $best_only);
1484
1485 =cut
1486
1487 sub GetImportRecordMatches {
1488     my $import_record_id = shift;
1489     my $best_only = @_ ? shift : 0;
1490
1491     my $dbh = C4::Context->dbh;
1492     # FIXME currently biblio only
1493     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1494                                     candidate_match_id, score, record_type,
1495                                     chosen
1496                                     FROM import_records
1497                                     JOIN import_record_matches USING (import_record_id)
1498                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1499                                     WHERE import_record_id = ?
1500                                     ORDER BY score DESC, biblionumber DESC");
1501     $sth->bind_param(1, $import_record_id);
1502     my $results = [];
1503     $sth->execute();
1504     while (my $row = $sth->fetchrow_hashref) {
1505         if ($row->{'record_type'} eq 'auth') {
1506             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1507         }
1508         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1509         push @$results, $row;
1510         last if $best_only;
1511     }
1512     $sth->finish();
1513
1514     return $results;
1515     
1516 }
1517
1518 =head2 SetImportRecordMatches
1519
1520   SetImportRecordMatches($import_record_id, @matches);
1521
1522 =cut
1523
1524 sub SetImportRecordMatches {
1525     my $import_record_id = shift;
1526     my @matches = @_;
1527
1528     my $dbh = C4::Context->dbh;
1529     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1530     $delsth->execute($import_record_id);
1531     $delsth->finish();
1532
1533     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1534                                     VALUES (?, ?, ?, ?)");
1535     my $chosen = 1; #The first match is defaulted to be chosen
1536     foreach my $match (@matches) {
1537         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1538         $chosen = 0; #After the first we do not default to other matches
1539     }
1540 }
1541
1542 =head2 RecordsFromISO2709File
1543
1544     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1545
1546 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1547
1548 @PARAM1, String, absolute path to the ISO2709 file.
1549 @PARAM2, String, see stage_file.pl
1550 @PARAM3, String, should be utf8
1551
1552 Returns two array refs.
1553
1554 =cut
1555
1556 sub RecordsFromISO2709File {
1557     my ($input_file, $record_type, $encoding) = @_;
1558     my @errors;
1559
1560     my $marc_type = C4::Context->preference('marcflavour');
1561     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1562
1563     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1564     my @marc_records;
1565     $/ = "\035";
1566     while (<$fh>) {
1567         s/^\s+//;
1568         s/\s+$//;
1569         next unless $_; # skip if record has only whitespace, as might occur
1570                         # if file includes newlines between each MARC record
1571         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1572         push @marc_records, $marc_record;
1573         if ($charset_guessed ne $encoding) {
1574             push @errors,
1575                 "Unexpected charset $charset_guessed, expecting $encoding";
1576         }
1577     }
1578     close $fh;
1579     return ( \@errors, \@marc_records );
1580 }
1581
1582 =head2 RecordsFromMARCXMLFile
1583
1584     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1585
1586 Creates MARC::Record-objects out of the given MARCXML-file.
1587
1588 @PARAM1, String, absolute path to the MARCXML file.
1589 @PARAM2, String, should be utf8
1590
1591 Returns two array refs.
1592
1593 =cut
1594
1595 sub RecordsFromMARCXMLFile {
1596     my ( $filename, $encoding ) = @_;
1597     my $batch = MARC::File::XML->in( $filename );
1598     my ( @marcRecords, @errors, $record );
1599     do {
1600         eval { $record = $batch->next( $encoding ); };
1601         if ($@) {
1602             push @errors, $@;
1603         }
1604         push @marcRecords, $record if $record;
1605     } while( $record );
1606     return (\@errors, \@marcRecords);
1607 }
1608
1609 =head2 RecordsFromMarcPlugin
1610
1611     Converts text of input_file into array of MARC records with to_marc plugin
1612
1613 =cut
1614
1615 sub RecordsFromMarcPlugin {
1616     my ($input_file, $plugin_class, $encoding) = @_;
1617     my ( $text, @return );
1618     return \@return if !$input_file || !$plugin_class;
1619
1620     # Read input file
1621     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1622     $/ = "\035";
1623     while (<$fh>) {
1624         s/^\s+//;
1625         s/\s+$//;
1626         next unless $_;
1627         $text .= $_;
1628     }
1629     close $fh;
1630
1631     # Convert to large MARC blob with plugin
1632     $text = Koha::Plugins::Handler->run({
1633         class  => $plugin_class,
1634         method => 'to_marc',
1635         params => { data => $text },
1636     }) if $text;
1637
1638     # Convert to array of MARC records
1639     if( $text ) {
1640         my $marc_type = C4::Context->preference('marcflavour');
1641         foreach my $blob ( split(/\x1D/, $text) ) {
1642             next if $blob =~ /^\s*$/;
1643             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1644             push @return, $marcrecord;
1645         }
1646     }
1647     return \@return;
1648 }
1649
1650 # internal functions
1651
1652 sub _create_import_record {
1653     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1654
1655     my $dbh = C4::Context->dbh;
1656     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1657                                                          record_type, encoding)
1658                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1659     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1660                   $record_type, $encoding);
1661     my $import_record_id = $dbh->{'mysql_insertid'};
1662     $sth->finish();
1663     return $import_record_id;
1664 }
1665
1666 sub _add_auth_fields {
1667     my ($import_record_id, $marc_record) = @_;
1668
1669     my $controlnumber;
1670     if ($marc_record->field('001')) {
1671         $controlnumber = $marc_record->field('001')->data();
1672     }
1673     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1674     my $dbh = C4::Context->dbh;
1675     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1676     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1677     $sth->finish();
1678 }
1679
1680 sub _add_biblio_fields {
1681     my ($import_record_id, $marc_record) = @_;
1682
1683     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1684     my $dbh = C4::Context->dbh;
1685     # FIXME no controlnumber, originalsource
1686     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1687     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1688     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1689     $sth->finish();
1690                 
1691 }
1692
1693 sub _update_biblio_fields {
1694     my ($import_record_id, $marc_record) = @_;
1695
1696     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1697     my $dbh = C4::Context->dbh;
1698     # FIXME no controlnumber, originalsource
1699     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1700     $isbn =~ s/\(.*$//;
1701     $isbn =~ tr/ -_//;
1702     $isbn = uc $isbn;
1703     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1704                              WHERE  import_record_id = ?");
1705     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1706     $sth->finish();
1707 }
1708
1709 sub _parse_biblio_fields {
1710     my ($marc_record) = @_;
1711
1712     my $dbh = C4::Context->dbh;
1713     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1714     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1715
1716 }
1717
1718 sub _update_batch_record_counts {
1719     my ($batch_id) = @_;
1720
1721     my $dbh = C4::Context->dbh;
1722     my ( $num_records ) = $dbh->selectrow_array(q|
1723                                             SELECT COUNT(*)
1724                                             FROM import_records
1725                                             WHERE import_batch_id = ?
1726     |, undef, $batch_id );
1727     my ( $num_items ) = $dbh->selectrow_array(q|
1728                                             SELECT COUNT(*)
1729                                             FROM import_records
1730                                             JOIN import_items USING (import_record_id)
1731                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1732     |, undef, $batch_id );
1733     $dbh->do(
1734         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1735         undef,
1736         $num_records,
1737         $num_items,
1738         $batch_id,
1739     );
1740 }
1741
1742 sub _get_commit_action {
1743     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1744     
1745     if ($record_type eq 'biblio') {
1746         my ($bib_result, $bib_match, $item_result);
1747
1748         $bib_match = GetBestRecordMatch($import_record_id);
1749         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1750
1751             $bib_result = $overlay_action;
1752
1753             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1754                 $item_result = 'create_new';
1755             } elsif($item_action eq 'replace'){
1756                 $item_result = 'replace';
1757             } else {
1758                 $item_result = 'ignore';
1759             }
1760
1761         } else {
1762             $bib_result = $nomatch_action;
1763             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1764         }
1765         return ($bib_result, $item_result, $bib_match);
1766     } else { # must be auths
1767         my ($auth_result, $auth_match);
1768
1769         $auth_match = GetBestRecordMatch($import_record_id);
1770         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1771             $auth_result = $overlay_action;
1772         } else {
1773             $auth_result = $nomatch_action;
1774         }
1775
1776         return ($auth_result, undef, $auth_match);
1777
1778     }
1779 }
1780
1781 sub _get_revert_action {
1782     my ($overlay_action, $overlay_status, $status) = @_;
1783
1784     my $bib_result;
1785
1786     if ($status eq 'ignored') {
1787         $bib_result = 'ignore';
1788     } else {
1789         if ($overlay_action eq 'create_new') {
1790             $bib_result = 'delete';
1791         } else {
1792             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1793         }
1794     }
1795     return $bib_result;
1796 }
1797
1798 1;
1799 __END__
1800
1801 =head1 AUTHOR
1802
1803 Koha Development Team <http://koha-community.org/>
1804
1805 Galen Charlton <galen.charlton@liblime.com>
1806
1807 =cut