Bug 17600: Standardize our EXPORT_OK
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc;
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57       ModBiblioInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub GetRecordFromImportBiblio {
196     my ( $import_record_id, $embed_items ) = @_;
197
198     my ($marc) = GetImportRecordMarc($import_record_id);
199     my $record = MARC::Record->new_from_usmarc($marc);
200
201     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
202
203     return $record;
204 }
205
206 sub EmbedItemsInImportBiblio {
207     my ( $record, $import_record_id ) = @_;
208     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
209     my $dbh = C4::Context->dbh;
210     my $import_items = $dbh->selectall_arrayref(q|
211         SELECT import_items.marcxml
212         FROM import_items
213         WHERE import_record_id = ?
214     |, { Slice => {} }, $import_record_id );
215     my @item_fields;
216     for my $import_item ( @$import_items ) {
217         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
218         push @item_fields, $item_marc->field($itemtag);
219     }
220     $record->append_fields(@item_fields);
221     return $record;
222 }
223
224 =head2 GetImportRecordMarcXML
225
226   my $marcxml = GetImportRecordMarcXML($import_record_id);
227
228 =cut
229
230 sub GetImportRecordMarcXML {
231     my ($import_record_id) = @_;
232
233     my $dbh = C4::Context->dbh;
234     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
235     $sth->execute($import_record_id);
236     my ($marcxml) = $sth->fetchrow();
237     $sth->finish();
238     return $marcxml;
239
240 }
241
242 =head2 AddImportBatch
243
244   my $batch_id = AddImportBatch($params_hash);
245
246 =cut
247
248 sub AddImportBatch {
249     my ($params) = @_;
250
251     my (@fields, @vals);
252     foreach (qw( matcher_id template_id branchcode
253                  overlay_action nomatch_action item_action
254                  import_status batch_type file_name comments record_type )) {
255         if (exists $params->{$_}) {
256             push @fields, $_;
257             push @vals, $params->{$_};
258         }
259     }
260     my $dbh = C4::Context->dbh;
261     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
262                                   VALUES (".join( ',', map '?', @fields).")",
263              undef,
264              @vals);
265     return $dbh->{'mysql_insertid'};
266 }
267
268 =head2 GetImportBatch 
269
270   my $row = GetImportBatch($batch_id);
271
272 Retrieve a hashref of an import_batches row.
273
274 =cut
275
276 sub GetImportBatch {
277     my ($batch_id) = @_;
278
279     my $dbh = C4::Context->dbh;
280     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
281     $sth->bind_param(1, $batch_id);
282     $sth->execute();
283     my $result = $sth->fetchrow_hashref;
284     $sth->finish();
285     return $result;
286
287 }
288
289 =head2 AddBiblioToBatch 
290
291   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
292                 $marc_record, $encoding, $update_counts);
293
294 =cut
295
296 sub AddBiblioToBatch {
297     my $batch_id = shift;
298     my $record_sequence = shift;
299     my $marc_record = shift;
300     my $encoding = shift;
301     my $update_counts = @_ ? shift : 1;
302
303     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
304     _add_biblio_fields($import_record_id, $marc_record);
305     _update_batch_record_counts($batch_id) if $update_counts;
306     return $import_record_id;
307 }
308
309 =head2 ModBiblioInBatch
310
311   ModBiblioInBatch($import_record_id, $marc_record);
312
313 =cut
314
315 sub ModBiblioInBatch {
316     my ($import_record_id, $marc_record) = @_;
317
318     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
319     _update_biblio_fields($import_record_id, $marc_record);
320
321 }
322
323 =head2 AddAuthToBatch
324
325   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
326                 $marc_record, $encoding, $update_counts, [$marc_type]);
327
328 =cut
329
330 sub AddAuthToBatch {
331     my $batch_id = shift;
332     my $record_sequence = shift;
333     my $marc_record = shift;
334     my $encoding = shift;
335     my $update_counts = @_ ? shift : 1;
336     my $marc_type = shift || C4::Context->preference('marcflavour');
337
338     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
339
340     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
341     _add_auth_fields($import_record_id, $marc_record);
342     _update_batch_record_counts($batch_id) if $update_counts;
343     return $import_record_id;
344 }
345
346 =head2 ModAuthInBatch
347
348   ModAuthInBatch($import_record_id, $marc_record);
349
350 =cut
351
352 sub ModAuthInBatch {
353     my ($import_record_id, $marc_record) = @_;
354
355     my $marcflavour = C4::Context->preference('marcflavour');
356     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
357
358 }
359
360 =head2 BatchStageMarcRecords
361
362 ( $batch_id, $num_records, $num_items, @invalid_records ) =
363   BatchStageMarcRecords(
364     $record_type,                $encoding,
365     $marc_records,               $file_name,
366     $marc_modification_template, $comments,
367     $branch_code,                $parse_items,
368     $leave_as_staging,           $progress_interval,
369     $progress_callback
370   );
371
372 =cut
373
374 sub BatchStageMarcRecords {
375     my $record_type = shift;
376     my $encoding = shift;
377     my $marc_records = shift;
378     my $file_name = shift;
379     my $marc_modification_template = shift;
380     my $comments = shift;
381     my $branch_code = shift;
382     my $parse_items = shift;
383     my $leave_as_staging = shift;
384
385     # optional callback to monitor status 
386     # of job
387     my $progress_interval = 0;
388     my $progress_callback = undef;
389     if ($#_ == 1) {
390         $progress_interval = shift;
391         $progress_callback = shift;
392         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
393         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
394     } 
395     
396     my $batch_id = AddImportBatch( {
397             overlay_action => 'create_new',
398             import_status => 'staging',
399             batch_type => 'batch',
400             file_name => $file_name,
401             comments => $comments,
402             record_type => $record_type,
403         } );
404     if ($parse_items) {
405         SetImportBatchItemAction($batch_id, 'always_add');
406     } else {
407         SetImportBatchItemAction($batch_id, 'ignore');
408     }
409
410
411     my $marc_type = C4::Context->preference('marcflavour');
412     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
413     my @invalid_records = ();
414     my $num_valid = 0;
415     my $num_items = 0;
416     # FIXME - for now, we're dealing only with bibs
417     my $rec_num = 0;
418     foreach my $marc_record (@$marc_records) {
419         $rec_num++;
420         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
421             &$progress_callback($rec_num);
422         }
423
424         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
425
426         my $import_record_id;
427         if (scalar($marc_record->fields()) == 0) {
428             push @invalid_records, $marc_record;
429         } else {
430
431             # Normalize the record so it doesn't have separated diacritics
432             SetUTF8Flag($marc_record);
433
434             $num_valid++;
435             if ($record_type eq 'biblio') {
436                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
437                 if ($parse_items) {
438                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
439                     $num_items += scalar(@import_items_ids);
440                 }
441             } elsif ($record_type eq 'auth') {
442                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
443             }
444         }
445     }
446     unless ($leave_as_staging) {
447         SetImportBatchStatus($batch_id, 'staged');
448     }
449     # FIXME branch_code, number of bibs, number of items
450     _update_batch_record_counts($batch_id);
451     return ($batch_id, $num_valid, $num_items, @invalid_records);
452 }
453
454 =head2 AddItemsToImportBiblio
455
456   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
457                 $import_record_id, $marc_record, $update_counts);
458
459 =cut
460
461 sub AddItemsToImportBiblio {
462     my $batch_id = shift;
463     my $import_record_id = shift;
464     my $marc_record = shift;
465     my $update_counts = @_ ? shift : 0;
466
467     my @import_items_ids = ();
468    
469     my $dbh = C4::Context->dbh; 
470     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
471     foreach my $item_field ($marc_record->field($item_tag)) {
472         my $item_marc = MARC::Record->new();
473         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
474         $item_marc->append_fields($item_field);
475         $marc_record->delete_field($item_field);
476         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
477                                         VALUES (?, ?, ?)");
478         $sth->bind_param(1, $import_record_id);
479         $sth->bind_param(2, 'staged');
480         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
481         $sth->execute();
482         push @import_items_ids, $dbh->{'mysql_insertid'};
483         $sth->finish();
484     }
485
486     if ($#import_items_ids > -1) {
487         _update_batch_record_counts($batch_id) if $update_counts;
488         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
489     }
490     return @import_items_ids;
491 }
492
493 =head2 BatchFindDuplicates
494
495   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
496              $max_matches, $progress_interval, $progress_callback);
497
498 Goes through the records loaded in the batch and attempts to 
499 find duplicates for each one.  Sets the matching status 
500 of each record to "no_match" or "auto_match" as appropriate.
501
502 The $max_matches parameter is optional; if it is not supplied,
503 it defaults to 10.
504
505 The $progress_interval and $progress_callback parameters are 
506 optional; if both are supplied, the sub referred to by
507 $progress_callback will be invoked every $progress_interval
508 records using the number of records processed as the 
509 singular argument.
510
511 =cut
512
513 sub BatchFindDuplicates {
514     my $batch_id = shift;
515     my $matcher = shift;
516     my $max_matches = @_ ? shift : 10;
517
518     # optional callback to monitor status 
519     # of job
520     my $progress_interval = 0;
521     my $progress_callback = undef;
522     if ($#_ == 1) {
523         $progress_interval = shift;
524         $progress_callback = shift;
525         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
526         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
527     }
528
529     my $dbh = C4::Context->dbh;
530
531     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
532                              FROM import_records
533                              WHERE import_batch_id = ?");
534     $sth->execute($batch_id);
535     my $num_with_matches = 0;
536     my $rec_num = 0;
537     while (my $rowref = $sth->fetchrow_hashref) {
538         $rec_num++;
539         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
540             &$progress_callback($rec_num);
541         }
542         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
543         my @matches = ();
544         if (defined $matcher) {
545             @matches = $matcher->get_matches($marc_record, $max_matches);
546         }
547         if (scalar(@matches) > 0) {
548             $num_with_matches++;
549             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
550             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
551         } else {
552             SetImportRecordMatches($rowref->{'import_record_id'}, ());
553             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
554         }
555     }
556     $sth->finish();
557     return $num_with_matches;
558 }
559
560 =head2 BatchCommitRecords
561
562   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
563         BatchCommitRecords($batch_id, $framework,
564         $progress_interval, $progress_callback);
565
566 =cut
567
568 sub BatchCommitRecords {
569     my $batch_id = shift;
570     my $framework = shift;
571
572     # optional callback to monitor status 
573     # of job
574     my $progress_interval = 0;
575     my $progress_callback = undef;
576     if ($#_ == 1) {
577         $progress_interval = shift;
578         $progress_callback = shift;
579         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
580         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
581     }
582
583     my $record_type;
584     my $num_added = 0;
585     my $num_updated = 0;
586     my $num_items_added = 0;
587     my $num_items_replaced = 0;
588     my $num_items_errored = 0;
589     my $num_ignored = 0;
590     # commit (i.e., save, all records in the batch)
591     SetImportBatchStatus($batch_id, 'importing');
592     my $overlay_action = GetImportBatchOverlayAction($batch_id);
593     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
594     my $item_action = GetImportBatchItemAction($batch_id);
595     my $item_tag;
596     my $item_subfield;
597     my $dbh = C4::Context->dbh;
598     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
599                              FROM import_records
600                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
601                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
602                              WHERE import_batch_id = ?");
603     $sth->execute($batch_id);
604     my $marcflavour = C4::Context->preference('marcflavour');
605     my $rec_num = 0;
606     while (my $rowref = $sth->fetchrow_hashref) {
607         $record_type = $rowref->{'record_type'};
608         $rec_num++;
609         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
610             &$progress_callback($rec_num);
611         }
612         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
613             $num_ignored++;
614             next;
615         }
616
617         my $marc_type;
618         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
619             $marc_type = 'UNIMARCAUTH';
620         } elsif ($marcflavour eq 'UNIMARC') {
621             $marc_type = 'UNIMARC';
622         } else {
623             $marc_type = 'USMARC';
624         }
625         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
626
627         if ($record_type eq 'biblio') {
628             # remove any item tags - rely on BatchCommitItems
629             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
630             foreach my $item_field ($marc_record->field($item_tag)) {
631                 $marc_record->delete_field($item_field);
632             }
633         }
634
635         my ($record_result, $item_result, $record_match) =
636             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
637                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
638
639         my $recordid;
640         my $query;
641         if ($record_result eq 'create_new') {
642             $num_added++;
643             if ($record_type eq 'biblio') {
644                 my $biblioitemnumber;
645                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
646                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
647                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
648                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
649                     $num_items_added += $bib_items_added;
650                     $num_items_replaced += $bib_items_replaced;
651                     $num_items_errored += $bib_items_errored;
652                 }
653             } else {
654                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
655                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
656             }
657             my $sth = $dbh->prepare_cached($query);
658             $sth->execute($recordid, $rowref->{'import_record_id'});
659             $sth->finish();
660             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
661         } elsif ($record_result eq 'replace') {
662             $num_updated++;
663             $recordid = $record_match;
664             my $oldxml;
665             if ($record_type eq 'biblio') {
666                 my $oldbiblio = Koha::Biblios->find( $recordid );
667                 $oldxml = GetXmlBiblio($recordid);
668
669                 # remove item fields so that they don't get
670                 # added again if record is reverted
671                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
672                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
673                 foreach my $item_field ($old_marc->field($item_tag)) {
674                     $old_marc->delete_field($item_field);
675                 }
676                 $oldxml = $old_marc->as_xml($marc_type);
677
678                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
679                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
680
681                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
682                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
683                     $num_items_added += $bib_items_added;
684                     $num_items_replaced += $bib_items_replaced;
685                     $num_items_errored += $bib_items_errored;
686                 }
687             } else {
688                 $oldxml = GetAuthorityXML($recordid);
689
690                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
691                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
692             }
693             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
694             $sth->execute($oldxml, $rowref->{'import_record_id'});
695             $sth->finish();
696             my $sth2 = $dbh->prepare_cached($query);
697             $sth2->execute($recordid, $rowref->{'import_record_id'});
698             $sth2->finish();
699             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
700             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
701         } elsif ($record_result eq 'ignore') {
702             $recordid = $record_match;
703             $num_ignored++;
704             $recordid = $record_match;
705             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
706                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
707                 $num_items_added += $bib_items_added;
708          $num_items_replaced += $bib_items_replaced;
709                 $num_items_errored += $bib_items_errored;
710                 # still need to record the matched biblionumber so that the
711                 # items can be reverted
712                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
713                 $sth2->execute($recordid, $rowref->{'import_record_id'});
714                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
715             }
716             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
717         }
718     }
719     $sth->finish();
720     SetImportBatchStatus($batch_id, 'imported');
721     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
722 }
723
724 =head2 BatchCommitItems
725
726   ($num_items_added, $num_items_errored) = 
727          BatchCommitItems($import_record_id, $biblionumber);
728
729 =cut
730
731 sub BatchCommitItems {
732     my ( $import_record_id, $biblionumber, $action ) = @_;
733
734     my $dbh = C4::Context->dbh;
735
736     my $num_items_added = 0;
737     my $num_items_errored = 0;
738     my $num_items_replaced = 0;
739
740     my $sth = $dbh->prepare( "
741         SELECT import_items_id, import_items.marcxml, encoding
742         FROM import_items
743         JOIN import_records USING (import_record_id)
744         WHERE import_record_id = ?
745         ORDER BY import_items_id
746     " );
747     $sth->bind_param( 1, $import_record_id );
748     $sth->execute();
749
750     while ( my $row = $sth->fetchrow_hashref() ) {
751         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
752
753         # Delete date_due subfield as to not accidentally delete item checkout due dates
754         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
755         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
756
757         my $item = TransformMarcToKoha( $item_marc );
758
759         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
760         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
761
762         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
763         if ( $action eq "replace" && $duplicate_itemnumber ) {
764             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
765             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
766             $updsth->bind_param( 1, 'imported' );
767             $updsth->bind_param( 2, $item->{itemnumber} );
768             $updsth->bind_param( 3, $row->{'import_items_id'} );
769             $updsth->execute();
770             $updsth->finish();
771             $num_items_replaced++;
772         } elsif ( $action eq "replace" && $duplicate_barcode ) {
773             my $itemnumber = $duplicate_barcode->itemnumber;
774             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
775             $updsth->bind_param( 1, 'imported' );
776             $updsth->bind_param( 2, $item->{itemnumber} );
777             $updsth->bind_param( 3, $row->{'import_items_id'} );
778             $updsth->execute();
779             $updsth->finish();
780             $num_items_replaced++;
781         } elsif ($duplicate_barcode) {
782             $updsth->bind_param( 1, 'error' );
783             $updsth->bind_param( 2, 'duplicate item barcode' );
784             $updsth->bind_param( 3, $row->{'import_items_id'} );
785             $updsth->execute();
786             $num_items_errored++;
787         } else {
788             # Remove the itemnumber if it exists, we want to create a new item
789             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
790             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
791
792             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
793             if( $itemnumber ) {
794                 $updsth->bind_param( 1, 'imported' );
795                 $updsth->bind_param( 2, $itemnumber );
796                 $updsth->bind_param( 3, $row->{'import_items_id'} );
797                 $updsth->execute();
798                 $updsth->finish();
799                 $num_items_added++;
800             }
801         }
802     }
803
804     return ( $num_items_added, $num_items_replaced, $num_items_errored );
805 }
806
807 =head2 BatchRevertRecords
808
809   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
810       $num_ignored) = BatchRevertRecords($batch_id);
811
812 =cut
813
814 sub BatchRevertRecords {
815     my $batch_id = shift;
816
817     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
818
819     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
820
821     my $record_type;
822     my $num_deleted = 0;
823     my $num_errors = 0;
824     my $num_reverted = 0;
825     my $num_ignored = 0;
826     my $num_items_deleted = 0;
827     # commit (i.e., save, all records in the batch)
828     SetImportBatchStatus($batch_id, 'reverting');
829     my $overlay_action = GetImportBatchOverlayAction($batch_id);
830     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
831     my $dbh = C4::Context->dbh;
832     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
833                              FROM import_records
834                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
835                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
836                              WHERE import_batch_id = ?");
837     $sth->execute($batch_id);
838     my $marc_type;
839     my $marcflavour = C4::Context->preference('marcflavour');
840     while (my $rowref = $sth->fetchrow_hashref) {
841         $record_type = $rowref->{'record_type'};
842         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
843             $num_ignored++;
844             next;
845         }
846         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
847             $marc_type = 'UNIMARCAUTH';
848         } elsif ($marcflavour eq 'UNIMARC') {
849             $marc_type = 'UNIMARC';
850         } else {
851             $marc_type = 'USMARC';
852         }
853
854         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
855
856         if ($record_result eq 'delete') {
857             my $error = undef;
858             if  ($record_type eq 'biblio') {
859                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
860                 $error = DelBiblio($rowref->{'matched_biblionumber'});
861             } else {
862                 DelAuthority({ authid => $rowref->{'matched_authid'} });
863             }
864             if (defined $error) {
865                 $num_errors++;
866             } else {
867                 $num_deleted++;
868                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
869             }
870         } elsif ($record_result eq 'restore') {
871             $num_reverted++;
872             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
873             if ($record_type eq 'biblio') {
874                 my $biblionumber = $rowref->{'matched_biblionumber'};
875                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
876
877                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
878                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
879
880                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
881                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
882             } else {
883                 my $authid = $rowref->{'matched_authid'};
884                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
885             }
886             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
887         } elsif ($record_result eq 'ignore') {
888             if ($record_type eq 'biblio') {
889                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
890             }
891             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
892         }
893         my $query;
894         if ($record_type eq 'biblio') {
895             # remove matched_biblionumber only if there is no 'imported' item left
896             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
897             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
898         } else {
899             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
900         }
901         my $sth2 = $dbh->prepare_cached($query);
902         $sth2->execute($rowref->{'import_record_id'});
903     }
904
905     $sth->finish();
906     SetImportBatchStatus($batch_id, 'reverted');
907     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
908 }
909
910 =head2 BatchRevertItems
911
912   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
913
914 =cut
915
916 sub BatchRevertItems {
917     my ($import_record_id, $biblionumber) = @_;
918
919     my $dbh = C4::Context->dbh;
920     my $num_items_deleted = 0;
921
922     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
923                                    FROM import_items
924                                    JOIN items USING (itemnumber)
925                                    WHERE import_record_id = ?");
926     $sth->bind_param(1, $import_record_id);
927     $sth->execute();
928     while (my $row = $sth->fetchrow_hashref()) {
929         my $item = Koha::Items->find($row->{itemnumber});
930         my $error = $item->safe_delete;
931         if ($error eq '1'){
932             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
933             $updsth->bind_param(1, 'reverted');
934             $updsth->bind_param(2, $row->{'import_items_id'});
935             $updsth->execute();
936             $updsth->finish();
937             $num_items_deleted++;
938         }
939         else {
940             next;
941         }
942     }
943     $sth->finish();
944     return $num_items_deleted;
945 }
946
947 =head2 CleanBatch
948
949   CleanBatch($batch_id)
950
951 Deletes all staged records from the import batch
952 and sets the status of the batch to 'cleaned'.  Note
953 that deleting a stage record does *not* affect
954 any record that has been committed to the database.
955
956 =cut
957
958 sub CleanBatch {
959     my $batch_id = shift;
960     return unless defined $batch_id;
961
962     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
963     SetImportBatchStatus($batch_id, 'cleaned');
964 }
965
966 =head2 DeleteBatch
967
968   DeleteBatch($batch_id)
969
970 Deletes the record from the database. This can only be done
971 once the batch has been cleaned.
972
973 =cut
974
975 sub DeleteBatch {
976     my $batch_id = shift;
977     return unless defined $batch_id;
978
979     my $dbh = C4::Context->dbh;
980     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
981     $sth->execute( $batch_id );
982 }
983
984 =head2 GetAllImportBatches
985
986   my $results = GetAllImportBatches();
987
988 Returns a references to an array of hash references corresponding
989 to all import_batches rows (of batch_type 'batch'), sorted in 
990 ascending order by import_batch_id.
991
992 =cut
993
994 sub  GetAllImportBatches {
995     my $dbh = C4::Context->dbh;
996     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
997                                     WHERE batch_type IN ('batch', 'webservice')
998                                     ORDER BY import_batch_id ASC");
999
1000     my $results = [];
1001     $sth->execute();
1002     while (my $row = $sth->fetchrow_hashref) {
1003         push @$results, $row;
1004     }
1005     $sth->finish();
1006     return $results;
1007 }
1008
1009 =head2 GetStagedWebserviceBatches
1010
1011   my $batch_ids = GetStagedWebserviceBatches();
1012
1013 Returns a references to an array of batch id's
1014 of batch_type 'webservice' that are not imported
1015
1016 =cut
1017
1018 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1019 SELECT import_batch_id FROM import_batches
1020 WHERE batch_type = 'webservice'
1021 AND import_status = 'staged'
1022 EOQ
1023 sub  GetStagedWebserviceBatches {
1024     my $dbh = C4::Context->dbh;
1025     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1026 }
1027
1028 =head2 GetImportBatchRangeDesc
1029
1030   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1031
1032 Returns a reference to an array of hash references corresponding to
1033 import_batches rows (sorted in descending order by import_batch_id)
1034 start at the given offset.
1035
1036 =cut
1037
1038 sub GetImportBatchRangeDesc {
1039     my ($offset, $results_per_group) = @_;
1040
1041     my $dbh = C4::Context->dbh;
1042     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1043                                     LEFT JOIN import_batch_profiles p
1044                                     ON b.profile_id = p.id
1045                                     WHERE b.batch_type IN ('batch', 'webservice')
1046                                     ORDER BY b.import_batch_id DESC";
1047     my @params;
1048     if ($results_per_group){
1049         $query .= " LIMIT ?";
1050         push(@params, $results_per_group);
1051     }
1052     if ($offset){
1053         $query .= " OFFSET ?";
1054         push(@params, $offset);
1055     }
1056     my $sth = $dbh->prepare_cached($query);
1057     $sth->execute(@params);
1058     my $results = $sth->fetchall_arrayref({});
1059     $sth->finish();
1060     return $results;
1061 }
1062
1063 =head2 GetItemNumbersFromImportBatch
1064
1065   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1066
1067 =cut
1068
1069 sub GetItemNumbersFromImportBatch {
1070     my ($batch_id) = @_;
1071     my $dbh = C4::Context->dbh;
1072     my $sql = q|
1073 SELECT itemnumber FROM import_items
1074 INNER JOIN items USING (itemnumber)
1075 INNER JOIN import_records USING (import_record_id)
1076 WHERE import_batch_id = ?|;
1077     my  $sth = $dbh->prepare( $sql );
1078     $sth->execute($batch_id);
1079     my @items ;
1080     while ( my ($itm) = $sth->fetchrow_array ) {
1081         push @items, $itm;
1082     }
1083     return @items;
1084 }
1085
1086 =head2 GetNumberOfImportBatches
1087
1088   my $count = GetNumberOfImportBatches();
1089
1090 =cut
1091
1092 sub GetNumberOfNonZ3950ImportBatches {
1093     my $dbh = C4::Context->dbh;
1094     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1095     $sth->execute();
1096     my ($count) = $sth->fetchrow_array();
1097     $sth->finish();
1098     return $count;
1099 }
1100
1101 =head2 GetImportBiblios
1102
1103   my $results = GetImportBiblios($importid);
1104
1105 =cut
1106
1107 sub GetImportBiblios {
1108     my ($import_record_id) = @_;
1109
1110     my $dbh = C4::Context->dbh;
1111     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1112     return $dbh->selectall_arrayref(
1113         $query,
1114         { Slice => {} },
1115         $import_record_id
1116     );
1117
1118 }
1119
1120 =head2 GetImportRecordsRange
1121
1122   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1123
1124 Returns a reference to an array of hash references corresponding to
1125 import_biblios/import_auths/import_records rows for a given batch
1126 starting at the given offset.
1127
1128 =cut
1129
1130 sub GetImportRecordsRange {
1131     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1132
1133     my $dbh = C4::Context->dbh;
1134
1135     my $order_by = $parameters->{order_by} || 'import_record_id';
1136     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1137
1138     my $order_by_direction =
1139       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1140
1141     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1142
1143     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1144                                            record_sequence, status, overlay_status,
1145                                            matched_biblionumber, matched_authid, record_type
1146                                     FROM   import_records
1147                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1148                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1149                                     WHERE  import_batch_id = ?";
1150     my @params;
1151     push(@params, $batch_id);
1152     if ($status) {
1153         $query .= " AND status=?";
1154         push(@params,$status);
1155     }
1156
1157     $query.=" ORDER BY $order_by $order_by_direction";
1158
1159     if($results_per_group){
1160         $query .= " LIMIT ?";
1161         push(@params, $results_per_group);
1162     }
1163     if($offset){
1164         $query .= " OFFSET ?";
1165         push(@params, $offset);
1166     }
1167     my $sth = $dbh->prepare_cached($query);
1168     $sth->execute(@params);
1169     my $results = $sth->fetchall_arrayref({});
1170     $sth->finish();
1171     return $results;
1172
1173 }
1174
1175 =head2 GetBestRecordMatch
1176
1177   my $record_id = GetBestRecordMatch($import_record_id);
1178
1179 =cut
1180
1181 sub GetBestRecordMatch {
1182     my ($import_record_id) = @_;
1183
1184     my $dbh = C4::Context->dbh;
1185     my $sth = $dbh->prepare("SELECT candidate_match_id
1186                              FROM   import_record_matches
1187                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1188                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1189                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1190                              WHERE  import_record_matches.import_record_id = ? AND
1191                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1192                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1193                              ORDER BY score DESC, candidate_match_id DESC");
1194     $sth->execute($import_record_id);
1195     my ($record_id) = $sth->fetchrow_array();
1196     $sth->finish();
1197     return $record_id;
1198 }
1199
1200 =head2 GetImportBatchStatus
1201
1202   my $status = GetImportBatchStatus($batch_id);
1203
1204 =cut
1205
1206 sub GetImportBatchStatus {
1207     my ($batch_id) = @_;
1208
1209     my $dbh = C4::Context->dbh;
1210     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1211     $sth->execute($batch_id);
1212     my ($status) = $sth->fetchrow_array();
1213     $sth->finish();
1214     return $status;
1215
1216 }
1217
1218 =head2 SetImportBatchStatus
1219
1220   SetImportBatchStatus($batch_id, $new_status);
1221
1222 =cut
1223
1224 sub SetImportBatchStatus {
1225     my ($batch_id, $new_status) = @_;
1226
1227     my $dbh = C4::Context->dbh;
1228     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1229     $sth->execute($new_status, $batch_id);
1230     $sth->finish();
1231
1232 }
1233
1234 =head2 SetMatchedBiblionumber
1235
1236   SetMatchedBiblionumber($import_record_id, $biblionumber);
1237
1238 =cut
1239
1240 sub SetMatchedBiblionumber {
1241     my ($import_record_id, $biblionumber) = @_;
1242
1243     my $dbh = C4::Context->dbh;
1244     $dbh->do(
1245         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1246         undef, $biblionumber, $import_record_id
1247     );
1248 }
1249
1250 =head2 GetImportBatchOverlayAction
1251
1252   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1253
1254 =cut
1255
1256 sub GetImportBatchOverlayAction {
1257     my ($batch_id) = @_;
1258
1259     my $dbh = C4::Context->dbh;
1260     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1261     $sth->execute($batch_id);
1262     my ($overlay_action) = $sth->fetchrow_array();
1263     $sth->finish();
1264     return $overlay_action;
1265
1266 }
1267
1268
1269 =head2 SetImportBatchOverlayAction
1270
1271   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1272
1273 =cut
1274
1275 sub SetImportBatchOverlayAction {
1276     my ($batch_id, $new_overlay_action) = @_;
1277
1278     my $dbh = C4::Context->dbh;
1279     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1280     $sth->execute($new_overlay_action, $batch_id);
1281     $sth->finish();
1282
1283 }
1284
1285 =head2 GetImportBatchNoMatchAction
1286
1287   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1288
1289 =cut
1290
1291 sub GetImportBatchNoMatchAction {
1292     my ($batch_id) = @_;
1293
1294     my $dbh = C4::Context->dbh;
1295     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1296     $sth->execute($batch_id);
1297     my ($nomatch_action) = $sth->fetchrow_array();
1298     $sth->finish();
1299     return $nomatch_action;
1300
1301 }
1302
1303
1304 =head2 SetImportBatchNoMatchAction
1305
1306   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1307
1308 =cut
1309
1310 sub SetImportBatchNoMatchAction {
1311     my ($batch_id, $new_nomatch_action) = @_;
1312
1313     my $dbh = C4::Context->dbh;
1314     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1315     $sth->execute($new_nomatch_action, $batch_id);
1316     $sth->finish();
1317
1318 }
1319
1320 =head2 GetImportBatchItemAction
1321
1322   my $item_action = GetImportBatchItemAction($batch_id);
1323
1324 =cut
1325
1326 sub GetImportBatchItemAction {
1327     my ($batch_id) = @_;
1328
1329     my $dbh = C4::Context->dbh;
1330     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1331     $sth->execute($batch_id);
1332     my ($item_action) = $sth->fetchrow_array();
1333     $sth->finish();
1334     return $item_action;
1335
1336 }
1337
1338
1339 =head2 SetImportBatchItemAction
1340
1341   SetImportBatchItemAction($batch_id, $new_item_action);
1342
1343 =cut
1344
1345 sub SetImportBatchItemAction {
1346     my ($batch_id, $new_item_action) = @_;
1347
1348     my $dbh = C4::Context->dbh;
1349     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1350     $sth->execute($new_item_action, $batch_id);
1351     $sth->finish();
1352
1353 }
1354
1355 =head2 GetImportBatchMatcher
1356
1357   my $matcher_id = GetImportBatchMatcher($batch_id);
1358
1359 =cut
1360
1361 sub GetImportBatchMatcher {
1362     my ($batch_id) = @_;
1363
1364     my $dbh = C4::Context->dbh;
1365     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1366     $sth->execute($batch_id);
1367     my ($matcher_id) = $sth->fetchrow_array();
1368     $sth->finish();
1369     return $matcher_id;
1370
1371 }
1372
1373
1374 =head2 SetImportBatchMatcher
1375
1376   SetImportBatchMatcher($batch_id, $new_matcher_id);
1377
1378 =cut
1379
1380 sub SetImportBatchMatcher {
1381     my ($batch_id, $new_matcher_id) = @_;
1382
1383     my $dbh = C4::Context->dbh;
1384     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1385     $sth->execute($new_matcher_id, $batch_id);
1386     $sth->finish();
1387
1388 }
1389
1390 =head2 GetImportRecordOverlayStatus
1391
1392   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1393
1394 =cut
1395
1396 sub GetImportRecordOverlayStatus {
1397     my ($import_record_id) = @_;
1398
1399     my $dbh = C4::Context->dbh;
1400     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1401     $sth->execute($import_record_id);
1402     my ($overlay_status) = $sth->fetchrow_array();
1403     $sth->finish();
1404     return $overlay_status;
1405
1406 }
1407
1408
1409 =head2 SetImportRecordOverlayStatus
1410
1411   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1412
1413 =cut
1414
1415 sub SetImportRecordOverlayStatus {
1416     my ($import_record_id, $new_overlay_status) = @_;
1417
1418     my $dbh = C4::Context->dbh;
1419     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1420     $sth->execute($new_overlay_status, $import_record_id);
1421     $sth->finish();
1422
1423 }
1424
1425 =head2 GetImportRecordStatus
1426
1427   my $status = GetImportRecordStatus($import_record_id);
1428
1429 =cut
1430
1431 sub GetImportRecordStatus {
1432     my ($import_record_id) = @_;
1433
1434     my $dbh = C4::Context->dbh;
1435     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1436     $sth->execute($import_record_id);
1437     my ($status) = $sth->fetchrow_array();
1438     $sth->finish();
1439     return $status;
1440
1441 }
1442
1443
1444 =head2 SetImportRecordStatus
1445
1446   SetImportRecordStatus($import_record_id, $new_status);
1447
1448 =cut
1449
1450 sub SetImportRecordStatus {
1451     my ($import_record_id, $new_status) = @_;
1452
1453     my $dbh = C4::Context->dbh;
1454     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1455     $sth->execute($new_status, $import_record_id);
1456     $sth->finish();
1457
1458 }
1459
1460 =head2 GetImportRecordMatches
1461
1462   my $results = GetImportRecordMatches($import_record_id, $best_only);
1463
1464 =cut
1465
1466 sub GetImportRecordMatches {
1467     my $import_record_id = shift;
1468     my $best_only = @_ ? shift : 0;
1469
1470     my $dbh = C4::Context->dbh;
1471     # FIXME currently biblio only
1472     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1473                                     candidate_match_id, score, record_type
1474                                     FROM import_records
1475                                     JOIN import_record_matches USING (import_record_id)
1476                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1477                                     WHERE import_record_id = ?
1478                                     ORDER BY score DESC, biblionumber DESC");
1479     $sth->bind_param(1, $import_record_id);
1480     my $results = [];
1481     $sth->execute();
1482     while (my $row = $sth->fetchrow_hashref) {
1483         if ($row->{'record_type'} eq 'auth') {
1484             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1485         }
1486         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1487         push @$results, $row;
1488         last if $best_only;
1489     }
1490     $sth->finish();
1491
1492     return $results;
1493     
1494 }
1495
1496 =head2 SetImportRecordMatches
1497
1498   SetImportRecordMatches($import_record_id, @matches);
1499
1500 =cut
1501
1502 sub SetImportRecordMatches {
1503     my $import_record_id = shift;
1504     my @matches = @_;
1505
1506     my $dbh = C4::Context->dbh;
1507     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1508     $delsth->execute($import_record_id);
1509     $delsth->finish();
1510
1511     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1512                                     VALUES (?, ?, ?)");
1513     foreach my $match (@matches) {
1514         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1515     }
1516 }
1517
1518 =head2 RecordsFromISO2709File
1519
1520     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1521
1522 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1523
1524 @PARAM1, String, absolute path to the ISO2709 file.
1525 @PARAM2, String, see stage_file.pl
1526 @PARAM3, String, should be utf8
1527
1528 Returns two array refs.
1529
1530 =cut
1531
1532 sub RecordsFromISO2709File {
1533     my ($input_file, $record_type, $encoding) = @_;
1534     my @errors;
1535
1536     my $marc_type = C4::Context->preference('marcflavour');
1537     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1538
1539     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1540     my @marc_records;
1541     $/ = "\035";
1542     while (<$fh>) {
1543         s/^\s+//;
1544         s/\s+$//;
1545         next unless $_; # skip if record has only whitespace, as might occur
1546                         # if file includes newlines between each MARC record
1547         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1548         push @marc_records, $marc_record;
1549         if ($charset_guessed ne $encoding) {
1550             push @errors,
1551                 "Unexpected charset $charset_guessed, expecting $encoding";
1552         }
1553     }
1554     close $fh;
1555     return ( \@errors, \@marc_records );
1556 }
1557
1558 =head2 RecordsFromMARCXMLFile
1559
1560     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1561
1562 Creates MARC::Record-objects out of the given MARCXML-file.
1563
1564 @PARAM1, String, absolute path to the ISO2709 file.
1565 @PARAM2, String, should be utf8
1566
1567 Returns two array refs.
1568
1569 =cut
1570
1571 sub RecordsFromMARCXMLFile {
1572     my ( $filename, $encoding ) = @_;
1573     my $batch = MARC::File::XML->in( $filename );
1574     my ( @marcRecords, @errors, $record );
1575     do {
1576         eval { $record = $batch->next( $encoding ); };
1577         if ($@) {
1578             push @errors, $@;
1579         }
1580         push @marcRecords, $record if $record;
1581     } while( $record );
1582     return (\@errors, \@marcRecords);
1583 }
1584
1585 =head2 RecordsFromMarcPlugin
1586
1587     Converts text of input_file into array of MARC records with to_marc plugin
1588
1589 =cut
1590
1591 sub RecordsFromMarcPlugin {
1592     my ($input_file, $plugin_class, $encoding) = @_;
1593     my ( $text, @return );
1594     return \@return if !$input_file || !$plugin_class;
1595
1596     # Read input file
1597     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1598     $/ = "\035";
1599     while (<$fh>) {
1600         s/^\s+//;
1601         s/\s+$//;
1602         next unless $_;
1603         $text .= $_;
1604     }
1605     close $fh;
1606
1607     # Convert to large MARC blob with plugin
1608     $text = Koha::Plugins::Handler->run({
1609         class  => $plugin_class,
1610         method => 'to_marc',
1611         params => { data => $text },
1612     }) if $text;
1613
1614     # Convert to array of MARC records
1615     if( $text ) {
1616         my $marc_type = C4::Context->preference('marcflavour');
1617         foreach my $blob ( split(/\x1D/, $text) ) {
1618             next if $blob =~ /^\s*$/;
1619             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1620             push @return, $marcrecord;
1621         }
1622     }
1623     return \@return;
1624 }
1625
1626 # internal functions
1627
1628 sub _create_import_record {
1629     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1630
1631     my $dbh = C4::Context->dbh;
1632     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1633                                                          record_type, encoding)
1634                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1635     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1636                   $record_type, $encoding);
1637     my $import_record_id = $dbh->{'mysql_insertid'};
1638     $sth->finish();
1639     return $import_record_id;
1640 }
1641
1642 sub _update_import_record_marc {
1643     my ($import_record_id, $marc_record, $marc_type) = @_;
1644
1645     my $dbh = C4::Context->dbh;
1646     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1647                              WHERE  import_record_id = ?");
1648     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1649     $sth->finish();
1650 }
1651
1652 sub _add_auth_fields {
1653     my ($import_record_id, $marc_record) = @_;
1654
1655     my $controlnumber;
1656     if ($marc_record->field('001')) {
1657         $controlnumber = $marc_record->field('001')->data();
1658     }
1659     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1660     my $dbh = C4::Context->dbh;
1661     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1662     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1663     $sth->finish();
1664 }
1665
1666 sub _add_biblio_fields {
1667     my ($import_record_id, $marc_record) = @_;
1668
1669     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1670     my $dbh = C4::Context->dbh;
1671     # FIXME no controlnumber, originalsource
1672     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1673     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1674     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1675     $sth->finish();
1676                 
1677 }
1678
1679 sub _update_biblio_fields {
1680     my ($import_record_id, $marc_record) = @_;
1681
1682     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1683     my $dbh = C4::Context->dbh;
1684     # FIXME no controlnumber, originalsource
1685     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1686     $isbn =~ s/\(.*$//;
1687     $isbn =~ tr/ -_//;
1688     $isbn = uc $isbn;
1689     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1690                              WHERE  import_record_id = ?");
1691     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1692     $sth->finish();
1693 }
1694
1695 sub _parse_biblio_fields {
1696     my ($marc_record) = @_;
1697
1698     my $dbh = C4::Context->dbh;
1699     my $bibliofields = TransformMarcToKoha($marc_record, '');
1700     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1701
1702 }
1703
1704 sub _update_batch_record_counts {
1705     my ($batch_id) = @_;
1706
1707     my $dbh = C4::Context->dbh;
1708     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1709                                         num_records = (
1710                                             SELECT COUNT(*)
1711                                             FROM import_records
1712                                             WHERE import_batch_id = import_batches.import_batch_id),
1713                                         num_items = (
1714                                             SELECT COUNT(*)
1715                                             FROM import_records
1716                                             JOIN import_items USING (import_record_id)
1717                                             WHERE import_batch_id = import_batches.import_batch_id
1718                                             AND record_type = 'biblio')
1719                                     WHERE import_batch_id = ?");
1720     $sth->bind_param(1, $batch_id);
1721     $sth->execute();
1722     $sth->finish();
1723 }
1724
1725 sub _get_commit_action {
1726     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1727     
1728     if ($record_type eq 'biblio') {
1729         my ($bib_result, $bib_match, $item_result);
1730
1731         if ($overlay_status ne 'no_match') {
1732             $bib_match = GetBestRecordMatch($import_record_id);
1733             if ($overlay_action eq 'replace') {
1734                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1735             } elsif ($overlay_action eq 'create_new') {
1736                 $bib_result  = 'create_new';
1737             } elsif ($overlay_action eq 'ignore') {
1738                 $bib_result  = 'ignore';
1739             }
1740          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1741                 $item_result = 'create_new';
1742        }
1743       elsif($item_action eq 'replace'){
1744           $item_result = 'replace';
1745           }
1746       else {
1747              $item_result = 'ignore';
1748            }
1749         } else {
1750             $bib_result = $nomatch_action;
1751             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1752         }
1753         return ($bib_result, $item_result, $bib_match);
1754     } else { # must be auths
1755         my ($auth_result, $auth_match);
1756
1757         if ($overlay_status ne 'no_match') {
1758             $auth_match = GetBestRecordMatch($import_record_id);
1759             if ($overlay_action eq 'replace') {
1760                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1761             } elsif ($overlay_action eq 'create_new') {
1762                 $auth_result  = 'create_new';
1763             } elsif ($overlay_action eq 'ignore') {
1764                 $auth_result  = 'ignore';
1765             }
1766         } else {
1767             $auth_result = $nomatch_action;
1768         }
1769
1770         return ($auth_result, undef, $auth_match);
1771
1772     }
1773 }
1774
1775 sub _get_revert_action {
1776     my ($overlay_action, $overlay_status, $status) = @_;
1777
1778     my $bib_result;
1779
1780     if ($status eq 'ignored') {
1781         $bib_result = 'ignore';
1782     } else {
1783         if ($overlay_action eq 'create_new') {
1784             $bib_result = 'delete';
1785         } else {
1786             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1787         }
1788     }
1789     return $bib_result;
1790 }
1791
1792 1;
1793 __END__
1794
1795 =head1 AUTHOR
1796
1797 Koha Development Team <http://koha-community.org/>
1798
1799 Galen Charlton <galen.charlton@liblime.com>
1800
1801 =cut