Bug 15869: Change framework on overlay
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub EmbedItemsInImportBiblio {
195     my ( $record, $import_record_id ) = @_;
196     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
197     my $dbh = C4::Context->dbh;
198     my $import_items = $dbh->selectall_arrayref(q|
199         SELECT import_items.marcxml
200         FROM import_items
201         WHERE import_record_id = ?
202     |, { Slice => {} }, $import_record_id );
203     my @item_fields;
204     for my $import_item ( @$import_items ) {
205         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
206         push @item_fields, $item_marc->field($itemtag);
207     }
208     $record->append_fields(@item_fields);
209     return $record;
210 }
211
212 =head2 AddImportBatch
213
214   my $batch_id = AddImportBatch($params_hash);
215
216 =cut
217
218 sub AddImportBatch {
219     my ($params) = @_;
220
221     my (@fields, @vals);
222     foreach (qw( matcher_id template_id branchcode
223                  overlay_action nomatch_action item_action
224                  import_status batch_type file_name comments record_type )) {
225         if (exists $params->{$_}) {
226             push @fields, $_;
227             push @vals, $params->{$_};
228         }
229     }
230     my $dbh = C4::Context->dbh;
231     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
232                                   VALUES (".join( ',', map '?', @fields).")",
233              undef,
234              @vals);
235     return $dbh->{'mysql_insertid'};
236 }
237
238 =head2 GetImportBatch 
239
240   my $row = GetImportBatch($batch_id);
241
242 Retrieve a hashref of an import_batches row.
243
244 =cut
245
246 sub GetImportBatch {
247     my ($batch_id) = @_;
248
249     my $dbh = C4::Context->dbh;
250     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
251     $sth->bind_param(1, $batch_id);
252     $sth->execute();
253     my $result = $sth->fetchrow_hashref;
254     $sth->finish();
255     return $result;
256
257 }
258
259 =head2 AddBiblioToBatch 
260
261   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
262                 $marc_record, $encoding, $update_counts);
263
264 =cut
265
266 sub AddBiblioToBatch {
267     my $batch_id = shift;
268     my $record_sequence = shift;
269     my $marc_record = shift;
270     my $encoding = shift;
271     my $update_counts = @_ ? shift : 1;
272
273     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
274     _add_biblio_fields($import_record_id, $marc_record);
275     _update_batch_record_counts($batch_id) if $update_counts;
276     return $import_record_id;
277 }
278
279 =head2 AddAuthToBatch
280
281   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
282                 $marc_record, $encoding, $update_counts, [$marc_type]);
283
284 =cut
285
286 sub AddAuthToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 BatchStageMarcRecords
303
304 ( $batch_id, $num_records, $num_items, @invalid_records ) =
305   BatchStageMarcRecords(
306     $record_type,                $encoding,
307     $marc_records,               $file_name,
308     $marc_modification_template, $comments,
309     $branch_code,                $parse_items,
310     $leave_as_staging,           $progress_interval,
311     $progress_callback
312   );
313
314 =cut
315
316 sub BatchStageMarcRecords {
317     my $record_type = shift;
318     my $encoding = shift;
319     my $marc_records = shift;
320     my $file_name = shift;
321     my $marc_modification_template = shift;
322     my $comments = shift;
323     my $branch_code = shift;
324     my $parse_items = shift;
325     my $leave_as_staging = shift;
326
327     # optional callback to monitor status 
328     # of job
329     my $progress_interval = 0;
330     my $progress_callback = undef;
331     if ($#_ == 1) {
332         $progress_interval = shift;
333         $progress_callback = shift;
334         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
335         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
336     } 
337     
338     my $batch_id = AddImportBatch( {
339             overlay_action => 'create_new',
340             import_status => 'staging',
341             batch_type => 'batch',
342             file_name => $file_name,
343             comments => $comments,
344             record_type => $record_type,
345         } );
346     if ($parse_items) {
347         SetImportBatchItemAction($batch_id, 'always_add');
348     } else {
349         SetImportBatchItemAction($batch_id, 'ignore');
350     }
351
352
353     my $marc_type = C4::Context->preference('marcflavour');
354     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
355     my @invalid_records = ();
356     my $num_valid = 0;
357     my $num_items = 0;
358     # FIXME - for now, we're dealing only with bibs
359     my $rec_num = 0;
360     foreach my $marc_record (@$marc_records) {
361         $rec_num++;
362         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
363             &$progress_callback($rec_num);
364         }
365
366         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
367
368         my $import_record_id;
369         if (scalar($marc_record->fields()) == 0) {
370             push @invalid_records, $marc_record;
371         } else {
372
373             # Normalize the record so it doesn't have separated diacritics
374             SetUTF8Flag($marc_record);
375
376             $num_valid++;
377             if ($record_type eq 'biblio') {
378                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, 0);
379                 if ($parse_items) {
380                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
381                     $num_items += scalar(@import_items_ids);
382                 }
383             } elsif ($record_type eq 'auth') {
384                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, 0, $marc_type);
385             }
386         }
387     }
388     unless ($leave_as_staging) {
389         SetImportBatchStatus($batch_id, 'staged');
390     }
391     # FIXME branch_code, number of bibs, number of items
392     _update_batch_record_counts($batch_id);
393     if ($progress_interval){
394         &$progress_callback($rec_num);
395     }
396
397     return ($batch_id, $num_valid, $num_items, @invalid_records);
398 }
399
400 =head2 AddItemsToImportBiblio
401
402   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
403                 $import_record_id, $marc_record, $update_counts);
404
405 =cut
406
407 sub AddItemsToImportBiblio {
408     my $batch_id = shift;
409     my $import_record_id = shift;
410     my $marc_record = shift;
411     my $update_counts = @_ ? shift : 0;
412
413     my @import_items_ids = ();
414    
415     my $dbh = C4::Context->dbh; 
416     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
417     foreach my $item_field ($marc_record->field($item_tag)) {
418         my $item_marc = MARC::Record->new();
419         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
420         $item_marc->append_fields($item_field);
421         $marc_record->delete_field($item_field);
422         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
423                                         VALUES (?, ?, ?)");
424         $sth->bind_param(1, $import_record_id);
425         $sth->bind_param(2, 'staged');
426         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
427         $sth->execute();
428         push @import_items_ids, $dbh->{'mysql_insertid'};
429         $sth->finish();
430     }
431
432     if ($#import_items_ids > -1) {
433         _update_batch_record_counts($batch_id) if $update_counts;
434     }
435     return @import_items_ids;
436 }
437
438 =head2 BatchFindDuplicates
439
440   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
441              $max_matches, $progress_interval, $progress_callback);
442
443 Goes through the records loaded in the batch and attempts to 
444 find duplicates for each one.  Sets the matching status 
445 of each record to "no_match" or "auto_match" as appropriate.
446
447 The $max_matches parameter is optional; if it is not supplied,
448 it defaults to 10.
449
450 The $progress_interval and $progress_callback parameters are 
451 optional; if both are supplied, the sub referred to by
452 $progress_callback will be invoked every $progress_interval
453 records using the number of records processed as the 
454 singular argument.
455
456 =cut
457
458 sub BatchFindDuplicates {
459     my $batch_id = shift;
460     my $matcher = shift;
461     my $max_matches = @_ ? shift : 10;
462
463     # optional callback to monitor status 
464     # of job
465     my $progress_interval = 0;
466     my $progress_callback = undef;
467     if ($#_ == 1) {
468         $progress_interval = shift;
469         $progress_callback = shift;
470         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
471         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
472     }
473
474     my $dbh = C4::Context->dbh;
475
476     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
477                              FROM import_records
478                              WHERE import_batch_id = ?");
479     $sth->execute($batch_id);
480     my $num_with_matches = 0;
481     my $rec_num = 0;
482     while (my $rowref = $sth->fetchrow_hashref) {
483         $rec_num++;
484         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
485             &$progress_callback($rec_num);
486         }
487         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
488         my @matches = ();
489         if (defined $matcher) {
490             @matches = $matcher->get_matches($marc_record, $max_matches);
491         }
492         if (scalar(@matches) > 0) {
493             $num_with_matches++;
494             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
495             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
496         } else {
497             SetImportRecordMatches($rowref->{'import_record_id'}, ());
498             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
499         }
500     }
501
502     if ($progress_interval){
503         &$progress_callback($rec_num);
504     }
505
506     $sth->finish();
507     return $num_with_matches;
508 }
509
510 =head2 BatchCommitRecords
511
512   Takes a hashref containing params for committing the batch - optional parameters 'progress_interval' and
513   'progress_callback' will define code called every X records.
514
515   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
516         BatchCommitRecords({
517             batch_id  => $batch_id,
518             framework => $framework,
519             overlay_framework => $overlay_framework,
520             progress_interval => $progress_interval,
521             progress_callback => $progress_callback,
522             skip_intermediate_commit => $skip_intermediate_commit
523         });
524
525     Parameter skip_intermediate_commit does what is says.
526 =cut
527
528 sub BatchCommitRecords {
529     my $params = shift;
530     my $batch_id          = $params->{batch_id};
531     my $framework         = $params->{framework};
532     my $overlay_framework = $params->{overlay_framework};
533     my $skip_intermediate_commit = $params->{skip_intermediate_commit};
534     my $progress_interval = $params->{progress_interval} // 0;
535     my $progress_callback = $params->{progress_callback};
536     $progress_interval = 0 unless $progress_interval && $progress_interval =~ /^\d+$/;
537     $progress_interval = 0 unless ref($progress_callback) eq 'CODE';
538
539     my $schema = Koha::Database->schema;
540     $schema->txn_begin;
541     # NOTE: Moved this transaction to the front of the routine. Note that inside the while loop below
542     # transactions may be committed and started too again. The final commit is close to the end.
543
544     my $record_type;
545     my $num_added = 0;
546     my $num_updated = 0;
547     my $num_items_added = 0;
548     my $num_items_replaced = 0;
549     my $num_items_errored = 0;
550     my $num_ignored = 0;
551     # commit (i.e., save, all records in the batch)
552     SetImportBatchStatus($batch_id, 'importing');
553     my $overlay_action = GetImportBatchOverlayAction($batch_id);
554     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
555     my $item_action = GetImportBatchItemAction($batch_id);
556     my $item_tag;
557     my $item_subfield;
558     my $dbh = C4::Context->dbh;
559     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
560                              FROM import_records
561                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
562                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
563                              WHERE import_batch_id = ?");
564     $sth->execute($batch_id);
565     my $marcflavour = C4::Context->preference('marcflavour');
566
567     my $userenv = C4::Context->userenv;
568     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
569
570     my $rec_num = 0;
571     my @biblio_ids;
572     while (my $rowref = $sth->fetchrow_hashref) {
573         $record_type = $rowref->{'record_type'};
574
575         $rec_num++;
576
577         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
578             # report progress and commit
579             $schema->txn_commit unless $skip_intermediate_commit;
580             &$progress_callback( $rec_num );
581             $schema->txn_begin unless $skip_intermediate_commit;
582         }
583         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
584             $num_ignored++;
585             next;
586         }
587
588         my $marc_type;
589         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
590             $marc_type = 'UNIMARCAUTH';
591         } elsif ($marcflavour eq 'UNIMARC') {
592             $marc_type = 'UNIMARC';
593         } else {
594             $marc_type = 'USMARC';
595         }
596         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
597
598         if ($record_type eq 'biblio') {
599             # remove any item tags - rely on _batchCommitItems
600             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
601             foreach my $item_field ($marc_record->field($item_tag)) {
602                 $marc_record->delete_field($item_field);
603             }
604             if(C4::Context->preference('autoControlNumber') eq 'biblionumber'){
605                 my @control_num = $marc_record->field('001');
606                 $marc_record->delete_fields(@control_num);
607             }
608         }
609
610         my ($record_result, $item_result, $record_match) =
611             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
612                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
613
614         my $recordid;
615         my $query;
616         if ($record_result eq 'create_new') {
617             $num_added++;
618             if ($record_type eq 'biblio') {
619                 my $biblioitemnumber;
620                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
621                 push @biblio_ids, $recordid;
622                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
623                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
624                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
625                     $num_items_added += $bib_items_added;
626                     $num_items_replaced += $bib_items_replaced;
627                     $num_items_errored += $bib_items_errored;
628                 }
629             } else {
630                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
631                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
632             }
633             my $sth = $dbh->prepare_cached($query);
634             $sth->execute($recordid, $rowref->{'import_record_id'});
635             $sth->finish();
636             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
637         } elsif ($record_result eq 'replace') {
638             $num_updated++;
639             $recordid = $record_match;
640             my $oldxml;
641             if ($record_type eq 'biblio') {
642                 my $oldbiblio = Koha::Biblios->find( $recordid );
643                 $oldxml = GetXmlBiblio($recordid);
644
645                 # remove item fields so that they don't get
646                 # added again if record is reverted
647                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
648                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
649                 foreach my $item_field ($old_marc->field($item_tag)) {
650                     $old_marc->delete_field($item_field);
651                 }
652                 $oldxml = $old_marc->as_xml($marc_type);
653
654                 my $context = { source => 'batchimport' };
655                 if ($logged_in_patron) {
656                     $context->{categorycode} = $logged_in_patron->categorycode;
657                     $context->{userid} = $logged_in_patron->userid;
658                 }
659
660                 ModBiblio(
661                     $marc_record,
662                     $recordid,
663                     $overlay_framework // $oldbiblio->frameworkcode,
664                     {
665                         overlay_context   => $context,
666                         skip_record_index => 1
667                     }
668                 );
669                 push @biblio_ids, $recordid;
670                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
671
672                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
673                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
674                     $num_items_added += $bib_items_added;
675                     $num_items_replaced += $bib_items_replaced;
676                     $num_items_errored += $bib_items_errored;
677                 }
678             } else {
679                 $oldxml = GetAuthorityXML($recordid);
680
681                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
682                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
683             }
684             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
685             $sth->execute($oldxml, $rowref->{'import_record_id'});
686             $sth->finish();
687             my $sth2 = $dbh->prepare_cached($query);
688             $sth2->execute($recordid, $rowref->{'import_record_id'});
689             $sth2->finish();
690             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
691             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
692         } elsif ($record_result eq 'ignore') {
693             $recordid = $record_match;
694             $num_ignored++;
695             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
696                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
697                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
698                 $num_items_added += $bib_items_added;
699          $num_items_replaced += $bib_items_replaced;
700                 $num_items_errored += $bib_items_errored;
701                 # still need to record the matched biblionumber so that the
702                 # items can be reverted
703                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
704                 $sth2->execute($recordid, $rowref->{'import_record_id'});
705                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
706             }
707             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
708         }
709     }
710
711     if ($progress_interval){
712         &$progress_callback($rec_num);
713     }
714
715     $sth->finish();
716
717     if ( @biblio_ids ) {
718         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
719         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
720     }
721
722     SetImportBatchStatus($batch_id, 'imported');
723
724     # Moved final commit to the end
725     $schema->txn_commit;
726
727     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
728 }
729
730 =head2 _batchCommitItems
731
732   ($num_items_added, $num_items_errored) = 
733          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
734
735 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
736
737 =cut
738
739 sub _batchCommitItems {
740     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
741
742     my $dbh = C4::Context->dbh;
743
744     my $num_items_added = 0;
745     my $num_items_errored = 0;
746     my $num_items_replaced = 0;
747
748     my $sth = $dbh->prepare( "
749         SELECT import_items_id, import_items.marcxml, encoding
750         FROM import_items
751         JOIN import_records USING (import_record_id)
752         WHERE import_record_id = ?
753         ORDER BY import_items_id
754     " );
755     $sth->bind_param( 1, $import_record_id );
756     $sth->execute();
757
758     while ( my $row = $sth->fetchrow_hashref() ) {
759         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
760
761         # Delete date_due subfield as to not accidentally delete item checkout due dates
762         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
763         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
764
765         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
766
767         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
768         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
769
770         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
771         if ( $action eq "replace" && $duplicate_itemnumber ) {
772             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
773             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
774             $updsth->bind_param( 1, 'imported' );
775             $updsth->bind_param( 2, $item->{itemnumber} );
776             $updsth->bind_param( 3, undef );
777             $updsth->bind_param( 4, $row->{'import_items_id'} );
778             $updsth->execute();
779             $updsth->finish();
780             $num_items_replaced++;
781         } elsif ( $action eq "replace" && $duplicate_barcode ) {
782             my $itemnumber = $duplicate_barcode->itemnumber;
783             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
784             $updsth->bind_param( 1, 'imported' );
785             $updsth->bind_param( 2, $item->{itemnumber} );
786             $updsth->bind_param( 3, undef );
787             $updsth->bind_param( 4, $row->{'import_items_id'} );
788             $updsth->execute();
789             $updsth->finish();
790             $num_items_replaced++;
791         } elsif ($duplicate_barcode) {
792             $updsth->bind_param( 1, 'error' );
793             $updsth->bind_param( 2, undef );
794             $updsth->bind_param( 3, 'duplicate item barcode' );
795             $updsth->bind_param( 4, $row->{'import_items_id'} );
796             $updsth->execute();
797             $num_items_errored++;
798         } else {
799             # Remove the itemnumber if it exists, we want to create a new item
800             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
801             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
802
803             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
804             if( $itemnumber ) {
805                 $updsth->bind_param( 1, 'imported' );
806                 $updsth->bind_param( 2, $itemnumber );
807                 $updsth->bind_param( 3, undef );
808                 $updsth->bind_param( 4, $row->{'import_items_id'} );
809                 $updsth->execute();
810                 $updsth->finish();
811                 $num_items_added++;
812             }
813         }
814     }
815
816     return ( $num_items_added, $num_items_replaced, $num_items_errored );
817 }
818
819 =head2 BatchRevertRecords
820
821   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
822       $num_ignored) = BatchRevertRecords($batch_id);
823
824 =cut
825
826 sub BatchRevertRecords {
827     my $batch_id = shift;
828
829     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
830
831     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
832
833     my $record_type;
834     my $num_deleted = 0;
835     my $num_errors = 0;
836     my $num_reverted = 0;
837     my $num_ignored = 0;
838     my $num_items_deleted = 0;
839     # commit (i.e., save, all records in the batch)
840     SetImportBatchStatus($batch_id, 'reverting');
841     my $overlay_action = GetImportBatchOverlayAction($batch_id);
842     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
843     my $dbh = C4::Context->dbh;
844     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
845                              FROM import_records
846                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
847                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
848                              WHERE import_batch_id = ?");
849     $sth->execute($batch_id);
850     my $marc_type;
851     my $marcflavour = C4::Context->preference('marcflavour');
852     while (my $rowref = $sth->fetchrow_hashref) {
853         $record_type = $rowref->{'record_type'};
854         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
855             $num_ignored++;
856             next;
857         }
858         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
859             $marc_type = 'UNIMARCAUTH';
860         } elsif ($marcflavour eq 'UNIMARC') {
861             $marc_type = 'UNIMARC';
862         } else {
863             $marc_type = 'USMARC';
864         }
865
866         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
867
868         if ($record_result eq 'delete') {
869             my $error = undef;
870             if  ($record_type eq 'biblio') {
871                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
872                 $error = DelBiblio($rowref->{'matched_biblionumber'});
873             } else {
874                 DelAuthority({ authid => $rowref->{'matched_authid'} });
875             }
876             if (defined $error) {
877                 $num_errors++;
878             } else {
879                 $num_deleted++;
880                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
881             }
882         } elsif ($record_result eq 'restore') {
883             $num_reverted++;
884             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
885             if ($record_type eq 'biblio') {
886                 my $biblionumber = $rowref->{'matched_biblionumber'};
887                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
888
889                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
890                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
891
892                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
893                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
894             } else {
895                 my $authid = $rowref->{'matched_authid'};
896                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
897             }
898             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
899         } elsif ($record_result eq 'ignore') {
900             if ($record_type eq 'biblio') {
901                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
902             }
903             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
904         }
905         my $query;
906         if ($record_type eq 'biblio') {
907             # remove matched_biblionumber only if there is no 'imported' item left
908             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
909             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
910         } else {
911             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
912         }
913         my $sth2 = $dbh->prepare_cached($query);
914         $sth2->execute($rowref->{'import_record_id'});
915     }
916
917     $sth->finish();
918     SetImportBatchStatus($batch_id, 'reverted');
919     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
920 }
921
922 =head2 BatchRevertItems
923
924   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
925
926 =cut
927
928 sub BatchRevertItems {
929     my ($import_record_id, $biblionumber) = @_;
930
931     my $dbh = C4::Context->dbh;
932     my $num_items_deleted = 0;
933
934     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
935                                    FROM import_items
936                                    JOIN items USING (itemnumber)
937                                    WHERE import_record_id = ?");
938     $sth->bind_param(1, $import_record_id);
939     $sth->execute();
940     while (my $row = $sth->fetchrow_hashref()) {
941         my $item = Koha::Items->find($row->{itemnumber});
942         if ($item->safe_delete){
943             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
944             $updsth->bind_param(1, 'reverted');
945             $updsth->bind_param(2, $row->{'import_items_id'});
946             $updsth->execute();
947             $updsth->finish();
948             $num_items_deleted++;
949         }
950         else {
951             next;
952         }
953     }
954     $sth->finish();
955     return $num_items_deleted;
956 }
957
958 =head2 CleanBatch
959
960   CleanBatch($batch_id)
961
962 Deletes all staged records from the import batch
963 and sets the status of the batch to 'cleaned'.  Note
964 that deleting a stage record does *not* affect
965 any record that has been committed to the database.
966
967 =cut
968
969 sub CleanBatch {
970     my $batch_id = shift;
971     return unless defined $batch_id;
972
973     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
974     SetImportBatchStatus($batch_id, 'cleaned');
975 }
976
977 =head2 DeleteBatch
978
979   DeleteBatch($batch_id)
980
981 Deletes the record from the database. This can only be done
982 once the batch has been cleaned.
983
984 =cut
985
986 sub DeleteBatch {
987     my $batch_id = shift;
988     return unless defined $batch_id;
989
990     my $dbh = C4::Context->dbh;
991     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
992     $sth->execute( $batch_id );
993 }
994
995 =head2 GetAllImportBatches
996
997   my $results = GetAllImportBatches();
998
999 Returns a references to an array of hash references corresponding
1000 to all import_batches rows (of batch_type 'batch'), sorted in 
1001 ascending order by import_batch_id.
1002
1003 =cut
1004
1005 sub  GetAllImportBatches {
1006     my $dbh = C4::Context->dbh;
1007     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1008                                     WHERE batch_type IN ('batch', 'webservice')
1009                                     ORDER BY import_batch_id ASC");
1010
1011     my $results = [];
1012     $sth->execute();
1013     while (my $row = $sth->fetchrow_hashref) {
1014         push @$results, $row;
1015     }
1016     $sth->finish();
1017     return $results;
1018 }
1019
1020 =head2 GetStagedWebserviceBatches
1021
1022   my $batch_ids = GetStagedWebserviceBatches();
1023
1024 Returns a references to an array of batch id's
1025 of batch_type 'webservice' that are not imported
1026
1027 =cut
1028
1029 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1030 SELECT import_batch_id FROM import_batches
1031 WHERE batch_type = 'webservice'
1032 AND import_status = 'staged'
1033 EOQ
1034 sub  GetStagedWebserviceBatches {
1035     my $dbh = C4::Context->dbh;
1036     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1037 }
1038
1039 =head2 GetImportBatchRangeDesc
1040
1041   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1042
1043 Returns a reference to an array of hash references corresponding to
1044 import_batches rows (sorted in descending order by import_batch_id)
1045 start at the given offset.
1046
1047 =cut
1048
1049 sub GetImportBatchRangeDesc {
1050     my ($offset, $results_per_group) = @_;
1051
1052     my $dbh = C4::Context->dbh;
1053     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1054                                     LEFT JOIN import_batch_profiles p
1055                                     ON b.profile_id = p.id
1056                                     WHERE b.batch_type IN ('batch', 'webservice')
1057                                     ORDER BY b.import_batch_id DESC";
1058     my @params;
1059     if ($results_per_group){
1060         $query .= " LIMIT ?";
1061         push(@params, $results_per_group);
1062     }
1063     if ($offset){
1064         $query .= " OFFSET ?";
1065         push(@params, $offset);
1066     }
1067     my $sth = $dbh->prepare_cached($query);
1068     $sth->execute(@params);
1069     my $results = $sth->fetchall_arrayref({});
1070     $sth->finish();
1071     return $results;
1072 }
1073
1074 =head2 GetItemNumbersFromImportBatch
1075
1076   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1077
1078 =cut
1079
1080 sub GetItemNumbersFromImportBatch {
1081     my ($batch_id) = @_;
1082     my $dbh = C4::Context->dbh;
1083     my $sql = q|
1084 SELECT itemnumber FROM import_items
1085 INNER JOIN items USING (itemnumber)
1086 INNER JOIN import_records USING (import_record_id)
1087 WHERE import_batch_id = ?|;
1088     my  $sth = $dbh->prepare( $sql );
1089     $sth->execute($batch_id);
1090     my @items ;
1091     while ( my ($itm) = $sth->fetchrow_array ) {
1092         push @items, $itm;
1093     }
1094     return @items;
1095 }
1096
1097 =head2 GetNumberOfImportBatches
1098
1099   my $count = GetNumberOfImportBatches();
1100
1101 =cut
1102
1103 sub GetNumberOfNonZ3950ImportBatches {
1104     my $dbh = C4::Context->dbh;
1105     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1106     $sth->execute();
1107     my ($count) = $sth->fetchrow_array();
1108     $sth->finish();
1109     return $count;
1110 }
1111
1112 =head2 GetImportBiblios
1113
1114   my $results = GetImportBiblios($importid);
1115
1116 =cut
1117
1118 sub GetImportBiblios {
1119     my ($import_record_id) = @_;
1120
1121     my $dbh = C4::Context->dbh;
1122     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1123     return $dbh->selectall_arrayref(
1124         $query,
1125         { Slice => {} },
1126         $import_record_id
1127     );
1128
1129 }
1130
1131 =head2 GetImportRecordsRange
1132
1133   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1134
1135 Returns a reference to an array of hash references corresponding to
1136 import_biblios/import_auths/import_records rows for a given batch
1137 starting at the given offset.
1138
1139 =cut
1140
1141 sub GetImportRecordsRange {
1142     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1143
1144     my $dbh = C4::Context->dbh;
1145
1146     my $order_by = $parameters->{order_by} || 'import_record_id';
1147     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1148
1149     my $order_by_direction =
1150       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1151
1152     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1153
1154     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1155                                            record_sequence, status, overlay_status,
1156                                            matched_biblionumber, matched_authid, record_type
1157                                     FROM   import_records
1158                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1159                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1160                                     WHERE  import_batch_id = ?";
1161     my @params;
1162     push(@params, $batch_id);
1163     if ($status) {
1164         $query .= " AND status=?";
1165         push(@params,$status);
1166     }
1167
1168     $query.=" ORDER BY $order_by $order_by_direction";
1169
1170     if($results_per_group){
1171         $query .= " LIMIT ?";
1172         push(@params, $results_per_group);
1173     }
1174     if($offset){
1175         $query .= " OFFSET ?";
1176         push(@params, $offset);
1177     }
1178     my $sth = $dbh->prepare_cached($query);
1179     $sth->execute(@params);
1180     my $results = $sth->fetchall_arrayref({});
1181     $sth->finish();
1182     return $results;
1183
1184 }
1185
1186 =head2 GetBestRecordMatch
1187
1188   my $record_id = GetBestRecordMatch($import_record_id);
1189
1190 =cut
1191
1192 sub GetBestRecordMatch {
1193     my ($import_record_id) = @_;
1194
1195     my $dbh = C4::Context->dbh;
1196     my $sth = $dbh->prepare("SELECT candidate_match_id
1197                              FROM   import_record_matches
1198                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1199                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1200                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1201                              WHERE  import_record_matches.import_record_id = ? AND
1202                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1203                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1204                              AND chosen = 1
1205                              ORDER BY score DESC, candidate_match_id DESC");
1206     $sth->execute($import_record_id);
1207     my ($record_id) = $sth->fetchrow_array();
1208     $sth->finish();
1209     return $record_id;
1210 }
1211
1212 =head2 GetImportBatchStatus
1213
1214   my $status = GetImportBatchStatus($batch_id);
1215
1216 =cut
1217
1218 sub GetImportBatchStatus {
1219     my ($batch_id) = @_;
1220
1221     my $dbh = C4::Context->dbh;
1222     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1223     $sth->execute($batch_id);
1224     my ($status) = $sth->fetchrow_array();
1225     $sth->finish();
1226     return $status;
1227
1228 }
1229
1230 =head2 SetImportBatchStatus
1231
1232   SetImportBatchStatus($batch_id, $new_status);
1233
1234 =cut
1235
1236 sub SetImportBatchStatus {
1237     my ($batch_id, $new_status) = @_;
1238
1239     my $dbh = C4::Context->dbh;
1240     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1241     $sth->execute($new_status, $batch_id);
1242     $sth->finish();
1243
1244 }
1245
1246 =head2 SetMatchedBiblionumber
1247
1248   SetMatchedBiblionumber($import_record_id, $biblionumber);
1249
1250 =cut
1251
1252 sub SetMatchedBiblionumber {
1253     my ($import_record_id, $biblionumber) = @_;
1254
1255     my $dbh = C4::Context->dbh;
1256     $dbh->do(
1257         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1258         undef, $biblionumber, $import_record_id
1259     );
1260 }
1261
1262 =head2 GetImportBatchOverlayAction
1263
1264   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1265
1266 =cut
1267
1268 sub GetImportBatchOverlayAction {
1269     my ($batch_id) = @_;
1270
1271     my $dbh = C4::Context->dbh;
1272     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1273     $sth->execute($batch_id);
1274     my ($overlay_action) = $sth->fetchrow_array();
1275     $sth->finish();
1276     return $overlay_action;
1277
1278 }
1279
1280
1281 =head2 SetImportBatchOverlayAction
1282
1283   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1284
1285 =cut
1286
1287 sub SetImportBatchOverlayAction {
1288     my ($batch_id, $new_overlay_action) = @_;
1289
1290     my $dbh = C4::Context->dbh;
1291     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1292     $sth->execute($new_overlay_action, $batch_id);
1293     $sth->finish();
1294
1295 }
1296
1297 =head2 GetImportBatchNoMatchAction
1298
1299   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1300
1301 =cut
1302
1303 sub GetImportBatchNoMatchAction {
1304     my ($batch_id) = @_;
1305
1306     my $dbh = C4::Context->dbh;
1307     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1308     $sth->execute($batch_id);
1309     my ($nomatch_action) = $sth->fetchrow_array();
1310     $sth->finish();
1311     return $nomatch_action;
1312
1313 }
1314
1315
1316 =head2 SetImportBatchNoMatchAction
1317
1318   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1319
1320 =cut
1321
1322 sub SetImportBatchNoMatchAction {
1323     my ($batch_id, $new_nomatch_action) = @_;
1324
1325     my $dbh = C4::Context->dbh;
1326     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1327     $sth->execute($new_nomatch_action, $batch_id);
1328     $sth->finish();
1329
1330 }
1331
1332 =head2 GetImportBatchItemAction
1333
1334   my $item_action = GetImportBatchItemAction($batch_id);
1335
1336 =cut
1337
1338 sub GetImportBatchItemAction {
1339     my ($batch_id) = @_;
1340
1341     my $dbh = C4::Context->dbh;
1342     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1343     $sth->execute($batch_id);
1344     my ($item_action) = $sth->fetchrow_array();
1345     $sth->finish();
1346     return $item_action;
1347
1348 }
1349
1350
1351 =head2 SetImportBatchItemAction
1352
1353   SetImportBatchItemAction($batch_id, $new_item_action);
1354
1355 =cut
1356
1357 sub SetImportBatchItemAction {
1358     my ($batch_id, $new_item_action) = @_;
1359
1360     my $dbh = C4::Context->dbh;
1361     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1362     $sth->execute($new_item_action, $batch_id);
1363     $sth->finish();
1364
1365 }
1366
1367 =head2 GetImportBatchMatcher
1368
1369   my $matcher_id = GetImportBatchMatcher($batch_id);
1370
1371 =cut
1372
1373 sub GetImportBatchMatcher {
1374     my ($batch_id) = @_;
1375
1376     my $dbh = C4::Context->dbh;
1377     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1378     $sth->execute($batch_id);
1379     my ($matcher_id) = $sth->fetchrow_array();
1380     $sth->finish();
1381     return $matcher_id;
1382
1383 }
1384
1385
1386 =head2 SetImportBatchMatcher
1387
1388   SetImportBatchMatcher($batch_id, $new_matcher_id);
1389
1390 =cut
1391
1392 sub SetImportBatchMatcher {
1393     my ($batch_id, $new_matcher_id) = @_;
1394
1395     my $dbh = C4::Context->dbh;
1396     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1397     $sth->execute($new_matcher_id, $batch_id);
1398     $sth->finish();
1399
1400 }
1401
1402 =head2 GetImportRecordOverlayStatus
1403
1404   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1405
1406 =cut
1407
1408 sub GetImportRecordOverlayStatus {
1409     my ($import_record_id) = @_;
1410
1411     my $dbh = C4::Context->dbh;
1412     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1413     $sth->execute($import_record_id);
1414     my ($overlay_status) = $sth->fetchrow_array();
1415     $sth->finish();
1416     return $overlay_status;
1417
1418 }
1419
1420
1421 =head2 SetImportRecordOverlayStatus
1422
1423   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1424
1425 =cut
1426
1427 sub SetImportRecordOverlayStatus {
1428     my ($import_record_id, $new_overlay_status) = @_;
1429
1430     my $dbh = C4::Context->dbh;
1431     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1432     $sth->execute($new_overlay_status, $import_record_id);
1433     $sth->finish();
1434
1435 }
1436
1437 =head2 GetImportRecordStatus
1438
1439   my $status = GetImportRecordStatus($import_record_id);
1440
1441 =cut
1442
1443 sub GetImportRecordStatus {
1444     my ($import_record_id) = @_;
1445
1446     my $dbh = C4::Context->dbh;
1447     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1448     $sth->execute($import_record_id);
1449     my ($status) = $sth->fetchrow_array();
1450     $sth->finish();
1451     return $status;
1452
1453 }
1454
1455
1456 =head2 SetImportRecordStatus
1457
1458   SetImportRecordStatus($import_record_id, $new_status);
1459
1460 =cut
1461
1462 sub SetImportRecordStatus {
1463     my ($import_record_id, $new_status) = @_;
1464
1465     my $dbh = C4::Context->dbh;
1466     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1467     $sth->execute($new_status, $import_record_id);
1468     $sth->finish();
1469
1470 }
1471
1472 =head2 GetImportRecordMatches
1473
1474   my $results = GetImportRecordMatches($import_record_id, $best_only);
1475
1476 =cut
1477
1478 sub GetImportRecordMatches {
1479     my $import_record_id = shift;
1480     my $best_only = @_ ? shift : 0;
1481
1482     my $dbh = C4::Context->dbh;
1483     # FIXME currently biblio only
1484     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1485                                     candidate_match_id, score, record_type,
1486                                     chosen
1487                                     FROM import_records
1488                                     JOIN import_record_matches USING (import_record_id)
1489                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1490                                     WHERE import_record_id = ?
1491                                     ORDER BY score DESC, biblionumber DESC");
1492     $sth->bind_param(1, $import_record_id);
1493     my $results = [];
1494     $sth->execute();
1495     while (my $row = $sth->fetchrow_hashref) {
1496         if ($row->{'record_type'} eq 'auth') {
1497             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1498         }
1499         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1500         push @$results, $row;
1501         last if $best_only;
1502     }
1503     $sth->finish();
1504
1505     return $results;
1506     
1507 }
1508
1509 =head2 SetImportRecordMatches
1510
1511   SetImportRecordMatches($import_record_id, @matches);
1512
1513 =cut
1514
1515 sub SetImportRecordMatches {
1516     my $import_record_id = shift;
1517     my @matches = @_;
1518
1519     my $dbh = C4::Context->dbh;
1520     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1521     $delsth->execute($import_record_id);
1522     $delsth->finish();
1523
1524     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1525                                     VALUES (?, ?, ?, ?)");
1526     my $chosen = 1; #The first match is defaulted to be chosen
1527     foreach my $match (@matches) {
1528         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1529         $chosen = 0; #After the first we do not default to other matches
1530     }
1531 }
1532
1533 =head2 RecordsFromISO2709File
1534
1535     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1536
1537 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1538
1539 @PARAM1, String, absolute path to the ISO2709 file.
1540 @PARAM2, String, see stage_file.pl
1541 @PARAM3, String, should be utf8
1542
1543 Returns two array refs.
1544
1545 =cut
1546
1547 sub RecordsFromISO2709File {
1548     my ($input_file, $record_type, $encoding) = @_;
1549     my @errors;
1550
1551     my $marc_type = C4::Context->preference('marcflavour');
1552     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1553
1554     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1555     my @marc_records;
1556     $/ = "\035";
1557     while (<$fh>) {
1558         s/^\s+//;
1559         s/\s+$//;
1560         next unless $_; # skip if record has only whitespace, as might occur
1561                         # if file includes newlines between each MARC record
1562         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1563         push @marc_records, $marc_record;
1564         if ($charset_guessed ne $encoding) {
1565             push @errors,
1566                 "Unexpected charset $charset_guessed, expecting $encoding";
1567         }
1568     }
1569     close $fh;
1570     return ( \@errors, \@marc_records );
1571 }
1572
1573 =head2 RecordsFromMARCXMLFile
1574
1575     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1576
1577 Creates MARC::Record-objects out of the given MARCXML-file.
1578
1579 @PARAM1, String, absolute path to the ISO2709 file.
1580 @PARAM2, String, should be utf8
1581
1582 Returns two array refs.
1583
1584 =cut
1585
1586 sub RecordsFromMARCXMLFile {
1587     my ( $filename, $encoding ) = @_;
1588     my $batch = MARC::File::XML->in( $filename );
1589     my ( @marcRecords, @errors, $record );
1590     do {
1591         eval { $record = $batch->next( $encoding ); };
1592         if ($@) {
1593             push @errors, $@;
1594         }
1595         push @marcRecords, $record if $record;
1596     } while( $record );
1597     return (\@errors, \@marcRecords);
1598 }
1599
1600 =head2 RecordsFromMarcPlugin
1601
1602     Converts text of input_file into array of MARC records with to_marc plugin
1603
1604 =cut
1605
1606 sub RecordsFromMarcPlugin {
1607     my ($input_file, $plugin_class, $encoding) = @_;
1608     my ( $text, @return );
1609     return \@return if !$input_file || !$plugin_class;
1610
1611     # Read input file
1612     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1613     $/ = "\035";
1614     while (<$fh>) {
1615         s/^\s+//;
1616         s/\s+$//;
1617         next unless $_;
1618         $text .= $_;
1619     }
1620     close $fh;
1621
1622     # Convert to large MARC blob with plugin
1623     $text = Koha::Plugins::Handler->run({
1624         class  => $plugin_class,
1625         method => 'to_marc',
1626         params => { data => $text },
1627     }) if $text;
1628
1629     # Convert to array of MARC records
1630     if( $text ) {
1631         my $marc_type = C4::Context->preference('marcflavour');
1632         foreach my $blob ( split(/\x1D/, $text) ) {
1633             next if $blob =~ /^\s*$/;
1634             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1635             push @return, $marcrecord;
1636         }
1637     }
1638     return \@return;
1639 }
1640
1641 # internal functions
1642
1643 sub _create_import_record {
1644     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1645
1646     my $dbh = C4::Context->dbh;
1647     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1648                                                          record_type, encoding)
1649                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1650     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1651                   $record_type, $encoding);
1652     my $import_record_id = $dbh->{'mysql_insertid'};
1653     $sth->finish();
1654     return $import_record_id;
1655 }
1656
1657 sub _add_auth_fields {
1658     my ($import_record_id, $marc_record) = @_;
1659
1660     my $controlnumber;
1661     if ($marc_record->field('001')) {
1662         $controlnumber = $marc_record->field('001')->data();
1663     }
1664     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1665     my $dbh = C4::Context->dbh;
1666     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1667     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1668     $sth->finish();
1669 }
1670
1671 sub _add_biblio_fields {
1672     my ($import_record_id, $marc_record) = @_;
1673
1674     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1675     my $dbh = C4::Context->dbh;
1676     # FIXME no controlnumber, originalsource
1677     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1678     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1679     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1680     $sth->finish();
1681                 
1682 }
1683
1684 sub _update_biblio_fields {
1685     my ($import_record_id, $marc_record) = @_;
1686
1687     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1688     my $dbh = C4::Context->dbh;
1689     # FIXME no controlnumber, originalsource
1690     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1691     $isbn =~ s/\(.*$//;
1692     $isbn =~ tr/ -_//;
1693     $isbn = uc $isbn;
1694     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1695                              WHERE  import_record_id = ?");
1696     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1697     $sth->finish();
1698 }
1699
1700 sub _parse_biblio_fields {
1701     my ($marc_record) = @_;
1702
1703     my $dbh = C4::Context->dbh;
1704     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1705     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1706
1707 }
1708
1709 sub _update_batch_record_counts {
1710     my ($batch_id) = @_;
1711
1712     my $dbh = C4::Context->dbh;
1713     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1714                                         num_records = (
1715                                             SELECT COUNT(*)
1716                                             FROM import_records
1717                                             WHERE import_batch_id = import_batches.import_batch_id),
1718                                         num_items = (
1719                                             SELECT COUNT(*)
1720                                             FROM import_records
1721                                             JOIN import_items USING (import_record_id)
1722                                             WHERE import_batch_id = import_batches.import_batch_id
1723                                             AND record_type = 'biblio')
1724                                     WHERE import_batch_id = ?");
1725     $sth->bind_param(1, $batch_id);
1726     $sth->execute();
1727     $sth->finish();
1728 }
1729
1730 sub _get_commit_action {
1731     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1732     
1733     if ($record_type eq 'biblio') {
1734         my ($bib_result, $bib_match, $item_result);
1735
1736         $bib_match = GetBestRecordMatch($import_record_id);
1737         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1738
1739             $bib_result = $overlay_action;
1740
1741             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1742                 $item_result = 'create_new';
1743             } elsif($item_action eq 'replace'){
1744                 $item_result = 'replace';
1745             } else {
1746                 $item_result = 'ignore';
1747             }
1748
1749         } else {
1750             $bib_result = $nomatch_action;
1751             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1752         }
1753         return ($bib_result, $item_result, $bib_match);
1754     } else { # must be auths
1755         my ($auth_result, $auth_match);
1756
1757         $auth_match = GetBestRecordMatch($import_record_id);
1758         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1759             $auth_result = $overlay_action;
1760         } else {
1761             $auth_result = $nomatch_action;
1762         }
1763
1764         return ($auth_result, undef, $auth_match);
1765
1766     }
1767 }
1768
1769 sub _get_revert_action {
1770     my ($overlay_action, $overlay_status, $status) = @_;
1771
1772     my $bib_result;
1773
1774     if ($status eq 'ignored') {
1775         $bib_result = 'ignore';
1776     } else {
1777         if ($overlay_action eq 'create_new') {
1778             $bib_result = 'delete';
1779         } else {
1780             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1781         }
1782     }
1783     return $bib_result;
1784 }
1785
1786 1;
1787 __END__
1788
1789 =head1 AUTHOR
1790
1791 Koha Development Team <http://koha-community.org/>
1792
1793 Galen Charlton <galen.charlton@liblime.com>
1794
1795 =cut