Bug 27584: Refactor OAI-PMH paging to improve performance
[koha.git] / Koha / OAI / Server / ListBase.pm
1 package Koha::OAI::Server::ListBase;
2
3 # Copyright The National Library of Finland, University of Helsinki 2016-2017
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 =head1 NAME
21
22 Koha::OAI::Server::ListBase - OAI ListIdentifiers/ListRecords shared functionality
23
24 =head1 DESCRIPTION
25
26 Koha::OAI::Server::ListBase contains OAI-PMH functions shared by ListIdentifiers and ListRecords.
27
28 =cut
29
30 use Modern::Perl;
31 use C4::Biblio;
32 use HTTP::OAI;
33 use Koha::OAI::Server::ResumptionToken;
34 use Koha::OAI::Server::Record;
35 use Koha::OAI::Server::DeletedRecord;
36 use C4::OAI::Sets;
37 use MARC::File::XML;
38
39 sub GetRecords {
40     my ($class, $self, $repository, $metadata, %args) = @_;
41
42     my $token = Koha::OAI::Server::ResumptionToken->new( %args );
43     my $dbh = C4::Context->dbh;
44     my $set;
45     if ( defined $token->{'set'} ) {
46         $set = GetOAISetBySpec($token->{'set'});
47     }
48     my $deleted = defined $token->{deleted} ? $token->{deleted} : 0;
49     my $max = $repository->{koha_max_count};
50     my $count = 0;
51     my $format = $args{metadataPrefix} || $token->{metadata_prefix};
52     my $include_items = $repository->items_included( $format );
53     my $from = $token->{from_arg};
54     my $until = $token->{until_arg};
55     my $next_id = $token->{next_id};
56
57     # Since creating a union of normal and deleted record tables would be a heavy
58     # operation in a large database, build results in two stages:
59     # first deleted records ($deleted == 1), then normal records ($deleted == 0)
60     STAGELOOP:
61     for ( ; $deleted >= 0; $deleted-- ) {
62         my $table = $deleted ? 'deletedbiblio_metadata' : 'biblio_metadata';
63
64         my @part_bind_params = ($next_id);
65
66         my $where = "biblionumber >= ?";
67         if ( $from ) {
68             $where .= " AND timestamp >= ?";
69             push @part_bind_params, $from;
70         }
71         if ( $until ) {
72             $where .= " AND timestamp <= ?";
73             push @part_bind_params, $until;
74         }
75         if ( defined $set ) {
76             $where .= " AND biblionumber in (SELECT osb.biblionumber FROM oai_sets_biblios osb WHERE osb.set_id = ?)";
77             push @part_bind_params, $set->{'id'};
78         }
79
80         my @bind_params = @part_bind_params;
81
82         my $criteria = "WHERE $where ORDER BY biblionumber LIMIT " . ($max + 1);
83
84         my $sql = "
85             SELECT biblionumber
86             FROM $table
87             $criteria
88         ";
89
90         # If items are included, fetch a set of potential biblionumbers from items tables as well.
91         # Then merge them, sort them and take the required number of them from the resulting list.
92         # This may seem counter-intuitive as in worst case we fetch 3 times the biblionumbers needed,
93         # but avoiding joins or subqueries makes this so much faster that it does not matter.
94         if ( $include_items )  {
95             $sql = "($sql) UNION (SELECT DISTINCT(biblionumber) FROM deleteditems $criteria)";
96             push @bind_params, @part_bind_params;
97             if (!$deleted) {
98                 $sql .= " UNION (SELECT DISTINCT(biblionumber) FROM items $criteria)";
99                 push @bind_params, @part_bind_params;
100             }
101             $sql = "SELECT biblionumber FROM ($sql) bibnos ORDER BY biblionumber LIMIT " . ($max + 1);
102         }
103
104         my $sth = $dbh->prepare( $sql ) || die( 'Could not prepare statement: ' . $dbh->errstr );
105         $sth->execute( @bind_params ) || die( 'Could not execute statement: ' . $sth->errstr );
106
107         my $ts_sql;
108         if ( $include_items ) {
109             if ( $deleted ) {
110                 $ts_sql = "
111                     SELECT MAX(timestamp)
112                     FROM (
113                         SELECT timestamp FROM deletedbiblio_metadata WHERE biblionumber = ?
114                         UNION
115                         SELECT timestamp FROM deleteditems WHERE biblionumber = ?
116                     ) bis
117                 ";
118             } else {
119                 $ts_sql = "
120                     SELECT MAX(timestamp)
121                     FROM (
122                         SELECT timestamp FROM biblio_metadata WHERE biblionumber = ?
123                         UNION
124                         SELECT timestamp FROM deleteditems WHERE biblionumber = ?
125                         UNION
126                         SELECT timestamp FROM items WHERE biblionumber = ?
127                     ) bi
128                 ";
129             }
130         } else {
131             $ts_sql = "SELECT timestamp FROM $table WHERE biblionumber = ?";
132         }
133         my $ts_sth = $dbh->prepare( $ts_sql ) || die( 'Could not prepare statement: ' . $dbh->errstr );
134
135         foreach my $row (@{ $sth->fetchall_arrayref() }) {
136             my $biblionumber = $row->[0];
137             $count++;
138             if ( $count > $max ) {
139                 $self->resumptionToken(
140                     Koha::OAI::Server::ResumptionToken->new(
141                         metadataPrefix  => $token->{metadata_prefix},
142                         from            => $token->{from},
143                         until           => $token->{until},
144                         cursor          => $token->{cursor} + $max,
145                         set             => $token->{set},
146                         deleted         => $deleted,
147                         next_id         => $biblionumber
148                     )
149                 );
150                 last STAGELOOP;
151             }
152             my @params = ($biblionumber);
153             if ( $include_items ) {
154                 push @params, $deleted ? ( $biblionumber ) : ( $biblionumber, $biblionumber );
155             }
156             $ts_sth->execute( @params ) || die( 'Could not execute statement: ' . $ts_sth->errstr );
157
158             my ($timestamp) = $ts_sth->fetchrow;
159
160             my $oai_sets = GetOAISetsBiblio($biblionumber);
161             my @setSpecs;
162             foreach ( @$oai_sets ) {
163                 push @setSpecs, $_->{spec};
164             }
165             if ( $metadata ) {
166                 my $marcxml = !$deleted ? $repository->get_biblio_marcxml($biblionumber, $format) : undef;
167                 if ( $marcxml ) {
168                   $self->record( Koha::OAI::Server::Record->new(
169                       $repository, $marcxml, $timestamp, \@setSpecs,
170                       identifier      => $repository->{ koha_identifier } . ':' . $biblionumber,
171                       metadataPrefix  => $token->{metadata_prefix}
172                   ) );
173                 } else {
174                   $self->record( Koha::OAI::Server::DeletedRecord->new(
175                       $timestamp, \@setSpecs, identifier => $repository->{ koha_identifier } . ':' . $biblionumber
176                   ) );
177                 }
178             } else {
179                 $timestamp =~ s/ /T/;
180                 $timestamp .= 'Z';
181                 $self->identifier( HTTP::OAI::Header->new(
182                     identifier => $repository->{ koha_identifier} . ':' . $biblionumber,
183                     datestamp  => $timestamp,
184                     status     => $deleted ? 'deleted' : undef
185                 ) );
186             }
187         }
188         # Reset $next_id for the next stage
189         $next_id = 0;
190     }
191     return $count;
192 }
193
194 1;