Bug 32804: (QA follow-up) Typo ahs and fix ImportBatch.t
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
513   'progress_callback' will define code called every X records.
514
515   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
516         BatchCommitRecords({
517             batch_id  => $batch_id,
518             framework => $framework,
519             overlay_framework => $overlay_framework,
520             progress_interval => $progress_interval,
521             progress_callback => $progress_callback,
522             skip_intermediate_commit => $skip_intermediate_commit
523         });
524
525     Parameter skip_intermediate_commit does what is says.
526 =cut
527
528 sub BatchCommitRecords {
529     my $params = shift;
530     my $batch_id          = $params->{batch_id};
531     my $framework         = $params->{framework};
532     my $overlay_framework = $params->{overlay_framework};
533     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
534     my $progress_interval = $params->{progress_interval} // 0;
535     my $progress_callback = $params->{progress_callback};
536     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
537     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
538
539     my $schema = Koha::Database->schema;
540     $schema->txn_begin;
541     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
542     # transactions may be committed and started too again. The final commit is close to the end.
543
544     my $record_type;
545     my $num_added = 0;
546     my $num_updated = 0;
547     my $num_items_added = 0;
548     my $num_items_replaced = 0;
549     my $num_items_errored = 0;
550     my $num_ignored = 0;
551     # commit (i.e., save, all records in the batch)
552     SetImportBatchStatus($batch_id, 'importing');
553     my $overlay_action = GetImportBatchOverlayAction($batch_id);
554     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
555     my $item_action = GetImportBatchItemAction($batch_id);
556     my $item_tag;
557     my $item_subfield;
558     my $dbh = C4::Context->dbh;
559     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
560                              FROM import_records
561                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
562                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
563                              WHERE import_batch_id = ?");
564     $sth->execute($batch_id);
565     my $marcflavour = C4::Context->preference('marcflavour');
566
567     my $userenv = C4::Context->userenv;
568     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
569
570     my $rec_num = 0;
571     my @biblio_ids;
572     while (my $rowref = $sth->fetchrow_hashref) {
573         $record_type = $rowref->{'record_type'};
574
575         $rec_num++;
576
577         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
578             # report progress and commit
579             $schema->txn_commit unless $skip_intermediate_commit;
580             &$progress_callback( $rec_num );
581             $schema->txn_begin unless $skip_intermediate_commit;
582         }
583         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
584             $num_ignored++;
585             next;
586         }
587
588         my $marc_type;
589         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
590             $marc_type = 'UNIMARCAUTH';
591         } elsif ($marcflavour eq 'UNIMARC') {
592             $marc_type = 'UNIMARC';
593         } else {
594             $marc_type = 'USMARC';
595         }
596         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
597
598         if ($record_type eq 'biblio') {
599             # remove any item tags - rely on _batchCommitItems
600             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
601             foreach my $item_field ($marc_record->field($item_tag)) {
602                 $marc_record->delete_field($item_field);
603             }
604             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
605                 my @control_num = $marc_record->field('001');
606                 $marc_record->delete_fields(@control_num);
607             }
608         }
609
610         my ($record_result, $item_result, $record_match) =
611             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
612                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
613
614         my $recordid;
615         my $query;
616         if ($record_result eq 'create_new') {
617             $num_added++;
618             if ($record_type eq 'biblio') {
619                 my $biblioitemnumber;
620                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
621                 push @biblio_ids, $recordid;
622                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
623                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
624                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
625                     $num_items_added += $bib_items_added;
626                     $num_items_replaced += $bib_items_replaced;
627                     $num_items_errored += $bib_items_errored;
628                 }
629             } else {
630                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
631                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
632             }
633             my $sth = $dbh->prepare_cached($query);
634             $sth->execute($recordid, $rowref->{'import_record_id'});
635             $sth->finish();
636             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
637         } elsif ($record_result eq 'replace') {
638             $num_updated++;
639             $recordid = $record_match;
640             my $oldxml;
641             if ($record_type eq 'biblio') {
642                 my $oldbiblio = Koha::Biblios->find( $recordid );
643                 $oldxml = GetXmlBiblio($recordid);
644
645                 # remove item fields so that they don't get
646                 # added again if record is reverted
647                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
648                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
649                 foreach my $item_field ($old_marc->field($item_tag)) {
650                     $old_marc->delete_field($item_field);
651                 }
652                 $oldxml = $old_marc->as_xml($marc_type);
653
654                 my $context = { source => 'batchimport' };
655                 if ($logged_in_patron) {
656                     $context->{categorycode} = $logged_in_patron->categorycode;
657                     $context->{userid} = $logged_in_patron->userid;
658                 }
659
660                 ModBiblio(
661                     $marc_record,
662                     $recordid,
663                     $overlay_framework // $oldbiblio->frameworkcode,
664                     {
665                         overlay_context   => $context,
666                         skip_record_index => 1
667                     }
668                 );
669                 push @biblio_ids, $recordid;
670                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
671
672                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
673                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
674                     $num_items_added += $bib_items_added;
675                     $num_items_replaced += $bib_items_replaced;
676                     $num_items_errored += $bib_items_errored;
677                 }
678             } else {
679                 $oldxml = GetAuthorityXML($recordid);
680
681                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
682                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
683             }
684             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
685             $sth->execute($oldxml, $rowref->{'import_record_id'});
686             $sth->finish();
687             my $sth2 = $dbh->prepare_cached($query);
688             $sth2->execute($recordid, $rowref->{'import_record_id'});
689             $sth2->finish();
690             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
691             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
692         } elsif ($record_result eq 'ignore') {
693             $recordid = $record_match;
694             $num_ignored++;
695             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
696                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
697                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
698                 $num_items_added += $bib_items_added;
699          $num_items_replaced += $bib_items_replaced;
700                 $num_items_errored += $bib_items_errored;
701                 # still need to record the matched biblionumber so that the
702                 # items can be reverted
703                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
704                 $sth2->execute($recordid, $rowref->{'import_record_id'});
705                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
706             }
707             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
708         }
709     }
710
711     if ($progress_interval){
712         &$progress_callback($rec_num);
713     }
714
715     $sth->finish();
716
717     if ( @biblio_ids ) {
718         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
719         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
720     }
721
722     SetImportBatchStatus($batch_id, 'imported');
723
724     # Moved final commit to the end
725     $schema->txn_commit;
726
727     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
728 }
729
730 =head2 _batchCommitItems
731
732   ($num_items_added, $num_items_errored) = 
733          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
734
735 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
736
737 =cut
738
739 sub _batchCommitItems {
740     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
741
742     my $dbh = C4::Context->dbh;
743
744     my $num_items_added = 0;
745     my $num_items_errored = 0;
746     my $num_items_replaced = 0;
747
748     my $sth = $dbh->prepare( "
749         SELECT import_items_id, import_items.marcxml, encoding
750         FROM import_items
751         JOIN import_records USING (import_record_id)
752         WHERE import_record_id = ?
753         ORDER BY import_items_id
754     " );
755     $sth->bind_param( 1, $import_record_id );
756     $sth->execute();
757
758     while ( my $row = $sth->fetchrow_hashref() ) {
759         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
760
761         # Delete date_due subfield as to not accidentally delete item checkout due dates
762         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
763         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
764
765         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
766
767         my $item_match;
768         my $duplicate_barcode = exists( $item->{'barcode'} );
769         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
770
771         # We assume that when replacing items we do not want to move them - the onus is on the importer to
772         # ensure the correct items/records are being updated
773         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
774         if (
775             $action eq "replace" &&
776             $duplicate_itemnumber &&
777             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
778         ) {
779             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
780             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
781             $updsth->bind_param( 1, 'imported' );
782             $updsth->bind_param( 2, $item->{itemnumber} );
783             $updsth->bind_param( 3, undef );
784             $updsth->bind_param( 4, $row->{'import_items_id'} );
785             $updsth->execute();
786             $updsth->finish();
787             $num_items_replaced++;
788         } elsif (
789             $action eq "replace" &&
790             $duplicate_barcode &&
791             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
792         ) {
793             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
794             $updsth->bind_param( 1, 'imported' );
795             $updsth->bind_param( 2, $item->{itemnumber} );
796             $updsth->bind_param( 3, undef );
797             $updsth->bind_param( 4, $row->{'import_items_id'} );
798             $updsth->execute();
799             $updsth->finish();
800             $num_items_replaced++;
801         } elsif (
802             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
803             $duplicate_barcode &&
804             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
805         ) {
806             $updsth->bind_param( 1, 'error' );
807             $updsth->bind_param( 2, undef );
808             $updsth->bind_param( 3, 'duplicate item barcode' );
809             $updsth->bind_param( 4, $row->{'import_items_id'} );
810             $updsth->execute();
811             $num_items_errored++;
812         } else {
813             # Remove the itemnumber if it exists, we want to create a new item
814             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
815             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
816
817             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
818             if( $itemnumber ) {
819                 $updsth->bind_param( 1, 'imported' );
820                 $updsth->bind_param( 2, $itemnumber );
821                 $updsth->bind_param( 3, undef );
822                 $updsth->bind_param( 4, $row->{'import_items_id'} );
823                 $updsth->execute();
824                 $updsth->finish();
825                 $num_items_added++;
826             }
827         }
828     }
829
830     return ( $num_items_added, $num_items_replaced, $num_items_errored );
831 }
832
833 =head2 BatchRevertRecords
834
835   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
836       $num_ignored) = BatchRevertRecords($batch_id);
837
838 =cut
839
840 sub BatchRevertRecords {
841     my $batch_id = shift;
842
843     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
844
845     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
846
847     my $record_type;
848     my $num_deleted = 0;
849     my $num_errors = 0;
850     my $num_reverted = 0;
851     my $num_ignored = 0;
852     my $num_items_deleted = 0;
853     # commit (i.e., save, all records in the batch)
854     SetImportBatchStatus($batch_id, 'reverting');
855     my $overlay_action = GetImportBatchOverlayAction($batch_id);
856     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
857     my $dbh = C4::Context->dbh;
858     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
859                              FROM import_records
860                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
861                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
862                              WHERE import_batch_id = ?");
863     $sth->execute($batch_id);
864     my $marc_type;
865     my $marcflavour = C4::Context->preference('marcflavour');
866     while (my $rowref = $sth->fetchrow_hashref) {
867         $record_type = $rowref->{'record_type'};
868         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
869             $num_ignored++;
870             next;
871         }
872         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
873             $marc_type = 'UNIMARCAUTH';
874         } elsif ($marcflavour eq 'UNIMARC') {
875             $marc_type = 'UNIMARC';
876         } else {
877             $marc_type = 'USMARC';
878         }
879
880         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
881
882         if ($record_result eq 'delete') {
883             my $error = undef;
884             if  ($record_type eq 'biblio') {
885                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
886                 $error = DelBiblio($rowref->{'matched_biblionumber'});
887             } else {
888                 DelAuthority({ authid => $rowref->{'matched_authid'} });
889             }
890             if (defined $error) {
891                 $num_errors++;
892             } else {
893                 $num_deleted++;
894                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
895             }
896         } elsif ($record_result eq 'restore') {
897             $num_reverted++;
898             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
899             if ($record_type eq 'biblio') {
900                 my $biblionumber = $rowref->{'matched_biblionumber'};
901                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
902
903                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
904                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
905
906                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
907                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
908             } else {
909                 my $authid = $rowref->{'matched_authid'};
910                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
911             }
912             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
913         } elsif ($record_result eq 'ignore') {
914             if ($record_type eq 'biblio') {
915                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
916             }
917             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
918         }
919         my $query;
920         if ($record_type eq 'biblio') {
921             # remove matched_biblionumber only if there is no 'imported' item left
922             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
923             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
924         } else {
925             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
926         }
927         my $sth2 = $dbh->prepare_cached($query);
928         $sth2->execute($rowref->{'import_record_id'});
929     }
930
931     $sth->finish();
932     SetImportBatchStatus($batch_id, 'reverted');
933     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
934 }
935
936 =head2 BatchRevertItems
937
938   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
939
940 =cut
941
942 sub BatchRevertItems {
943     my ($import_record_id, $biblionumber) = @_;
944
945     my $dbh = C4::Context->dbh;
946     my $num_items_deleted = 0;
947
948     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
949                                    FROM import_items
950                                    JOIN items USING (itemnumber)
951                                    WHERE import_record_id = ?");
952     $sth->bind_param(1, $import_record_id);
953     $sth->execute();
954     while (my $row = $sth->fetchrow_hashref()) {
955         my $item = Koha::Items->find($row->{itemnumber});
956         if ($item->safe_delete){
957             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
958             $updsth->bind_param(1, 'reverted');
959             $updsth->bind_param(2, $row->{'import_items_id'});
960             $updsth->execute();
961             $updsth->finish();
962             $num_items_deleted++;
963         }
964         else {
965             next;
966         }
967     }
968     $sth->finish();
969     return $num_items_deleted;
970 }
971
972 =head2 CleanBatch
973
974   CleanBatch($batch_id)
975
976 Deletes all staged records from the import batch
977 and sets the status of the batch to 'cleaned'.  Note
978 that deleting a stage record does *not* affect
979 any record that has been committed to the database.
980
981 =cut
982
983 sub CleanBatch {
984     my $batch_id = shift;
985     return unless defined $batch_id;
986
987     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
988     SetImportBatchStatus($batch_id, 'cleaned');
989 }
990
991 =head2 DeleteBatch
992
993   DeleteBatch($batch_id)
994
995 Deletes the record from the database. This can only be done
996 once the batch has been cleaned.
997
998 =cut
999
1000 sub DeleteBatch {
1001     my $batch_id = shift;
1002     return unless defined $batch_id;
1003
1004     my $dbh = C4::Context->dbh;
1005     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1006     $sth->execute( $batch_id );
1007 }
1008
1009 =head2 GetAllImportBatches
1010
1011   my $results = GetAllImportBatches();
1012
1013 Returns a references to an array of hash references corresponding
1014 to all import_batches rows (of batch_type 'batch'), sorted in 
1015 ascending order by import_batch_id.
1016
1017 =cut
1018
1019 sub  GetAllImportBatches {
1020     my $dbh = C4::Context->dbh;
1021     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1022                                     WHERE batch_type IN ('batch', 'webservice')
1023                                     ORDER BY import_batch_id ASC");
1024
1025     my $results = [];
1026     $sth->execute();
1027     while (my $row = $sth->fetchrow_hashref) {
1028         push @$results, $row;
1029     }
1030     $sth->finish();
1031     return $results;
1032 }
1033
1034 =head2 GetStagedWebserviceBatches
1035
1036   my $batch_ids = GetStagedWebserviceBatches();
1037
1038 Returns a references to an array of batch id's
1039 of batch_type 'webservice' that are not imported
1040
1041 =cut
1042
1043 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1044 SELECT import_batch_id FROM import_batches
1045 WHERE batch_type = 'webservice'
1046 AND import_status = 'staged'
1047 EOQ
1048 sub  GetStagedWebserviceBatches {
1049     my $dbh = C4::Context->dbh;
1050     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1051 }
1052
1053 =head2 GetImportBatchRangeDesc
1054
1055   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1056
1057 Returns a reference to an array of hash references corresponding to
1058 import_batches rows (sorted in descending order by import_batch_id)
1059 start at the given offset.
1060
1061 =cut
1062
1063 sub GetImportBatchRangeDesc {
1064     my ($offset, $results_per_group) = @_;
1065
1066     my $dbh = C4::Context->dbh;
1067     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1068                                     LEFT JOIN import_batch_profiles p
1069                                     ON b.profile_id = p.id
1070                                     WHERE b.batch_type IN ('batch', 'webservice')
1071                                     ORDER BY b.import_batch_id DESC";
1072     my @params;
1073     if ($results_per_group){
1074         $query .= " LIMIT ?";
1075         push(@params, $results_per_group);
1076     }
1077     if ($offset){
1078         $query .= " OFFSET ?";
1079         push(@params, $offset);
1080     }
1081     my $sth = $dbh->prepare_cached($query);
1082     $sth->execute(@params);
1083     my $results = $sth->fetchall_arrayref({});
1084     $sth->finish();
1085     return $results;
1086 }
1087
1088 =head2 GetItemNumbersFromImportBatch
1089
1090   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1091
1092 =cut
1093
1094 sub GetItemNumbersFromImportBatch {
1095     my ($batch_id) = @_;
1096     my $dbh = C4::Context->dbh;
1097     my $sql = q|
1098 SELECT itemnumber FROM import_items
1099 INNER JOIN items USING (itemnumber)
1100 INNER JOIN import_records USING (import_record_id)
1101 WHERE import_batch_id = ?|;
1102     my  $sth = $dbh->prepare( $sql );
1103     $sth->execute($batch_id);
1104     my @items ;
1105     while ( my ($itm) = $sth->fetchrow_array ) {
1106         push @items, $itm;
1107     }
1108     return @items;
1109 }
1110
1111 =head2 GetNumberOfImportBatches
1112
1113   my $count = GetNumberOfImportBatches();
1114
1115 =cut
1116
1117 sub GetNumberOfNonZ3950ImportBatches {
1118     my $dbh = C4::Context->dbh;
1119     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1120     $sth->execute();
1121     my ($count) = $sth->fetchrow_array();
1122     $sth->finish();
1123     return $count;
1124 }
1125
1126 =head2 GetImportBiblios
1127
1128   my $results = GetImportBiblios($importid);
1129
1130 =cut
1131
1132 sub GetImportBiblios {
1133     my ($import_record_id) = @_;
1134
1135     my $dbh = C4::Context->dbh;
1136     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1137     return $dbh->selectall_arrayref(
1138         $query,
1139         { Slice => {} },
1140         $import_record_id
1141     );
1142
1143 }
1144
1145 =head2 GetImportRecordsRange
1146
1147   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1148
1149 Returns a reference to an array of hash references corresponding to
1150 import_biblios/import_auths/import_records rows for a given batch
1151 starting at the given offset.
1152
1153 =cut
1154
1155 sub GetImportRecordsRange {
1156     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1157
1158     my $dbh = C4::Context->dbh;
1159
1160     my $order_by = $parameters->{order_by} || 'import_record_id';
1161     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1162
1163     my $order_by_direction =
1164       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1165
1166     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1167
1168     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1169                                            record_sequence, status, overlay_status,
1170                                            matched_biblionumber, matched_authid, record_type
1171                                     FROM   import_records
1172                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1173                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1174                                     WHERE  import_batch_id = ?";
1175     my @params;
1176     push(@params, $batch_id);
1177     if ($status) {
1178         $query .= " AND status=?";
1179         push(@params,$status);
1180     }
1181
1182     $query.=" ORDER BY $order_by $order_by_direction";
1183
1184     if($results_per_group){
1185         $query .= " LIMIT ?";
1186         push(@params, $results_per_group);
1187     }
1188     if($offset){
1189         $query .= " OFFSET ?";
1190         push(@params, $offset);
1191     }
1192     my $sth = $dbh->prepare_cached($query);
1193     $sth->execute(@params);
1194     my $results = $sth->fetchall_arrayref({});
1195     $sth->finish();
1196     return $results;
1197
1198 }
1199
1200 =head2 GetBestRecordMatch
1201
1202   my $record_id = GetBestRecordMatch($import_record_id);
1203
1204 =cut
1205
1206 sub GetBestRecordMatch {
1207     my ($import_record_id) = @_;
1208
1209     my $dbh = C4::Context->dbh;
1210     my $sth = $dbh->prepare("SELECT candidate_match_id
1211                              FROM   import_record_matches
1212                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1213                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1214                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1215                              WHERE  import_record_matches.import_record_id = ? AND
1216                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1217                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1218                              AND chosen = 1
1219                              ORDER BY score DESC, candidate_match_id DESC");
1220     $sth->execute($import_record_id);
1221     my ($record_id) = $sth->fetchrow_array();
1222     $sth->finish();
1223     return $record_id;
1224 }
1225
1226 =head2 GetImportBatchStatus
1227
1228   my $status = GetImportBatchStatus($batch_id);
1229
1230 =cut
1231
1232 sub GetImportBatchStatus {
1233     my ($batch_id) = @_;
1234
1235     my $dbh = C4::Context->dbh;
1236     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1237     $sth->execute($batch_id);
1238     my ($status) = $sth->fetchrow_array();
1239     $sth->finish();
1240     return $status;
1241
1242 }
1243
1244 =head2 SetImportBatchStatus
1245
1246   SetImportBatchStatus($batch_id, $new_status);
1247
1248 =cut
1249
1250 sub SetImportBatchStatus {
1251     my ($batch_id, $new_status) = @_;
1252
1253     my $dbh = C4::Context->dbh;
1254     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1255     $sth->execute($new_status, $batch_id);
1256     $sth->finish();
1257
1258 }
1259
1260 =head2 SetMatchedBiblionumber
1261
1262   SetMatchedBiblionumber($import_record_id, $biblionumber);
1263
1264 =cut
1265
1266 sub SetMatchedBiblionumber {
1267     my ($import_record_id, $biblionumber) = @_;
1268
1269     my $dbh = C4::Context->dbh;
1270     $dbh->do(
1271         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1272         undef, $biblionumber, $import_record_id
1273     );
1274 }
1275
1276 =head2 GetImportBatchOverlayAction
1277
1278   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1279
1280 =cut
1281
1282 sub GetImportBatchOverlayAction {
1283     my ($batch_id) = @_;
1284
1285     my $dbh = C4::Context->dbh;
1286     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1287     $sth->execute($batch_id);
1288     my ($overlay_action) = $sth->fetchrow_array();
1289     $sth->finish();
1290     return $overlay_action;
1291
1292 }
1293
1294
1295 =head2 SetImportBatchOverlayAction
1296
1297   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1298
1299 =cut
1300
1301 sub SetImportBatchOverlayAction {
1302     my ($batch_id, $new_overlay_action) = @_;
1303
1304     my $dbh = C4::Context->dbh;
1305     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1306     $sth->execute($new_overlay_action, $batch_id);
1307     $sth->finish();
1308
1309 }
1310
1311 =head2 GetImportBatchNoMatchAction
1312
1313   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1314
1315 =cut
1316
1317 sub GetImportBatchNoMatchAction {
1318     my ($batch_id) = @_;
1319
1320     my $dbh = C4::Context->dbh;
1321     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1322     $sth->execute($batch_id);
1323     my ($nomatch_action) = $sth->fetchrow_array();
1324     $sth->finish();
1325     return $nomatch_action;
1326
1327 }
1328
1329
1330 =head2 SetImportBatchNoMatchAction
1331
1332   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1333
1334 =cut
1335
1336 sub SetImportBatchNoMatchAction {
1337     my ($batch_id, $new_nomatch_action) = @_;
1338
1339     my $dbh = C4::Context->dbh;
1340     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1341     $sth->execute($new_nomatch_action, $batch_id);
1342     $sth->finish();
1343
1344 }
1345
1346 =head2 GetImportBatchItemAction
1347
1348   my $item_action = GetImportBatchItemAction($batch_id);
1349
1350 =cut
1351
1352 sub GetImportBatchItemAction {
1353     my ($batch_id) = @_;
1354
1355     my $dbh = C4::Context->dbh;
1356     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1357     $sth->execute($batch_id);
1358     my ($item_action) = $sth->fetchrow_array();
1359     $sth->finish();
1360     return $item_action;
1361
1362 }
1363
1364
1365 =head2 SetImportBatchItemAction
1366
1367   SetImportBatchItemAction($batch_id, $new_item_action);
1368
1369 =cut
1370
1371 sub SetImportBatchItemAction {
1372     my ($batch_id, $new_item_action) = @_;
1373
1374     my $dbh = C4::Context->dbh;
1375     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1376     $sth->execute($new_item_action, $batch_id);
1377     $sth->finish();
1378
1379 }
1380
1381 =head2 GetImportBatchMatcher
1382
1383   my $matcher_id = GetImportBatchMatcher($batch_id);
1384
1385 =cut
1386
1387 sub GetImportBatchMatcher {
1388     my ($batch_id) = @_;
1389
1390     my $dbh = C4::Context->dbh;
1391     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1392     $sth->execute($batch_id);
1393     my ($matcher_id) = $sth->fetchrow_array();
1394     $sth->finish();
1395     return $matcher_id;
1396
1397 }
1398
1399
1400 =head2 SetImportBatchMatcher
1401
1402   SetImportBatchMatcher($batch_id, $new_matcher_id);
1403
1404 =cut
1405
1406 sub SetImportBatchMatcher {
1407     my ($batch_id, $new_matcher_id) = @_;
1408
1409     my $dbh = C4::Context->dbh;
1410     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1411     $sth->execute($new_matcher_id, $batch_id);
1412     $sth->finish();
1413
1414 }
1415
1416 =head2 GetImportRecordOverlayStatus
1417
1418   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1419
1420 =cut
1421
1422 sub GetImportRecordOverlayStatus {
1423     my ($import_record_id) = @_;
1424
1425     my $dbh = C4::Context->dbh;
1426     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1427     $sth->execute($import_record_id);
1428     my ($overlay_status) = $sth->fetchrow_array();
1429     $sth->finish();
1430     return $overlay_status;
1431
1432 }
1433
1434
1435 =head2 SetImportRecordOverlayStatus
1436
1437   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1438
1439 =cut
1440
1441 sub SetImportRecordOverlayStatus {
1442     my ($import_record_id, $new_overlay_status) = @_;
1443
1444     my $dbh = C4::Context->dbh;
1445     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1446     $sth->execute($new_overlay_status, $import_record_id);
1447     $sth->finish();
1448
1449 }
1450
1451 =head2 GetImportRecordStatus
1452
1453   my $status = GetImportRecordStatus($import_record_id);
1454
1455 =cut
1456
1457 sub GetImportRecordStatus {
1458     my ($import_record_id) = @_;
1459
1460     my $dbh = C4::Context->dbh;
1461     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1462     $sth->execute($import_record_id);
1463     my ($status) = $sth->fetchrow_array();
1464     $sth->finish();
1465     return $status;
1466
1467 }
1468
1469
1470 =head2 SetImportRecordStatus
1471
1472   SetImportRecordStatus($import_record_id, $new_status);
1473
1474 =cut
1475
1476 sub SetImportRecordStatus {
1477     my ($import_record_id, $new_status) = @_;
1478
1479     my $dbh = C4::Context->dbh;
1480     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1481     $sth->execute($new_status, $import_record_id);
1482     $sth->finish();
1483
1484 }
1485
1486 =head2 GetImportRecordMatches
1487
1488   my $results = GetImportRecordMatches($import_record_id, $best_only);
1489
1490 =cut
1491
1492 sub GetImportRecordMatches {
1493     my $import_record_id = shift;
1494     my $best_only = @_ ? shift : 0;
1495
1496     my $dbh = C4::Context->dbh;
1497     # FIXME currently biblio only
1498     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1499                                     candidate_match_id, score, record_type,
1500                                     chosen
1501                                     FROM import_records
1502                                     JOIN import_record_matches USING (import_record_id)
1503                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1504                                     WHERE import_record_id = ?
1505                                     ORDER BY score DESC, biblionumber DESC");
1506     $sth->bind_param(1, $import_record_id);
1507     my $results = [];
1508     $sth->execute();
1509     while (my $row = $sth->fetchrow_hashref) {
1510         if ($row->{'record_type'} eq 'auth') {
1511             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1512         }
1513         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1514         push @$results, $row;
1515         last if $best_only;
1516     }
1517     $sth->finish();
1518
1519     return $results;
1520     
1521 }
1522
1523 =head2 SetImportRecordMatches
1524
1525   SetImportRecordMatches($import_record_id, @matches);
1526
1527 =cut
1528
1529 sub SetImportRecordMatches {
1530     my $import_record_id = shift;
1531     my @matches = @_;
1532
1533     my $dbh = C4::Context->dbh;
1534     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1535     $delsth->execute($import_record_id);
1536     $delsth->finish();
1537
1538     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1539                                     VALUES (?, ?, ?, ?)");
1540     my $chosen = 1; #The first match is defaulted to be chosen
1541     foreach my $match (@matches) {
1542         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1543         $chosen = 0; #After the first we do not default to other matches
1544     }
1545 }
1546
1547 =head2 RecordsFromISO2709File
1548
1549     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1550
1551 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1552
1553 @PARAM1, String, absolute path to the ISO2709 file.
1554 @PARAM2, String, see stage_file.pl
1555 @PARAM3, String, should be utf8
1556
1557 Returns two array refs.
1558
1559 =cut
1560
1561 sub RecordsFromISO2709File {
1562     my ($input_file, $record_type, $encoding) = @_;
1563     my @errors;
1564
1565     my $marc_type = C4::Context->preference('marcflavour');
1566     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1567
1568     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1569     my @marc_records;
1570     $/ = "\035";
1571     while (<$fh>) {
1572         s/^\s+//;
1573         s/\s+$//;
1574         next unless $_; # skip if record has only whitespace, as might occur
1575                         # if file includes newlines between each MARC record
1576         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1577         push @marc_records, $marc_record;
1578         if ($charset_guessed ne $encoding) {
1579             push @errors,
1580                 "Unexpected charset $charset_guessed, expecting $encoding";
1581         }
1582     }
1583     close $fh;
1584     return ( \@errors, \@marc_records );
1585 }
1586
1587 =head2 RecordsFromMARCXMLFile
1588
1589     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1590
1591 Creates MARC::Record-objects out of the given MARCXML-file.
1592
1593 @PARAM1, String, absolute path to the ISO2709 file.
1594 @PARAM2, String, should be utf8
1595
1596 Returns two array refs.
1597
1598 =cut
1599
1600 sub RecordsFromMARCXMLFile {
1601     my ( $filename, $encoding ) = @_;
1602     my $batch = MARC::File::XML->in( $filename );
1603     my ( @marcRecords, @errors, $record );
1604     do {
1605         eval { $record = $batch->next( $encoding ); };
1606         if ($@) {
1607             push @errors, $@;
1608         }
1609         push @marcRecords, $record if $record;
1610     } while( $record );
1611     return (\@errors, \@marcRecords);
1612 }
1613
1614 =head2 RecordsFromMarcPlugin
1615
1616     Converts text of input_file into array of MARC records with to_marc plugin
1617
1618 =cut
1619
1620 sub RecordsFromMarcPlugin {
1621     my ($input_file, $plugin_class, $encoding) = @_;
1622     my ( $text, @return );
1623     return \@return if !$input_file || !$plugin_class;
1624
1625     # Read input file
1626     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1627     $/ = "\035";
1628     while (<$fh>) {
1629         s/^\s+//;
1630         s/\s+$//;
1631         next unless $_;
1632         $text .= $_;
1633     }
1634     close $fh;
1635
1636     # Convert to large MARC blob with plugin
1637     $text = Koha::Plugins::Handler->run({
1638         class  => $plugin_class,
1639         method => 'to_marc',
1640         params => { data => $text },
1641     }) if $text;
1642
1643     # Convert to array of MARC records
1644     if( $text ) {
1645         my $marc_type = C4::Context->preference('marcflavour');
1646         foreach my $blob ( split(/\x1D/, $text) ) {
1647             next if $blob =~ /^\s*$/;
1648             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1649             push @return, $marcrecord;
1650         }
1651     }
1652     return \@return;
1653 }
1654
1655 # internal functions
1656
1657 sub _create_import_record {
1658     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1659
1660     my $dbh = C4::Context->dbh;
1661     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1662                                                          record_type, encoding)
1663                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1664     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1665                   $record_type, $encoding);
1666     my $import_record_id = $dbh->{'mysql_insertid'};
1667     $sth->finish();
1668     return $import_record_id;
1669 }
1670
1671 sub _add_auth_fields {
1672     my ($import_record_id, $marc_record) = @_;
1673
1674     my $controlnumber;
1675     if ($marc_record->field('001')) {
1676         $controlnumber = $marc_record->field('001')->data();
1677     }
1678     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1679     my $dbh = C4::Context->dbh;
1680     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1681     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1682     $sth->finish();
1683 }
1684
1685 sub _add_biblio_fields {
1686     my ($import_record_id, $marc_record) = @_;
1687
1688     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1689     my $dbh = C4::Context->dbh;
1690     # FIXME no controlnumber, originalsource
1691     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1692     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1693     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1694     $sth->finish();
1695                 
1696 }
1697
1698 sub _update_biblio_fields {
1699     my ($import_record_id, $marc_record) = @_;
1700
1701     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1702     my $dbh = C4::Context->dbh;
1703     # FIXME no controlnumber, originalsource
1704     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1705     $isbn =~ s/\(.*$//;
1706     $isbn =~ tr/ -_//;
1707     $isbn = uc $isbn;
1708     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1709                              WHERE  import_record_id = ?");
1710     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1711     $sth->finish();
1712 }
1713
1714 sub _parse_biblio_fields {
1715     my ($marc_record) = @_;
1716
1717     my $dbh = C4::Context->dbh;
1718     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1719     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1720
1721 }
1722
1723 sub _update_batch_record_counts {
1724     my ($batch_id) = @_;
1725
1726     my $dbh = C4::Context->dbh;
1727     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1728                                         num_records = (
1729                                             SELECT COUNT(*)
1730                                             FROM import_records
1731                                             WHERE import_batch_id = import_batches.import_batch_id),
1732                                         num_items = (
1733                                             SELECT COUNT(*)
1734                                             FROM import_records
1735                                             JOIN import_items USING (import_record_id)
1736                                             WHERE import_batch_id = import_batches.import_batch_id
1737                                             AND record_type = 'biblio')
1738                                     WHERE import_batch_id = ?");
1739     $sth->bind_param(1, $batch_id);
1740     $sth->execute();
1741     $sth->finish();
1742 }
1743
1744 sub _get_commit_action {
1745     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1746     
1747     if ($record_type eq 'biblio') {
1748         my ($bib_result, $bib_match, $item_result);
1749
1750         $bib_match = GetBestRecordMatch($import_record_id);
1751         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1752
1753             $bib_result = $overlay_action;
1754
1755             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1756                 $item_result = 'create_new';
1757             } elsif($item_action eq 'replace'){
1758                 $item_result = 'replace';
1759             } else {
1760                 $item_result = 'ignore';
1761             }
1762
1763         } else {
1764             $bib_result = $nomatch_action;
1765             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1766         }
1767         return ($bib_result, $item_result, $bib_match);
1768     } else { # must be auths
1769         my ($auth_result, $auth_match);
1770
1771         $auth_match = GetBestRecordMatch($import_record_id);
1772         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1773             $auth_result = $overlay_action;
1774         } else {
1775             $auth_result = $nomatch_action;
1776         }
1777
1778         return ($auth_result, undef, $auth_match);
1779
1780     }
1781 }
1782
1783 sub _get_revert_action {
1784     my ($overlay_action, $overlay_status, $status) = @_;
1785
1786     my $bib_result;
1787
1788     if ($status eq 'ignored') {
1789         $bib_result = 'ignore';
1790     } else {
1791         if ($overlay_action eq 'create_new') {
1792             $bib_result = 'delete';
1793         } else {
1794             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1795         }
1796     }
1797     return $bib_result;
1798 }
1799
1800 1;
1801 __END__
1802
1803 =head1 AUTHOR
1804
1805 Koha Development Team <http://koha-community.org/>
1806
1807 Galen Charlton <galen.charlton@liblime.com>
1808
1809 =cut