1=head1 NAME
2
3RDF::Trine::Store::Redis - RDF Store for Redis
4
5=head1 VERSION
6
7This document describes RDF::Trine::Store::Redis version 1.019
8
9=head1 SYNOPSIS
10
11 use RDF::Trine::Store::Redis;
12
13=head1 DESCRIPTION
14
15RDF::Trine::Store::Redis provides a RDF::Trine::Store API to interact with a
16Redis server.
17
18=cut
19
20package RDF::Trine::Store::Redis;
21
22use strict;
23use warnings;
24no warnings 'redefine';
25use base qw(RDF::Trine::Store);
26
27use Redis;
28use Cache::LRU;
29use URI::Escape;
30use Data::Dumper;
31use Digest::MD5 qw(md5_base64);
32use List::Util qw(first);
33use List::MoreUtils qw(zip);
34use Scalar::Util qw(refaddr reftype blessed);
35use HTTP::Request::Common ();
36use JSON;
37
38use RDF::Trine::Error qw(:try);
39
40######################################################################
41
42our $CACHING	= 1;
43
44my @pos_names;
45our $VERSION;
46BEGIN {
47	$VERSION	= "1.019";
48	my $class	= __PACKAGE__;
49	$RDF::Trine::Store::STORE_CLASSES{ $class }	= $VERSION;
50	@pos_names	= qw(subject predicate object context);
51}
52
53######################################################################
54
55=head1 METHODS
56
57Beyond the methods documented below, this class inherits methods from the
58L<RDF::Trine::Store> class.
59
60=over 4
61
62=item C<< new ( $server ) >>
63
64Returns a new storage object.
65
66=item C<new_with_config ( $hashref )>
67
68Returns a new storage object configured with a hashref with certain
69keys as arguments.
70
71The C<storetype> key must be C<Redis> for this backend.
72
73The following key must also be used:
74
75=over
76
77=item foo
78
79description
80
81=back
82
83=cut
84
85sub new {
86	my $class	= shift;
87	my %args	= @_;
88	my $size	= delete $args{cache_size};
89	$size		= 128 unless (defined($size) and $size > 0);
90	my $r		= Redis->new( %args );
91	my $cache	= Cache::LRU->new( size => $size );
92	my $self	= bless({ conn => $r, cache => $cache, cache_size => $size }, $class);
93	return $self;
94}
95
96=item C<< conn >>
97
98Returns the Redis connection object.
99
100=cut
101
102sub conn {
103	my $self	= shift;
104	return $self->{conn};
105}
106
107=item C<< cache >>
108
109Returns the Cache::LRU object used to cache frequently used redis data.
110
111=cut
112
113sub cache {
114	my $self	= shift;
115	return $self->{cache};
116}
117
118sub _new_with_string {
119	my $class	= shift;
120	my $config	= shift;
121	return $class->new( $config );
122}
123
124=item C<< new_with_config ( \%config ) >>
125
126Returns a new RDF::Trine::Store object based on the supplied configuration hashref.
127
128=cut
129
130sub new_with_config {
131	my $proto	= shift;
132	my $config	= shift;
133	$config->{storetype}	= 'Redis';
134	return $proto->SUPER::new_with_config( $config );
135}
136
137sub _new_with_config {
138	my $class	= shift;
139	my $config	= shift;
140	return $class->new( server => $config->{server}, cache_size => $config->{cache_size} );
141}
142
143sub _config_meta {
144	return {
145		required_keys	=> [qw(server)],
146		fields			=> {
147			server		=> { description => 'server:port', type => 'string' },
148			cache_size	=> { description => 'cache size', type => 'int' },
149		}
150	}
151}
152
153sub _id_node {
154	my $self	= shift;
155	my @id		= @_;
156	my $r		= $self->conn;
157	my $p		= RDF::Trine::Parser::NTriples->new();
158
159	my @nodes;
160	foreach my $id (@id) {
161		my $bucket	= int($id / 1000);
162		my $hid		= $id % 1000;
163		my $key		= "R:n.v:$bucket";
164		my $nt		= $r->hget($key, $hid);
165		my $node	= $p->parse_node( $nt );
166		push(@nodes, $node);
167	}
168	return @nodes;
169}
170
171sub _get_node_id {
172	my $self	= shift;
173	my @node	= @_;
174	my $r		= $self->conn;
175	my $s		= RDF::Trine::Serializer::NTriples->new();
176	my @str		= map { $s->serialize_node( $_ ) } @node;
177	my @ids;
178	foreach my $nt (@str) {
179		my $md5		= md5_base64($nt);
180		my $key		= "R:n.i:$md5";
181		my $id		= $r->get($key);
182		push(@ids, $id);
183	}
184	return wantarray ? @ids : $ids[0];
185}
186
187sub _get_or_set_node_id {
188	my $self	= shift;
189	my $node	= shift;
190	my $r		= $self->conn;
191	my $s		= RDF::Trine::Serializer::NTriples->new();
192	my $nt		= $s->serialize_node( $node );
193
194	my $md5		= md5_base64($nt);
195	my $idkey	= "R:n.i:$md5";
196	my $id		= $r->get( $idkey );
197	return $id if (defined($id));
198
199	$id			= $r->incr( 'RT:node.next' );
200
201	$r->set($idkey, $id);
202
203	my $bucket	= int($id / 1000);
204	my $hid		= $id % 1000;
205
206	my $valkey	= "R:n.v:$bucket";
207	$r->hset( $valkey, $hid, $nt );
208
209	return $id;
210}
211
212=item C<< add_statement ( $statement [, $context] ) >>
213
214Adds the specified C<$statement> to the underlying model.
215
216=cut
217
218sub add_statement {
219	my $self	= shift;
220	my $st		= shift;
221	my $context	= shift;
222	unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
223		throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to add_statement";
224	}
225
226	if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) {
227		throw RDF::Trine::Error::MethodInvocationError -text => "add_statement cannot be called with both a quad and a context";
228	}
229
230	if ($self->_bulk_ops) {
231		push(@{ $self->{ ops } }, ['_add_statements', $st, $context]);
232	} else {
233		my $r		= $self->conn;
234		my @nodes	= $st->nodes;
235		$nodes[3]	= $context if ($context);
236		@nodes		= map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3];
237		my @ids		= map { $self->_get_or_set_node_id($_) } @nodes;
238		my $key		= join(':', @ids);
239		my @keys	= qw(s p o g);
240		$r->hmset( "RT:spog:$key", zip @keys, @ids );
241		$r->sadd( "RT:sset:$ids[0]", $key );
242		$r->sadd( "RT:pset:$ids[1]", $key );
243		$r->sadd( "RT:oset:$ids[2]", $key );
244		$r->sadd( "RT:gset:$ids[3]", $key );
245	}
246	return;
247}
248
249=item C<< remove_statement ( $statement [, $context]) >>
250
251Removes the specified C<$statement> from the underlying model.
252
253=cut
254
255sub remove_statement {
256	my $self	= shift;
257	my $st		= shift;
258	my $context	= shift;
259
260	unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
261		throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to remove_statement";
262	}
263
264	if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) {
265		throw RDF::Trine::Error::MethodInvocationError -text => "remove_statement cannot be called with both a quad and a context";
266	}
267
268	if ($self->_bulk_ops) {
269		push(@{ $self->{ ops } }, ['_remove_statements', $st, $context]);
270	} else {
271		my $r		= $self->conn;
272		my @nodes	= $st->nodes;
273		$nodes[3]	= $context if ($context);
274		@nodes		= map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3];
275		my @ids		= $self->_get_node_id(@nodes);
276		foreach my $i (@ids) {
277			return unless defined($i);
278		}
279		my $key		= join(':', @ids);
280		$r->del( "RT:spog:$key" );
281		$r->srem( "RT:sset:$ids[0]", $key );
282		$r->srem( "RT:pset:$ids[1]", $key );
283		$r->srem( "RT:oset:$ids[2]", $key );
284		$r->srem( "RT:gset:$ids[3]", $key );
285	}
286	return;
287}
288
289=item C<< remove_statements ( $subject, $predicate, $object [, $context]) >>
290
291Removes the specified C<$statement> from the underlying model.
292
293=cut
294
295sub remove_statements {
296	my $self	= shift;
297	my @nodes	= @_[0..3];
298	my $st		= RDF::Trine::Statement->new( @nodes[0..2] );
299	my $context	= $nodes[3];
300
301	if ($self->_bulk_ops) {
302		push(@{ $self->{ ops } }, ['_remove_statement_patterns', $st, $context]);
303	} else {
304		my @strs	= map { (not(blessed($_)) or $_->is_variable) ? '*' : $self->_get_or_set_node_id($_) } @nodes;
305		my $key		= 'RT:spog:' . join(':', @strs);
306		my $r		= $self->conn;
307		foreach my $k ($r->keys($key)) {
308			my ($sid, $pid, $oid, $gid)	= $k =~ m/RT:spog:(\d+):(\d+):(\d+):(\d+)/;
309			$r->srem( "RT:sset:$sid", $_ ) for ($r->smembers("RT:sset:$sid"));
310			$r->srem( "RT:pset:$pid", $_ ) for ($r->smembers("RT:pset:$pid"));
311			$r->srem( "RT:oset:$oid", $_ ) for ($r->smembers("RT:oset:$oid"));
312			$r->srem( "RT:gset:$gid", $_ ) for ($r->smembers("RT:gset:$gid"));
313			$r->del( $k );
314		}
315	}
316	return;
317}
318
319=item C<< get_statements ($subject, $predicate, $object [, $context] ) >>
320
321Returns a stream object of all statements matching the specified subject,
322predicate and objects. Any of the arguments may be undef to match any value.
323
324=cut
325
326sub get_statements {
327	my $self	= shift;
328	my @nodes	= @_;
329
330	my $use_quad	= 0;
331	if (scalar(@_) >= 4) {
332		$use_quad	= 1;
333	} elsif (scalar(@nodes) != 3) {
334		$#nodes		= 3;
335		$use_quad	= 1;
336	}
337
338	my @var_map	= qw(s p o g);
339	my %var_map	= map { $var_map[$_] => $_ } (0 .. $#var_map);
340	my @node_map;
341	foreach my $i (0 .. $#nodes) {
342		if (not(blessed($nodes[$i])) or $nodes[$i]->is_variable) {
343			$nodes[$i]	= RDF::Trine::Node::Variable->new( $var_map[ $i ] );
344		}
345	}
346
347	my $sub;
348	if ($use_quad) {
349		my $r	= $self->conn;
350		my @skeys;
351		my @indexes	= qw(s p o g);
352		foreach my $i (0 .. $#indexes) {
353			my $index	= $indexes[$i];
354			my $n		= $nodes[$i];
355			unless ($n->is_variable) {
356				my $id	= $self->_get_node_id($n);
357				unless (defined($id)) {
358					return RDF::Trine::Iterator::Graph->new( [] );
359				}
360				my $key	= "RT:${index}set:$id";
361				push(@skeys, $key);
362			}
363		}
364		if (@skeys) {
365			my @keys	= $r->sinter(@skeys);
366			$sub		= sub {
367				return unless (scalar(@keys));
368				my $key		= shift(@keys);
369				my @data	= split(':', $key);
370				my @nodes	= $self->_id_node( @data[0..3] );
371				my $st		= RDF::Trine::Statement::Quad->new( @nodes );
372				return $st;
373			};
374		} else {
375			my @strs	= map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes;
376			my $key		= 'RT:spog:' . join(':', @strs);
377			my @keys	= $r->keys($key);
378			$sub		= sub {
379				return unless (scalar(@keys));
380				my $key		= shift(@keys);
381				(undef, undef, my @data)	= split(':', $key);
382				my @nodes	= $self->_id_node( @data );
383				my $st		= RDF::Trine::Statement::Quad->new( @nodes );
384				return $st;
385			};
386		}
387	} else {
388		my $r	= $self->conn;
389		my @skeys;
390		my @indexes	= qw(s p o);
391		foreach my $i (0 .. $#indexes) {
392			my $index	= $indexes[$i];
393			my $n		= $nodes[$i];
394			unless ($n->is_variable) {
395				my $id	= $self->_get_node_id($n);
396				unless (defined($id)) {
397					return RDF::Trine::Iterator::Graph->new( [] );
398				}
399				my $key	= "RT:${index}set:$id";
400				push(@skeys, $key);
401			}
402		}
403		if (@skeys) {
404			my @keys	= $r->sinter(@skeys);
405			my %keys;
406			foreach (@keys) {
407				s/:[^:]+$//;
408				$keys{ $_ }++;
409			}
410			@keys	= keys %keys;
411			$sub		= sub {
412				return unless (scalar(@keys));
413				my $key		= shift(@keys);
414				my @data	= split(':', $key);
415				my @nodes	= $self->_id_node( @data[0..2] );
416				my $st		= RDF::Trine::Statement->new( @nodes );
417				return $st;
418			};
419		} else {
420			my @strs	= map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes[0..2];
421			my $key		= 'RT:spog:' . join(':', @strs, '*');
422			my %triples;
423			foreach ($r->keys($key)) {
424				s/:[^:]+$//;
425				$triples{ $_ }++;
426			}
427			my @keys	= keys %triples;
428			$sub		= sub {
429				return unless (scalar(@keys));
430				my $key		= shift(@keys);
431				my ($ids)	= $key =~ m/^RT:spog:(.*)$/;
432				my @data	= split(':', $ids);
433				my @nodes	= $self->_id_node( @data );
434				my $st		= RDF::Trine::Statement->new( @nodes );
435				return $st;
436			};
437		}
438	}
439	return RDF::Trine::Iterator::Graph->new( $sub );
440}
441
442=item C<< count_statements ( $subject, $predicate, $object, $context ) >>
443
444Returns a count of all the statements matching the specified subject,
445predicate, object, and context. Any of the arguments may be undef to match any
446value.
447
448=cut
449
450sub count_statements {
451	my $self	= shift;
452	my $use_quad	= 0;
453	if (scalar(@_) >= 4) {
454		$use_quad	= 1;
455# 		warn "count statements with quad" if ($::debug);
456	}
457	my @nodes	= @_;
458	my @strs;
459	foreach my $n (@nodes[0..3]) {
460		if (not(blessed($n)) or $n->is_variable) {
461			push(@strs, '*');
462		} else {
463			my $id	= $self->_get_node_id($n);
464			unless (defined($id)) {
465				return 0;
466			}
467			push(@strs, $id);
468		}
469	}
470
471	if ($use_quad) {
472		my $key		= 'RT:spog:' . join(':', @strs);
473		my $r		= $self->conn;
474		my @keys	= $r->keys($key);
475		return scalar(@keys);
476	} else {
477		my $key		= 'RT:spog:' . join(':', @strs);
478		my $r		= $self->conn;
479		my @keys	= $r->keys($key);
480		my %keys;
481		foreach (@keys) {
482			s/:[^:]+$//;
483			$keys{ $_ }++;
484		}
485		@keys	= keys %keys;
486		return scalar(@keys);
487	}
488}
489
490=item C<< get_contexts >>
491
492Returns an RDF::Trine::Iterator over the RDF::Trine::Node objects comprising
493the set of contexts of the stored quads.
494
495=cut
496
497sub get_contexts {
498	my $self	= shift;
499	my $r		= $self->conn;
500	my @keys	= $r->keys('RT:spog:*');
501	my %graphs;
502	foreach (@keys) {
503		s/^.*://;
504		$graphs{ $_ }++;
505	}
506	my @nodes	= grep { not($_->isa('RDF::Trine::Node::Nil')) } $self->_id_node(keys %graphs);
507	return RDF::Trine::Iterator->new( \@nodes );
508}
509
510
511=item C<< supports ( [ $feature ] ) >>
512
513If C<< $feature >> is specified, returns true if the feature is supported by the
514store, false otherwise. If C<< $feature >> is not specified, returns a list of
515supported features.
516
517=cut
518
519sub supports {
520	my $self	= shift;
521	my %features	= map { $_ => 1 } (
522# 		'http://www.w3.org/ns/sparql-service-description#SPARQL10Query',
523# 		'http://www.w3.org/ns/sparql-service-description#SPARQL11Query',
524	);
525	if (@_) {
526		my $f	= shift;
527		return $features{ $f };
528	} else {
529		return keys %features;
530	}
531}
532
533# =item C<< get_sparql ( $sparql ) >>
534#
535# Returns an iterator object of all bindings matching the specified SPARQL query.
536#
537# =cut
538#
539# sub get_sparql {
540# 	my $self	= shift;
541# 	my $sparql	= shift;
542# 	throw RDF::Trine::Error::UnimplementedError -text => "get_sparql not implemented for Redis stores yet";
543# }
544
545sub _bulk_ops {
546	return 0;
547}
548
549sub _begin_bulk_ops {
550	return 0;
551}
552
553sub _end_bulk_ops {
554	my $self			= shift;
555	if (scalar(@{ $self->{ ops } || []})) {
556		my @ops	= splice(@{ $self->{ ops } });
557		my @aggops	= $self->_group_bulk_ops( @ops );
558		my @sparql;
559		warn '_end_bulk_ops: ' . Dumper(\@aggops);
560		throw RDF::Trine::Error::UnimplementedError -text => "bulk operations not implemented for Redis stores yet";
561	}
562	$self->{BulkOps}	= 0;
563}
564
565=item C<< nuke >>
566
567Permanently removes the store and its data.
568
569=cut
570
571sub nuke {
572	my $self	= shift;
573	my $r		= $self->conn;
574	$r->del('RT:node.next');
575	foreach my $k ($r->keys('R:n.i:*')) {
576		$r->del($k);
577	}
578	foreach my $k ($r->keys('R:n.v:*')) {
579		$r->del($k);
580	}
581	foreach my $k ($r->keys('RT:spog:*')) {
582		$r->del($k);
583	}
584	$r->del($_) foreach ($r->keys('RT:sset:*'));
585	$r->del($_) foreach ($r->keys('RT:pset:*'));
586	$r->del($_) foreach ($r->keys('RT:oset:*'));
587	$r->del($_) foreach ($r->keys('RT:gset:*'));
588
589	$self->{cache}	= Cache::LRU->new( size => $self->{cache_size} );
590}
591
592
593sub _dump {
594	my $self	= shift;
595	my $r		= $self->conn;
596	my @keys	= $r->keys('RT:spog:*');
597	warn "--------------------------------------\n";
598	warn '*** DUMP Redis statements:';
599	warn "$_\n" foreach (@keys);
600}
601
6021;
603
604__END__
605
606=back
607
608=head1 REDIS DATA LAYOUT
609
610...
611
612=head1 BUGS
613
614Please report any bugs or feature requests to through the GitHub web interface
615at L<https://github.com/kasei/perlrdf/issues>.
616
617=head1 AUTHOR
618
619Gregory Todd Williams  C<< <gwilliams@cpan.org> >>
620
621=head1 COPYRIGHT
622
623Copyright (c) 2006-2012 Gregory Todd Williams. This
624program is free software; you can redistribute it and/or modify it under
625the same terms as Perl itself.
626
627=cut
628