Bug 33576: Index records after import transaction is committed
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority GetAuthorizedHeading );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
513   'progress_callback' will define code called every X records.
514
515   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
516         BatchCommitRecords({
517             batch_id  => $batch_id,
518             framework => $framework,
519             overlay_framework => $overlay_framework,
520             progress_interval => $progress_interval,
521             progress_callback => $progress_callback,
522             skip_intermediate_commit => $skip_intermediate_commit
523         });
524
525     Parameter skip_intermediate_commit does what is says.
526 =cut
527
528 sub BatchCommitRecords {
529     my $params = shift;
530     my $batch_id          = $params->{batch_id};
531     my $framework         = $params->{framework};
532     my $overlay_framework = $params->{overlay_framework};
533     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
534     my $progress_interval = $params->{progress_interval} // 0;
535     my $progress_callback = $params->{progress_callback};
536     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
537     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
538
539     my $schema = Koha::Database->schema;
540     $schema->txn_begin;
541     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
542     # transactions may be committed and started too again. The final commit is close to the end.
543
544     my $record_type;
545     my $num_added = 0;
546     my $num_updated = 0;
547     my $num_items_added = 0;
548     my $num_items_replaced = 0;
549     my $num_items_errored = 0;
550     my $num_ignored = 0;
551     # commit (i.e., save, all records in the batch)
552     SetImportBatchStatus($batch_id, 'importing');
553     my $overlay_action = GetImportBatchOverlayAction($batch_id);
554     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
555     my $item_action = GetImportBatchItemAction($batch_id);
556     my $item_tag;
557     my $item_subfield;
558     my $dbh = C4::Context->dbh;
559     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
560                              FROM import_records
561                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
562                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
563                              WHERE import_batch_id = ?");
564     $sth->execute($batch_id);
565     my $marcflavour = C4::Context->preference('marcflavour');
566
567     my $userenv = C4::Context->userenv;
568     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
569
570     my $rec_num = 0;
571     my @biblio_ids;
572     while (my $rowref = $sth->fetchrow_hashref) {
573         $record_type = $rowref->{'record_type'};
574
575         $rec_num++;
576
577         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
578             # report progress and commit
579             $schema->txn_commit unless $skip_intermediate_commit;
580             &$progress_callback( $rec_num );
581             $schema->txn_begin unless $skip_intermediate_commit;
582         }
583         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
584             $num_ignored++;
585             next;
586         }
587
588         my $marc_type;
589         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
590             $marc_type = 'UNIMARCAUTH';
591         } elsif ($marcflavour eq 'UNIMARC') {
592             $marc_type = 'UNIMARC';
593         } else {
594             $marc_type = 'USMARC';
595         }
596         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
597
598         if ($record_type eq 'biblio') {
599             # remove any item tags - rely on _batchCommitItems
600             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
601             foreach my $item_field ($marc_record->field($item_tag)) {
602                 $marc_record->delete_field($item_field);
603             }
604             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
605                 my @control_num = $marc_record->field('001');
606                 $marc_record->delete_fields(@control_num);
607             }
608         }
609
610         my ($record_result, $item_result, $record_match) =
611             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
612                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
613
614         my $recordid;
615         my $query;
616         if ($record_result eq 'create_new') {
617             $num_added++;
618             if ($record_type eq 'biblio') {
619                 my $biblioitemnumber;
620                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
621                 push @biblio_ids, $recordid;
622                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
623                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
624                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
625                     $num_items_added += $bib_items_added;
626                     $num_items_replaced += $bib_items_replaced;
627                     $num_items_errored += $bib_items_errored;
628                 }
629             } else {
630                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
631                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
632             }
633             my $sth = $dbh->prepare_cached($query);
634             $sth->execute($recordid, $rowref->{'import_record_id'});
635             $sth->finish();
636             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
637         } elsif ($record_result eq 'replace') {
638             $num_updated++;
639             $recordid = $record_match;
640             my $oldxml;
641             if ($record_type eq 'biblio') {
642                 my $oldbiblio = Koha::Biblios->find( $recordid );
643                 $oldxml = GetXmlBiblio($recordid);
644
645                 # remove item fields so that they don't get
646                 # added again if record is reverted
647                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
648                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
649                 foreach my $item_field ($old_marc->field($item_tag)) {
650                     $old_marc->delete_field($item_field);
651                 }
652                 $oldxml = $old_marc->as_xml($marc_type);
653
654                 my $context = { source => 'batchimport' };
655                 if ($logged_in_patron) {
656                     $context->{categorycode} = $logged_in_patron->categorycode;
657                     $context->{userid} = $logged_in_patron->userid;
658                 }
659
660                 ModBiblio(
661                     $marc_record,
662                     $recordid,
663                     $overlay_framework // $oldbiblio->frameworkcode,
664                     {
665                         overlay_context   => $context,
666                         skip_record_index => 1
667                     }
668                 );
669                 push @biblio_ids, $recordid;
670                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
671
672                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
673                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
674                     $num_items_added += $bib_items_added;
675                     $num_items_replaced += $bib_items_replaced;
676                     $num_items_errored += $bib_items_errored;
677                 }
678             } else {
679                 $oldxml = GetAuthorityXML($recordid);
680
681                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
682                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
683             }
684             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
685             $sth->execute($oldxml, $rowref->{'import_record_id'});
686             $sth->finish();
687             my $sth2 = $dbh->prepare_cached($query);
688             $sth2->execute($recordid, $rowref->{'import_record_id'});
689             $sth2->finish();
690             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
691             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
692         } elsif ($record_result eq 'ignore') {
693             $recordid = $record_match;
694             $num_ignored++;
695             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
696                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
697                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
698                 $num_items_added += $bib_items_added;
699          $num_items_replaced += $bib_items_replaced;
700                 $num_items_errored += $bib_items_errored;
701                 # still need to record the matched biblionumber so that the
702                 # items can be reverted
703                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
704                 $sth2->execute($recordid, $rowref->{'import_record_id'});
705                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
706             }
707             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
708         }
709     }
710
711     if ($progress_interval){
712         &$progress_callback($rec_num);
713     }
714
715     $sth->finish();
716
717     SetImportBatchStatus($batch_id, 'imported');
718
719     # Moved final commit to the end
720     $schema->txn_commit;
721
722     if ( @biblio_ids ) {
723         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
724         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
725     }
726
727
728     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
729 }
730
731 =head2 _batchCommitItems
732
733   ($num_items_added, $num_items_errored) = 
734          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
735
736 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
737
738 =cut
739
740 sub _batchCommitItems {
741     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
742
743     my $dbh = C4::Context->dbh;
744
745     my $num_items_added = 0;
746     my $num_items_errored = 0;
747     my $num_items_replaced = 0;
748
749     my $sth = $dbh->prepare( "
750         SELECT import_items_id, import_items.marcxml, encoding
751         FROM import_items
752         JOIN import_records USING (import_record_id)
753         WHERE import_record_id = ?
754         ORDER BY import_items_id
755     " );
756     $sth->bind_param( 1, $import_record_id );
757     $sth->execute();
758
759     while ( my $row = $sth->fetchrow_hashref() ) {
760         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
761
762         # Delete date_due subfield as to not accidentally delete item checkout due dates
763         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
764         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
765
766         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
767
768         my $item_match;
769         my $duplicate_barcode = exists( $item->{'barcode'} );
770         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
771
772         # We assume that when replacing items we do not want to move them - the onus is on the importer to
773         # ensure the correct items/records are being updated
774         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
775         if (
776             $action eq "replace" &&
777             $duplicate_itemnumber &&
778             ( $item_match = Koha::Items->find( $item->{itemnumber} ))
779         ) {
780             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
781             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
782             $updsth->bind_param( 1, 'imported' );
783             $updsth->bind_param( 2, $item->{itemnumber} );
784             $updsth->bind_param( 3, undef );
785             $updsth->bind_param( 4, $row->{'import_items_id'} );
786             $updsth->execute();
787             $updsth->finish();
788             $num_items_replaced++;
789         } elsif (
790             $action eq "replace" &&
791             $duplicate_barcode &&
792             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
793         ) {
794             ModItemFromMarc( $item_marc, $item_match->biblionumber, $item_match->itemnumber, { skip_record_index => 1 } );
795             $updsth->bind_param( 1, 'imported' );
796             $updsth->bind_param( 2, $item->{itemnumber} );
797             $updsth->bind_param( 3, undef );
798             $updsth->bind_param( 4, $row->{'import_items_id'} );
799             $updsth->execute();
800             $updsth->finish();
801             $num_items_replaced++;
802         } elsif (
803             # We aren't replacing, but the incoming file has a barcode, we need to check if it exists
804             $duplicate_barcode &&
805             ( $item_match = Koha::Items->find({ barcode => $item->{'barcode'} }) )
806         ) {
807             $updsth->bind_param( 1, 'error' );
808             $updsth->bind_param( 2, undef );
809             $updsth->bind_param( 3, 'duplicate item barcode' );
810             $updsth->bind_param( 4, $row->{'import_items_id'} );
811             $updsth->execute();
812             $num_items_errored++;
813         } else {
814             # Remove the itemnumber if it exists, we want to create a new item
815             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
816             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
817
818             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
819             if( $itemnumber ) {
820                 $updsth->bind_param( 1, 'imported' );
821                 $updsth->bind_param( 2, $itemnumber );
822                 $updsth->bind_param( 3, undef );
823                 $updsth->bind_param( 4, $row->{'import_items_id'} );
824                 $updsth->execute();
825                 $updsth->finish();
826                 $num_items_added++;
827             }
828         }
829     }
830
831     return ( $num_items_added, $num_items_replaced, $num_items_errored );
832 }
833
834 =head2 BatchRevertRecords
835
836   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
837       $num_ignored) = BatchRevertRecords($batch_id);
838
839 =cut
840
841 sub BatchRevertRecords {
842     my $batch_id = shift;
843
844     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
845
846     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
847
848     my $record_type;
849     my $num_deleted = 0;
850     my $num_errors = 0;
851     my $num_reverted = 0;
852     my $num_ignored = 0;
853     my $num_items_deleted = 0;
854     # commit (i.e., save, all records in the batch)
855     SetImportBatchStatus($batch_id, 'reverting');
856     my $overlay_action = GetImportBatchOverlayAction($batch_id);
857     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
858     my $dbh = C4::Context->dbh;
859     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
860                              FROM import_records
861                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
862                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
863                              WHERE import_batch_id = ?");
864     $sth->execute($batch_id);
865     my $marc_type;
866     my $marcflavour = C4::Context->preference('marcflavour');
867     while (my $rowref = $sth->fetchrow_hashref) {
868         $record_type = $rowref->{'record_type'};
869         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
870             $num_ignored++;
871             next;
872         }
873         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
874             $marc_type = 'UNIMARCAUTH';
875         } elsif ($marcflavour eq 'UNIMARC') {
876             $marc_type = 'UNIMARC';
877         } else {
878             $marc_type = 'USMARC';
879         }
880
881         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
882
883         if ($record_result eq 'delete') {
884             my $error = undef;
885             if  ($record_type eq 'biblio') {
886                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
887                 $error = DelBiblio($rowref->{'matched_biblionumber'});
888             } else {
889                 DelAuthority({ authid => $rowref->{'matched_authid'} });
890             }
891             if (defined $error) {
892                 $num_errors++;
893             } else {
894                 $num_deleted++;
895                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
896             }
897         } elsif ($record_result eq 'restore') {
898             $num_reverted++;
899             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
900             if ($record_type eq 'biblio') {
901                 my $biblionumber = $rowref->{'matched_biblionumber'};
902                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
903
904                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
905                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
906
907                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
908                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
909             } else {
910                 my $authid = $rowref->{'matched_authid'};
911                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
912             }
913             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
914         } elsif ($record_result eq 'ignore') {
915             if ($record_type eq 'biblio') {
916                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
917             }
918             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
919         }
920         my $query;
921         if ($record_type eq 'biblio') {
922             # remove matched_biblionumber only if there is no 'imported' item left
923             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
924             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
925         } else {
926             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
927         }
928         my $sth2 = $dbh->prepare_cached($query);
929         $sth2->execute($rowref->{'import_record_id'});
930     }
931
932     $sth->finish();
933     SetImportBatchStatus($batch_id, 'reverted');
934     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
935 }
936
937 =head2 BatchRevertItems
938
939   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
940
941 =cut
942
943 sub BatchRevertItems {
944     my ($import_record_id, $biblionumber) = @_;
945
946     my $dbh = C4::Context->dbh;
947     my $num_items_deleted = 0;
948
949     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
950                                    FROM import_items
951                                    JOIN items USING (itemnumber)
952                                    WHERE import_record_id = ?");
953     $sth->bind_param(1, $import_record_id);
954     $sth->execute();
955     while (my $row = $sth->fetchrow_hashref()) {
956         my $item = Koha::Items->find($row->{itemnumber});
957         if ($item->safe_delete){
958             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
959             $updsth->bind_param(1, 'reverted');
960             $updsth->bind_param(2, $row->{'import_items_id'});
961             $updsth->execute();
962             $updsth->finish();
963             $num_items_deleted++;
964         }
965         else {
966             next;
967         }
968     }
969     $sth->finish();
970     return $num_items_deleted;
971 }
972
973 =head2 CleanBatch
974
975   CleanBatch($batch_id)
976
977 Deletes all staged records from the import batch
978 and sets the status of the batch to 'cleaned'.  Note
979 that deleting a stage record does *not* affect
980 any record that has been committed to the database.
981
982 =cut
983
984 sub CleanBatch {
985     my $batch_id = shift;
986     return unless defined $batch_id;
987
988     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
989     SetImportBatchStatus($batch_id, 'cleaned');
990 }
991
992 =head2 DeleteBatch
993
994   DeleteBatch($batch_id)
995
996 Deletes the record from the database. This can only be done
997 once the batch has been cleaned.
998
999 =cut
1000
1001 sub DeleteBatch {
1002     my $batch_id = shift;
1003     return unless defined $batch_id;
1004
1005     my $dbh = C4::Context->dbh;
1006     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
1007     $sth->execute( $batch_id );
1008 }
1009
1010 =head2 GetAllImportBatches
1011
1012   my $results = GetAllImportBatches();
1013
1014 Returns a references to an array of hash references corresponding
1015 to all import_batches rows (of batch_type 'batch'), sorted in 
1016 ascending order by import_batch_id.
1017
1018 =cut
1019
1020 sub  GetAllImportBatches {
1021     my $dbh = C4::Context->dbh;
1022     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1023                                     WHERE batch_type IN ('batch', 'webservice')
1024                                     ORDER BY import_batch_id ASC");
1025
1026     my $results = [];
1027     $sth->execute();
1028     while (my $row = $sth->fetchrow_hashref) {
1029         push @$results, $row;
1030     }
1031     $sth->finish();
1032     return $results;
1033 }
1034
1035 =head2 GetStagedWebserviceBatches
1036
1037   my $batch_ids = GetStagedWebserviceBatches();
1038
1039 Returns a references to an array of batch id's
1040 of batch_type 'webservice' that are not imported
1041
1042 =cut
1043
1044 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1045 SELECT import_batch_id FROM import_batches
1046 WHERE batch_type = 'webservice'
1047 AND import_status = 'staged'
1048 EOQ
1049 sub  GetStagedWebserviceBatches {
1050     my $dbh = C4::Context->dbh;
1051     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1052 }
1053
1054 =head2 GetImportBatchRangeDesc
1055
1056   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1057
1058 Returns a reference to an array of hash references corresponding to
1059 import_batches rows (sorted in descending order by import_batch_id)
1060 start at the given offset.
1061
1062 =cut
1063
1064 sub GetImportBatchRangeDesc {
1065     my ($offset, $results_per_group) = @_;
1066
1067     my $dbh = C4::Context->dbh;
1068     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1069                                     LEFT JOIN import_batch_profiles p
1070                                     ON b.profile_id = p.id
1071                                     WHERE b.batch_type IN ('batch', 'webservice')
1072                                     ORDER BY b.import_batch_id DESC";
1073     my @params;
1074     if ($results_per_group){
1075         $query .= " LIMIT ?";
1076         push(@params, $results_per_group);
1077     }
1078     if ($offset){
1079         $query .= " OFFSET ?";
1080         push(@params, $offset);
1081     }
1082     my $sth = $dbh->prepare_cached($query);
1083     $sth->execute(@params);
1084     my $results = $sth->fetchall_arrayref({});
1085     $sth->finish();
1086     return $results;
1087 }
1088
1089 =head2 GetItemNumbersFromImportBatch
1090
1091   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1092
1093 =cut
1094
1095 sub GetItemNumbersFromImportBatch {
1096     my ($batch_id) = @_;
1097     my $dbh = C4::Context->dbh;
1098     my $sql = q|
1099 SELECT itemnumber FROM import_items
1100 INNER JOIN items USING (itemnumber)
1101 INNER JOIN import_records USING (import_record_id)
1102 WHERE import_batch_id = ?|;
1103     my  $sth = $dbh->prepare( $sql );
1104     $sth->execute($batch_id);
1105     my @items ;
1106     while ( my ($itm) = $sth->fetchrow_array ) {
1107         push @items, $itm;
1108     }
1109     return @items;
1110 }
1111
1112 =head2 GetNumberOfImportBatches
1113
1114   my $count = GetNumberOfImportBatches();
1115
1116 =cut
1117
1118 sub GetNumberOfNonZ3950ImportBatches {
1119     my $dbh = C4::Context->dbh;
1120     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1121     $sth->execute();
1122     my ($count) = $sth->fetchrow_array();
1123     $sth->finish();
1124     return $count;
1125 }
1126
1127 =head2 GetImportBiblios
1128
1129   my $results = GetImportBiblios($importid);
1130
1131 =cut
1132
1133 sub GetImportBiblios {
1134     my ($import_record_id) = @_;
1135
1136     my $dbh = C4::Context->dbh;
1137     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1138     return $dbh->selectall_arrayref(
1139         $query,
1140         { Slice => {} },
1141         $import_record_id
1142     );
1143
1144 }
1145
1146 =head2 GetImportRecordsRange
1147
1148   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1149
1150 Returns a reference to an array of hash references corresponding to
1151 import_biblios/import_auths/import_records rows for a given batch
1152 starting at the given offset.
1153
1154 =cut
1155
1156 sub GetImportRecordsRange {
1157     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1158
1159     my $dbh = C4::Context->dbh;
1160
1161     my $order_by = $parameters->{order_by} || 'import_record_id';
1162     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1163
1164     my $order_by_direction =
1165       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1166
1167     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1168
1169     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1170                                            record_sequence, status, overlay_status,
1171                                            matched_biblionumber, matched_authid, record_type
1172                                     FROM   import_records
1173                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1174                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1175                                     WHERE  import_batch_id = ?";
1176     my @params;
1177     push(@params, $batch_id);
1178     if ($status) {
1179         $query .= " AND status=?";
1180         push(@params,$status);
1181     }
1182
1183     $query.=" ORDER BY $order_by $order_by_direction";
1184
1185     if($results_per_group){
1186         $query .= " LIMIT ?";
1187         push(@params, $results_per_group);
1188     }
1189     if($offset){
1190         $query .= " OFFSET ?";
1191         push(@params, $offset);
1192     }
1193     my $sth = $dbh->prepare_cached($query);
1194     $sth->execute(@params);
1195     my $results = $sth->fetchall_arrayref({});
1196     $sth->finish();
1197     return $results;
1198
1199 }
1200
1201 =head2 GetBestRecordMatch
1202
1203   my $record_id = GetBestRecordMatch($import_record_id);
1204
1205 =cut
1206
1207 sub GetBestRecordMatch {
1208     my ($import_record_id) = @_;
1209
1210     my $dbh = C4::Context->dbh;
1211     my $sth = $dbh->prepare("SELECT candidate_match_id
1212                              FROM   import_record_matches
1213                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1214                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1215                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1216                              WHERE  import_record_matches.import_record_id = ? AND
1217                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1218                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1219                              AND chosen = 1
1220                              ORDER BY score DESC, candidate_match_id DESC");
1221     $sth->execute($import_record_id);
1222     my ($record_id) = $sth->fetchrow_array();
1223     $sth->finish();
1224     return $record_id;
1225 }
1226
1227 =head2 GetImportBatchStatus
1228
1229   my $status = GetImportBatchStatus($batch_id);
1230
1231 =cut
1232
1233 sub GetImportBatchStatus {
1234     my ($batch_id) = @_;
1235
1236     my $dbh = C4::Context->dbh;
1237     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1238     $sth->execute($batch_id);
1239     my ($status) = $sth->fetchrow_array();
1240     $sth->finish();
1241     return $status;
1242
1243 }
1244
1245 =head2 SetImportBatchStatus
1246
1247   SetImportBatchStatus($batch_id, $new_status);
1248
1249 =cut
1250
1251 sub SetImportBatchStatus {
1252     my ($batch_id, $new_status) = @_;
1253
1254     my $dbh = C4::Context->dbh;
1255     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1256     $sth->execute($new_status, $batch_id);
1257     $sth->finish();
1258
1259 }
1260
1261 =head2 SetMatchedBiblionumber
1262
1263   SetMatchedBiblionumber($import_record_id, $biblionumber);
1264
1265 =cut
1266
1267 sub SetMatchedBiblionumber {
1268     my ($import_record_id, $biblionumber) = @_;
1269
1270     my $dbh = C4::Context->dbh;
1271     $dbh->do(
1272         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1273         undef, $biblionumber, $import_record_id
1274     );
1275 }
1276
1277 =head2 GetImportBatchOverlayAction
1278
1279   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1280
1281 =cut
1282
1283 sub GetImportBatchOverlayAction {
1284     my ($batch_id) = @_;
1285
1286     my $dbh = C4::Context->dbh;
1287     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1288     $sth->execute($batch_id);
1289     my ($overlay_action) = $sth->fetchrow_array();
1290     $sth->finish();
1291     return $overlay_action;
1292
1293 }
1294
1295
1296 =head2 SetImportBatchOverlayAction
1297
1298   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1299
1300 =cut
1301
1302 sub SetImportBatchOverlayAction {
1303     my ($batch_id, $new_overlay_action) = @_;
1304
1305     my $dbh = C4::Context->dbh;
1306     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1307     $sth->execute($new_overlay_action, $batch_id);
1308     $sth->finish();
1309
1310 }
1311
1312 =head2 GetImportBatchNoMatchAction
1313
1314   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1315
1316 =cut
1317
1318 sub GetImportBatchNoMatchAction {
1319     my ($batch_id) = @_;
1320
1321     my $dbh = C4::Context->dbh;
1322     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1323     $sth->execute($batch_id);
1324     my ($nomatch_action) = $sth->fetchrow_array();
1325     $sth->finish();
1326     return $nomatch_action;
1327
1328 }
1329
1330
1331 =head2 SetImportBatchNoMatchAction
1332
1333   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1334
1335 =cut
1336
1337 sub SetImportBatchNoMatchAction {
1338     my ($batch_id, $new_nomatch_action) = @_;
1339
1340     my $dbh = C4::Context->dbh;
1341     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1342     $sth->execute($new_nomatch_action, $batch_id);
1343     $sth->finish();
1344
1345 }
1346
1347 =head2 GetImportBatchItemAction
1348
1349   my $item_action = GetImportBatchItemAction($batch_id);
1350
1351 =cut
1352
1353 sub GetImportBatchItemAction {
1354     my ($batch_id) = @_;
1355
1356     my $dbh = C4::Context->dbh;
1357     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1358     $sth->execute($batch_id);
1359     my ($item_action) = $sth->fetchrow_array();
1360     $sth->finish();
1361     return $item_action;
1362
1363 }
1364
1365
1366 =head2 SetImportBatchItemAction
1367
1368   SetImportBatchItemAction($batch_id, $new_item_action);
1369
1370 =cut
1371
1372 sub SetImportBatchItemAction {
1373     my ($batch_id, $new_item_action) = @_;
1374
1375     my $dbh = C4::Context->dbh;
1376     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1377     $sth->execute($new_item_action, $batch_id);
1378     $sth->finish();
1379
1380 }
1381
1382 =head2 GetImportBatchMatcher
1383
1384   my $matcher_id = GetImportBatchMatcher($batch_id);
1385
1386 =cut
1387
1388 sub GetImportBatchMatcher {
1389     my ($batch_id) = @_;
1390
1391     my $dbh = C4::Context->dbh;
1392     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1393     $sth->execute($batch_id);
1394     my ($matcher_id) = $sth->fetchrow_array();
1395     $sth->finish();
1396     return $matcher_id;
1397
1398 }
1399
1400
1401 =head2 SetImportBatchMatcher
1402
1403   SetImportBatchMatcher($batch_id, $new_matcher_id);
1404
1405 =cut
1406
1407 sub SetImportBatchMatcher {
1408     my ($batch_id, $new_matcher_id) = @_;
1409
1410     my $dbh = C4::Context->dbh;
1411     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1412     $sth->execute($new_matcher_id, $batch_id);
1413     $sth->finish();
1414
1415 }
1416
1417 =head2 GetImportRecordOverlayStatus
1418
1419   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1420
1421 =cut
1422
1423 sub GetImportRecordOverlayStatus {
1424     my ($import_record_id) = @_;
1425
1426     my $dbh = C4::Context->dbh;
1427     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1428     $sth->execute($import_record_id);
1429     my ($overlay_status) = $sth->fetchrow_array();
1430     $sth->finish();
1431     return $overlay_status;
1432
1433 }
1434
1435
1436 =head2 SetImportRecordOverlayStatus
1437
1438   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1439
1440 =cut
1441
1442 sub SetImportRecordOverlayStatus {
1443     my ($import_record_id, $new_overlay_status) = @_;
1444
1445     my $dbh = C4::Context->dbh;
1446     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1447     $sth->execute($new_overlay_status, $import_record_id);
1448     $sth->finish();
1449
1450 }
1451
1452 =head2 GetImportRecordStatus
1453
1454   my $status = GetImportRecordStatus($import_record_id);
1455
1456 =cut
1457
1458 sub GetImportRecordStatus {
1459     my ($import_record_id) = @_;
1460
1461     my $dbh = C4::Context->dbh;
1462     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1463     $sth->execute($import_record_id);
1464     my ($status) = $sth->fetchrow_array();
1465     $sth->finish();
1466     return $status;
1467
1468 }
1469
1470
1471 =head2 SetImportRecordStatus
1472
1473   SetImportRecordStatus($import_record_id, $new_status);
1474
1475 =cut
1476
1477 sub SetImportRecordStatus {
1478     my ($import_record_id, $new_status) = @_;
1479
1480     my $dbh = C4::Context->dbh;
1481     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1482     $sth->execute($new_status, $import_record_id);
1483     $sth->finish();
1484
1485 }
1486
1487 =head2 GetImportRecordMatches
1488
1489   my $results = GetImportRecordMatches($import_record_id, $best_only);
1490
1491 =cut
1492
1493 sub GetImportRecordMatches {
1494     my $import_record_id = shift;
1495     my $best_only = @_ ? shift : 0;
1496
1497     my $dbh = C4::Context->dbh;
1498     # FIXME currently biblio only
1499     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1500                                     candidate_match_id, score, record_type,
1501                                     chosen
1502                                     FROM import_records
1503                                     JOIN import_record_matches USING (import_record_id)
1504                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1505                                     WHERE import_record_id = ?
1506                                     ORDER BY score DESC, biblionumber DESC");
1507     $sth->bind_param(1, $import_record_id);
1508     my $results = [];
1509     $sth->execute();
1510     while (my $row = $sth->fetchrow_hashref) {
1511         if ($row->{'record_type'} eq 'auth') {
1512             $row->{'authorized_heading'} = GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1513         }
1514         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1515         push @$results, $row;
1516         last if $best_only;
1517     }
1518     $sth->finish();
1519
1520     return $results;
1521     
1522 }
1523
1524 =head2 SetImportRecordMatches
1525
1526   SetImportRecordMatches($import_record_id, @matches);
1527
1528 =cut
1529
1530 sub SetImportRecordMatches {
1531     my $import_record_id = shift;
1532     my @matches = @_;
1533
1534     my $dbh = C4::Context->dbh;
1535     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1536     $delsth->execute($import_record_id);
1537     $delsth->finish();
1538
1539     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1540                                     VALUES (?, ?, ?, ?)");
1541     my $chosen = 1; #The first match is defaulted to be chosen
1542     foreach my $match (@matches) {
1543         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1544         $chosen = 0; #After the first we do not default to other matches
1545     }
1546 }
1547
1548 =head2 RecordsFromISO2709File
1549
1550     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1551
1552 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1553
1554 @PARAM1, String, absolute path to the ISO2709 file.
1555 @PARAM2, String, see stage_file.pl
1556 @PARAM3, String, should be utf8
1557
1558 Returns two array refs.
1559
1560 =cut
1561
1562 sub RecordsFromISO2709File {
1563     my ($input_file, $record_type, $encoding) = @_;
1564     my @errors;
1565
1566     my $marc_type = C4::Context->preference('marcflavour');
1567     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1568
1569     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1570     my @marc_records;
1571     $/ = "\035";
1572     while (<$fh>) {
1573         s/^\s+//;
1574         s/\s+$//;
1575         next unless $_; # skip if record has only whitespace, as might occur
1576                         # if file includes newlines between each MARC record
1577         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1578         push @marc_records, $marc_record;
1579         if ($charset_guessed ne $encoding) {
1580             push @errors,
1581                 "Unexpected charset $charset_guessed, expecting $encoding";
1582         }
1583     }
1584     close $fh;
1585     return ( \@errors, \@marc_records );
1586 }
1587
1588 =head2 RecordsFromMARCXMLFile
1589
1590     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1591
1592 Creates MARC::Record-objects out of the given MARCXML-file.
1593
1594 @PARAM1, String, absolute path to the ISO2709 file.
1595 @PARAM2, String, should be utf8
1596
1597 Returns two array refs.
1598
1599 =cut
1600
1601 sub RecordsFromMARCXMLFile {
1602     my ( $filename, $encoding ) = @_;
1603     my $batch = MARC::File::XML->in( $filename );
1604     my ( @marcRecords, @errors, $record );
1605     do {
1606         eval { $record = $batch->next( $encoding ); };
1607         if ($@) {
1608             push @errors, $@;
1609         }
1610         push @marcRecords, $record if $record;
1611     } while( $record );
1612     return (\@errors, \@marcRecords);
1613 }
1614
1615 =head2 RecordsFromMarcPlugin
1616
1617     Converts text of input_file into array of MARC records with to_marc plugin
1618
1619 =cut
1620
1621 sub RecordsFromMarcPlugin {
1622     my ($input_file, $plugin_class, $encoding) = @_;
1623     my ( $text, @return );
1624     return \@return if !$input_file || !$plugin_class;
1625
1626     # Read input file
1627     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1628     $/ = "\035";
1629     while (<$fh>) {
1630         s/^\s+//;
1631         s/\s+$//;
1632         next unless $_;
1633         $text .= $_;
1634     }
1635     close $fh;
1636
1637     # Convert to large MARC blob with plugin
1638     $text = Koha::Plugins::Handler->run({
1639         class  => $plugin_class,
1640         method => 'to_marc',
1641         params => { data => $text },
1642     }) if $text;
1643
1644     # Convert to array of MARC records
1645     if( $text ) {
1646         my $marc_type = C4::Context->preference('marcflavour');
1647         foreach my $blob ( split(/\x1D/, $text) ) {
1648             next if $blob =~ /^\s*$/;
1649             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1650             push @return, $marcrecord;
1651         }
1652     }
1653     return \@return;
1654 }
1655
1656 # internal functions
1657
1658 sub _create_import_record {
1659     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1660
1661     my $dbh = C4::Context->dbh;
1662     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1663                                                          record_type, encoding)
1664                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1665     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1666                   $record_type, $encoding);
1667     my $import_record_id = $dbh->{'mysql_insertid'};
1668     $sth->finish();
1669     return $import_record_id;
1670 }
1671
1672 sub _add_auth_fields {
1673     my ($import_record_id, $marc_record) = @_;
1674
1675     my $controlnumber;
1676     if ($marc_record->field('001')) {
1677         $controlnumber = $marc_record->field('001')->data();
1678     }
1679     my $authorized_heading = GetAuthorizedHeading({ record => $marc_record });
1680     my $dbh = C4::Context->dbh;
1681     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1682     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1683     $sth->finish();
1684 }
1685
1686 sub _add_biblio_fields {
1687     my ($import_record_id, $marc_record) = @_;
1688
1689     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1690     my $dbh = C4::Context->dbh;
1691     # FIXME no controlnumber, originalsource
1692     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1693     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1694     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1695     $sth->finish();
1696                 
1697 }
1698
1699 sub _update_biblio_fields {
1700     my ($import_record_id, $marc_record) = @_;
1701
1702     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1703     my $dbh = C4::Context->dbh;
1704     # FIXME no controlnumber, originalsource
1705     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1706     $isbn =~ s/\(.*$//;
1707     $isbn =~ tr/ -_//;
1708     $isbn = uc $isbn;
1709     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1710                              WHERE  import_record_id = ?");
1711     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1712     $sth->finish();
1713 }
1714
1715 sub _parse_biblio_fields {
1716     my ($marc_record) = @_;
1717
1718     my $dbh = C4::Context->dbh;
1719     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1720     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1721
1722 }
1723
1724 sub _update_batch_record_counts {
1725     my ($batch_id) = @_;
1726
1727     my $dbh = C4::Context->dbh;
1728     my ( $num_records ) = $dbh->selectrow_array(q|
1729                                             SELECT COUNT(*)
1730                                             FROM import_records
1731                                             WHERE import_batch_id = ?
1732     |, undef, $batch_id );
1733     my ( $num_items ) = $dbh->selectrow_array(q|
1734                                             SELECT COUNT(*)
1735                                             FROM import_records
1736                                             JOIN import_items USING (import_record_id)
1737                                             WHERE import_batch_id = ? AND record_type = 'biblio'
1738     |, undef, $batch_id );
1739     $dbh->do(
1740         "UPDATE import_batches SET num_records=?, num_items=? WHERE import_batch_id=?",
1741         undef,
1742         $num_records,
1743         $num_items,
1744         $batch_id,
1745     );
1746 }
1747
1748 sub _get_commit_action {
1749     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1750     
1751     if ($record_type eq 'biblio') {
1752         my ($bib_result, $bib_match, $item_result);
1753
1754         $bib_match = GetBestRecordMatch($import_record_id);
1755         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1756
1757             $bib_result = $overlay_action;
1758
1759             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1760                 $item_result = 'create_new';
1761             } elsif($item_action eq 'replace'){
1762                 $item_result = 'replace';
1763             } else {
1764                 $item_result = 'ignore';
1765             }
1766
1767         } else {
1768             $bib_result = $nomatch_action;
1769             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1770         }
1771         return ($bib_result, $item_result, $bib_match);
1772     } else { # must be auths
1773         my ($auth_result, $auth_match);
1774
1775         $auth_match = GetBestRecordMatch($import_record_id);
1776         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1777             $auth_result = $overlay_action;
1778         } else {
1779             $auth_result = $nomatch_action;
1780         }
1781
1782         return ($auth_result, undef, $auth_match);
1783
1784     }
1785 }
1786
1787 sub _get_revert_action {
1788     my ($overlay_action, $overlay_status, $status) = @_;
1789
1790     my $bib_result;
1791
1792     if ($status eq 'ignored') {
1793         $bib_result = 'ignore';
1794     } else {
1795         if ($overlay_action eq 'create_new') {
1796             $bib_result = 'delete';
1797         } else {
1798             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1799         }
1800     }
1801     return $bib_result;
1802 }
1803
1804 1;
1805 __END__
1806
1807 =head1 AUTHOR
1808
1809 Koha Development Team <http://koha-community.org/>
1810
1811 Galen Charlton <galen.charlton@liblime.com>
1812
1813 =cut