Bug 22785: (follow-up) Don't sort by chosen and fix selection of matches
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57       ModBiblioInBatch
58
59       BatchStageMarcRecords
60       BatchFindDuplicates
61       BatchCommitRecords
62       BatchRevertRecords
63       CleanBatch
64       DeleteBatch
65
66       GetAllImportBatches
67       GetStagedWebserviceBatches
68       GetImportBatchRangeDesc
69       GetNumberOfNonZ3950ImportBatches
70       GetImportBiblios
71       GetImportRecordsRange
72       GetItemNumbersFromImportBatch
73
74       GetImportBatchStatus
75       SetImportBatchStatus
76       GetImportBatchOverlayAction
77       SetImportBatchOverlayAction
78       GetImportBatchNoMatchAction
79       SetImportBatchNoMatchAction
80       GetImportBatchItemAction
81       SetImportBatchItemAction
82       GetImportBatchMatcher
83       SetImportBatchMatcher
84       GetImportRecordOverlayStatus
85       SetImportRecordOverlayStatus
86       GetImportRecordStatus
87       SetImportRecordStatus
88       SetMatchedBiblionumber
89       GetImportRecordMatches
90       SetImportRecordMatches
91
92       RecordsFromMARCXMLFile
93       RecordsFromISO2709File
94       RecordsFromMarcPlugin
95     );
96 }
97
98 =head1 NAME
99
100 C4::ImportBatch - manage batches of imported MARC records
101
102 =head1 SYNOPSIS
103
104 use C4::ImportBatch;
105
106 =head1 FUNCTIONS
107
108 =head2 GetZ3950BatchId
109
110   my $batchid = GetZ3950BatchId($z3950server);
111
112 Retrieves the ID of the import batch for the Z39.50
113 reservoir for the given target.  If necessary,
114 creates the import batch.
115
116 =cut
117
118 sub GetZ3950BatchId {
119     my ($z3950server) = @_;
120
121     my $dbh = C4::Context->dbh;
122     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
123                              WHERE  batch_type = 'z3950'
124                              AND    file_name = ?");
125     $sth->execute($z3950server);
126     my $rowref = $sth->fetchrow_arrayref();
127     $sth->finish();
128     if (defined $rowref) {
129         return $rowref->[0];
130     } else {
131         my $batch_id = AddImportBatch( {
132                 overlay_action => 'create_new',
133                 import_status => 'staged',
134                 batch_type => 'z3950',
135                 file_name => $z3950server,
136             } );
137         return $batch_id;
138     }
139     
140 }
141
142 =head2 GetWebserviceBatchId
143
144   my $batchid = GetWebserviceBatchId();
145
146 Retrieves the ID of the import batch for webservice.
147 If necessary, creates the import batch.
148
149 =cut
150
151 my $WEBSERVICE_BASE_QRY = <<EOQ;
152 SELECT import_batch_id FROM import_batches
153 WHERE  batch_type = 'webservice'
154 AND    import_status = 'staged'
155 EOQ
156 sub GetWebserviceBatchId {
157     my ($params) = @_;
158
159     my $dbh = C4::Context->dbh;
160     my $sql = $WEBSERVICE_BASE_QRY;
161     my @args;
162     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
163         if (my $val = $params->{$field}) {
164             $sql .= " AND $field = ?";
165             push @args, $val;
166         }
167     }
168     my $id = $dbh->selectrow_array($sql, undef, @args);
169     return $id if $id;
170
171     $params->{batch_type} = 'webservice';
172     $params->{import_status} = 'staged';
173     return AddImportBatch($params);
174 }
175
176 =head2 GetImportRecordMarc
177
178   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
179
180 =cut
181
182 sub GetImportRecordMarc {
183     my ($import_record_id) = @_;
184
185     my $dbh = C4::Context->dbh;
186     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
187         SELECT marc, encoding
188         FROM import_records
189         WHERE import_record_id = ?
190     |, undef, $import_record_id );
191
192     return $marc, $encoding;
193 }
194
195 sub GetRecordFromImportBiblio {
196     my ( $import_record_id, $embed_items ) = @_;
197
198     my ($marc) = GetImportRecordMarc($import_record_id);
199     my $record = MARC::Record->new_from_usmarc($marc);
200
201     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
202
203     return $record;
204 }
205
206 sub EmbedItemsInImportBiblio {
207     my ( $record, $import_record_id ) = @_;
208     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
209     my $dbh = C4::Context->dbh;
210     my $import_items = $dbh->selectall_arrayref(q|
211         SELECT import_items.marcxml
212         FROM import_items
213         WHERE import_record_id = ?
214     |, { Slice => {} }, $import_record_id );
215     my @item_fields;
216     for my $import_item ( @$import_items ) {
217         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
218         push @item_fields, $item_marc->field($itemtag);
219     }
220     $record->append_fields(@item_fields);
221     return $record;
222 }
223
224 =head2 GetImportRecordMarcXML
225
226   my $marcxml = GetImportRecordMarcXML($import_record_id);
227
228 =cut
229
230 sub GetImportRecordMarcXML {
231     my ($import_record_id) = @_;
232
233     my $dbh = C4::Context->dbh;
234     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
235     $sth->execute($import_record_id);
236     my ($marcxml) = $sth->fetchrow();
237     $sth->finish();
238     return $marcxml;
239
240 }
241
242 =head2 AddImportBatch
243
244   my $batch_id = AddImportBatch($params_hash);
245
246 =cut
247
248 sub AddImportBatch {
249     my ($params) = @_;
250
251     my (@fields, @vals);
252     foreach (qw( matcher_id template_id branchcode
253                  overlay_action nomatch_action item_action
254                  import_status batch_type file_name comments record_type )) {
255         if (exists $params->{$_}) {
256             push @fields, $_;
257             push @vals, $params->{$_};
258         }
259     }
260     my $dbh = C4::Context->dbh;
261     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
262                                   VALUES (".join( ',', map '?', @fields).")",
263              undef,
264              @vals);
265     return $dbh->{'mysql_insertid'};
266 }
267
268 =head2 GetImportBatch 
269
270   my $row = GetImportBatch($batch_id);
271
272 Retrieve a hashref of an import_batches row.
273
274 =cut
275
276 sub GetImportBatch {
277     my ($batch_id) = @_;
278
279     my $dbh = C4::Context->dbh;
280     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
281     $sth->bind_param(1, $batch_id);
282     $sth->execute();
283     my $result = $sth->fetchrow_hashref;
284     $sth->finish();
285     return $result;
286
287 }
288
289 =head2 AddBiblioToBatch 
290
291   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
292                 $marc_record, $encoding, $update_counts);
293
294 =cut
295
296 sub AddBiblioToBatch {
297     my $batch_id = shift;
298     my $record_sequence = shift;
299     my $marc_record = shift;
300     my $encoding = shift;
301     my $update_counts = @_ ? shift : 1;
302
303     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
304     _add_biblio_fields($import_record_id, $marc_record);
305     _update_batch_record_counts($batch_id) if $update_counts;
306     return $import_record_id;
307 }
308
309 =head2 ModBiblioInBatch
310
311   ModBiblioInBatch($import_record_id, $marc_record);
312
313 =cut
314
315 sub ModBiblioInBatch {
316     my ($import_record_id, $marc_record) = @_;
317
318     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
319     _update_biblio_fields($import_record_id, $marc_record);
320
321 }
322
323 =head2 AddAuthToBatch
324
325   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
326                 $marc_record, $encoding, $update_counts, [$marc_type]);
327
328 =cut
329
330 sub AddAuthToBatch {
331     my $batch_id = shift;
332     my $record_sequence = shift;
333     my $marc_record = shift;
334     my $encoding = shift;
335     my $update_counts = @_ ? shift : 1;
336     my $marc_type = shift || C4::Context->preference('marcflavour');
337
338     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
339
340     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
341     _add_auth_fields($import_record_id, $marc_record);
342     _update_batch_record_counts($batch_id) if $update_counts;
343     return $import_record_id;
344 }
345
346 =head2 ModAuthInBatch
347
348   ModAuthInBatch($import_record_id, $marc_record);
349
350 =cut
351
352 sub ModAuthInBatch {
353     my ($import_record_id, $marc_record) = @_;
354
355     my $marcflavour = C4::Context->preference('marcflavour');
356     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
357
358 }
359
360 =head2 BatchStageMarcRecords
361
362 ( $batch_id, $num_records, $num_items, @invalid_records ) =
363   BatchStageMarcRecords(
364     $record_type,                $encoding,
365     $marc_records,               $file_name,
366     $marc_modification_template, $comments,
367     $branch_code,                $parse_items,
368     $leave_as_staging,           $progress_interval,
369     $progress_callback
370   );
371
372 =cut
373
374 sub BatchStageMarcRecords {
375     my $record_type = shift;
376     my $encoding = shift;
377     my $marc_records = shift;
378     my $file_name = shift;
379     my $marc_modification_template = shift;
380     my $comments = shift;
381     my $branch_code = shift;
382     my $parse_items = shift;
383     my $leave_as_staging = shift;
384
385     # optional callback to monitor status 
386     # of job
387     my $progress_interval = 0;
388     my $progress_callback = undef;
389     if ($#_ == 1) {
390         $progress_interval = shift;
391         $progress_callback = shift;
392         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
393         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
394     } 
395     
396     my $batch_id = AddImportBatch( {
397             overlay_action => 'create_new',
398             import_status => 'staging',
399             batch_type => 'batch',
400             file_name => $file_name,
401             comments => $comments,
402             record_type => $record_type,
403         } );
404     if ($parse_items) {
405         SetImportBatchItemAction($batch_id, 'always_add');
406     } else {
407         SetImportBatchItemAction($batch_id, 'ignore');
408     }
409
410
411     my $marc_type = C4::Context->preference('marcflavour');
412     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
413     my @invalid_records = ();
414     my $num_valid = 0;
415     my $num_items = 0;
416     # FIXME - for now, we're dealing only with bibs
417     my $rec_num = 0;
418     foreach my $marc_record (@$marc_records) {
419         $rec_num++;
420         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
421             &$progress_callback($rec_num);
422         }
423
424         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
425
426         my $import_record_id;
427         if (scalar($marc_record->fields()) == 0) {
428             push @invalid_records, $marc_record;
429         } else {
430
431             # Normalize the record so it doesn't have separated diacritics
432             SetUTF8Flag($marc_record);
433
434             $num_valid++;
435             if ($record_type eq 'biblio') {
436                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
437                 if ($parse_items) {
438                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
439                     $num_items += scalar(@import_items_ids);
440                 }
441             } elsif ($record_type eq 'auth') {
442                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
443             }
444         }
445     }
446     unless ($leave_as_staging) {
447         SetImportBatchStatus($batch_id, 'staged');
448     }
449     # FIXME branch_code, number of bibs, number of items
450     _update_batch_record_counts($batch_id);
451     return ($batch_id, $num_valid, $num_items, @invalid_records);
452 }
453
454 =head2 AddItemsToImportBiblio
455
456   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
457                 $import_record_id, $marc_record, $update_counts);
458
459 =cut
460
461 sub AddItemsToImportBiblio {
462     my $batch_id = shift;
463     my $import_record_id = shift;
464     my $marc_record = shift;
465     my $update_counts = @_ ? shift : 0;
466
467     my @import_items_ids = ();
468    
469     my $dbh = C4::Context->dbh; 
470     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
471     foreach my $item_field ($marc_record->field($item_tag)) {
472         my $item_marc = MARC::Record->new();
473         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
474         $item_marc->append_fields($item_field);
475         $marc_record->delete_field($item_field);
476         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
477                                         VALUES (?, ?, ?)");
478         $sth->bind_param(1, $import_record_id);
479         $sth->bind_param(2, 'staged');
480         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
481         $sth->execute();
482         push @import_items_ids, $dbh->{'mysql_insertid'};
483         $sth->finish();
484     }
485
486     if ($#import_items_ids > -1) {
487         _update_batch_record_counts($batch_id) if $update_counts;
488         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
489     }
490     return @import_items_ids;
491 }
492
493 =head2 BatchFindDuplicates
494
495   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
496              $max_matches, $progress_interval, $progress_callback);
497
498 Goes through the records loaded in the batch and attempts to 
499 find duplicates for each one.  Sets the matching status 
500 of each record to "no_match" or "auto_match" as appropriate.
501
502 The $max_matches parameter is optional; if it is not supplied,
503 it defaults to 10.
504
505 The $progress_interval and $progress_callback parameters are 
506 optional; if both are supplied, the sub referred to by
507 $progress_callback will be invoked every $progress_interval
508 records using the number of records processed as the 
509 singular argument.
510
511 =cut
512
513 sub BatchFindDuplicates {
514     my $batch_id = shift;
515     my $matcher = shift;
516     my $max_matches = @_ ? shift : 10;
517
518     # optional callback to monitor status 
519     # of job
520     my $progress_interval = 0;
521     my $progress_callback = undef;
522     if ($#_ == 1) {
523         $progress_interval = shift;
524         $progress_callback = shift;
525         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
526         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
527     }
528
529     my $dbh = C4::Context->dbh;
530
531     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
532                              FROM import_records
533                              WHERE import_batch_id = ?");
534     $sth->execute($batch_id);
535     my $num_with_matches = 0;
536     my $rec_num = 0;
537     while (my $rowref = $sth->fetchrow_hashref) {
538         $rec_num++;
539         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
540             &$progress_callback($rec_num);
541         }
542         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
543         my @matches = ();
544         if (defined $matcher) {
545             @matches = $matcher->get_matches($marc_record, $max_matches);
546         }
547         if (scalar(@matches) > 0) {
548             $num_with_matches++;
549             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
550             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
551         } else {
552             SetImportRecordMatches($rowref->{'import_record_id'}, ());
553             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
554         }
555     }
556     $sth->finish();
557     return $num_with_matches;
558 }
559
560 =head2 BatchCommitRecords
561
562   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
563         BatchCommitRecords($batch_id, $framework,
564         $progress_interval, $progress_callback);
565
566 =cut
567
568 sub BatchCommitRecords {
569     my $batch_id = shift;
570     my $framework = shift;
571
572     # optional callback to monitor status 
573     # of job
574     my $progress_interval = 0;
575     my $progress_callback = undef;
576     if ($#_ == 1) {
577         $progress_interval = shift;
578         $progress_callback = shift;
579         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
580         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
581     }
582
583     my $record_type;
584     my $num_added = 0;
585     my $num_updated = 0;
586     my $num_items_added = 0;
587     my $num_items_replaced = 0;
588     my $num_items_errored = 0;
589     my $num_ignored = 0;
590     # commit (i.e., save, all records in the batch)
591     SetImportBatchStatus($batch_id, 'importing');
592     my $overlay_action = GetImportBatchOverlayAction($batch_id);
593     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
594     my $item_action = GetImportBatchItemAction($batch_id);
595     my $item_tag;
596     my $item_subfield;
597     my $dbh = C4::Context->dbh;
598     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
599                              FROM import_records
600                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
601                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
602                              WHERE import_batch_id = ?");
603     $sth->execute($batch_id);
604     my $marcflavour = C4::Context->preference('marcflavour');
605
606     my $userenv = C4::Context->userenv;
607     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
608
609     my $rec_num = 0;
610     while (my $rowref = $sth->fetchrow_hashref) {
611         $record_type = $rowref->{'record_type'};
612         $rec_num++;
613         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
614             &$progress_callback($rec_num);
615         }
616         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
617             $num_ignored++;
618             next;
619         }
620
621         my $marc_type;
622         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
623             $marc_type = 'UNIMARCAUTH';
624         } elsif ($marcflavour eq 'UNIMARC') {
625             $marc_type = 'UNIMARC';
626         } else {
627             $marc_type = 'USMARC';
628         }
629         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
630
631         if ($record_type eq 'biblio') {
632             # remove any item tags - rely on BatchCommitItems
633             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
634             foreach my $item_field ($marc_record->field($item_tag)) {
635                 $marc_record->delete_field($item_field);
636             }
637         }
638
639         my ($record_result, $item_result, $record_match) =
640             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
641                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
642
643         my $recordid;
644         my $query;
645         if ($record_result eq 'create_new') {
646             $num_added++;
647             if ($record_type eq 'biblio') {
648                 my $biblioitemnumber;
649                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
650                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
651                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
652                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
653                     $num_items_added += $bib_items_added;
654                     $num_items_replaced += $bib_items_replaced;
655                     $num_items_errored += $bib_items_errored;
656                 }
657             } else {
658                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
659                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
660             }
661             my $sth = $dbh->prepare_cached($query);
662             $sth->execute($recordid, $rowref->{'import_record_id'});
663             $sth->finish();
664             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
665         } elsif ($record_result eq 'replace') {
666             $num_updated++;
667             $recordid = $record_match;
668             my $oldxml;
669             if ($record_type eq 'biblio') {
670                 my $oldbiblio = Koha::Biblios->find( $recordid );
671                 $oldxml = GetXmlBiblio($recordid);
672
673                 # remove item fields so that they don't get
674                 # added again if record is reverted
675                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
676                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
677                 foreach my $item_field ($old_marc->field($item_tag)) {
678                     $old_marc->delete_field($item_field);
679                 }
680                 $oldxml = $old_marc->as_xml($marc_type);
681
682                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
683                     overlay_context => {
684                         source => 'batchimport',
685                         categorycode => $logged_in_patron->categorycode,
686                         userid => $logged_in_patron->userid
687                     },
688                 });
689                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
690
691                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
692                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
693                     $num_items_added += $bib_items_added;
694                     $num_items_replaced += $bib_items_replaced;
695                     $num_items_errored += $bib_items_errored;
696                 }
697             } else {
698                 $oldxml = GetAuthorityXML($recordid);
699
700                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
701                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
702             }
703             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
704             $sth->execute($oldxml, $rowref->{'import_record_id'});
705             $sth->finish();
706             my $sth2 = $dbh->prepare_cached($query);
707             $sth2->execute($recordid, $rowref->{'import_record_id'});
708             $sth2->finish();
709             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
710             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
711         } elsif ($record_result eq 'ignore') {
712             $recordid = $record_match;
713             $num_ignored++;
714             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
715                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
716                 $num_items_added += $bib_items_added;
717          $num_items_replaced += $bib_items_replaced;
718                 $num_items_errored += $bib_items_errored;
719                 # still need to record the matched biblionumber so that the
720                 # items can be reverted
721                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
722                 $sth2->execute($recordid, $rowref->{'import_record_id'});
723                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
724             }
725             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
726         }
727     }
728     $sth->finish();
729     SetImportBatchStatus($batch_id, 'imported');
730     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
731 }
732
733 =head2 BatchCommitItems
734
735   ($num_items_added, $num_items_errored) = 
736          BatchCommitItems($import_record_id, $biblionumber);
737
738 =cut
739
740 sub BatchCommitItems {
741     my ( $import_record_id, $biblionumber, $action ) = @_;
742
743     my $dbh = C4::Context->dbh;
744
745     my $num_items_added = 0;
746     my $num_items_errored = 0;
747     my $num_items_replaced = 0;
748
749     my $sth = $dbh->prepare( "
750         SELECT import_items_id, import_items.marcxml, encoding
751         FROM import_items
752         JOIN import_records USING (import_record_id)
753         WHERE import_record_id = ?
754         ORDER BY import_items_id
755     " );
756     $sth->bind_param( 1, $import_record_id );
757     $sth->execute();
758
759     while ( my $row = $sth->fetchrow_hashref() ) {
760         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
761
762         # Delete date_due subfield as to not accidentally delete item checkout due dates
763         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
764         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
765
766         my $item = TransformMarcToKoha( $item_marc );
767
768         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
769         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
770
771         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
772         if ( $action eq "replace" && $duplicate_itemnumber ) {
773             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
774             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
775             $updsth->bind_param( 1, 'imported' );
776             $updsth->bind_param( 2, $item->{itemnumber} );
777             $updsth->bind_param( 3, $row->{'import_items_id'} );
778             $updsth->execute();
779             $updsth->finish();
780             $num_items_replaced++;
781         } elsif ( $action eq "replace" && $duplicate_barcode ) {
782             my $itemnumber = $duplicate_barcode->itemnumber;
783             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
784             $updsth->bind_param( 1, 'imported' );
785             $updsth->bind_param( 2, $item->{itemnumber} );
786             $updsth->bind_param( 3, $row->{'import_items_id'} );
787             $updsth->execute();
788             $updsth->finish();
789             $num_items_replaced++;
790         } elsif ($duplicate_barcode) {
791             $updsth->bind_param( 1, 'error' );
792             $updsth->bind_param( 2, 'duplicate item barcode' );
793             $updsth->bind_param( 3, $row->{'import_items_id'} );
794             $updsth->execute();
795             $num_items_errored++;
796         } else {
797             # Remove the itemnumber if it exists, we want to create a new item
798             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
799             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
800
801             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
802             if( $itemnumber ) {
803                 $updsth->bind_param( 1, 'imported' );
804                 $updsth->bind_param( 2, $itemnumber );
805                 $updsth->bind_param( 3, $row->{'import_items_id'} );
806                 $updsth->execute();
807                 $updsth->finish();
808                 $num_items_added++;
809             }
810         }
811     }
812
813     return ( $num_items_added, $num_items_replaced, $num_items_errored );
814 }
815
816 =head2 BatchRevertRecords
817
818   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
819       $num_ignored) = BatchRevertRecords($batch_id);
820
821 =cut
822
823 sub BatchRevertRecords {
824     my $batch_id = shift;
825
826     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
827
828     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
829
830     my $record_type;
831     my $num_deleted = 0;
832     my $num_errors = 0;
833     my $num_reverted = 0;
834     my $num_ignored = 0;
835     my $num_items_deleted = 0;
836     # commit (i.e., save, all records in the batch)
837     SetImportBatchStatus($batch_id, 'reverting');
838     my $overlay_action = GetImportBatchOverlayAction($batch_id);
839     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
840     my $dbh = C4::Context->dbh;
841     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
842                              FROM import_records
843                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
844                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
845                              WHERE import_batch_id = ?");
846     $sth->execute($batch_id);
847     my $marc_type;
848     my $marcflavour = C4::Context->preference('marcflavour');
849     while (my $rowref = $sth->fetchrow_hashref) {
850         $record_type = $rowref->{'record_type'};
851         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
852             $num_ignored++;
853             next;
854         }
855         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
856             $marc_type = 'UNIMARCAUTH';
857         } elsif ($marcflavour eq 'UNIMARC') {
858             $marc_type = 'UNIMARC';
859         } else {
860             $marc_type = 'USMARC';
861         }
862
863         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
864
865         if ($record_result eq 'delete') {
866             my $error = undef;
867             if  ($record_type eq 'biblio') {
868                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
869                 $error = DelBiblio($rowref->{'matched_biblionumber'});
870             } else {
871                 DelAuthority({ authid => $rowref->{'matched_authid'} });
872             }
873             if (defined $error) {
874                 $num_errors++;
875             } else {
876                 $num_deleted++;
877                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
878             }
879         } elsif ($record_result eq 'restore') {
880             $num_reverted++;
881             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
882             if ($record_type eq 'biblio') {
883                 my $biblionumber = $rowref->{'matched_biblionumber'};
884                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
885
886                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
887                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
888
889                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
890                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
891             } else {
892                 my $authid = $rowref->{'matched_authid'};
893                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
894             }
895             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
896         } elsif ($record_result eq 'ignore') {
897             if ($record_type eq 'biblio') {
898                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
899             }
900             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
901         }
902         my $query;
903         if ($record_type eq 'biblio') {
904             # remove matched_biblionumber only if there is no 'imported' item left
905             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
906             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
907         } else {
908             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
909         }
910         my $sth2 = $dbh->prepare_cached($query);
911         $sth2->execute($rowref->{'import_record_id'});
912     }
913
914     $sth->finish();
915     SetImportBatchStatus($batch_id, 'reverted');
916     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
917 }
918
919 =head2 BatchRevertItems
920
921   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
922
923 =cut
924
925 sub BatchRevertItems {
926     my ($import_record_id, $biblionumber) = @_;
927
928     my $dbh = C4::Context->dbh;
929     my $num_items_deleted = 0;
930
931     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
932                                    FROM import_items
933                                    JOIN items USING (itemnumber)
934                                    WHERE import_record_id = ?");
935     $sth->bind_param(1, $import_record_id);
936     $sth->execute();
937     while (my $row = $sth->fetchrow_hashref()) {
938         my $item = Koha::Items->find($row->{itemnumber});
939         if ($item->safe_delete){
940             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
941             $updsth->bind_param(1, 'reverted');
942             $updsth->bind_param(2, $row->{'import_items_id'});
943             $updsth->execute();
944             $updsth->finish();
945             $num_items_deleted++;
946         }
947         else {
948             next;
949         }
950     }
951     $sth->finish();
952     return $num_items_deleted;
953 }
954
955 =head2 CleanBatch
956
957   CleanBatch($batch_id)
958
959 Deletes all staged records from the import batch
960 and sets the status of the batch to 'cleaned'.  Note
961 that deleting a stage record does *not* affect
962 any record that has been committed to the database.
963
964 =cut
965
966 sub CleanBatch {
967     my $batch_id = shift;
968     return unless defined $batch_id;
969
970     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
971     SetImportBatchStatus($batch_id, 'cleaned');
972 }
973
974 =head2 DeleteBatch
975
976   DeleteBatch($batch_id)
977
978 Deletes the record from the database. This can only be done
979 once the batch has been cleaned.
980
981 =cut
982
983 sub DeleteBatch {
984     my $batch_id = shift;
985     return unless defined $batch_id;
986
987     my $dbh = C4::Context->dbh;
988     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
989     $sth->execute( $batch_id );
990 }
991
992 =head2 GetAllImportBatches
993
994   my $results = GetAllImportBatches();
995
996 Returns a references to an array of hash references corresponding
997 to all import_batches rows (of batch_type 'batch'), sorted in 
998 ascending order by import_batch_id.
999
1000 =cut
1001
1002 sub  GetAllImportBatches {
1003     my $dbh = C4::Context->dbh;
1004     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1005                                     WHERE batch_type IN ('batch', 'webservice')
1006                                     ORDER BY import_batch_id ASC");
1007
1008     my $results = [];
1009     $sth->execute();
1010     while (my $row = $sth->fetchrow_hashref) {
1011         push @$results, $row;
1012     }
1013     $sth->finish();
1014     return $results;
1015 }
1016
1017 =head2 GetStagedWebserviceBatches
1018
1019   my $batch_ids = GetStagedWebserviceBatches();
1020
1021 Returns a references to an array of batch id's
1022 of batch_type 'webservice' that are not imported
1023
1024 =cut
1025
1026 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1027 SELECT import_batch_id FROM import_batches
1028 WHERE batch_type = 'webservice'
1029 AND import_status = 'staged'
1030 EOQ
1031 sub  GetStagedWebserviceBatches {
1032     my $dbh = C4::Context->dbh;
1033     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1034 }
1035
1036 =head2 GetImportBatchRangeDesc
1037
1038   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1039
1040 Returns a reference to an array of hash references corresponding to
1041 import_batches rows (sorted in descending order by import_batch_id)
1042 start at the given offset.
1043
1044 =cut
1045
1046 sub GetImportBatchRangeDesc {
1047     my ($offset, $results_per_group) = @_;
1048
1049     my $dbh = C4::Context->dbh;
1050     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1051                                     LEFT JOIN import_batch_profiles p
1052                                     ON b.profile_id = p.id
1053                                     WHERE b.batch_type IN ('batch', 'webservice')
1054                                     ORDER BY b.import_batch_id DESC";
1055     my @params;
1056     if ($results_per_group){
1057         $query .= " LIMIT ?";
1058         push(@params, $results_per_group);
1059     }
1060     if ($offset){
1061         $query .= " OFFSET ?";
1062         push(@params, $offset);
1063     }
1064     my $sth = $dbh->prepare_cached($query);
1065     $sth->execute(@params);
1066     my $results = $sth->fetchall_arrayref({});
1067     $sth->finish();
1068     return $results;
1069 }
1070
1071 =head2 GetItemNumbersFromImportBatch
1072
1073   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1074
1075 =cut
1076
1077 sub GetItemNumbersFromImportBatch {
1078     my ($batch_id) = @_;
1079     my $dbh = C4::Context->dbh;
1080     my $sql = q|
1081 SELECT itemnumber FROM import_items
1082 INNER JOIN items USING (itemnumber)
1083 INNER JOIN import_records USING (import_record_id)
1084 WHERE import_batch_id = ?|;
1085     my  $sth = $dbh->prepare( $sql );
1086     $sth->execute($batch_id);
1087     my @items ;
1088     while ( my ($itm) = $sth->fetchrow_array ) {
1089         push @items, $itm;
1090     }
1091     return @items;
1092 }
1093
1094 =head2 GetNumberOfImportBatches
1095
1096   my $count = GetNumberOfImportBatches();
1097
1098 =cut
1099
1100 sub GetNumberOfNonZ3950ImportBatches {
1101     my $dbh = C4::Context->dbh;
1102     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1103     $sth->execute();
1104     my ($count) = $sth->fetchrow_array();
1105     $sth->finish();
1106     return $count;
1107 }
1108
1109 =head2 GetImportBiblios
1110
1111   my $results = GetImportBiblios($importid);
1112
1113 =cut
1114
1115 sub GetImportBiblios {
1116     my ($import_record_id) = @_;
1117
1118     my $dbh = C4::Context->dbh;
1119     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1120     return $dbh->selectall_arrayref(
1121         $query,
1122         { Slice => {} },
1123         $import_record_id
1124     );
1125
1126 }
1127
1128 =head2 GetImportRecordsRange
1129
1130   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1131
1132 Returns a reference to an array of hash references corresponding to
1133 import_biblios/import_auths/import_records rows for a given batch
1134 starting at the given offset.
1135
1136 =cut
1137
1138 sub GetImportRecordsRange {
1139     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1140
1141     my $dbh = C4::Context->dbh;
1142
1143     my $order_by = $parameters->{order_by} || 'import_record_id';
1144     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1145
1146     my $order_by_direction =
1147       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1148
1149     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1150
1151     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1152                                            record_sequence, status, overlay_status,
1153                                            matched_biblionumber, matched_authid, record_type
1154                                     FROM   import_records
1155                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1156                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1157                                     WHERE  import_batch_id = ?";
1158     my @params;
1159     push(@params, $batch_id);
1160     if ($status) {
1161         $query .= " AND status=?";
1162         push(@params,$status);
1163     }
1164
1165     $query.=" ORDER BY $order_by $order_by_direction";
1166
1167     if($results_per_group){
1168         $query .= " LIMIT ?";
1169         push(@params, $results_per_group);
1170     }
1171     if($offset){
1172         $query .= " OFFSET ?";
1173         push(@params, $offset);
1174     }
1175     my $sth = $dbh->prepare_cached($query);
1176     $sth->execute(@params);
1177     my $results = $sth->fetchall_arrayref({});
1178     $sth->finish();
1179     return $results;
1180
1181 }
1182
1183 =head2 GetBestRecordMatch
1184
1185   my $record_id = GetBestRecordMatch($import_record_id);
1186
1187 =cut
1188
1189 sub GetBestRecordMatch {
1190     my ($import_record_id) = @_;
1191
1192     my $dbh = C4::Context->dbh;
1193     my $sth = $dbh->prepare("SELECT candidate_match_id
1194                              FROM   import_record_matches
1195                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1196                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1197                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1198                              WHERE  import_record_matches.import_record_id = ? AND
1199                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1200                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1201                              AND chosen = 1
1202                              ORDER BY score DESC, candidate_match_id DESC");
1203     $sth->execute($import_record_id);
1204     my ($record_id) = $sth->fetchrow_array();
1205     $sth->finish();
1206     return $record_id;
1207 }
1208
1209 =head2 GetImportBatchStatus
1210
1211   my $status = GetImportBatchStatus($batch_id);
1212
1213 =cut
1214
1215 sub GetImportBatchStatus {
1216     my ($batch_id) = @_;
1217
1218     my $dbh = C4::Context->dbh;
1219     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1220     $sth->execute($batch_id);
1221     my ($status) = $sth->fetchrow_array();
1222     $sth->finish();
1223     return $status;
1224
1225 }
1226
1227 =head2 SetImportBatchStatus
1228
1229   SetImportBatchStatus($batch_id, $new_status);
1230
1231 =cut
1232
1233 sub SetImportBatchStatus {
1234     my ($batch_id, $new_status) = @_;
1235
1236     my $dbh = C4::Context->dbh;
1237     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1238     $sth->execute($new_status, $batch_id);
1239     $sth->finish();
1240
1241 }
1242
1243 =head2 SetMatchedBiblionumber
1244
1245   SetMatchedBiblionumber($import_record_id, $biblionumber);
1246
1247 =cut
1248
1249 sub SetMatchedBiblionumber {
1250     my ($import_record_id, $biblionumber) = @_;
1251
1252     my $dbh = C4::Context->dbh;
1253     $dbh->do(
1254         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1255         undef, $biblionumber, $import_record_id
1256     );
1257 }
1258
1259 =head2 GetImportBatchOverlayAction
1260
1261   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1262
1263 =cut
1264
1265 sub GetImportBatchOverlayAction {
1266     my ($batch_id) = @_;
1267
1268     my $dbh = C4::Context->dbh;
1269     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1270     $sth->execute($batch_id);
1271     my ($overlay_action) = $sth->fetchrow_array();
1272     $sth->finish();
1273     return $overlay_action;
1274
1275 }
1276
1277
1278 =head2 SetImportBatchOverlayAction
1279
1280   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1281
1282 =cut
1283
1284 sub SetImportBatchOverlayAction {
1285     my ($batch_id, $new_overlay_action) = @_;
1286
1287     my $dbh = C4::Context->dbh;
1288     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1289     $sth->execute($new_overlay_action, $batch_id);
1290     $sth->finish();
1291
1292 }
1293
1294 =head2 GetImportBatchNoMatchAction
1295
1296   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1297
1298 =cut
1299
1300 sub GetImportBatchNoMatchAction {
1301     my ($batch_id) = @_;
1302
1303     my $dbh = C4::Context->dbh;
1304     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1305     $sth->execute($batch_id);
1306     my ($nomatch_action) = $sth->fetchrow_array();
1307     $sth->finish();
1308     return $nomatch_action;
1309
1310 }
1311
1312
1313 =head2 SetImportBatchNoMatchAction
1314
1315   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1316
1317 =cut
1318
1319 sub SetImportBatchNoMatchAction {
1320     my ($batch_id, $new_nomatch_action) = @_;
1321
1322     my $dbh = C4::Context->dbh;
1323     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1324     $sth->execute($new_nomatch_action, $batch_id);
1325     $sth->finish();
1326
1327 }
1328
1329 =head2 GetImportBatchItemAction
1330
1331   my $item_action = GetImportBatchItemAction($batch_id);
1332
1333 =cut
1334
1335 sub GetImportBatchItemAction {
1336     my ($batch_id) = @_;
1337
1338     my $dbh = C4::Context->dbh;
1339     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1340     $sth->execute($batch_id);
1341     my ($item_action) = $sth->fetchrow_array();
1342     $sth->finish();
1343     return $item_action;
1344
1345 }
1346
1347
1348 =head2 SetImportBatchItemAction
1349
1350   SetImportBatchItemAction($batch_id, $new_item_action);
1351
1352 =cut
1353
1354 sub SetImportBatchItemAction {
1355     my ($batch_id, $new_item_action) = @_;
1356
1357     my $dbh = C4::Context->dbh;
1358     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1359     $sth->execute($new_item_action, $batch_id);
1360     $sth->finish();
1361
1362 }
1363
1364 =head2 GetImportBatchMatcher
1365
1366   my $matcher_id = GetImportBatchMatcher($batch_id);
1367
1368 =cut
1369
1370 sub GetImportBatchMatcher {
1371     my ($batch_id) = @_;
1372
1373     my $dbh = C4::Context->dbh;
1374     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1375     $sth->execute($batch_id);
1376     my ($matcher_id) = $sth->fetchrow_array();
1377     $sth->finish();
1378     return $matcher_id;
1379
1380 }
1381
1382
1383 =head2 SetImportBatchMatcher
1384
1385   SetImportBatchMatcher($batch_id, $new_matcher_id);
1386
1387 =cut
1388
1389 sub SetImportBatchMatcher {
1390     my ($batch_id, $new_matcher_id) = @_;
1391
1392     my $dbh = C4::Context->dbh;
1393     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1394     $sth->execute($new_matcher_id, $batch_id);
1395     $sth->finish();
1396
1397 }
1398
1399 =head2 GetImportRecordOverlayStatus
1400
1401   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1402
1403 =cut
1404
1405 sub GetImportRecordOverlayStatus {
1406     my ($import_record_id) = @_;
1407
1408     my $dbh = C4::Context->dbh;
1409     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1410     $sth->execute($import_record_id);
1411     my ($overlay_status) = $sth->fetchrow_array();
1412     $sth->finish();
1413     return $overlay_status;
1414
1415 }
1416
1417
1418 =head2 SetImportRecordOverlayStatus
1419
1420   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1421
1422 =cut
1423
1424 sub SetImportRecordOverlayStatus {
1425     my ($import_record_id, $new_overlay_status) = @_;
1426
1427     my $dbh = C4::Context->dbh;
1428     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1429     $sth->execute($new_overlay_status, $import_record_id);
1430     $sth->finish();
1431
1432 }
1433
1434 =head2 GetImportRecordStatus
1435
1436   my $status = GetImportRecordStatus($import_record_id);
1437
1438 =cut
1439
1440 sub GetImportRecordStatus {
1441     my ($import_record_id) = @_;
1442
1443     my $dbh = C4::Context->dbh;
1444     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1445     $sth->execute($import_record_id);
1446     my ($status) = $sth->fetchrow_array();
1447     $sth->finish();
1448     return $status;
1449
1450 }
1451
1452
1453 =head2 SetImportRecordStatus
1454
1455   SetImportRecordStatus($import_record_id, $new_status);
1456
1457 =cut
1458
1459 sub SetImportRecordStatus {
1460     my ($import_record_id, $new_status) = @_;
1461
1462     my $dbh = C4::Context->dbh;
1463     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1464     $sth->execute($new_status, $import_record_id);
1465     $sth->finish();
1466
1467 }
1468
1469 =head2 GetImportRecordMatches
1470
1471   my $results = GetImportRecordMatches($import_record_id, $best_only);
1472
1473 =cut
1474
1475 sub GetImportRecordMatches {
1476     my $import_record_id = shift;
1477     my $best_only = @_ ? shift : 0;
1478
1479     my $dbh = C4::Context->dbh;
1480     # FIXME currently biblio only
1481     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1482                                     candidate_match_id, score, record_type,
1483                                     chosen
1484                                     FROM import_records
1485                                     JOIN import_record_matches USING (import_record_id)
1486                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1487                                     WHERE import_record_id = ?
1488                                     ORDER BY score DESC, biblionumber DESC");
1489     $sth->bind_param(1, $import_record_id);
1490     my $results = [];
1491     $sth->execute();
1492     while (my $row = $sth->fetchrow_hashref) {
1493         if ($row->{'record_type'} eq 'auth') {
1494             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1495         }
1496         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1497         push @$results, $row;
1498         last if $best_only;
1499     }
1500     $sth->finish();
1501
1502     return $results;
1503     
1504 }
1505
1506 =head2 SetImportRecordMatches
1507
1508   SetImportRecordMatches($import_record_id, @matches);
1509
1510 =cut
1511
1512 sub SetImportRecordMatches {
1513     my $import_record_id = shift;
1514     my @matches = @_;
1515
1516     my $dbh = C4::Context->dbh;
1517     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1518     $delsth->execute($import_record_id);
1519     $delsth->finish();
1520
1521     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1522                                     VALUES (?, ?, ?, ?)");
1523     my $chosen = 1; #The first match is defaulted to be chosen
1524     foreach my $match (@matches) {
1525         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1526         $chosen = 0; #After the first we do not default to other matches
1527     }
1528 }
1529
1530 =head2 RecordsFromISO2709File
1531
1532     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1533
1534 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1535
1536 @PARAM1, String, absolute path to the ISO2709 file.
1537 @PARAM2, String, see stage_file.pl
1538 @PARAM3, String, should be utf8
1539
1540 Returns two array refs.
1541
1542 =cut
1543
1544 sub RecordsFromISO2709File {
1545     my ($input_file, $record_type, $encoding) = @_;
1546     my @errors;
1547
1548     my $marc_type = C4::Context->preference('marcflavour');
1549     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1550
1551     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1552     my @marc_records;
1553     $/ = "\035";
1554     while (<$fh>) {
1555         s/^\s+//;
1556         s/\s+$//;
1557         next unless $_; # skip if record has only whitespace, as might occur
1558                         # if file includes newlines between each MARC record
1559         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1560         push @marc_records, $marc_record;
1561         if ($charset_guessed ne $encoding) {
1562             push @errors,
1563                 "Unexpected charset $charset_guessed, expecting $encoding";
1564         }
1565     }
1566     close $fh;
1567     return ( \@errors, \@marc_records );
1568 }
1569
1570 =head2 RecordsFromMARCXMLFile
1571
1572     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1573
1574 Creates MARC::Record-objects out of the given MARCXML-file.
1575
1576 @PARAM1, String, absolute path to the ISO2709 file.
1577 @PARAM2, String, should be utf8
1578
1579 Returns two array refs.
1580
1581 =cut
1582
1583 sub RecordsFromMARCXMLFile {
1584     my ( $filename, $encoding ) = @_;
1585     my $batch = MARC::File::XML->in( $filename );
1586     my ( @marcRecords, @errors, $record );
1587     do {
1588         eval { $record = $batch->next( $encoding ); };
1589         if ($@) {
1590             push @errors, $@;
1591         }
1592         push @marcRecords, $record if $record;
1593     } while( $record );
1594     return (\@errors, \@marcRecords);
1595 }
1596
1597 =head2 RecordsFromMarcPlugin
1598
1599     Converts text of input_file into array of MARC records with to_marc plugin
1600
1601 =cut
1602
1603 sub RecordsFromMarcPlugin {
1604     my ($input_file, $plugin_class, $encoding) = @_;
1605     my ( $text, @return );
1606     return \@return if !$input_file || !$plugin_class;
1607
1608     # Read input file
1609     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1610     $/ = "\035";
1611     while (<$fh>) {
1612         s/^\s+//;
1613         s/\s+$//;
1614         next unless $_;
1615         $text .= $_;
1616     }
1617     close $fh;
1618
1619     # Convert to large MARC blob with plugin
1620     $text = Koha::Plugins::Handler->run({
1621         class  => $plugin_class,
1622         method => 'to_marc',
1623         params => { data => $text },
1624     }) if $text;
1625
1626     # Convert to array of MARC records
1627     if( $text ) {
1628         my $marc_type = C4::Context->preference('marcflavour');
1629         foreach my $blob ( split(/\x1D/, $text) ) {
1630             next if $blob =~ /^\s*$/;
1631             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1632             push @return, $marcrecord;
1633         }
1634     }
1635     return \@return;
1636 }
1637
1638 # internal functions
1639
1640 sub _create_import_record {
1641     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1642
1643     my $dbh = C4::Context->dbh;
1644     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1645                                                          record_type, encoding)
1646                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1647     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1648                   $record_type, $encoding);
1649     my $import_record_id = $dbh->{'mysql_insertid'};
1650     $sth->finish();
1651     return $import_record_id;
1652 }
1653
1654 sub _update_import_record_marc {
1655     my ($import_record_id, $marc_record, $marc_type) = @_;
1656
1657     my $dbh = C4::Context->dbh;
1658     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1659                              WHERE  import_record_id = ?");
1660     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1661     $sth->finish();
1662 }
1663
1664 sub _add_auth_fields {
1665     my ($import_record_id, $marc_record) = @_;
1666
1667     my $controlnumber;
1668     if ($marc_record->field('001')) {
1669         $controlnumber = $marc_record->field('001')->data();
1670     }
1671     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1672     my $dbh = C4::Context->dbh;
1673     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1674     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1675     $sth->finish();
1676 }
1677
1678 sub _add_biblio_fields {
1679     my ($import_record_id, $marc_record) = @_;
1680
1681     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1682     my $dbh = C4::Context->dbh;
1683     # FIXME no controlnumber, originalsource
1684     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1685     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1686     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1687     $sth->finish();
1688                 
1689 }
1690
1691 sub _update_biblio_fields {
1692     my ($import_record_id, $marc_record) = @_;
1693
1694     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1695     my $dbh = C4::Context->dbh;
1696     # FIXME no controlnumber, originalsource
1697     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1698     $isbn =~ s/\(.*$//;
1699     $isbn =~ tr/ -_//;
1700     $isbn = uc $isbn;
1701     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1702                              WHERE  import_record_id = ?");
1703     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1704     $sth->finish();
1705 }
1706
1707 sub _parse_biblio_fields {
1708     my ($marc_record) = @_;
1709
1710     my $dbh = C4::Context->dbh;
1711     my $bibliofields = TransformMarcToKoha($marc_record, '');
1712     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1713
1714 }
1715
1716 sub _update_batch_record_counts {
1717     my ($batch_id) = @_;
1718
1719     my $dbh = C4::Context->dbh;
1720     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1721                                         num_records = (
1722                                             SELECT COUNT(*)
1723                                             FROM import_records
1724                                             WHERE import_batch_id = import_batches.import_batch_id),
1725                                         num_items = (
1726                                             SELECT COUNT(*)
1727                                             FROM import_records
1728                                             JOIN import_items USING (import_record_id)
1729                                             WHERE import_batch_id = import_batches.import_batch_id
1730                                             AND record_type = 'biblio')
1731                                     WHERE import_batch_id = ?");
1732     $sth->bind_param(1, $batch_id);
1733     $sth->execute();
1734     $sth->finish();
1735 }
1736
1737 sub _get_commit_action {
1738     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1739     
1740     if ($record_type eq 'biblio') {
1741         my ($bib_result, $bib_match, $item_result);
1742
1743         $bib_match = GetBestRecordMatch($import_record_id);
1744         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1745
1746             $bib_result = $overlay_action;
1747
1748             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1749                 $item_result = 'create_new';
1750             } elsif($item_action eq 'replace'){
1751                 $item_result = 'replace';
1752             } else {
1753                 $item_result = 'ignore';
1754             }
1755
1756         } else {
1757             $bib_result = $nomatch_action;
1758             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1759         }
1760         return ($bib_result, $item_result, $bib_match);
1761     } else { # must be auths
1762         my ($auth_result, $auth_match);
1763
1764         $auth_match = GetBestRecordMatch($import_record_id);
1765         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1766             $auth_result = $overlay_action;
1767         } else {
1768             $auth_result = $nomatch_action;
1769         }
1770
1771         return ($auth_result, undef, $auth_match);
1772
1773     }
1774 }
1775
1776 sub _get_revert_action {
1777     my ($overlay_action, $overlay_status, $status) = @_;
1778
1779     my $bib_result;
1780
1781     if ($status eq 'ignored') {
1782         $bib_result = 'ignore';
1783     } else {
1784         if ($overlay_action eq 'create_new') {
1785             $bib_result = 'delete';
1786         } else {
1787             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1788         }
1789     }
1790     return $bib_result;
1791 }
1792
1793 1;
1794 __END__
1795
1796 =head1 AUTHOR
1797
1798 Koha Development Team <http://koha-community.org/>
1799
1800 Galen Charlton <galen.charlton@liblime.com>
1801
1802 =cut