Bug 30716: Add collection to cn_browser results
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub GetRecordFromImportBiblio {
195     my ( $import_record_id, $embed_items ) = @_;
196
197     my ($marc) = GetImportRecordMarc($import_record_id);
198     my $record = MARC::Record->new_from_usmarc($marc);
199
200     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
201
202     return $record;
203 }
204
205 sub EmbedItemsInImportBiblio {
206     my ( $record, $import_record_id ) = @_;
207     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
208     my $dbh = C4::Context->dbh;
209     my $import_items = $dbh->selectall_arrayref(q|
210         SELECT import_items.marcxml
211         FROM import_items
212         WHERE import_record_id = ?
213     |, { Slice => {} }, $import_record_id );
214     my @item_fields;
215     for my $import_item ( @$import_items ) {
216         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
217         push @item_fields, $item_marc->field($itemtag);
218     }
219     $record->append_fields(@item_fields);
220     return $record;
221 }
222
223 =head2 GetImportRecordMarcXML
224
225   my $marcxml = GetImportRecordMarcXML($import_record_id);
226
227 =cut
228
229 sub GetImportRecordMarcXML {
230     my ($import_record_id) = @_;
231
232     my $dbh = C4::Context->dbh;
233     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
234     $sth->execute($import_record_id);
235     my ($marcxml) = $sth->fetchrow();
236     $sth->finish();
237     return $marcxml;
238
239 }
240
241 =head2 AddImportBatch
242
243   my $batch_id = AddImportBatch($params_hash);
244
245 =cut
246
247 sub AddImportBatch {
248     my ($params) = @_;
249
250     my (@fields, @vals);
251     foreach (qw( matcher_id template_id branchcode
252                  overlay_action nomatch_action item_action
253                  import_status batch_type file_name comments record_type )) {
254         if (exists $params->{$_}) {
255             push @fields, $_;
256             push @vals, $params->{$_};
257         }
258     }
259     my $dbh = C4::Context->dbh;
260     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
261                                   VALUES (".join( ',', map '?', @fields).")",
262              undef,
263              @vals);
264     return $dbh->{'mysql_insertid'};
265 }
266
267 =head2 GetImportBatch 
268
269   my $row = GetImportBatch($batch_id);
270
271 Retrieve a hashref of an import_batches row.
272
273 =cut
274
275 sub GetImportBatch {
276     my ($batch_id) = @_;
277
278     my $dbh = C4::Context->dbh;
279     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
280     $sth->bind_param(1, $batch_id);
281     $sth->execute();
282     my $result = $sth->fetchrow_hashref;
283     $sth->finish();
284     return $result;
285
286 }
287
288 =head2 AddBiblioToBatch 
289
290   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
291                 $marc_record, $encoding, $update_counts);
292
293 =cut
294
295 sub AddBiblioToBatch {
296     my $batch_id = shift;
297     my $record_sequence = shift;
298     my $marc_record = shift;
299     my $encoding = shift;
300     my $update_counts = @_ ? shift : 1;
301
302     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
303     _add_biblio_fields($import_record_id, $marc_record);
304     _update_batch_record_counts($batch_id) if $update_counts;
305     return $import_record_id;
306 }
307
308 =head2 AddAuthToBatch
309
310   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
311                 $marc_record, $encoding, $update_counts, [$marc_type]);
312
313 =cut
314
315 sub AddAuthToBatch {
316     my $batch_id = shift;
317     my $record_sequence = shift;
318     my $marc_record = shift;
319     my $encoding = shift;
320     my $update_counts = @_ ? shift : 1;
321     my $marc_type = shift || C4::Context->preference('marcflavour');
322
323     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
324
325     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
326     _add_auth_fields($import_record_id, $marc_record);
327     _update_batch_record_counts($batch_id) if $update_counts;
328     return $import_record_id;
329 }
330
331 =head2 BatchStageMarcRecords
332
333 ( $batch_id, $num_records, $num_items, @invalid_records ) =
334   BatchStageMarcRecords(
335     $record_type,                $encoding,
336     $marc_records,               $file_name,
337     $marc_modification_template, $comments,
338     $branch_code,                $parse_items,
339     $leave_as_staging,           $progress_interval,
340     $progress_callback
341   );
342
343 =cut
344
345 sub BatchStageMarcRecords {
346     my $record_type = shift;
347     my $encoding = shift;
348     my $marc_records = shift;
349     my $file_name = shift;
350     my $marc_modification_template = shift;
351     my $comments = shift;
352     my $branch_code = shift;
353     my $parse_items = shift;
354     my $leave_as_staging = shift;
355
356     # optional callback to monitor status 
357     # of job
358     my $progress_interval = 0;
359     my $progress_callback = undef;
360     if ($#_ == 1) {
361         $progress_interval = shift;
362         $progress_callback = shift;
363         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
364         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
365     } 
366     
367     my $batch_id = AddImportBatch( {
368             overlay_action => 'create_new',
369             import_status => 'staging',
370             batch_type => 'batch',
371             file_name => $file_name,
372             comments => $comments,
373             record_type => $record_type,
374         } );
375     if ($parse_items) {
376         SetImportBatchItemAction($batch_id, 'always_add');
377     } else {
378         SetImportBatchItemAction($batch_id, 'ignore');
379     }
380
381
382     my $marc_type = C4::Context->preference('marcflavour');
383     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
384     my @invalid_records = ();
385     my $num_valid = 0;
386     my $num_items = 0;
387     # FIXME - for now, we're dealing only with bibs
388     my $rec_num = 0;
389     foreach my $marc_record (@$marc_records) {
390         $rec_num++;
391         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
392             &$progress_callback($rec_num);
393         }
394
395         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
396
397         my $import_record_id;
398         if (scalar($marc_record->fields()) == 0) {
399             push @invalid_records, $marc_record;
400         } else {
401
402             # Normalize the record so it doesn't have separated diacritics
403             SetUTF8Flag($marc_record);
404
405             $num_valid++;
406             if ($record_type eq 'biblio') {
407                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
408                 if ($parse_items) {
409                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
410                     $num_items += scalar(@import_items_ids);
411                 }
412             } elsif ($record_type eq 'auth') {
413                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
414             }
415         }
416     }
417     unless ($leave_as_staging) {
418         SetImportBatchStatus($batch_id, 'staged');
419     }
420     # FIXME branch_code, number of bibs, number of items
421     _update_batch_record_counts($batch_id);
422     return ($batch_id, $num_valid, $num_items, @invalid_records);
423 }
424
425 =head2 AddItemsToImportBiblio
426
427   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
428                 $import_record_id, $marc_record, $update_counts);
429
430 =cut
431
432 sub AddItemsToImportBiblio {
433     my $batch_id = shift;
434     my $import_record_id = shift;
435     my $marc_record = shift;
436     my $update_counts = @_ ? shift : 0;
437
438     my @import_items_ids = ();
439    
440     my $dbh = C4::Context->dbh; 
441     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
442     foreach my $item_field ($marc_record->field($item_tag)) {
443         my $item_marc = MARC::Record->new();
444         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
445         $item_marc->append_fields($item_field);
446         $marc_record->delete_field($item_field);
447         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
448                                         VALUES (?, ?, ?)");
449         $sth->bind_param(1, $import_record_id);
450         $sth->bind_param(2, 'staged');
451         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
452         $sth->execute();
453         push @import_items_ids, $dbh->{'mysql_insertid'};
454         $sth->finish();
455     }
456
457     if ($#import_items_ids > -1) {
458         _update_batch_record_counts($batch_id) if $update_counts;
459         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
460     }
461     return @import_items_ids;
462 }
463
464 =head2 BatchFindDuplicates
465
466   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
467              $max_matches, $progress_interval, $progress_callback);
468
469 Goes through the records loaded in the batch and attempts to 
470 find duplicates for each one.  Sets the matching status 
471 of each record to "no_match" or "auto_match" as appropriate.
472
473 The $max_matches parameter is optional; if it is not supplied,
474 it defaults to 10.
475
476 The $progress_interval and $progress_callback parameters are 
477 optional; if both are supplied, the sub referred to by
478 $progress_callback will be invoked every $progress_interval
479 records using the number of records processed as the 
480 singular argument.
481
482 =cut
483
484 sub BatchFindDuplicates {
485     my $batch_id = shift;
486     my $matcher = shift;
487     my $max_matches = @_ ? shift : 10;
488
489     # optional callback to monitor status 
490     # of job
491     my $progress_interval = 0;
492     my $progress_callback = undef;
493     if ($#_ == 1) {
494         $progress_interval = shift;
495         $progress_callback = shift;
496         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
497         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
498     }
499
500     my $dbh = C4::Context->dbh;
501
502     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
503                              FROM import_records
504                              WHERE import_batch_id = ?");
505     $sth->execute($batch_id);
506     my $num_with_matches = 0;
507     my $rec_num = 0;
508     while (my $rowref = $sth->fetchrow_hashref) {
509         $rec_num++;
510         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
511             &$progress_callback($rec_num);
512         }
513         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
514         my @matches = ();
515         if (defined $matcher) {
516             @matches = $matcher->get_matches($marc_record, $max_matches);
517         }
518         if (scalar(@matches) > 0) {
519             $num_with_matches++;
520             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
521             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
522         } else {
523             SetImportRecordMatches($rowref->{'import_record_id'}, ());
524             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
525         }
526     }
527     $sth->finish();
528     return $num_with_matches;
529 }
530
531 =head2 BatchCommitRecords
532
533   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
534         BatchCommitRecords($batch_id, $framework,
535         $progress_interval, $progress_callback);
536
537 =cut
538
539 sub BatchCommitRecords {
540     my $batch_id = shift;
541     my $framework = shift;
542
543     my $schema = Koha::Database->schema;
544
545     # optional callback to monitor status 
546     # of job
547     my $progress_interval = 0;
548     my $progress_callback = undef;
549     if ($#_ == 1) {
550         $progress_interval = shift;
551         $progress_callback = shift;
552         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
553         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
554     }
555
556     my $record_type;
557     my $num_added = 0;
558     my $num_updated = 0;
559     my $num_items_added = 0;
560     my $num_items_replaced = 0;
561     my $num_items_errored = 0;
562     my $num_ignored = 0;
563     # commit (i.e., save, all records in the batch)
564     SetImportBatchStatus($batch_id, 'importing');
565     my $overlay_action = GetImportBatchOverlayAction($batch_id);
566     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
567     my $item_action = GetImportBatchItemAction($batch_id);
568     my $item_tag;
569     my $item_subfield;
570     my $dbh = C4::Context->dbh;
571     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
572                              FROM import_records
573                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
574                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
575                              WHERE import_batch_id = ?");
576     $sth->execute($batch_id);
577     my $marcflavour = C4::Context->preference('marcflavour');
578
579     my $userenv = C4::Context->userenv;
580     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
581
582     my $rec_num = 0;
583
584     $schema->txn_begin; # We commit in a transaction
585     while (my $rowref = $sth->fetchrow_hashref) {
586         $record_type = $rowref->{'record_type'};
587
588         $rec_num++;
589
590         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
591             # report progress and commit
592             $schema->txn_commit;
593             &$progress_callback( $rec_num );
594             $schema->txn_begin;
595         }
596         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
597             $num_ignored++;
598             next;
599         }
600
601         my $marc_type;
602         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
603             $marc_type = 'UNIMARCAUTH';
604         } elsif ($marcflavour eq 'UNIMARC') {
605             $marc_type = 'UNIMARC';
606         } else {
607             $marc_type = 'USMARC';
608         }
609         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
610
611         if ($record_type eq 'biblio') {
612             # remove any item tags - rely on BatchCommitItems
613             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
614             foreach my $item_field ($marc_record->field($item_tag)) {
615                 $marc_record->delete_field($item_field);
616             }
617         }
618
619         my ($record_result, $item_result, $record_match) =
620             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
621                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
622
623         my $recordid;
624         my $query;
625         if ($record_result eq 'create_new') {
626             $num_added++;
627             if ($record_type eq 'biblio') {
628                 my $biblioitemnumber;
629                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
630                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
631                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
632                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
633                     $num_items_added += $bib_items_added;
634                     $num_items_replaced += $bib_items_replaced;
635                     $num_items_errored += $bib_items_errored;
636                 }
637             } else {
638                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
639                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
640             }
641             my $sth = $dbh->prepare_cached($query);
642             $sth->execute($recordid, $rowref->{'import_record_id'});
643             $sth->finish();
644             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
645         } elsif ($record_result eq 'replace') {
646             $num_updated++;
647             $recordid = $record_match;
648             my $oldxml;
649             if ($record_type eq 'biblio') {
650                 my $oldbiblio = Koha::Biblios->find( $recordid );
651                 $oldxml = GetXmlBiblio($recordid);
652
653                 # remove item fields so that they don't get
654                 # added again if record is reverted
655                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
656                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
657                 foreach my $item_field ($old_marc->field($item_tag)) {
658                     $old_marc->delete_field($item_field);
659                 }
660                 $oldxml = $old_marc->as_xml($marc_type);
661
662                 my $context = { source => 'batchimport' };
663                 if ($logged_in_patron) {
664                     $context->{categorycode} = $logged_in_patron->categorycode;
665                     $context->{userid} = $logged_in_patron->userid;
666                 }
667
668                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
669                     overlay_context => $context
670                 });
671                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
672
673                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
674                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
675                     $num_items_added += $bib_items_added;
676                     $num_items_replaced += $bib_items_replaced;
677                     $num_items_errored += $bib_items_errored;
678                 }
679             } else {
680                 $oldxml = GetAuthorityXML($recordid);
681
682                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
683                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
684             }
685             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
686             $sth->execute($oldxml, $rowref->{'import_record_id'});
687             $sth->finish();
688             my $sth2 = $dbh->prepare_cached($query);
689             $sth2->execute($recordid, $rowref->{'import_record_id'});
690             $sth2->finish();
691             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
692             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
693         } elsif ($record_result eq 'ignore') {
694             $recordid = $record_match;
695             $num_ignored++;
696             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
697                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
698                 $num_items_added += $bib_items_added;
699          $num_items_replaced += $bib_items_replaced;
700                 $num_items_errored += $bib_items_errored;
701                 # still need to record the matched biblionumber so that the
702                 # items can be reverted
703                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
704                 $sth2->execute($recordid, $rowref->{'import_record_id'});
705                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
706             }
707             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
708         }
709     }
710     $schema->txn_commit; # Commit final records that may not have hit callback threshold
711     $sth->finish();
712     SetImportBatchStatus($batch_id, 'imported');
713     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
714 }
715
716 =head2 BatchCommitItems
717
718   ($num_items_added, $num_items_errored) = 
719          BatchCommitItems($import_record_id, $biblionumber);
720
721 =cut
722
723 sub BatchCommitItems {
724     my ( $import_record_id, $biblionumber, $action ) = @_;
725
726     my $dbh = C4::Context->dbh;
727
728     my $num_items_added = 0;
729     my $num_items_errored = 0;
730     my $num_items_replaced = 0;
731
732     my $sth = $dbh->prepare( "
733         SELECT import_items_id, import_items.marcxml, encoding
734         FROM import_items
735         JOIN import_records USING (import_record_id)
736         WHERE import_record_id = ?
737         ORDER BY import_items_id
738     " );
739     $sth->bind_param( 1, $import_record_id );
740     $sth->execute();
741
742     while ( my $row = $sth->fetchrow_hashref() ) {
743         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
744
745         # Delete date_due subfield as to not accidentally delete item checkout due dates
746         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
747         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
748
749         my $item = TransformMarcToKoha( $item_marc );
750
751         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
752         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
753
754         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
755         if ( $action eq "replace" && $duplicate_itemnumber ) {
756             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
757             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
758             $updsth->bind_param( 1, 'imported' );
759             $updsth->bind_param( 2, $item->{itemnumber} );
760             $updsth->bind_param( 3, undef );
761             $updsth->bind_param( 4, $row->{'import_items_id'} );
762             $updsth->execute();
763             $updsth->finish();
764             $num_items_replaced++;
765         } elsif ( $action eq "replace" && $duplicate_barcode ) {
766             my $itemnumber = $duplicate_barcode->itemnumber;
767             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
768             $updsth->bind_param( 1, 'imported' );
769             $updsth->bind_param( 2, $item->{itemnumber} );
770             $updsth->bind_param( 3, undef );
771             $updsth->bind_param( 4, $row->{'import_items_id'} );
772             $updsth->execute();
773             $updsth->finish();
774             $num_items_replaced++;
775         } elsif ($duplicate_barcode) {
776             $updsth->bind_param( 1, 'error' );
777             $updsth->bind_param( 2, undef );
778             $updsth->bind_param( 3, 'duplicate item barcode' );
779             $updsth->bind_param( 4, $row->{'import_items_id'} );
780             $updsth->execute();
781             $num_items_errored++;
782         } else {
783             # Remove the itemnumber if it exists, we want to create a new item
784             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
785             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
786
787             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
788             if( $itemnumber ) {
789                 $updsth->bind_param( 1, 'imported' );
790                 $updsth->bind_param( 2, $itemnumber );
791                 $updsth->bind_param( 3, undef );
792                 $updsth->bind_param( 4, $row->{'import_items_id'} );
793                 $updsth->execute();
794                 $updsth->finish();
795                 $num_items_added++;
796             }
797         }
798     }
799
800     return ( $num_items_added, $num_items_replaced, $num_items_errored );
801 }
802
803 =head2 BatchRevertRecords
804
805   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
806       $num_ignored) = BatchRevertRecords($batch_id);
807
808 =cut
809
810 sub BatchRevertRecords {
811     my $batch_id = shift;
812
813     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
814
815     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
816
817     my $record_type;
818     my $num_deleted = 0;
819     my $num_errors = 0;
820     my $num_reverted = 0;
821     my $num_ignored = 0;
822     my $num_items_deleted = 0;
823     # commit (i.e., save, all records in the batch)
824     SetImportBatchStatus($batch_id, 'reverting');
825     my $overlay_action = GetImportBatchOverlayAction($batch_id);
826     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
827     my $dbh = C4::Context->dbh;
828     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
829                              FROM import_records
830                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
831                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
832                              WHERE import_batch_id = ?");
833     $sth->execute($batch_id);
834     my $marc_type;
835     my $marcflavour = C4::Context->preference('marcflavour');
836     while (my $rowref = $sth->fetchrow_hashref) {
837         $record_type = $rowref->{'record_type'};
838         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
839             $num_ignored++;
840             next;
841         }
842         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
843             $marc_type = 'UNIMARCAUTH';
844         } elsif ($marcflavour eq 'UNIMARC') {
845             $marc_type = 'UNIMARC';
846         } else {
847             $marc_type = 'USMARC';
848         }
849
850         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
851
852         if ($record_result eq 'delete') {
853             my $error = undef;
854             if  ($record_type eq 'biblio') {
855                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
856                 $error = DelBiblio($rowref->{'matched_biblionumber'});
857             } else {
858                 DelAuthority({ authid => $rowref->{'matched_authid'} });
859             }
860             if (defined $error) {
861                 $num_errors++;
862             } else {
863                 $num_deleted++;
864                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
865             }
866         } elsif ($record_result eq 'restore') {
867             $num_reverted++;
868             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
869             if ($record_type eq 'biblio') {
870                 my $biblionumber = $rowref->{'matched_biblionumber'};
871                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
872
873                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
874                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
875
876                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
877                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
878             } else {
879                 my $authid = $rowref->{'matched_authid'};
880                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
881             }
882             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
883         } elsif ($record_result eq 'ignore') {
884             if ($record_type eq 'biblio') {
885                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
886             }
887             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
888         }
889         my $query;
890         if ($record_type eq 'biblio') {
891             # remove matched_biblionumber only if there is no 'imported' item left
892             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
893             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
894         } else {
895             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
896         }
897         my $sth2 = $dbh->prepare_cached($query);
898         $sth2->execute($rowref->{'import_record_id'});
899     }
900
901     $sth->finish();
902     SetImportBatchStatus($batch_id, 'reverted');
903     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
904 }
905
906 =head2 BatchRevertItems
907
908   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
909
910 =cut
911
912 sub BatchRevertItems {
913     my ($import_record_id, $biblionumber) = @_;
914
915     my $dbh = C4::Context->dbh;
916     my $num_items_deleted = 0;
917
918     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
919                                    FROM import_items
920                                    JOIN items USING (itemnumber)
921                                    WHERE import_record_id = ?");
922     $sth->bind_param(1, $import_record_id);
923     $sth->execute();
924     while (my $row = $sth->fetchrow_hashref()) {
925         my $item = Koha::Items->find($row->{itemnumber});
926         if ($item->safe_delete){
927             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
928             $updsth->bind_param(1, 'reverted');
929             $updsth->bind_param(2, $row->{'import_items_id'});
930             $updsth->execute();
931             $updsth->finish();
932             $num_items_deleted++;
933         }
934         else {
935             next;
936         }
937     }
938     $sth->finish();
939     return $num_items_deleted;
940 }
941
942 =head2 CleanBatch
943
944   CleanBatch($batch_id)
945
946 Deletes all staged records from the import batch
947 and sets the status of the batch to 'cleaned'.  Note
948 that deleting a stage record does *not* affect
949 any record that has been committed to the database.
950
951 =cut
952
953 sub CleanBatch {
954     my $batch_id = shift;
955     return unless defined $batch_id;
956
957     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
958     SetImportBatchStatus($batch_id, 'cleaned');
959 }
960
961 =head2 DeleteBatch
962
963   DeleteBatch($batch_id)
964
965 Deletes the record from the database. This can only be done
966 once the batch has been cleaned.
967
968 =cut
969
970 sub DeleteBatch {
971     my $batch_id = shift;
972     return unless defined $batch_id;
973
974     my $dbh = C4::Context->dbh;
975     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
976     $sth->execute( $batch_id );
977 }
978
979 =head2 GetAllImportBatches
980
981   my $results = GetAllImportBatches();
982
983 Returns a references to an array of hash references corresponding
984 to all import_batches rows (of batch_type 'batch'), sorted in 
985 ascending order by import_batch_id.
986
987 =cut
988
989 sub  GetAllImportBatches {
990     my $dbh = C4::Context->dbh;
991     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
992                                     WHERE batch_type IN ('batch', 'webservice')
993                                     ORDER BY import_batch_id ASC");
994
995     my $results = [];
996     $sth->execute();
997     while (my $row = $sth->fetchrow_hashref) {
998         push @$results, $row;
999     }
1000     $sth->finish();
1001     return $results;
1002 }
1003
1004 =head2 GetStagedWebserviceBatches
1005
1006   my $batch_ids = GetStagedWebserviceBatches();
1007
1008 Returns a references to an array of batch id's
1009 of batch_type 'webservice' that are not imported
1010
1011 =cut
1012
1013 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1014 SELECT import_batch_id FROM import_batches
1015 WHERE batch_type = 'webservice'
1016 AND import_status = 'staged'
1017 EOQ
1018 sub  GetStagedWebserviceBatches {
1019     my $dbh = C4::Context->dbh;
1020     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1021 }
1022
1023 =head2 GetImportBatchRangeDesc
1024
1025   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1026
1027 Returns a reference to an array of hash references corresponding to
1028 import_batches rows (sorted in descending order by import_batch_id)
1029 start at the given offset.
1030
1031 =cut
1032
1033 sub GetImportBatchRangeDesc {
1034     my ($offset, $results_per_group) = @_;
1035
1036     my $dbh = C4::Context->dbh;
1037     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1038                                     LEFT JOIN import_batch_profiles p
1039                                     ON b.profile_id = p.id
1040                                     WHERE b.batch_type IN ('batch', 'webservice')
1041                                     ORDER BY b.import_batch_id DESC";
1042     my @params;
1043     if ($results_per_group){
1044         $query .= " LIMIT ?";
1045         push(@params, $results_per_group);
1046     }
1047     if ($offset){
1048         $query .= " OFFSET ?";
1049         push(@params, $offset);
1050     }
1051     my $sth = $dbh->prepare_cached($query);
1052     $sth->execute(@params);
1053     my $results = $sth->fetchall_arrayref({});
1054     $sth->finish();
1055     return $results;
1056 }
1057
1058 =head2 GetItemNumbersFromImportBatch
1059
1060   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1061
1062 =cut
1063
1064 sub GetItemNumbersFromImportBatch {
1065     my ($batch_id) = @_;
1066     my $dbh = C4::Context->dbh;
1067     my $sql = q|
1068 SELECT itemnumber FROM import_items
1069 INNER JOIN items USING (itemnumber)
1070 INNER JOIN import_records USING (import_record_id)
1071 WHERE import_batch_id = ?|;
1072     my  $sth = $dbh->prepare( $sql );
1073     $sth->execute($batch_id);
1074     my @items ;
1075     while ( my ($itm) = $sth->fetchrow_array ) {
1076         push @items, $itm;
1077     }
1078     return @items;
1079 }
1080
1081 =head2 GetNumberOfImportBatches
1082
1083   my $count = GetNumberOfImportBatches();
1084
1085 =cut
1086
1087 sub GetNumberOfNonZ3950ImportBatches {
1088     my $dbh = C4::Context->dbh;
1089     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1090     $sth->execute();
1091     my ($count) = $sth->fetchrow_array();
1092     $sth->finish();
1093     return $count;
1094 }
1095
1096 =head2 GetImportBiblios
1097
1098   my $results = GetImportBiblios($importid);
1099
1100 =cut
1101
1102 sub GetImportBiblios {
1103     my ($import_record_id) = @_;
1104
1105     my $dbh = C4::Context->dbh;
1106     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1107     return $dbh->selectall_arrayref(
1108         $query,
1109         { Slice => {} },
1110         $import_record_id
1111     );
1112
1113 }
1114
1115 =head2 GetImportRecordsRange
1116
1117   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1118
1119 Returns a reference to an array of hash references corresponding to
1120 import_biblios/import_auths/import_records rows for a given batch
1121 starting at the given offset.
1122
1123 =cut
1124
1125 sub GetImportRecordsRange {
1126     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1127
1128     my $dbh = C4::Context->dbh;
1129
1130     my $order_by = $parameters->{order_by} || 'import_record_id';
1131     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1132
1133     my $order_by_direction =
1134       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1135
1136     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1137
1138     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1139                                            record_sequence, status, overlay_status,
1140                                            matched_biblionumber, matched_authid, record_type
1141                                     FROM   import_records
1142                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1143                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1144                                     WHERE  import_batch_id = ?";
1145     my @params;
1146     push(@params, $batch_id);
1147     if ($status) {
1148         $query .= " AND status=?";
1149         push(@params,$status);
1150     }
1151
1152     $query.=" ORDER BY $order_by $order_by_direction";
1153
1154     if($results_per_group){
1155         $query .= " LIMIT ?";
1156         push(@params, $results_per_group);
1157     }
1158     if($offset){
1159         $query .= " OFFSET ?";
1160         push(@params, $offset);
1161     }
1162     my $sth = $dbh->prepare_cached($query);
1163     $sth->execute(@params);
1164     my $results = $sth->fetchall_arrayref({});
1165     $sth->finish();
1166     return $results;
1167
1168 }
1169
1170 =head2 GetBestRecordMatch
1171
1172   my $record_id = GetBestRecordMatch($import_record_id);
1173
1174 =cut
1175
1176 sub GetBestRecordMatch {
1177     my ($import_record_id) = @_;
1178
1179     my $dbh = C4::Context->dbh;
1180     my $sth = $dbh->prepare("SELECT candidate_match_id
1181                              FROM   import_record_matches
1182                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1183                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1184                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1185                              WHERE  import_record_matches.import_record_id = ? AND
1186                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1187                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1188                              AND chosen = 1
1189                              ORDER BY score DESC, candidate_match_id DESC");
1190     $sth->execute($import_record_id);
1191     my ($record_id) = $sth->fetchrow_array();
1192     $sth->finish();
1193     return $record_id;
1194 }
1195
1196 =head2 GetImportBatchStatus
1197
1198   my $status = GetImportBatchStatus($batch_id);
1199
1200 =cut
1201
1202 sub GetImportBatchStatus {
1203     my ($batch_id) = @_;
1204
1205     my $dbh = C4::Context->dbh;
1206     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1207     $sth->execute($batch_id);
1208     my ($status) = $sth->fetchrow_array();
1209     $sth->finish();
1210     return $status;
1211
1212 }
1213
1214 =head2 SetImportBatchStatus
1215
1216   SetImportBatchStatus($batch_id, $new_status);
1217
1218 =cut
1219
1220 sub SetImportBatchStatus {
1221     my ($batch_id, $new_status) = @_;
1222
1223     my $dbh = C4::Context->dbh;
1224     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1225     $sth->execute($new_status, $batch_id);
1226     $sth->finish();
1227
1228 }
1229
1230 =head2 SetMatchedBiblionumber
1231
1232   SetMatchedBiblionumber($import_record_id, $biblionumber);
1233
1234 =cut
1235
1236 sub SetMatchedBiblionumber {
1237     my ($import_record_id, $biblionumber) = @_;
1238
1239     my $dbh = C4::Context->dbh;
1240     $dbh->do(
1241         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1242         undef, $biblionumber, $import_record_id
1243     );
1244 }
1245
1246 =head2 GetImportBatchOverlayAction
1247
1248   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1249
1250 =cut
1251
1252 sub GetImportBatchOverlayAction {
1253     my ($batch_id) = @_;
1254
1255     my $dbh = C4::Context->dbh;
1256     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1257     $sth->execute($batch_id);
1258     my ($overlay_action) = $sth->fetchrow_array();
1259     $sth->finish();
1260     return $overlay_action;
1261
1262 }
1263
1264
1265 =head2 SetImportBatchOverlayAction
1266
1267   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1268
1269 =cut
1270
1271 sub SetImportBatchOverlayAction {
1272     my ($batch_id, $new_overlay_action) = @_;
1273
1274     my $dbh = C4::Context->dbh;
1275     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1276     $sth->execute($new_overlay_action, $batch_id);
1277     $sth->finish();
1278
1279 }
1280
1281 =head2 GetImportBatchNoMatchAction
1282
1283   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1284
1285 =cut
1286
1287 sub GetImportBatchNoMatchAction {
1288     my ($batch_id) = @_;
1289
1290     my $dbh = C4::Context->dbh;
1291     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1292     $sth->execute($batch_id);
1293     my ($nomatch_action) = $sth->fetchrow_array();
1294     $sth->finish();
1295     return $nomatch_action;
1296
1297 }
1298
1299
1300 =head2 SetImportBatchNoMatchAction
1301
1302   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1303
1304 =cut
1305
1306 sub SetImportBatchNoMatchAction {
1307     my ($batch_id, $new_nomatch_action) = @_;
1308
1309     my $dbh = C4::Context->dbh;
1310     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1311     $sth->execute($new_nomatch_action, $batch_id);
1312     $sth->finish();
1313
1314 }
1315
1316 =head2 GetImportBatchItemAction
1317
1318   my $item_action = GetImportBatchItemAction($batch_id);
1319
1320 =cut
1321
1322 sub GetImportBatchItemAction {
1323     my ($batch_id) = @_;
1324
1325     my $dbh = C4::Context->dbh;
1326     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1327     $sth->execute($batch_id);
1328     my ($item_action) = $sth->fetchrow_array();
1329     $sth->finish();
1330     return $item_action;
1331
1332 }
1333
1334
1335 =head2 SetImportBatchItemAction
1336
1337   SetImportBatchItemAction($batch_id, $new_item_action);
1338
1339 =cut
1340
1341 sub SetImportBatchItemAction {
1342     my ($batch_id, $new_item_action) = @_;
1343
1344     my $dbh = C4::Context->dbh;
1345     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1346     $sth->execute($new_item_action, $batch_id);
1347     $sth->finish();
1348
1349 }
1350
1351 =head2 GetImportBatchMatcher
1352
1353   my $matcher_id = GetImportBatchMatcher($batch_id);
1354
1355 =cut
1356
1357 sub GetImportBatchMatcher {
1358     my ($batch_id) = @_;
1359
1360     my $dbh = C4::Context->dbh;
1361     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1362     $sth->execute($batch_id);
1363     my ($matcher_id) = $sth->fetchrow_array();
1364     $sth->finish();
1365     return $matcher_id;
1366
1367 }
1368
1369
1370 =head2 SetImportBatchMatcher
1371
1372   SetImportBatchMatcher($batch_id, $new_matcher_id);
1373
1374 =cut
1375
1376 sub SetImportBatchMatcher {
1377     my ($batch_id, $new_matcher_id) = @_;
1378
1379     my $dbh = C4::Context->dbh;
1380     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1381     $sth->execute($new_matcher_id, $batch_id);
1382     $sth->finish();
1383
1384 }
1385
1386 =head2 GetImportRecordOverlayStatus
1387
1388   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1389
1390 =cut
1391
1392 sub GetImportRecordOverlayStatus {
1393     my ($import_record_id) = @_;
1394
1395     my $dbh = C4::Context->dbh;
1396     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1397     $sth->execute($import_record_id);
1398     my ($overlay_status) = $sth->fetchrow_array();
1399     $sth->finish();
1400     return $overlay_status;
1401
1402 }
1403
1404
1405 =head2 SetImportRecordOverlayStatus
1406
1407   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1408
1409 =cut
1410
1411 sub SetImportRecordOverlayStatus {
1412     my ($import_record_id, $new_overlay_status) = @_;
1413
1414     my $dbh = C4::Context->dbh;
1415     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1416     $sth->execute($new_overlay_status, $import_record_id);
1417     $sth->finish();
1418
1419 }
1420
1421 =head2 GetImportRecordStatus
1422
1423   my $status = GetImportRecordStatus($import_record_id);
1424
1425 =cut
1426
1427 sub GetImportRecordStatus {
1428     my ($import_record_id) = @_;
1429
1430     my $dbh = C4::Context->dbh;
1431     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1432     $sth->execute($import_record_id);
1433     my ($status) = $sth->fetchrow_array();
1434     $sth->finish();
1435     return $status;
1436
1437 }
1438
1439
1440 =head2 SetImportRecordStatus
1441
1442   SetImportRecordStatus($import_record_id, $new_status);
1443
1444 =cut
1445
1446 sub SetImportRecordStatus {
1447     my ($import_record_id, $new_status) = @_;
1448
1449     my $dbh = C4::Context->dbh;
1450     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1451     $sth->execute($new_status, $import_record_id);
1452     $sth->finish();
1453
1454 }
1455
1456 =head2 GetImportRecordMatches
1457
1458   my $results = GetImportRecordMatches($import_record_id, $best_only);
1459
1460 =cut
1461
1462 sub GetImportRecordMatches {
1463     my $import_record_id = shift;
1464     my $best_only = @_ ? shift : 0;
1465
1466     my $dbh = C4::Context->dbh;
1467     # FIXME currently biblio only
1468     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1469                                     candidate_match_id, score, record_type,
1470                                     chosen
1471                                     FROM import_records
1472                                     JOIN import_record_matches USING (import_record_id)
1473                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1474                                     WHERE import_record_id = ?
1475                                     ORDER BY score DESC, biblionumber DESC");
1476     $sth->bind_param(1, $import_record_id);
1477     my $results = [];
1478     $sth->execute();
1479     while (my $row = $sth->fetchrow_hashref) {
1480         if ($row->{'record_type'} eq 'auth') {
1481             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1482         }
1483         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1484         push @$results, $row;
1485         last if $best_only;
1486     }
1487     $sth->finish();
1488
1489     return $results;
1490     
1491 }
1492
1493 =head2 SetImportRecordMatches
1494
1495   SetImportRecordMatches($import_record_id, @matches);
1496
1497 =cut
1498
1499 sub SetImportRecordMatches {
1500     my $import_record_id = shift;
1501     my @matches = @_;
1502
1503     my $dbh = C4::Context->dbh;
1504     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1505     $delsth->execute($import_record_id);
1506     $delsth->finish();
1507
1508     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1509                                     VALUES (?, ?, ?, ?)");
1510     my $chosen = 1; #The first match is defaulted to be chosen
1511     foreach my $match (@matches) {
1512         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1513         $chosen = 0; #After the first we do not default to other matches
1514     }
1515 }
1516
1517 =head2 RecordsFromISO2709File
1518
1519     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1520
1521 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1522
1523 @PARAM1, String, absolute path to the ISO2709 file.
1524 @PARAM2, String, see stage_file.pl
1525 @PARAM3, String, should be utf8
1526
1527 Returns two array refs.
1528
1529 =cut
1530
1531 sub RecordsFromISO2709File {
1532     my ($input_file, $record_type, $encoding) = @_;
1533     my @errors;
1534
1535     my $marc_type = C4::Context->preference('marcflavour');
1536     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1537
1538     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1539     my @marc_records;
1540     $/ = "\035";
1541     while (<$fh>) {
1542         s/^\s+//;
1543         s/\s+$//;
1544         next unless $_; # skip if record has only whitespace, as might occur
1545                         # if file includes newlines between each MARC record
1546         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1547         push @marc_records, $marc_record;
1548         if ($charset_guessed ne $encoding) {
1549             push @errors,
1550                 "Unexpected charset $charset_guessed, expecting $encoding";
1551         }
1552     }
1553     close $fh;
1554     return ( \@errors, \@marc_records );
1555 }
1556
1557 =head2 RecordsFromMARCXMLFile
1558
1559     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1560
1561 Creates MARC::Record-objects out of the given MARCXML-file.
1562
1563 @PARAM1, String, absolute path to the ISO2709 file.
1564 @PARAM2, String, should be utf8
1565
1566 Returns two array refs.
1567
1568 =cut
1569
1570 sub RecordsFromMARCXMLFile {
1571     my ( $filename, $encoding ) = @_;
1572     my $batch = MARC::File::XML->in( $filename );
1573     my ( @marcRecords, @errors, $record );
1574     do {
1575         eval { $record = $batch->next( $encoding ); };
1576         if ($@) {
1577             push @errors, $@;
1578         }
1579         push @marcRecords, $record if $record;
1580     } while( $record );
1581     return (\@errors, \@marcRecords);
1582 }
1583
1584 =head2 RecordsFromMarcPlugin
1585
1586     Converts text of input_file into array of MARC records with to_marc plugin
1587
1588 =cut
1589
1590 sub RecordsFromMarcPlugin {
1591     my ($input_file, $plugin_class, $encoding) = @_;
1592     my ( $text, @return );
1593     return \@return if !$input_file || !$plugin_class;
1594
1595     # Read input file
1596     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1597     $/ = "\035";
1598     while (<$fh>) {
1599         s/^\s+//;
1600         s/\s+$//;
1601         next unless $_;
1602         $text .= $_;
1603     }
1604     close $fh;
1605
1606     # Convert to large MARC blob with plugin
1607     $text = Koha::Plugins::Handler->run({
1608         class  => $plugin_class,
1609         method => 'to_marc',
1610         params => { data => $text },
1611     }) if $text;
1612
1613     # Convert to array of MARC records
1614     if( $text ) {
1615         my $marc_type = C4::Context->preference('marcflavour');
1616         foreach my $blob ( split(/\x1D/, $text) ) {
1617             next if $blob =~ /^\s*$/;
1618             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1619             push @return, $marcrecord;
1620         }
1621     }
1622     return \@return;
1623 }
1624
1625 # internal functions
1626
1627 sub _create_import_record {
1628     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1629
1630     my $dbh = C4::Context->dbh;
1631     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1632                                                          record_type, encoding)
1633                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1634     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1635                   $record_type, $encoding);
1636     my $import_record_id = $dbh->{'mysql_insertid'};
1637     $sth->finish();
1638     return $import_record_id;
1639 }
1640
1641 sub _update_import_record_marc {
1642     my ($import_record_id, $marc_record, $marc_type) = @_;
1643
1644     my $dbh = C4::Context->dbh;
1645     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1646                              WHERE  import_record_id = ?");
1647     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1648     $sth->finish();
1649 }
1650
1651 sub _add_auth_fields {
1652     my ($import_record_id, $marc_record) = @_;
1653
1654     my $controlnumber;
1655     if ($marc_record->field('001')) {
1656         $controlnumber = $marc_record->field('001')->data();
1657     }
1658     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1659     my $dbh = C4::Context->dbh;
1660     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1661     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1662     $sth->finish();
1663 }
1664
1665 sub _add_biblio_fields {
1666     my ($import_record_id, $marc_record) = @_;
1667
1668     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1669     my $dbh = C4::Context->dbh;
1670     # FIXME no controlnumber, originalsource
1671     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1672     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1673     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1674     $sth->finish();
1675                 
1676 }
1677
1678 sub _update_biblio_fields {
1679     my ($import_record_id, $marc_record) = @_;
1680
1681     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1682     my $dbh = C4::Context->dbh;
1683     # FIXME no controlnumber, originalsource
1684     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1685     $isbn =~ s/\(.*$//;
1686     $isbn =~ tr/ -_//;
1687     $isbn = uc $isbn;
1688     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1689                              WHERE  import_record_id = ?");
1690     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1691     $sth->finish();
1692 }
1693
1694 sub _parse_biblio_fields {
1695     my ($marc_record) = @_;
1696
1697     my $dbh = C4::Context->dbh;
1698     my $bibliofields = TransformMarcToKoha($marc_record, '');
1699     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1700
1701 }
1702
1703 sub _update_batch_record_counts {
1704     my ($batch_id) = @_;
1705
1706     my $dbh = C4::Context->dbh;
1707     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1708                                         num_records = (
1709                                             SELECT COUNT(*)
1710                                             FROM import_records
1711                                             WHERE import_batch_id = import_batches.import_batch_id),
1712                                         num_items = (
1713                                             SELECT COUNT(*)
1714                                             FROM import_records
1715                                             JOIN import_items USING (import_record_id)
1716                                             WHERE import_batch_id = import_batches.import_batch_id
1717                                             AND record_type = 'biblio')
1718                                     WHERE import_batch_id = ?");
1719     $sth->bind_param(1, $batch_id);
1720     $sth->execute();
1721     $sth->finish();
1722 }
1723
1724 sub _get_commit_action {
1725     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1726     
1727     if ($record_type eq 'biblio') {
1728         my ($bib_result, $bib_match, $item_result);
1729
1730         $bib_match = GetBestRecordMatch($import_record_id);
1731         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1732
1733             $bib_result = $overlay_action;
1734
1735             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1736                 $item_result = 'create_new';
1737             } elsif($item_action eq 'replace'){
1738                 $item_result = 'replace';
1739             } else {
1740                 $item_result = 'ignore';
1741             }
1742
1743         } else {
1744             $bib_result = $nomatch_action;
1745             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1746         }
1747         return ($bib_result, $item_result, $bib_match);
1748     } else { # must be auths
1749         my ($auth_result, $auth_match);
1750
1751         $auth_match = GetBestRecordMatch($import_record_id);
1752         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1753             $auth_result = $overlay_action;
1754         } else {
1755             $auth_result = $nomatch_action;
1756         }
1757
1758         return ($auth_result, undef, $auth_match);
1759
1760     }
1761 }
1762
1763 sub _get_revert_action {
1764     my ($overlay_action, $overlay_status, $status) = @_;
1765
1766     my $bib_result;
1767
1768     if ($status eq 'ignored') {
1769         $bib_result = 'ignore';
1770     } else {
1771         if ($overlay_action eq 'create_new') {
1772             $bib_result = 'delete';
1773         } else {
1774             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1775         }
1776     }
1777     return $bib_result;
1778 }
1779
1780 1;
1781 __END__
1782
1783 =head1 AUTHOR
1784
1785 Koha Development Team <http://koha-community.org/>
1786
1787 Galen Charlton <galen.charlton@liblime.com>
1788
1789 =cut