Bug 29623: Cache circulation rules
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub GetRecordFromImportBiblio {
195     my ( $import_record_id, $embed_items ) = @_;
196
197     my ($marc) = GetImportRecordMarc($import_record_id);
198     my $record = MARC::Record->new_from_usmarc($marc);
199
200     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
201
202     return $record;
203 }
204
205 sub EmbedItemsInImportBiblio {
206     my ( $record, $import_record_id ) = @_;
207     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
208     my $dbh = C4::Context->dbh;
209     my $import_items = $dbh->selectall_arrayref(q|
210         SELECT import_items.marcxml
211         FROM import_items
212         WHERE import_record_id = ?
213     |, { Slice => {} }, $import_record_id );
214     my @item_fields;
215     for my $import_item ( @$import_items ) {
216         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
217         push @item_fields, $item_marc->field($itemtag);
218     }
219     $record->append_fields(@item_fields);
220     return $record;
221 }
222
223 =head2 GetImportRecordMarcXML
224
225   my $marcxml = GetImportRecordMarcXML($import_record_id);
226
227 =cut
228
229 sub GetImportRecordMarcXML {
230     my ($import_record_id) = @_;
231
232     my $dbh = C4::Context->dbh;
233     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
234     $sth->execute($import_record_id);
235     my ($marcxml) = $sth->fetchrow();
236     $sth->finish();
237     return $marcxml;
238
239 }
240
241 =head2 AddImportBatch
242
243   my $batch_id = AddImportBatch($params_hash);
244
245 =cut
246
247 sub AddImportBatch {
248     my ($params) = @_;
249
250     my (@fields, @vals);
251     foreach (qw( matcher_id template_id branchcode
252                  overlay_action nomatch_action item_action
253                  import_status batch_type file_name comments record_type )) {
254         if (exists $params->{$_}) {
255             push @fields, $_;
256             push @vals, $params->{$_};
257         }
258     }
259     my $dbh = C4::Context->dbh;
260     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
261                                   VALUES (".join( ',', map '?', @fields).")",
262              undef,
263              @vals);
264     return $dbh->{'mysql_insertid'};
265 }
266
267 =head2 GetImportBatch 
268
269   my $row = GetImportBatch($batch_id);
270
271 Retrieve a hashref of an import_batches row.
272
273 =cut
274
275 sub GetImportBatch {
276     my ($batch_id) = @_;
277
278     my $dbh = C4::Context->dbh;
279     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
280     $sth->bind_param(1, $batch_id);
281     $sth->execute();
282     my $result = $sth->fetchrow_hashref;
283     $sth->finish();
284     return $result;
285
286 }
287
288 =head2 AddBiblioToBatch 
289
290   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
291                 $marc_record, $encoding, $update_counts);
292
293 =cut
294
295 sub AddBiblioToBatch {
296     my $batch_id = shift;
297     my $record_sequence = shift;
298     my $marc_record = shift;
299     my $encoding = shift;
300     my $update_counts = @_ ? shift : 1;
301
302     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
303     _add_biblio_fields($import_record_id, $marc_record);
304     _update_batch_record_counts($batch_id) if $update_counts;
305     return $import_record_id;
306 }
307
308 =head2 AddAuthToBatch
309
310   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
311                 $marc_record, $encoding, $update_counts, [$marc_type]);
312
313 =cut
314
315 sub AddAuthToBatch {
316     my $batch_id = shift;
317     my $record_sequence = shift;
318     my $marc_record = shift;
319     my $encoding = shift;
320     my $update_counts = @_ ? shift : 1;
321     my $marc_type = shift || C4::Context->preference('marcflavour');
322
323     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
324
325     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
326     _add_auth_fields($import_record_id, $marc_record);
327     _update_batch_record_counts($batch_id) if $update_counts;
328     return $import_record_id;
329 }
330
331 =head2 BatchStageMarcRecords
332
333 ( $batch_id, $num_records, $num_items, @invalid_records ) =
334   BatchStageMarcRecords(
335     $record_type,                $encoding,
336     $marc_records,               $file_name,
337     $marc_modification_template, $comments,
338     $branch_code,                $parse_items,
339     $leave_as_staging,           $progress_interval,
340     $progress_callback
341   );
342
343 =cut
344
345 sub BatchStageMarcRecords {
346     my $record_type = shift;
347     my $encoding = shift;
348     my $marc_records = shift;
349     my $file_name = shift;
350     my $marc_modification_template = shift;
351     my $comments = shift;
352     my $branch_code = shift;
353     my $parse_items = shift;
354     my $leave_as_staging = shift;
355
356     # optional callback to monitor status 
357     # of job
358     my $progress_interval = 0;
359     my $progress_callback = undef;
360     if ($#_ == 1) {
361         $progress_interval = shift;
362         $progress_callback = shift;
363         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
364         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
365     } 
366     
367     my $batch_id = AddImportBatch( {
368             overlay_action => 'create_new',
369             import_status => 'staging',
370             batch_type => 'batch',
371             file_name => $file_name,
372             comments => $comments,
373             record_type => $record_type,
374         } );
375     if ($parse_items) {
376         SetImportBatchItemAction($batch_id, 'always_add');
377     } else {
378         SetImportBatchItemAction($batch_id, 'ignore');
379     }
380
381
382     my $marc_type = C4::Context->preference('marcflavour');
383     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
384     my @invalid_records = ();
385     my $num_valid = 0;
386     my $num_items = 0;
387     # FIXME - for now, we're dealing only with bibs
388     my $rec_num = 0;
389     foreach my $marc_record (@$marc_records) {
390         $rec_num++;
391         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
392             &$progress_callback($rec_num);
393         }
394
395         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
396
397         my $import_record_id;
398         if (scalar($marc_record->fields()) == 0) {
399             push @invalid_records, $marc_record;
400         } else {
401
402             # Normalize the record so it doesn't have separated diacritics
403             SetUTF8Flag($marc_record);
404
405             $num_valid++;
406             if ($record_type eq 'biblio') {
407                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
408                 if ($parse_items) {
409                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
410                     $num_items += scalar(@import_items_ids);
411                 }
412             } elsif ($record_type eq 'auth') {
413                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
414             }
415         }
416     }
417     unless ($leave_as_staging) {
418         SetImportBatchStatus($batch_id, 'staged');
419     }
420     # FIXME branch_code, number of bibs, number of items
421     _update_batch_record_counts($batch_id);
422     return ($batch_id, $num_valid, $num_items, @invalid_records);
423 }
424
425 =head2 AddItemsToImportBiblio
426
427   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
428                 $import_record_id, $marc_record, $update_counts);
429
430 =cut
431
432 sub AddItemsToImportBiblio {
433     my $batch_id = shift;
434     my $import_record_id = shift;
435     my $marc_record = shift;
436     my $update_counts = @_ ? shift : 0;
437
438     my @import_items_ids = ();
439    
440     my $dbh = C4::Context->dbh; 
441     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
442     foreach my $item_field ($marc_record->field($item_tag)) {
443         my $item_marc = MARC::Record->new();
444         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
445         $item_marc->append_fields($item_field);
446         $marc_record->delete_field($item_field);
447         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
448                                         VALUES (?, ?, ?)");
449         $sth->bind_param(1, $import_record_id);
450         $sth->bind_param(2, 'staged');
451         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
452         $sth->execute();
453         push @import_items_ids, $dbh->{'mysql_insertid'};
454         $sth->finish();
455     }
456
457     if ($#import_items_ids > -1) {
458         _update_batch_record_counts($batch_id) if $update_counts;
459         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
460     }
461     return @import_items_ids;
462 }
463
464 =head2 BatchFindDuplicates
465
466   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
467              $max_matches, $progress_interval, $progress_callback);
468
469 Goes through the records loaded in the batch and attempts to 
470 find duplicates for each one.  Sets the matching status 
471 of each record to "no_match" or "auto_match" as appropriate.
472
473 The $max_matches parameter is optional; if it is not supplied,
474 it defaults to 10.
475
476 The $progress_interval and $progress_callback parameters are 
477 optional; if both are supplied, the sub referred to by
478 $progress_callback will be invoked every $progress_interval
479 records using the number of records processed as the 
480 singular argument.
481
482 =cut
483
484 sub BatchFindDuplicates {
485     my $batch_id = shift;
486     my $matcher = shift;
487     my $max_matches = @_ ? shift : 10;
488
489     # optional callback to monitor status 
490     # of job
491     my $progress_interval = 0;
492     my $progress_callback = undef;
493     if ($#_ == 1) {
494         $progress_interval = shift;
495         $progress_callback = shift;
496         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
497         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
498     }
499
500     my $dbh = C4::Context->dbh;
501
502     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
503                              FROM import_records
504                              WHERE import_batch_id = ?");
505     $sth->execute($batch_id);
506     my $num_with_matches = 0;
507     my $rec_num = 0;
508     while (my $rowref = $sth->fetchrow_hashref) {
509         $rec_num++;
510         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
511             &$progress_callback($rec_num);
512         }
513         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
514         my @matches = ();
515         if (defined $matcher) {
516             @matches = $matcher->get_matches($marc_record, $max_matches);
517         }
518         if (scalar(@matches) > 0) {
519             $num_with_matches++;
520             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
521             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
522         } else {
523             SetImportRecordMatches($rowref->{'import_record_id'}, ());
524             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
525         }
526     }
527     $sth->finish();
528     return $num_with_matches;
529 }
530
531 =head2 BatchCommitRecords
532
533   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
534         BatchCommitRecords($batch_id, $framework,
535         $progress_interval, $progress_callback);
536
537 =cut
538
539 sub BatchCommitRecords {
540     my $batch_id = shift;
541     my $framework = shift;
542
543     # optional callback to monitor status 
544     # of job
545     my $progress_interval = 0;
546     my $progress_callback = undef;
547     if ($#_ == 1) {
548         $progress_interval = shift;
549         $progress_callback = shift;
550         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
551         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
552     }
553
554     my $record_type;
555     my $num_added = 0;
556     my $num_updated = 0;
557     my $num_items_added = 0;
558     my $num_items_replaced = 0;
559     my $num_items_errored = 0;
560     my $num_ignored = 0;
561     # commit (i.e., save, all records in the batch)
562     SetImportBatchStatus($batch_id, 'importing');
563     my $overlay_action = GetImportBatchOverlayAction($batch_id);
564     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
565     my $item_action = GetImportBatchItemAction($batch_id);
566     my $item_tag;
567     my $item_subfield;
568     my $dbh = C4::Context->dbh;
569     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
570                              FROM import_records
571                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
572                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
573                              WHERE import_batch_id = ?");
574     $sth->execute($batch_id);
575     my $marcflavour = C4::Context->preference('marcflavour');
576
577     my $userenv = C4::Context->userenv;
578     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
579
580     my $rec_num = 0;
581     while (my $rowref = $sth->fetchrow_hashref) {
582         $record_type = $rowref->{'record_type'};
583         $rec_num++;
584         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
585             &$progress_callback($rec_num);
586         }
587         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
588             $num_ignored++;
589             next;
590         }
591
592         my $marc_type;
593         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
594             $marc_type = 'UNIMARCAUTH';
595         } elsif ($marcflavour eq 'UNIMARC') {
596             $marc_type = 'UNIMARC';
597         } else {
598             $marc_type = 'USMARC';
599         }
600         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
601
602         if ($record_type eq 'biblio') {
603             # remove any item tags - rely on BatchCommitItems
604             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
605             foreach my $item_field ($marc_record->field($item_tag)) {
606                 $marc_record->delete_field($item_field);
607             }
608         }
609
610         my ($record_result, $item_result, $record_match) =
611             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
612                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
613
614         my $recordid;
615         my $query;
616         if ($record_result eq 'create_new') {
617             $num_added++;
618             if ($record_type eq 'biblio') {
619                 my $biblioitemnumber;
620                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
621                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
622                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
623                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
624                     $num_items_added += $bib_items_added;
625                     $num_items_replaced += $bib_items_replaced;
626                     $num_items_errored += $bib_items_errored;
627                 }
628             } else {
629                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
630                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
631             }
632             my $sth = $dbh->prepare_cached($query);
633             $sth->execute($recordid, $rowref->{'import_record_id'});
634             $sth->finish();
635             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
636         } elsif ($record_result eq 'replace') {
637             $num_updated++;
638             $recordid = $record_match;
639             my $oldxml;
640             if ($record_type eq 'biblio') {
641                 my $oldbiblio = Koha::Biblios->find( $recordid );
642                 $oldxml = GetXmlBiblio($recordid);
643
644                 # remove item fields so that they don't get
645                 # added again if record is reverted
646                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
647                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
648                 foreach my $item_field ($old_marc->field($item_tag)) {
649                     $old_marc->delete_field($item_field);
650                 }
651                 $oldxml = $old_marc->as_xml($marc_type);
652
653                 my $context = { source => 'batchimport' };
654                 if ($logged_in_patron) {
655                     $context->{categorycode} = $logged_in_patron->categorycode;
656                     $context->{userid} = $logged_in_patron->userid;
657                 }
658
659                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
660                     overlay_context => $context
661                 });
662                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
663
664                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
665                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
666                     $num_items_added += $bib_items_added;
667                     $num_items_replaced += $bib_items_replaced;
668                     $num_items_errored += $bib_items_errored;
669                 }
670             } else {
671                 $oldxml = GetAuthorityXML($recordid);
672
673                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
674                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
675             }
676             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
677             $sth->execute($oldxml, $rowref->{'import_record_id'});
678             $sth->finish();
679             my $sth2 = $dbh->prepare_cached($query);
680             $sth2->execute($recordid, $rowref->{'import_record_id'});
681             $sth2->finish();
682             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
683             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
684         } elsif ($record_result eq 'ignore') {
685             $recordid = $record_match;
686             $num_ignored++;
687             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
688                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
689                 $num_items_added += $bib_items_added;
690          $num_items_replaced += $bib_items_replaced;
691                 $num_items_errored += $bib_items_errored;
692                 # still need to record the matched biblionumber so that the
693                 # items can be reverted
694                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
695                 $sth2->execute($recordid, $rowref->{'import_record_id'});
696                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
697             }
698             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
699         }
700     }
701     $sth->finish();
702     SetImportBatchStatus($batch_id, 'imported');
703     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
704 }
705
706 =head2 BatchCommitItems
707
708   ($num_items_added, $num_items_errored) = 
709          BatchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
710
711 =cut
712
713 sub BatchCommitItems {
714     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
715
716     my $dbh = C4::Context->dbh;
717
718     my $num_items_added = 0;
719     my $num_items_errored = 0;
720     my $num_items_replaced = 0;
721
722     my $sth = $dbh->prepare( "
723         SELECT import_items_id, import_items.marcxml, encoding
724         FROM import_items
725         JOIN import_records USING (import_record_id)
726         WHERE import_record_id = ?
727         ORDER BY import_items_id
728     " );
729     $sth->bind_param( 1, $import_record_id );
730     $sth->execute();
731
732     while ( my $row = $sth->fetchrow_hashref() ) {
733         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
734
735         # Delete date_due subfield as to not accidentally delete item checkout due dates
736         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
737         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
738
739         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
740
741         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
742         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
743
744         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
745         if ( $action eq "replace" && $duplicate_itemnumber ) {
746             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
747             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
748             $updsth->bind_param( 1, 'imported' );
749             $updsth->bind_param( 2, $item->{itemnumber} );
750             $updsth->bind_param( 3, undef );
751             $updsth->bind_param( 4, $row->{'import_items_id'} );
752             $updsth->execute();
753             $updsth->finish();
754             $num_items_replaced++;
755         } elsif ( $action eq "replace" && $duplicate_barcode ) {
756             my $itemnumber = $duplicate_barcode->itemnumber;
757             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
758             $updsth->bind_param( 1, 'imported' );
759             $updsth->bind_param( 2, $item->{itemnumber} );
760             $updsth->bind_param( 3, undef );
761             $updsth->bind_param( 4, $row->{'import_items_id'} );
762             $updsth->execute();
763             $updsth->finish();
764             $num_items_replaced++;
765         } elsif ($duplicate_barcode) {
766             $updsth->bind_param( 1, 'error' );
767             $updsth->bind_param( 2, undef );
768             $updsth->bind_param( 3, 'duplicate item barcode' );
769             $updsth->bind_param( 4, $row->{'import_items_id'} );
770             $updsth->execute();
771             $num_items_errored++;
772         } else {
773             # Remove the itemnumber if it exists, we want to create a new item
774             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
775             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
776
777             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, {biblioitemnumber => $biblioitemnumber} );
778             if( $itemnumber ) {
779                 $updsth->bind_param( 1, 'imported' );
780                 $updsth->bind_param( 2, $itemnumber );
781                 $updsth->bind_param( 3, undef );
782                 $updsth->bind_param( 4, $row->{'import_items_id'} );
783                 $updsth->execute();
784                 $updsth->finish();
785                 $num_items_added++;
786             }
787         }
788     }
789
790     return ( $num_items_added, $num_items_replaced, $num_items_errored );
791 }
792
793 =head2 BatchRevertRecords
794
795   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
796       $num_ignored) = BatchRevertRecords($batch_id);
797
798 =cut
799
800 sub BatchRevertRecords {
801     my $batch_id = shift;
802
803     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
804
805     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
806
807     my $record_type;
808     my $num_deleted = 0;
809     my $num_errors = 0;
810     my $num_reverted = 0;
811     my $num_ignored = 0;
812     my $num_items_deleted = 0;
813     # commit (i.e., save, all records in the batch)
814     SetImportBatchStatus($batch_id, 'reverting');
815     my $overlay_action = GetImportBatchOverlayAction($batch_id);
816     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
817     my $dbh = C4::Context->dbh;
818     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
819                              FROM import_records
820                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
821                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
822                              WHERE import_batch_id = ?");
823     $sth->execute($batch_id);
824     my $marc_type;
825     my $marcflavour = C4::Context->preference('marcflavour');
826     while (my $rowref = $sth->fetchrow_hashref) {
827         $record_type = $rowref->{'record_type'};
828         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
829             $num_ignored++;
830             next;
831         }
832         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
833             $marc_type = 'UNIMARCAUTH';
834         } elsif ($marcflavour eq 'UNIMARC') {
835             $marc_type = 'UNIMARC';
836         } else {
837             $marc_type = 'USMARC';
838         }
839
840         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
841
842         if ($record_result eq 'delete') {
843             my $error = undef;
844             if  ($record_type eq 'biblio') {
845                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
846                 $error = DelBiblio($rowref->{'matched_biblionumber'});
847             } else {
848                 DelAuthority({ authid => $rowref->{'matched_authid'} });
849             }
850             if (defined $error) {
851                 $num_errors++;
852             } else {
853                 $num_deleted++;
854                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
855             }
856         } elsif ($record_result eq 'restore') {
857             $num_reverted++;
858             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
859             if ($record_type eq 'biblio') {
860                 my $biblionumber = $rowref->{'matched_biblionumber'};
861                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
862
863                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
864                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
865
866                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
867                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
868             } else {
869                 my $authid = $rowref->{'matched_authid'};
870                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
871             }
872             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
873         } elsif ($record_result eq 'ignore') {
874             if ($record_type eq 'biblio') {
875                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
876             }
877             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
878         }
879         my $query;
880         if ($record_type eq 'biblio') {
881             # remove matched_biblionumber only if there is no 'imported' item left
882             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
883             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
884         } else {
885             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
886         }
887         my $sth2 = $dbh->prepare_cached($query);
888         $sth2->execute($rowref->{'import_record_id'});
889     }
890
891     $sth->finish();
892     SetImportBatchStatus($batch_id, 'reverted');
893     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
894 }
895
896 =head2 BatchRevertItems
897
898   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
899
900 =cut
901
902 sub BatchRevertItems {
903     my ($import_record_id, $biblionumber) = @_;
904
905     my $dbh = C4::Context->dbh;
906     my $num_items_deleted = 0;
907
908     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
909                                    FROM import_items
910                                    JOIN items USING (itemnumber)
911                                    WHERE import_record_id = ?");
912     $sth->bind_param(1, $import_record_id);
913     $sth->execute();
914     while (my $row = $sth->fetchrow_hashref()) {
915         my $item = Koha::Items->find($row->{itemnumber});
916         if ($item->safe_delete){
917             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
918             $updsth->bind_param(1, 'reverted');
919             $updsth->bind_param(2, $row->{'import_items_id'});
920             $updsth->execute();
921             $updsth->finish();
922             $num_items_deleted++;
923         }
924         else {
925             next;
926         }
927     }
928     $sth->finish();
929     return $num_items_deleted;
930 }
931
932 =head2 CleanBatch
933
934   CleanBatch($batch_id)
935
936 Deletes all staged records from the import batch
937 and sets the status of the batch to 'cleaned'.  Note
938 that deleting a stage record does *not* affect
939 any record that has been committed to the database.
940
941 =cut
942
943 sub CleanBatch {
944     my $batch_id = shift;
945     return unless defined $batch_id;
946
947     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
948     SetImportBatchStatus($batch_id, 'cleaned');
949 }
950
951 =head2 DeleteBatch
952
953   DeleteBatch($batch_id)
954
955 Deletes the record from the database. This can only be done
956 once the batch has been cleaned.
957
958 =cut
959
960 sub DeleteBatch {
961     my $batch_id = shift;
962     return unless defined $batch_id;
963
964     my $dbh = C4::Context->dbh;
965     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
966     $sth->execute( $batch_id );
967 }
968
969 =head2 GetAllImportBatches
970
971   my $results = GetAllImportBatches();
972
973 Returns a references to an array of hash references corresponding
974 to all import_batches rows (of batch_type 'batch'), sorted in 
975 ascending order by import_batch_id.
976
977 =cut
978
979 sub  GetAllImportBatches {
980     my $dbh = C4::Context->dbh;
981     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
982                                     WHERE batch_type IN ('batch', 'webservice')
983                                     ORDER BY import_batch_id ASC");
984
985     my $results = [];
986     $sth->execute();
987     while (my $row = $sth->fetchrow_hashref) {
988         push @$results, $row;
989     }
990     $sth->finish();
991     return $results;
992 }
993
994 =head2 GetStagedWebserviceBatches
995
996   my $batch_ids = GetStagedWebserviceBatches();
997
998 Returns a references to an array of batch id's
999 of batch_type 'webservice' that are not imported
1000
1001 =cut
1002
1003 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1004 SELECT import_batch_id FROM import_batches
1005 WHERE batch_type = 'webservice'
1006 AND import_status = 'staged'
1007 EOQ
1008 sub  GetStagedWebserviceBatches {
1009     my $dbh = C4::Context->dbh;
1010     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1011 }
1012
1013 =head2 GetImportBatchRangeDesc
1014
1015   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1016
1017 Returns a reference to an array of hash references corresponding to
1018 import_batches rows (sorted in descending order by import_batch_id)
1019 start at the given offset.
1020
1021 =cut
1022
1023 sub GetImportBatchRangeDesc {
1024     my ($offset, $results_per_group) = @_;
1025
1026     my $dbh = C4::Context->dbh;
1027     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1028                                     LEFT JOIN import_batch_profiles p
1029                                     ON b.profile_id = p.id
1030                                     WHERE b.batch_type IN ('batch', 'webservice')
1031                                     ORDER BY b.import_batch_id DESC";
1032     my @params;
1033     if ($results_per_group){
1034         $query .= " LIMIT ?";
1035         push(@params, $results_per_group);
1036     }
1037     if ($offset){
1038         $query .= " OFFSET ?";
1039         push(@params, $offset);
1040     }
1041     my $sth = $dbh->prepare_cached($query);
1042     $sth->execute(@params);
1043     my $results = $sth->fetchall_arrayref({});
1044     $sth->finish();
1045     return $results;
1046 }
1047
1048 =head2 GetItemNumbersFromImportBatch
1049
1050   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1051
1052 =cut
1053
1054 sub GetItemNumbersFromImportBatch {
1055     my ($batch_id) = @_;
1056     my $dbh = C4::Context->dbh;
1057     my $sql = q|
1058 SELECT itemnumber FROM import_items
1059 INNER JOIN items USING (itemnumber)
1060 INNER JOIN import_records USING (import_record_id)
1061 WHERE import_batch_id = ?|;
1062     my  $sth = $dbh->prepare( $sql );
1063     $sth->execute($batch_id);
1064     my @items ;
1065     while ( my ($itm) = $sth->fetchrow_array ) {
1066         push @items, $itm;
1067     }
1068     return @items;
1069 }
1070
1071 =head2 GetNumberOfImportBatches
1072
1073   my $count = GetNumberOfImportBatches();
1074
1075 =cut
1076
1077 sub GetNumberOfNonZ3950ImportBatches {
1078     my $dbh = C4::Context->dbh;
1079     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1080     $sth->execute();
1081     my ($count) = $sth->fetchrow_array();
1082     $sth->finish();
1083     return $count;
1084 }
1085
1086 =head2 GetImportBiblios
1087
1088   my $results = GetImportBiblios($importid);
1089
1090 =cut
1091
1092 sub GetImportBiblios {
1093     my ($import_record_id) = @_;
1094
1095     my $dbh = C4::Context->dbh;
1096     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1097     return $dbh->selectall_arrayref(
1098         $query,
1099         { Slice => {} },
1100         $import_record_id
1101     );
1102
1103 }
1104
1105 =head2 GetImportRecordsRange
1106
1107   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1108
1109 Returns a reference to an array of hash references corresponding to
1110 import_biblios/import_auths/import_records rows for a given batch
1111 starting at the given offset.
1112
1113 =cut
1114
1115 sub GetImportRecordsRange {
1116     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1117
1118     my $dbh = C4::Context->dbh;
1119
1120     my $order_by = $parameters->{order_by} || 'import_record_id';
1121     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1122
1123     my $order_by_direction =
1124       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1125
1126     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1127
1128     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1129                                            record_sequence, status, overlay_status,
1130                                            matched_biblionumber, matched_authid, record_type
1131                                     FROM   import_records
1132                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1133                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1134                                     WHERE  import_batch_id = ?";
1135     my @params;
1136     push(@params, $batch_id);
1137     if ($status) {
1138         $query .= " AND status=?";
1139         push(@params,$status);
1140     }
1141
1142     $query.=" ORDER BY $order_by $order_by_direction";
1143
1144     if($results_per_group){
1145         $query .= " LIMIT ?";
1146         push(@params, $results_per_group);
1147     }
1148     if($offset){
1149         $query .= " OFFSET ?";
1150         push(@params, $offset);
1151     }
1152     my $sth = $dbh->prepare_cached($query);
1153     $sth->execute(@params);
1154     my $results = $sth->fetchall_arrayref({});
1155     $sth->finish();
1156     return $results;
1157
1158 }
1159
1160 =head2 GetBestRecordMatch
1161
1162   my $record_id = GetBestRecordMatch($import_record_id);
1163
1164 =cut
1165
1166 sub GetBestRecordMatch {
1167     my ($import_record_id) = @_;
1168
1169     my $dbh = C4::Context->dbh;
1170     my $sth = $dbh->prepare("SELECT candidate_match_id
1171                              FROM   import_record_matches
1172                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1173                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1174                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1175                              WHERE  import_record_matches.import_record_id = ? AND
1176                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1177                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1178                              AND chosen = 1
1179                              ORDER BY score DESC, candidate_match_id DESC");
1180     $sth->execute($import_record_id);
1181     my ($record_id) = $sth->fetchrow_array();
1182     $sth->finish();
1183     return $record_id;
1184 }
1185
1186 =head2 GetImportBatchStatus
1187
1188   my $status = GetImportBatchStatus($batch_id);
1189
1190 =cut
1191
1192 sub GetImportBatchStatus {
1193     my ($batch_id) = @_;
1194
1195     my $dbh = C4::Context->dbh;
1196     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1197     $sth->execute($batch_id);
1198     my ($status) = $sth->fetchrow_array();
1199     $sth->finish();
1200     return $status;
1201
1202 }
1203
1204 =head2 SetImportBatchStatus
1205
1206   SetImportBatchStatus($batch_id, $new_status);
1207
1208 =cut
1209
1210 sub SetImportBatchStatus {
1211     my ($batch_id, $new_status) = @_;
1212
1213     my $dbh = C4::Context->dbh;
1214     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1215     $sth->execute($new_status, $batch_id);
1216     $sth->finish();
1217
1218 }
1219
1220 =head2 SetMatchedBiblionumber
1221
1222   SetMatchedBiblionumber($import_record_id, $biblionumber);
1223
1224 =cut
1225
1226 sub SetMatchedBiblionumber {
1227     my ($import_record_id, $biblionumber) = @_;
1228
1229     my $dbh = C4::Context->dbh;
1230     $dbh->do(
1231         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1232         undef, $biblionumber, $import_record_id
1233     );
1234 }
1235
1236 =head2 GetImportBatchOverlayAction
1237
1238   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1239
1240 =cut
1241
1242 sub GetImportBatchOverlayAction {
1243     my ($batch_id) = @_;
1244
1245     my $dbh = C4::Context->dbh;
1246     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1247     $sth->execute($batch_id);
1248     my ($overlay_action) = $sth->fetchrow_array();
1249     $sth->finish();
1250     return $overlay_action;
1251
1252 }
1253
1254
1255 =head2 SetImportBatchOverlayAction
1256
1257   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1258
1259 =cut
1260
1261 sub SetImportBatchOverlayAction {
1262     my ($batch_id, $new_overlay_action) = @_;
1263
1264     my $dbh = C4::Context->dbh;
1265     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1266     $sth->execute($new_overlay_action, $batch_id);
1267     $sth->finish();
1268
1269 }
1270
1271 =head2 GetImportBatchNoMatchAction
1272
1273   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1274
1275 =cut
1276
1277 sub GetImportBatchNoMatchAction {
1278     my ($batch_id) = @_;
1279
1280     my $dbh = C4::Context->dbh;
1281     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1282     $sth->execute($batch_id);
1283     my ($nomatch_action) = $sth->fetchrow_array();
1284     $sth->finish();
1285     return $nomatch_action;
1286
1287 }
1288
1289
1290 =head2 SetImportBatchNoMatchAction
1291
1292   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1293
1294 =cut
1295
1296 sub SetImportBatchNoMatchAction {
1297     my ($batch_id, $new_nomatch_action) = @_;
1298
1299     my $dbh = C4::Context->dbh;
1300     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1301     $sth->execute($new_nomatch_action, $batch_id);
1302     $sth->finish();
1303
1304 }
1305
1306 =head2 GetImportBatchItemAction
1307
1308   my $item_action = GetImportBatchItemAction($batch_id);
1309
1310 =cut
1311
1312 sub GetImportBatchItemAction {
1313     my ($batch_id) = @_;
1314
1315     my $dbh = C4::Context->dbh;
1316     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1317     $sth->execute($batch_id);
1318     my ($item_action) = $sth->fetchrow_array();
1319     $sth->finish();
1320     return $item_action;
1321
1322 }
1323
1324
1325 =head2 SetImportBatchItemAction
1326
1327   SetImportBatchItemAction($batch_id, $new_item_action);
1328
1329 =cut
1330
1331 sub SetImportBatchItemAction {
1332     my ($batch_id, $new_item_action) = @_;
1333
1334     my $dbh = C4::Context->dbh;
1335     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1336     $sth->execute($new_item_action, $batch_id);
1337     $sth->finish();
1338
1339 }
1340
1341 =head2 GetImportBatchMatcher
1342
1343   my $matcher_id = GetImportBatchMatcher($batch_id);
1344
1345 =cut
1346
1347 sub GetImportBatchMatcher {
1348     my ($batch_id) = @_;
1349
1350     my $dbh = C4::Context->dbh;
1351     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1352     $sth->execute($batch_id);
1353     my ($matcher_id) = $sth->fetchrow_array();
1354     $sth->finish();
1355     return $matcher_id;
1356
1357 }
1358
1359
1360 =head2 SetImportBatchMatcher
1361
1362   SetImportBatchMatcher($batch_id, $new_matcher_id);
1363
1364 =cut
1365
1366 sub SetImportBatchMatcher {
1367     my ($batch_id, $new_matcher_id) = @_;
1368
1369     my $dbh = C4::Context->dbh;
1370     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1371     $sth->execute($new_matcher_id, $batch_id);
1372     $sth->finish();
1373
1374 }
1375
1376 =head2 GetImportRecordOverlayStatus
1377
1378   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1379
1380 =cut
1381
1382 sub GetImportRecordOverlayStatus {
1383     my ($import_record_id) = @_;
1384
1385     my $dbh = C4::Context->dbh;
1386     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1387     $sth->execute($import_record_id);
1388     my ($overlay_status) = $sth->fetchrow_array();
1389     $sth->finish();
1390     return $overlay_status;
1391
1392 }
1393
1394
1395 =head2 SetImportRecordOverlayStatus
1396
1397   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1398
1399 =cut
1400
1401 sub SetImportRecordOverlayStatus {
1402     my ($import_record_id, $new_overlay_status) = @_;
1403
1404     my $dbh = C4::Context->dbh;
1405     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1406     $sth->execute($new_overlay_status, $import_record_id);
1407     $sth->finish();
1408
1409 }
1410
1411 =head2 GetImportRecordStatus
1412
1413   my $status = GetImportRecordStatus($import_record_id);
1414
1415 =cut
1416
1417 sub GetImportRecordStatus {
1418     my ($import_record_id) = @_;
1419
1420     my $dbh = C4::Context->dbh;
1421     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1422     $sth->execute($import_record_id);
1423     my ($status) = $sth->fetchrow_array();
1424     $sth->finish();
1425     return $status;
1426
1427 }
1428
1429
1430 =head2 SetImportRecordStatus
1431
1432   SetImportRecordStatus($import_record_id, $new_status);
1433
1434 =cut
1435
1436 sub SetImportRecordStatus {
1437     my ($import_record_id, $new_status) = @_;
1438
1439     my $dbh = C4::Context->dbh;
1440     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1441     $sth->execute($new_status, $import_record_id);
1442     $sth->finish();
1443
1444 }
1445
1446 =head2 GetImportRecordMatches
1447
1448   my $results = GetImportRecordMatches($import_record_id, $best_only);
1449
1450 =cut
1451
1452 sub GetImportRecordMatches {
1453     my $import_record_id = shift;
1454     my $best_only = @_ ? shift : 0;
1455
1456     my $dbh = C4::Context->dbh;
1457     # FIXME currently biblio only
1458     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1459                                     candidate_match_id, score, record_type,
1460                                     chosen
1461                                     FROM import_records
1462                                     JOIN import_record_matches USING (import_record_id)
1463                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1464                                     WHERE import_record_id = ?
1465                                     ORDER BY score DESC, biblionumber DESC");
1466     $sth->bind_param(1, $import_record_id);
1467     my $results = [];
1468     $sth->execute();
1469     while (my $row = $sth->fetchrow_hashref) {
1470         if ($row->{'record_type'} eq 'auth') {
1471             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1472         }
1473         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1474         push @$results, $row;
1475         last if $best_only;
1476     }
1477     $sth->finish();
1478
1479     return $results;
1480     
1481 }
1482
1483 =head2 SetImportRecordMatches
1484
1485   SetImportRecordMatches($import_record_id, @matches);
1486
1487 =cut
1488
1489 sub SetImportRecordMatches {
1490     my $import_record_id = shift;
1491     my @matches = @_;
1492
1493     my $dbh = C4::Context->dbh;
1494     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1495     $delsth->execute($import_record_id);
1496     $delsth->finish();
1497
1498     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1499                                     VALUES (?, ?, ?, ?)");
1500     my $chosen = 1; #The first match is defaulted to be chosen
1501     foreach my $match (@matches) {
1502         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1503         $chosen = 0; #After the first we do not default to other matches
1504     }
1505 }
1506
1507 =head2 RecordsFromISO2709File
1508
1509     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1510
1511 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1512
1513 @PARAM1, String, absolute path to the ISO2709 file.
1514 @PARAM2, String, see stage_file.pl
1515 @PARAM3, String, should be utf8
1516
1517 Returns two array refs.
1518
1519 =cut
1520
1521 sub RecordsFromISO2709File {
1522     my ($input_file, $record_type, $encoding) = @_;
1523     my @errors;
1524
1525     my $marc_type = C4::Context->preference('marcflavour');
1526     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1527
1528     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1529     my @marc_records;
1530     $/ = "\035";
1531     while (<$fh>) {
1532         s/^\s+//;
1533         s/\s+$//;
1534         next unless $_; # skip if record has only whitespace, as might occur
1535                         # if file includes newlines between each MARC record
1536         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1537         push @marc_records, $marc_record;
1538         if ($charset_guessed ne $encoding) {
1539             push @errors,
1540                 "Unexpected charset $charset_guessed, expecting $encoding";
1541         }
1542     }
1543     close $fh;
1544     return ( \@errors, \@marc_records );
1545 }
1546
1547 =head2 RecordsFromMARCXMLFile
1548
1549     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1550
1551 Creates MARC::Record-objects out of the given MARCXML-file.
1552
1553 @PARAM1, String, absolute path to the ISO2709 file.
1554 @PARAM2, String, should be utf8
1555
1556 Returns two array refs.
1557
1558 =cut
1559
1560 sub RecordsFromMARCXMLFile {
1561     my ( $filename, $encoding ) = @_;
1562     my $batch = MARC::File::XML->in( $filename );
1563     my ( @marcRecords, @errors, $record );
1564     do {
1565         eval { $record = $batch->next( $encoding ); };
1566         if ($@) {
1567             push @errors, $@;
1568         }
1569         push @marcRecords, $record if $record;
1570     } while( $record );
1571     return (\@errors, \@marcRecords);
1572 }
1573
1574 =head2 RecordsFromMarcPlugin
1575
1576     Converts text of input_file into array of MARC records with to_marc plugin
1577
1578 =cut
1579
1580 sub RecordsFromMarcPlugin {
1581     my ($input_file, $plugin_class, $encoding) = @_;
1582     my ( $text, @return );
1583     return \@return if !$input_file || !$plugin_class;
1584
1585     # Read input file
1586     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1587     $/ = "\035";
1588     while (<$fh>) {
1589         s/^\s+//;
1590         s/\s+$//;
1591         next unless $_;
1592         $text .= $_;
1593     }
1594     close $fh;
1595
1596     # Convert to large MARC blob with plugin
1597     $text = Koha::Plugins::Handler->run({
1598         class  => $plugin_class,
1599         method => 'to_marc',
1600         params => { data => $text },
1601     }) if $text;
1602
1603     # Convert to array of MARC records
1604     if( $text ) {
1605         my $marc_type = C4::Context->preference('marcflavour');
1606         foreach my $blob ( split(/\x1D/, $text) ) {
1607             next if $blob =~ /^\s*$/;
1608             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1609             push @return, $marcrecord;
1610         }
1611     }
1612     return \@return;
1613 }
1614
1615 # internal functions
1616
1617 sub _create_import_record {
1618     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1619
1620     my $dbh = C4::Context->dbh;
1621     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1622                                                          record_type, encoding)
1623                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1624     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1625                   $record_type, $encoding);
1626     my $import_record_id = $dbh->{'mysql_insertid'};
1627     $sth->finish();
1628     return $import_record_id;
1629 }
1630
1631 sub _update_import_record_marc {
1632     my ($import_record_id, $marc_record, $marc_type) = @_;
1633
1634     my $dbh = C4::Context->dbh;
1635     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1636                              WHERE  import_record_id = ?");
1637     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1638     $sth->finish();
1639 }
1640
1641 sub _add_auth_fields {
1642     my ($import_record_id, $marc_record) = @_;
1643
1644     my $controlnumber;
1645     if ($marc_record->field('001')) {
1646         $controlnumber = $marc_record->field('001')->data();
1647     }
1648     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1649     my $dbh = C4::Context->dbh;
1650     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1651     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1652     $sth->finish();
1653 }
1654
1655 sub _add_biblio_fields {
1656     my ($import_record_id, $marc_record) = @_;
1657
1658     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1659     my $dbh = C4::Context->dbh;
1660     # FIXME no controlnumber, originalsource
1661     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1662     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1663     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1664     $sth->finish();
1665                 
1666 }
1667
1668 sub _update_biblio_fields {
1669     my ($import_record_id, $marc_record) = @_;
1670
1671     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1672     my $dbh = C4::Context->dbh;
1673     # FIXME no controlnumber, originalsource
1674     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1675     $isbn =~ s/\(.*$//;
1676     $isbn =~ tr/ -_//;
1677     $isbn = uc $isbn;
1678     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1679                              WHERE  import_record_id = ?");
1680     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1681     $sth->finish();
1682 }
1683
1684 sub _parse_biblio_fields {
1685     my ($marc_record) = @_;
1686
1687     my $dbh = C4::Context->dbh;
1688     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1689     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1690
1691 }
1692
1693 sub _update_batch_record_counts {
1694     my ($batch_id) = @_;
1695
1696     my $dbh = C4::Context->dbh;
1697     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1698                                         num_records = (
1699                                             SELECT COUNT(*)
1700                                             FROM import_records
1701                                             WHERE import_batch_id = import_batches.import_batch_id),
1702                                         num_items = (
1703                                             SELECT COUNT(*)
1704                                             FROM import_records
1705                                             JOIN import_items USING (import_record_id)
1706                                             WHERE import_batch_id = import_batches.import_batch_id
1707                                             AND record_type = 'biblio')
1708                                     WHERE import_batch_id = ?");
1709     $sth->bind_param(1, $batch_id);
1710     $sth->execute();
1711     $sth->finish();
1712 }
1713
1714 sub _get_commit_action {
1715     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1716     
1717     if ($record_type eq 'biblio') {
1718         my ($bib_result, $bib_match, $item_result);
1719
1720         $bib_match = GetBestRecordMatch($import_record_id);
1721         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1722
1723             $bib_result = $overlay_action;
1724
1725             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1726                 $item_result = 'create_new';
1727             } elsif($item_action eq 'replace'){
1728                 $item_result = 'replace';
1729             } else {
1730                 $item_result = 'ignore';
1731             }
1732
1733         } else {
1734             $bib_result = $nomatch_action;
1735             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1736         }
1737         return ($bib_result, $item_result, $bib_match);
1738     } else { # must be auths
1739         my ($auth_result, $auth_match);
1740
1741         $auth_match = GetBestRecordMatch($import_record_id);
1742         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1743             $auth_result = $overlay_action;
1744         } else {
1745             $auth_result = $nomatch_action;
1746         }
1747
1748         return ($auth_result, undef, $auth_match);
1749
1750     }
1751 }
1752
1753 sub _get_revert_action {
1754     my ($overlay_action, $overlay_status, $status) = @_;
1755
1756     my $bib_result;
1757
1758     if ($status eq 'ignored') {
1759         $bib_result = 'ignore';
1760     } else {
1761         if ($overlay_action eq 'create_new') {
1762             $bib_result = 'delete';
1763         } else {
1764             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1765         }
1766     }
1767     return $bib_result;
1768 }
1769
1770 1;
1771 __END__
1772
1773 =head1 AUTHOR
1774
1775 Koha Development Team <http://koha-community.org/>
1776
1777 Galen Charlton <galen.charlton@liblime.com>
1778
1779 =cut