Bug 14957: (QA follow-up) Clarify 'context' param
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57       ModBiblioInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub GetRecordFromImportBiblio {
196     my ( $import_record_id, $embed_items ) = @_;
197
198     my ($marc) = GetImportRecordMarc($import_record_id);
199     my $record = MARC::Record->new_from_usmarc($marc);
200
201     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
202
203     return $record;
204 }
205
206 sub EmbedItemsInImportBiblio {
207     my ( $record, $import_record_id ) = @_;
208     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
209     my $dbh = C4::Context->dbh;
210     my $import_items = $dbh->selectall_arrayref(q|
211         SELECT import_items.marcxml
212         FROM import_items
213         WHERE import_record_id = ?
214     |, { Slice => {} }, $import_record_id );
215     my @item_fields;
216     for my $import_item ( @$import_items ) {
217         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
218         push @item_fields, $item_marc->field($itemtag);
219     }
220     $record->append_fields(@item_fields);
221     return $record;
222 }
223
224 =head2 GetImportRecordMarcXML
225
226   my $marcxml = GetImportRecordMarcXML($import_record_id);
227
228 =cut
229
230 sub GetImportRecordMarcXML {
231     my ($import_record_id) = @_;
232
233     my $dbh = C4::Context->dbh;
234     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
235     $sth->execute($import_record_id);
236     my ($marcxml) = $sth->fetchrow();
237     $sth->finish();
238     return $marcxml;
239
240 }
241
242 =head2 AddImportBatch
243
244   my $batch_id = AddImportBatch($params_hash);
245
246 =cut
247
248 sub AddImportBatch {
249     my ($params) = @_;
250
251     my (@fields, @vals);
252     foreach (qw( matcher_id template_id branchcode
253                  overlay_action nomatch_action item_action
254                  import_status batch_type file_name comments record_type )) {
255         if (exists $params->{$_}) {
256             push @fields, $_;
257             push @vals, $params->{$_};
258         }
259     }
260     my $dbh = C4::Context->dbh;
261     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
262                                   VALUES (".join( ',', map '?', @fields).")",
263              undef,
264              @vals);
265     return $dbh->{'mysql_insertid'};
266 }
267
268 =head2 GetImportBatch 
269
270   my $row = GetImportBatch($batch_id);
271
272 Retrieve a hashref of an import_batches row.
273
274 =cut
275
276 sub GetImportBatch {
277     my ($batch_id) = @_;
278
279     my $dbh = C4::Context->dbh;
280     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
281     $sth->bind_param(1, $batch_id);
282     $sth->execute();
283     my $result = $sth->fetchrow_hashref;
284     $sth->finish();
285     return $result;
286
287 }
288
289 =head2 AddBiblioToBatch 
290
291   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
292                 $marc_record, $encoding, $update_counts);
293
294 =cut
295
296 sub AddBiblioToBatch {
297     my $batch_id = shift;
298     my $record_sequence = shift;
299     my $marc_record = shift;
300     my $encoding = shift;
301     my $update_counts = @_ ? shift : 1;
302
303     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
304     _add_biblio_fields($import_record_id, $marc_record);
305     _update_batch_record_counts($batch_id) if $update_counts;
306     return $import_record_id;
307 }
308
309 =head2 ModBiblioInBatch
310
311   ModBiblioInBatch($import_record_id, $marc_record);
312
313 =cut
314
315 sub ModBiblioInBatch {
316     my ($import_record_id, $marc_record) = @_;
317
318     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
319     _update_biblio_fields($import_record_id, $marc_record);
320
321 }
322
323 =head2 AddAuthToBatch
324
325   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
326                 $marc_record, $encoding, $update_counts, [$marc_type]);
327
328 =cut
329
330 sub AddAuthToBatch {
331     my $batch_id = shift;
332     my $record_sequence = shift;
333     my $marc_record = shift;
334     my $encoding = shift;
335     my $update_counts = @_ ? shift : 1;
336     my $marc_type = shift || C4::Context->preference('marcflavour');
337
338     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
339
340     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
341     _add_auth_fields($import_record_id, $marc_record);
342     _update_batch_record_counts($batch_id) if $update_counts;
343     return $import_record_id;
344 }
345
346 =head2 ModAuthInBatch
347
348   ModAuthInBatch($import_record_id, $marc_record);
349
350 =cut
351
352 sub ModAuthInBatch {
353     my ($import_record_id, $marc_record) = @_;
354
355     my $marcflavour = C4::Context->preference('marcflavour');
356     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
357
358 }
359
360 =head2 BatchStageMarcRecords
361
362 ( $batch_id, $num_records, $num_items, @invalid_records ) =
363   BatchStageMarcRecords(
364     $record_type,                $encoding,
365     $marc_records,               $file_name,
366     $marc_modification_template, $comments,
367     $branch_code,                $parse_items,
368     $leave_as_staging,           $progress_interval,
369     $progress_callback
370   );
371
372 =cut
373
374 sub BatchStageMarcRecords {
375     my $record_type = shift;
376     my $encoding = shift;
377     my $marc_records = shift;
378     my $file_name = shift;
379     my $marc_modification_template = shift;
380     my $comments = shift;
381     my $branch_code = shift;
382     my $parse_items = shift;
383     my $leave_as_staging = shift;
384
385     # optional callback to monitor status 
386     # of job
387     my $progress_interval = 0;
388     my $progress_callback = undef;
389     if ($#_ == 1) {
390         $progress_interval = shift;
391         $progress_callback = shift;
392         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
393         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
394     } 
395     
396     my $batch_id = AddImportBatch( {
397             overlay_action => 'create_new',
398             import_status => 'staging',
399             batch_type => 'batch',
400             file_name => $file_name,
401             comments => $comments,
402             record_type => $record_type,
403         } );
404     if ($parse_items) {
405         SetImportBatchItemAction($batch_id, 'always_add');
406     } else {
407         SetImportBatchItemAction($batch_id, 'ignore');
408     }
409
410
411     my $marc_type = C4::Context->preference('marcflavour');
412     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
413     my @invalid_records = ();
414     my $num_valid = 0;
415     my $num_items = 0;
416     # FIXME - for now, we're dealing only with bibs
417     my $rec_num = 0;
418     foreach my $marc_record (@$marc_records) {
419         $rec_num++;
420         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
421             &$progress_callback($rec_num);
422         }
423
424         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
425
426         my $import_record_id;
427         if (scalar($marc_record->fields()) == 0) {
428             push @invalid_records, $marc_record;
429         } else {
430
431             # Normalize the record so it doesn't have separated diacritics
432             SetUTF8Flag($marc_record);
433
434             $num_valid++;
435             if ($record_type eq 'biblio') {
436                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
437                 if ($parse_items) {
438                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
439                     $num_items += scalar(@import_items_ids);
440                 }
441             } elsif ($record_type eq 'auth') {
442                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
443             }
444         }
445     }
446     unless ($leave_as_staging) {
447         SetImportBatchStatus($batch_id, 'staged');
448     }
449     # FIXME branch_code, number of bibs, number of items
450     _update_batch_record_counts($batch_id);
451     return ($batch_id, $num_valid, $num_items, @invalid_records);
452 }
453
454 =head2 AddItemsToImportBiblio
455
456   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
457                 $import_record_id, $marc_record, $update_counts);
458
459 =cut
460
461 sub AddItemsToImportBiblio {
462     my $batch_id = shift;
463     my $import_record_id = shift;
464     my $marc_record = shift;
465     my $update_counts = @_ ? shift : 0;
466
467     my @import_items_ids = ();
468    
469     my $dbh = C4::Context->dbh; 
470     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
471     foreach my $item_field ($marc_record->field($item_tag)) {
472         my $item_marc = MARC::Record->new();
473         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
474         $item_marc->append_fields($item_field);
475         $marc_record->delete_field($item_field);
476         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
477                                         VALUES (?, ?, ?)");
478         $sth->bind_param(1, $import_record_id);
479         $sth->bind_param(2, 'staged');
480         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
481         $sth->execute();
482         push @import_items_ids, $dbh->{'mysql_insertid'};
483         $sth->finish();
484     }
485
486     if ($#import_items_ids > -1) {
487         _update_batch_record_counts($batch_id) if $update_counts;
488         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
489     }
490     return @import_items_ids;
491 }
492
493 =head2 BatchFindDuplicates
494
495   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
496              $max_matches, $progress_interval, $progress_callback);
497
498 Goes through the records loaded in the batch and attempts to 
499 find duplicates for each one.  Sets the matching status 
500 of each record to "no_match" or "auto_match" as appropriate.
501
502 The $max_matches parameter is optional; if it is not supplied,
503 it defaults to 10.
504
505 The $progress_interval and $progress_callback parameters are 
506 optional; if both are supplied, the sub referred to by
507 $progress_callback will be invoked every $progress_interval
508 records using the number of records processed as the 
509 singular argument.
510
511 =cut
512
513 sub BatchFindDuplicates {
514     my $batch_id = shift;
515     my $matcher = shift;
516     my $max_matches = @_ ? shift : 10;
517
518     # optional callback to monitor status 
519     # of job
520     my $progress_interval = 0;
521     my $progress_callback = undef;
522     if ($#_ == 1) {
523         $progress_interval = shift;
524         $progress_callback = shift;
525         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
526         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
527     }
528
529     my $dbh = C4::Context->dbh;
530
531     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
532                              FROM import_records
533                              WHERE import_batch_id = ?");
534     $sth->execute($batch_id);
535     my $num_with_matches = 0;
536     my $rec_num = 0;
537     while (my $rowref = $sth->fetchrow_hashref) {
538         $rec_num++;
539         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
540             &$progress_callback($rec_num);
541         }
542         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
543         my @matches = ();
544         if (defined $matcher) {
545             @matches = $matcher->get_matches($marc_record, $max_matches);
546         }
547         if (scalar(@matches) > 0) {
548             $num_with_matches++;
549             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
550             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
551         } else {
552             SetImportRecordMatches($rowref->{'import_record_id'}, ());
553             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
554         }
555     }
556     $sth->finish();
557     return $num_with_matches;
558 }
559
560 =head2 BatchCommitRecords
561
562   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
563         BatchCommitRecords($batch_id, $framework,
564         $progress_interval, $progress_callback);
565
566 =cut
567
568 sub BatchCommitRecords {
569     my $batch_id = shift;
570     my $framework = shift;
571
572     # optional callback to monitor status 
573     # of job
574     my $progress_interval = 0;
575     my $progress_callback = undef;
576     if ($#_ == 1) {
577         $progress_interval = shift;
578         $progress_callback = shift;
579         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
580         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
581     }
582
583     my $record_type;
584     my $num_added = 0;
585     my $num_updated = 0;
586     my $num_items_added = 0;
587     my $num_items_replaced = 0;
588     my $num_items_errored = 0;
589     my $num_ignored = 0;
590     # commit (i.e., save, all records in the batch)
591     SetImportBatchStatus($batch_id, 'importing');
592     my $overlay_action = GetImportBatchOverlayAction($batch_id);
593     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
594     my $item_action = GetImportBatchItemAction($batch_id);
595     my $item_tag;
596     my $item_subfield;
597     my $dbh = C4::Context->dbh;
598     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
599                              FROM import_records
600                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
601                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
602                              WHERE import_batch_id = ?");
603     $sth->execute($batch_id);
604     my $marcflavour = C4::Context->preference('marcflavour');
605
606     my $userenv = C4::Context->userenv;
607     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
608
609     my $rec_num = 0;
610     while (my $rowref = $sth->fetchrow_hashref) {
611         $record_type = $rowref->{'record_type'};
612         $rec_num++;
613         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
614             &$progress_callback($rec_num);
615         }
616         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
617             $num_ignored++;
618             next;
619         }
620
621         my $marc_type;
622         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
623             $marc_type = 'UNIMARCAUTH';
624         } elsif ($marcflavour eq 'UNIMARC') {
625             $marc_type = 'UNIMARC';
626         } else {
627             $marc_type = 'USMARC';
628         }
629         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
630
631         if ($record_type eq 'biblio') {
632             # remove any item tags - rely on BatchCommitItems
633             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
634             foreach my $item_field ($marc_record->field($item_tag)) {
635                 $marc_record->delete_field($item_field);
636             }
637         }
638
639         my ($record_result, $item_result, $record_match) =
640             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
641                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
642
643         my $recordid;
644         my $query;
645         if ($record_result eq 'create_new') {
646             $num_added++;
647             if ($record_type eq 'biblio') {
648                 my $biblioitemnumber;
649                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
650                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
651                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
652                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
653                     $num_items_added += $bib_items_added;
654                     $num_items_replaced += $bib_items_replaced;
655                     $num_items_errored += $bib_items_errored;
656                 }
657             } else {
658                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
659                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
660             }
661             my $sth = $dbh->prepare_cached($query);
662             $sth->execute($recordid, $rowref->{'import_record_id'});
663             $sth->finish();
664             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
665         } elsif ($record_result eq 'replace') {
666             $num_updated++;
667             $recordid = $record_match;
668             my $oldxml;
669             if ($record_type eq 'biblio') {
670                 my $oldbiblio = Koha::Biblios->find( $recordid );
671                 $oldxml = GetXmlBiblio($recordid);
672
673                 # remove item fields so that they don't get
674                 # added again if record is reverted
675                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
676                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
677                 foreach my $item_field ($old_marc->field($item_tag)) {
678                     $old_marc->delete_field($item_field);
679                 }
680                 $oldxml = $old_marc->as_xml($marc_type);
681
682                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
683                     overlay_context => {
684                         source => 'batchimport',
685                         categorycode => $logged_in_patron->categorycode,
686                         userid => $logged_in_patron->userid
687                     },
688                 });
689                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
690
691                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
692                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
693                     $num_items_added += $bib_items_added;
694                     $num_items_replaced += $bib_items_replaced;
695                     $num_items_errored += $bib_items_errored;
696                 }
697             } else {
698                 $oldxml = GetAuthorityXML($recordid);
699
700                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
701                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
702             }
703             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
704             $sth->execute($oldxml, $rowref->{'import_record_id'});
705             $sth->finish();
706             my $sth2 = $dbh->prepare_cached($query);
707             $sth2->execute($recordid, $rowref->{'import_record_id'});
708             $sth2->finish();
709             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
710             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
711         } elsif ($record_result eq 'ignore') {
712             $recordid = $record_match;
713             $num_ignored++;
714             $recordid = $record_match;
715             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
716                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
717                 $num_items_added += $bib_items_added;
718          $num_items_replaced += $bib_items_replaced;
719                 $num_items_errored += $bib_items_errored;
720                 # still need to record the matched biblionumber so that the
721                 # items can be reverted
722                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
723                 $sth2->execute($recordid, $rowref->{'import_record_id'});
724                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
725             }
726             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
727         }
728     }
729     $sth->finish();
730     SetImportBatchStatus($batch_id, 'imported');
731     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
732 }
733
734 =head2 BatchCommitItems
735
736   ($num_items_added, $num_items_errored) = 
737          BatchCommitItems($import_record_id, $biblionumber);
738
739 =cut
740
741 sub BatchCommitItems {
742     my ( $import_record_id, $biblionumber, $action ) = @_;
743
744     my $dbh = C4::Context->dbh;
745
746     my $num_items_added = 0;
747     my $num_items_errored = 0;
748     my $num_items_replaced = 0;
749
750     my $sth = $dbh->prepare( "
751         SELECT import_items_id, import_items.marcxml, encoding
752         FROM import_items
753         JOIN import_records USING (import_record_id)
754         WHERE import_record_id = ?
755         ORDER BY import_items_id
756     " );
757     $sth->bind_param( 1, $import_record_id );
758     $sth->execute();
759
760     while ( my $row = $sth->fetchrow_hashref() ) {
761         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
762
763         # Delete date_due subfield as to not accidentally delete item checkout due dates
764         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
765         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
766
767         my $item = TransformMarcToKoha( $item_marc );
768
769         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
770         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
771
772         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
773         if ( $action eq "replace" && $duplicate_itemnumber ) {
774             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
775             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
776             $updsth->bind_param( 1, 'imported' );
777             $updsth->bind_param( 2, $item->{itemnumber} );
778             $updsth->bind_param( 3, $row->{'import_items_id'} );
779             $updsth->execute();
780             $updsth->finish();
781             $num_items_replaced++;
782         } elsif ( $action eq "replace" && $duplicate_barcode ) {
783             my $itemnumber = $duplicate_barcode->itemnumber;
784             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
785             $updsth->bind_param( 1, 'imported' );
786             $updsth->bind_param( 2, $item->{itemnumber} );
787             $updsth->bind_param( 3, $row->{'import_items_id'} );
788             $updsth->execute();
789             $updsth->finish();
790             $num_items_replaced++;
791         } elsif ($duplicate_barcode) {
792             $updsth->bind_param( 1, 'error' );
793             $updsth->bind_param( 2, 'duplicate item barcode' );
794             $updsth->bind_param( 3, $row->{'import_items_id'} );
795             $updsth->execute();
796             $num_items_errored++;
797         } else {
798             # Remove the itemnumber if it exists, we want to create a new item
799             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
800             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
801
802             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
803             if( $itemnumber ) {
804                 $updsth->bind_param( 1, 'imported' );
805                 $updsth->bind_param( 2, $itemnumber );
806                 $updsth->bind_param( 3, $row->{'import_items_id'} );
807                 $updsth->execute();
808                 $updsth->finish();
809                 $num_items_added++;
810             }
811         }
812     }
813
814     return ( $num_items_added, $num_items_replaced, $num_items_errored );
815 }
816
817 =head2 BatchRevertRecords
818
819   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
820       $num_ignored) = BatchRevertRecords($batch_id);
821
822 =cut
823
824 sub BatchRevertRecords {
825     my $batch_id = shift;
826
827     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
828
829     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
830
831     my $record_type;
832     my $num_deleted = 0;
833     my $num_errors = 0;
834     my $num_reverted = 0;
835     my $num_ignored = 0;
836     my $num_items_deleted = 0;
837     # commit (i.e., save, all records in the batch)
838     SetImportBatchStatus($batch_id, 'reverting');
839     my $overlay_action = GetImportBatchOverlayAction($batch_id);
840     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
841     my $dbh = C4::Context->dbh;
842     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
843                              FROM import_records
844                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
845                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
846                              WHERE import_batch_id = ?");
847     $sth->execute($batch_id);
848     my $marc_type;
849     my $marcflavour = C4::Context->preference('marcflavour');
850     while (my $rowref = $sth->fetchrow_hashref) {
851         $record_type = $rowref->{'record_type'};
852         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
853             $num_ignored++;
854             next;
855         }
856         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
857             $marc_type = 'UNIMARCAUTH';
858         } elsif ($marcflavour eq 'UNIMARC') {
859             $marc_type = 'UNIMARC';
860         } else {
861             $marc_type = 'USMARC';
862         }
863
864         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
865
866         if ($record_result eq 'delete') {
867             my $error = undef;
868             if  ($record_type eq 'biblio') {
869                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
870                 $error = DelBiblio($rowref->{'matched_biblionumber'});
871             } else {
872                 DelAuthority({ authid => $rowref->{'matched_authid'} });
873             }
874             if (defined $error) {
875                 $num_errors++;
876             } else {
877                 $num_deleted++;
878                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
879             }
880         } elsif ($record_result eq 'restore') {
881             $num_reverted++;
882             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
883             if ($record_type eq 'biblio') {
884                 my $biblionumber = $rowref->{'matched_biblionumber'};
885                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
886
887                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
888                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
889
890                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
891                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
892             } else {
893                 my $authid = $rowref->{'matched_authid'};
894                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
895             }
896             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
897         } elsif ($record_result eq 'ignore') {
898             if ($record_type eq 'biblio') {
899                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
900             }
901             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
902         }
903         my $query;
904         if ($record_type eq 'biblio') {
905             # remove matched_biblionumber only if there is no 'imported' item left
906             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
907             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
908         } else {
909             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
910         }
911         my $sth2 = $dbh->prepare_cached($query);
912         $sth2->execute($rowref->{'import_record_id'});
913     }
914
915     $sth->finish();
916     SetImportBatchStatus($batch_id, 'reverted');
917     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
918 }
919
920 =head2 BatchRevertItems
921
922   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
923
924 =cut
925
926 sub BatchRevertItems {
927     my ($import_record_id, $biblionumber) = @_;
928
929     my $dbh = C4::Context->dbh;
930     my $num_items_deleted = 0;
931
932     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
933                                    FROM import_items
934                                    JOIN items USING (itemnumber)
935                                    WHERE import_record_id = ?");
936     $sth->bind_param(1, $import_record_id);
937     $sth->execute();
938     while (my $row = $sth->fetchrow_hashref()) {
939         my $item = Koha::Items->find($row->{itemnumber});
940         my $error = $item->safe_delete;
941         if ($error eq '1'){
942             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
943             $updsth->bind_param(1, 'reverted');
944             $updsth->bind_param(2, $row->{'import_items_id'});
945             $updsth->execute();
946             $updsth->finish();
947             $num_items_deleted++;
948         }
949         else {
950             next;
951         }
952     }
953     $sth->finish();
954     return $num_items_deleted;
955 }
956
957 =head2 CleanBatch
958
959   CleanBatch($batch_id)
960
961 Deletes all staged records from the import batch
962 and sets the status of the batch to 'cleaned'.  Note
963 that deleting a stage record does *not* affect
964 any record that has been committed to the database.
965
966 =cut
967
968 sub CleanBatch {
969     my $batch_id = shift;
970     return unless defined $batch_id;
971
972     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
973     SetImportBatchStatus($batch_id, 'cleaned');
974 }
975
976 =head2 DeleteBatch
977
978   DeleteBatch($batch_id)
979
980 Deletes the record from the database. This can only be done
981 once the batch has been cleaned.
982
983 =cut
984
985 sub DeleteBatch {
986     my $batch_id = shift;
987     return unless defined $batch_id;
988
989     my $dbh = C4::Context->dbh;
990     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
991     $sth->execute( $batch_id );
992 }
993
994 =head2 GetAllImportBatches
995
996   my $results = GetAllImportBatches();
997
998 Returns a references to an array of hash references corresponding
999 to all import_batches rows (of batch_type 'batch'), sorted in 
1000 ascending order by import_batch_id.
1001
1002 =cut
1003
1004 sub  GetAllImportBatches {
1005     my $dbh = C4::Context->dbh;
1006     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1007                                     WHERE batch_type IN ('batch', 'webservice')
1008                                     ORDER BY import_batch_id ASC");
1009
1010     my $results = [];
1011     $sth->execute();
1012     while (my $row = $sth->fetchrow_hashref) {
1013         push @$results, $row;
1014     }
1015     $sth->finish();
1016     return $results;
1017 }
1018
1019 =head2 GetStagedWebserviceBatches
1020
1021   my $batch_ids = GetStagedWebserviceBatches();
1022
1023 Returns a references to an array of batch id's
1024 of batch_type 'webservice' that are not imported
1025
1026 =cut
1027
1028 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1029 SELECT import_batch_id FROM import_batches
1030 WHERE batch_type = 'webservice'
1031 AND import_status = 'staged'
1032 EOQ
1033 sub  GetStagedWebserviceBatches {
1034     my $dbh = C4::Context->dbh;
1035     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1036 }
1037
1038 =head2 GetImportBatchRangeDesc
1039
1040   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1041
1042 Returns a reference to an array of hash references corresponding to
1043 import_batches rows (sorted in descending order by import_batch_id)
1044 start at the given offset.
1045
1046 =cut
1047
1048 sub GetImportBatchRangeDesc {
1049     my ($offset, $results_per_group) = @_;
1050
1051     my $dbh = C4::Context->dbh;
1052     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1053                                     LEFT JOIN import_batch_profiles p
1054                                     ON b.profile_id = p.id
1055                                     WHERE b.batch_type IN ('batch', 'webservice')
1056                                     ORDER BY b.import_batch_id DESC";
1057     my @params;
1058     if ($results_per_group){
1059         $query .= " LIMIT ?";
1060         push(@params, $results_per_group);
1061     }
1062     if ($offset){
1063         $query .= " OFFSET ?";
1064         push(@params, $offset);
1065     }
1066     my $sth = $dbh->prepare_cached($query);
1067     $sth->execute(@params);
1068     my $results = $sth->fetchall_arrayref({});
1069     $sth->finish();
1070     return $results;
1071 }
1072
1073 =head2 GetItemNumbersFromImportBatch
1074
1075   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1076
1077 =cut
1078
1079 sub GetItemNumbersFromImportBatch {
1080     my ($batch_id) = @_;
1081     my $dbh = C4::Context->dbh;
1082     my $sql = q|
1083 SELECT itemnumber FROM import_items
1084 INNER JOIN items USING (itemnumber)
1085 INNER JOIN import_records USING (import_record_id)
1086 WHERE import_batch_id = ?|;
1087     my  $sth = $dbh->prepare( $sql );
1088     $sth->execute($batch_id);
1089     my @items ;
1090     while ( my ($itm) = $sth->fetchrow_array ) {
1091         push @items, $itm;
1092     }
1093     return @items;
1094 }
1095
1096 =head2 GetNumberOfImportBatches
1097
1098   my $count = GetNumberOfImportBatches();
1099
1100 =cut
1101
1102 sub GetNumberOfNonZ3950ImportBatches {
1103     my $dbh = C4::Context->dbh;
1104     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1105     $sth->execute();
1106     my ($count) = $sth->fetchrow_array();
1107     $sth->finish();
1108     return $count;
1109 }
1110
1111 =head2 GetImportBiblios
1112
1113   my $results = GetImportBiblios($importid);
1114
1115 =cut
1116
1117 sub GetImportBiblios {
1118     my ($import_record_id) = @_;
1119
1120     my $dbh = C4::Context->dbh;
1121     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1122     return $dbh->selectall_arrayref(
1123         $query,
1124         { Slice => {} },
1125         $import_record_id
1126     );
1127
1128 }
1129
1130 =head2 GetImportRecordsRange
1131
1132   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1133
1134 Returns a reference to an array of hash references corresponding to
1135 import_biblios/import_auths/import_records rows for a given batch
1136 starting at the given offset.
1137
1138 =cut
1139
1140 sub GetImportRecordsRange {
1141     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1142
1143     my $dbh = C4::Context->dbh;
1144
1145     my $order_by = $parameters->{order_by} || 'import_record_id';
1146     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1147
1148     my $order_by_direction =
1149       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1150
1151     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1152
1153     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1154                                            record_sequence, status, overlay_status,
1155                                            matched_biblionumber, matched_authid, record_type
1156                                     FROM   import_records
1157                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1158                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1159                                     WHERE  import_batch_id = ?";
1160     my @params;
1161     push(@params, $batch_id);
1162     if ($status) {
1163         $query .= " AND status=?";
1164         push(@params,$status);
1165     }
1166
1167     $query.=" ORDER BY $order_by $order_by_direction";
1168
1169     if($results_per_group){
1170         $query .= " LIMIT ?";
1171         push(@params, $results_per_group);
1172     }
1173     if($offset){
1174         $query .= " OFFSET ?";
1175         push(@params, $offset);
1176     }
1177     my $sth = $dbh->prepare_cached($query);
1178     $sth->execute(@params);
1179     my $results = $sth->fetchall_arrayref({});
1180     $sth->finish();
1181     return $results;
1182
1183 }
1184
1185 =head2 GetBestRecordMatch
1186
1187   my $record_id = GetBestRecordMatch($import_record_id);
1188
1189 =cut
1190
1191 sub GetBestRecordMatch {
1192     my ($import_record_id) = @_;
1193
1194     my $dbh = C4::Context->dbh;
1195     my $sth = $dbh->prepare("SELECT candidate_match_id
1196                              FROM   import_record_matches
1197                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1198                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1199                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1200                              WHERE  import_record_matches.import_record_id = ? AND
1201                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1202                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1203                              ORDER BY score DESC, candidate_match_id DESC");
1204     $sth->execute($import_record_id);
1205     my ($record_id) = $sth->fetchrow_array();
1206     $sth->finish();
1207     return $record_id;
1208 }
1209
1210 =head2 GetImportBatchStatus
1211
1212   my $status = GetImportBatchStatus($batch_id);
1213
1214 =cut
1215
1216 sub GetImportBatchStatus {
1217     my ($batch_id) = @_;
1218
1219     my $dbh = C4::Context->dbh;
1220     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1221     $sth->execute($batch_id);
1222     my ($status) = $sth->fetchrow_array();
1223     $sth->finish();
1224     return $status;
1225
1226 }
1227
1228 =head2 SetImportBatchStatus
1229
1230   SetImportBatchStatus($batch_id, $new_status);
1231
1232 =cut
1233
1234 sub SetImportBatchStatus {
1235     my ($batch_id, $new_status) = @_;
1236
1237     my $dbh = C4::Context->dbh;
1238     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1239     $sth->execute($new_status, $batch_id);
1240     $sth->finish();
1241
1242 }
1243
1244 =head2 SetMatchedBiblionumber
1245
1246   SetMatchedBiblionumber($import_record_id, $biblionumber);
1247
1248 =cut
1249
1250 sub SetMatchedBiblionumber {
1251     my ($import_record_id, $biblionumber) = @_;
1252
1253     my $dbh = C4::Context->dbh;
1254     $dbh->do(
1255         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1256         undef, $biblionumber, $import_record_id
1257     );
1258 }
1259
1260 =head2 GetImportBatchOverlayAction
1261
1262   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1263
1264 =cut
1265
1266 sub GetImportBatchOverlayAction {
1267     my ($batch_id) = @_;
1268
1269     my $dbh = C4::Context->dbh;
1270     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1271     $sth->execute($batch_id);
1272     my ($overlay_action) = $sth->fetchrow_array();
1273     $sth->finish();
1274     return $overlay_action;
1275
1276 }
1277
1278
1279 =head2 SetImportBatchOverlayAction
1280
1281   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1282
1283 =cut
1284
1285 sub SetImportBatchOverlayAction {
1286     my ($batch_id, $new_overlay_action) = @_;
1287
1288     my $dbh = C4::Context->dbh;
1289     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1290     $sth->execute($new_overlay_action, $batch_id);
1291     $sth->finish();
1292
1293 }
1294
1295 =head2 GetImportBatchNoMatchAction
1296
1297   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1298
1299 =cut
1300
1301 sub GetImportBatchNoMatchAction {
1302     my ($batch_id) = @_;
1303
1304     my $dbh = C4::Context->dbh;
1305     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1306     $sth->execute($batch_id);
1307     my ($nomatch_action) = $sth->fetchrow_array();
1308     $sth->finish();
1309     return $nomatch_action;
1310
1311 }
1312
1313
1314 =head2 SetImportBatchNoMatchAction
1315
1316   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1317
1318 =cut
1319
1320 sub SetImportBatchNoMatchAction {
1321     my ($batch_id, $new_nomatch_action) = @_;
1322
1323     my $dbh = C4::Context->dbh;
1324     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1325     $sth->execute($new_nomatch_action, $batch_id);
1326     $sth->finish();
1327
1328 }
1329
1330 =head2 GetImportBatchItemAction
1331
1332   my $item_action = GetImportBatchItemAction($batch_id);
1333
1334 =cut
1335
1336 sub GetImportBatchItemAction {
1337     my ($batch_id) = @_;
1338
1339     my $dbh = C4::Context->dbh;
1340     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1341     $sth->execute($batch_id);
1342     my ($item_action) = $sth->fetchrow_array();
1343     $sth->finish();
1344     return $item_action;
1345
1346 }
1347
1348
1349 =head2 SetImportBatchItemAction
1350
1351   SetImportBatchItemAction($batch_id, $new_item_action);
1352
1353 =cut
1354
1355 sub SetImportBatchItemAction {
1356     my ($batch_id, $new_item_action) = @_;
1357
1358     my $dbh = C4::Context->dbh;
1359     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1360     $sth->execute($new_item_action, $batch_id);
1361     $sth->finish();
1362
1363 }
1364
1365 =head2 GetImportBatchMatcher
1366
1367   my $matcher_id = GetImportBatchMatcher($batch_id);
1368
1369 =cut
1370
1371 sub GetImportBatchMatcher {
1372     my ($batch_id) = @_;
1373
1374     my $dbh = C4::Context->dbh;
1375     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1376     $sth->execute($batch_id);
1377     my ($matcher_id) = $sth->fetchrow_array();
1378     $sth->finish();
1379     return $matcher_id;
1380
1381 }
1382
1383
1384 =head2 SetImportBatchMatcher
1385
1386   SetImportBatchMatcher($batch_id, $new_matcher_id);
1387
1388 =cut
1389
1390 sub SetImportBatchMatcher {
1391     my ($batch_id, $new_matcher_id) = @_;
1392
1393     my $dbh = C4::Context->dbh;
1394     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1395     $sth->execute($new_matcher_id, $batch_id);
1396     $sth->finish();
1397
1398 }
1399
1400 =head2 GetImportRecordOverlayStatus
1401
1402   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1403
1404 =cut
1405
1406 sub GetImportRecordOverlayStatus {
1407     my ($import_record_id) = @_;
1408
1409     my $dbh = C4::Context->dbh;
1410     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1411     $sth->execute($import_record_id);
1412     my ($overlay_status) = $sth->fetchrow_array();
1413     $sth->finish();
1414     return $overlay_status;
1415
1416 }
1417
1418
1419 =head2 SetImportRecordOverlayStatus
1420
1421   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1422
1423 =cut
1424
1425 sub SetImportRecordOverlayStatus {
1426     my ($import_record_id, $new_overlay_status) = @_;
1427
1428     my $dbh = C4::Context->dbh;
1429     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1430     $sth->execute($new_overlay_status, $import_record_id);
1431     $sth->finish();
1432
1433 }
1434
1435 =head2 GetImportRecordStatus
1436
1437   my $status = GetImportRecordStatus($import_record_id);
1438
1439 =cut
1440
1441 sub GetImportRecordStatus {
1442     my ($import_record_id) = @_;
1443
1444     my $dbh = C4::Context->dbh;
1445     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1446     $sth->execute($import_record_id);
1447     my ($status) = $sth->fetchrow_array();
1448     $sth->finish();
1449     return $status;
1450
1451 }
1452
1453
1454 =head2 SetImportRecordStatus
1455
1456   SetImportRecordStatus($import_record_id, $new_status);
1457
1458 =cut
1459
1460 sub SetImportRecordStatus {
1461     my ($import_record_id, $new_status) = @_;
1462
1463     my $dbh = C4::Context->dbh;
1464     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1465     $sth->execute($new_status, $import_record_id);
1466     $sth->finish();
1467
1468 }
1469
1470 =head2 GetImportRecordMatches
1471
1472   my $results = GetImportRecordMatches($import_record_id, $best_only);
1473
1474 =cut
1475
1476 sub GetImportRecordMatches {
1477     my $import_record_id = shift;
1478     my $best_only = @_ ? shift : 0;
1479
1480     my $dbh = C4::Context->dbh;
1481     # FIXME currently biblio only
1482     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1483                                     candidate_match_id, score, record_type
1484                                     FROM import_records
1485                                     JOIN import_record_matches USING (import_record_id)
1486                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1487                                     WHERE import_record_id = ?
1488                                     ORDER BY score DESC, biblionumber DESC");
1489     $sth->bind_param(1, $import_record_id);
1490     my $results = [];
1491     $sth->execute();
1492     while (my $row = $sth->fetchrow_hashref) {
1493         if ($row->{'record_type'} eq 'auth') {
1494             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1495         }
1496         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1497         push @$results, $row;
1498         last if $best_only;
1499     }
1500     $sth->finish();
1501
1502     return $results;
1503     
1504 }
1505
1506 =head2 SetImportRecordMatches
1507
1508   SetImportRecordMatches($import_record_id, @matches);
1509
1510 =cut
1511
1512 sub SetImportRecordMatches {
1513     my $import_record_id = shift;
1514     my @matches = @_;
1515
1516     my $dbh = C4::Context->dbh;
1517     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1518     $delsth->execute($import_record_id);
1519     $delsth->finish();
1520
1521     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1522                                     VALUES (?, ?, ?)");
1523     foreach my $match (@matches) {
1524         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1525     }
1526 }
1527
1528 =head2 RecordsFromISO2709File
1529
1530     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1531
1532 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1533
1534 @PARAM1, String, absolute path to the ISO2709 file.
1535 @PARAM2, String, see stage_file.pl
1536 @PARAM3, String, should be utf8
1537
1538 Returns two array refs.
1539
1540 =cut
1541
1542 sub RecordsFromISO2709File {
1543     my ($input_file, $record_type, $encoding) = @_;
1544     my @errors;
1545
1546     my $marc_type = C4::Context->preference('marcflavour');
1547     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1548
1549     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1550     my @marc_records;
1551     $/ = "\035";
1552     while (<$fh>) {
1553         s/^\s+//;
1554         s/\s+$//;
1555         next unless $_; # skip if record has only whitespace, as might occur
1556                         # if file includes newlines between each MARC record
1557         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1558         push @marc_records, $marc_record;
1559         if ($charset_guessed ne $encoding) {
1560             push @errors,
1561                 "Unexpected charset $charset_guessed, expecting $encoding";
1562         }
1563     }
1564     close $fh;
1565     return ( \@errors, \@marc_records );
1566 }
1567
1568 =head2 RecordsFromMARCXMLFile
1569
1570     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1571
1572 Creates MARC::Record-objects out of the given MARCXML-file.
1573
1574 @PARAM1, String, absolute path to the ISO2709 file.
1575 @PARAM2, String, should be utf8
1576
1577 Returns two array refs.
1578
1579 =cut
1580
1581 sub RecordsFromMARCXMLFile {
1582     my ( $filename, $encoding ) = @_;
1583     my $batch = MARC::File::XML->in( $filename );
1584     my ( @marcRecords, @errors, $record );
1585     do {
1586         eval { $record = $batch->next( $encoding ); };
1587         if ($@) {
1588             push @errors, $@;
1589         }
1590         push @marcRecords, $record if $record;
1591     } while( $record );
1592     return (\@errors, \@marcRecords);
1593 }
1594
1595 =head2 RecordsFromMarcPlugin
1596
1597     Converts text of input_file into array of MARC records with to_marc plugin
1598
1599 =cut
1600
1601 sub RecordsFromMarcPlugin {
1602     my ($input_file, $plugin_class, $encoding) = @_;
1603     my ( $text, @return );
1604     return \@return if !$input_file || !$plugin_class;
1605
1606     # Read input file
1607     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1608     $/ = "\035";
1609     while (<$fh>) {
1610         s/^\s+//;
1611         s/\s+$//;
1612         next unless $_;
1613         $text .= $_;
1614     }
1615     close $fh;
1616
1617     # Convert to large MARC blob with plugin
1618     $text = Koha::Plugins::Handler->run({
1619         class  => $plugin_class,
1620         method => 'to_marc',
1621         params => { data => $text },
1622     }) if $text;
1623
1624     # Convert to array of MARC records
1625     if( $text ) {
1626         my $marc_type = C4::Context->preference('marcflavour');
1627         foreach my $blob ( split(/\x1D/, $text) ) {
1628             next if $blob =~ /^\s*$/;
1629             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1630             push @return, $marcrecord;
1631         }
1632     }
1633     return \@return;
1634 }
1635
1636 # internal functions
1637
1638 sub _create_import_record {
1639     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1640
1641     my $dbh = C4::Context->dbh;
1642     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1643                                                          record_type, encoding)
1644                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1645     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1646                   $record_type, $encoding);
1647     my $import_record_id = $dbh->{'mysql_insertid'};
1648     $sth->finish();
1649     return $import_record_id;
1650 }
1651
1652 sub _update_import_record_marc {
1653     my ($import_record_id, $marc_record, $marc_type) = @_;
1654
1655     my $dbh = C4::Context->dbh;
1656     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1657                              WHERE  import_record_id = ?");
1658     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1659     $sth->finish();
1660 }
1661
1662 sub _add_auth_fields {
1663     my ($import_record_id, $marc_record) = @_;
1664
1665     my $controlnumber;
1666     if ($marc_record->field('001')) {
1667         $controlnumber = $marc_record->field('001')->data();
1668     }
1669     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1670     my $dbh = C4::Context->dbh;
1671     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1672     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1673     $sth->finish();
1674 }
1675
1676 sub _add_biblio_fields {
1677     my ($import_record_id, $marc_record) = @_;
1678
1679     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1680     my $dbh = C4::Context->dbh;
1681     # FIXME no controlnumber, originalsource
1682     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1683     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1684     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1685     $sth->finish();
1686                 
1687 }
1688
1689 sub _update_biblio_fields {
1690     my ($import_record_id, $marc_record) = @_;
1691
1692     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1693     my $dbh = C4::Context->dbh;
1694     # FIXME no controlnumber, originalsource
1695     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1696     $isbn =~ s/\(.*$//;
1697     $isbn =~ tr/ -_//;
1698     $isbn = uc $isbn;
1699     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1700                              WHERE  import_record_id = ?");
1701     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1702     $sth->finish();
1703 }
1704
1705 sub _parse_biblio_fields {
1706     my ($marc_record) = @_;
1707
1708     my $dbh = C4::Context->dbh;
1709     my $bibliofields = TransformMarcToKoha($marc_record, '');
1710     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1711
1712 }
1713
1714 sub _update_batch_record_counts {
1715     my ($batch_id) = @_;
1716
1717     my $dbh = C4::Context->dbh;
1718     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1719                                         num_records = (
1720                                             SELECT COUNT(*)
1721                                             FROM import_records
1722                                             WHERE import_batch_id = import_batches.import_batch_id),
1723                                         num_items = (
1724                                             SELECT COUNT(*)
1725                                             FROM import_records
1726                                             JOIN import_items USING (import_record_id)
1727                                             WHERE import_batch_id = import_batches.import_batch_id
1728                                             AND record_type = 'biblio')
1729                                     WHERE import_batch_id = ?");
1730     $sth->bind_param(1, $batch_id);
1731     $sth->execute();
1732     $sth->finish();
1733 }
1734
1735 sub _get_commit_action {
1736     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1737     
1738     if ($record_type eq 'biblio') {
1739         my ($bib_result, $bib_match, $item_result);
1740
1741         if ($overlay_status ne 'no_match') {
1742             $bib_match = GetBestRecordMatch($import_record_id);
1743             if ($overlay_action eq 'replace') {
1744                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1745             } elsif ($overlay_action eq 'create_new') {
1746                 $bib_result  = 'create_new';
1747             } elsif ($overlay_action eq 'ignore') {
1748                 $bib_result  = 'ignore';
1749             }
1750          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1751                 $item_result = 'create_new';
1752        }
1753       elsif($item_action eq 'replace'){
1754           $item_result = 'replace';
1755           }
1756       else {
1757              $item_result = 'ignore';
1758            }
1759         } else {
1760             $bib_result = $nomatch_action;
1761             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1762         }
1763         return ($bib_result, $item_result, $bib_match);
1764     } else { # must be auths
1765         my ($auth_result, $auth_match);
1766
1767         if ($overlay_status ne 'no_match') {
1768             $auth_match = GetBestRecordMatch($import_record_id);
1769             if ($overlay_action eq 'replace') {
1770                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1771             } elsif ($overlay_action eq 'create_new') {
1772                 $auth_result  = 'create_new';
1773             } elsif ($overlay_action eq 'ignore') {
1774                 $auth_result  = 'ignore';
1775             }
1776         } else {
1777             $auth_result = $nomatch_action;
1778         }
1779
1780         return ($auth_result, undef, $auth_match);
1781
1782     }
1783 }
1784
1785 sub _get_revert_action {
1786     my ($overlay_action, $overlay_status, $status) = @_;
1787
1788     my $bib_result;
1789
1790     if ($status eq 'ignored') {
1791         $bib_result = 'ignore';
1792     } else {
1793         if ($overlay_action eq 'create_new') {
1794             $bib_result = 'delete';
1795         } else {
1796             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1797         }
1798     }
1799     return $bib_result;
1800 }
1801
1802 1;
1803 __END__
1804
1805 =head1 AUTHOR
1806
1807 Koha Development Team <http://koha-community.org/>
1808
1809 Galen Charlton <galen.charlton@liblime.com>
1810
1811 =cut