1=head1 NAME 2 3RDF::Trine::Store::Redis - RDF Store for Redis 4 5=head1 VERSION 6 7This document describes RDF::Trine::Store::Redis version 1.019 8 9=head1 SYNOPSIS 10 11 use RDF::Trine::Store::Redis; 12 13=head1 DESCRIPTION 14 15RDF::Trine::Store::Redis provides a RDF::Trine::Store API to interact with a 16Redis server. 17 18=cut 19 20package RDF::Trine::Store::Redis; 21 22use strict; 23use warnings; 24no warnings 'redefine'; 25use base qw(RDF::Trine::Store); 26 27use Redis; 28use Cache::LRU; 29use URI::Escape; 30use Data::Dumper; 31use Digest::MD5 qw(md5_base64); 32use List::Util qw(first); 33use List::MoreUtils qw(zip); 34use Scalar::Util qw(refaddr reftype blessed); 35use HTTP::Request::Common (); 36use JSON; 37 38use RDF::Trine::Error qw(:try); 39 40###################################################################### 41 42our $CACHING = 1; 43 44my @pos_names; 45our $VERSION; 46BEGIN { 47 $VERSION = "1.019"; 48 my $class = __PACKAGE__; 49 $RDF::Trine::Store::STORE_CLASSES{ $class } = $VERSION; 50 @pos_names = qw(subject predicate object context); 51} 52 53###################################################################### 54 55=head1 METHODS 56 57Beyond the methods documented below, this class inherits methods from the 58L<RDF::Trine::Store> class. 59 60=over 4 61 62=item C<< new ( $server ) >> 63 64Returns a new storage object. 65 66=item C<new_with_config ( $hashref )> 67 68Returns a new storage object configured with a hashref with certain 69keys as arguments. 70 71The C<storetype> key must be C<Redis> for this backend. 72 73The following key must also be used: 74 75=over 76 77=item foo 78 79description 80 81=back 82 83=cut 84 85sub new { 86 my $class = shift; 87 my %args = @_; 88 my $size = delete $args{cache_size}; 89 $size = 128 unless (defined($size) and $size > 0); 90 my $r = Redis->new( %args ); 91 my $cache = Cache::LRU->new( size => $size ); 92 my $self = bless({ conn => $r, cache => $cache, cache_size => $size }, $class); 93 return $self; 94} 95 96=item C<< conn >> 97 98Returns the Redis connection object. 99 100=cut 101 102sub conn { 103 my $self = shift; 104 return $self->{conn}; 105} 106 107=item C<< cache >> 108 109Returns the Cache::LRU object used to cache frequently used redis data. 110 111=cut 112 113sub cache { 114 my $self = shift; 115 return $self->{cache}; 116} 117 118sub _new_with_string { 119 my $class = shift; 120 my $config = shift; 121 return $class->new( $config ); 122} 123 124=item C<< new_with_config ( \%config ) >> 125 126Returns a new RDF::Trine::Store object based on the supplied configuration hashref. 127 128=cut 129 130sub new_with_config { 131 my $proto = shift; 132 my $config = shift; 133 $config->{storetype} = 'Redis'; 134 return $proto->SUPER::new_with_config( $config ); 135} 136 137sub _new_with_config { 138 my $class = shift; 139 my $config = shift; 140 return $class->new( server => $config->{server}, cache_size => $config->{cache_size} ); 141} 142 143sub _config_meta { 144 return { 145 required_keys => [qw(server)], 146 fields => { 147 server => { description => 'server:port', type => 'string' }, 148 cache_size => { description => 'cache size', type => 'int' }, 149 } 150 } 151} 152 153sub _id_node { 154 my $self = shift; 155 my @id = @_; 156 my $r = $self->conn; 157 my $p = RDF::Trine::Parser::NTriples->new(); 158 159 my @nodes; 160 foreach my $id (@id) { 161 my $bucket = int($id / 1000); 162 my $hid = $id % 1000; 163 my $key = "R:n.v:$bucket"; 164 my $nt = $r->hget($key, $hid); 165 my $node = $p->parse_node( $nt ); 166 push(@nodes, $node); 167 } 168 return @nodes; 169} 170 171sub _get_node_id { 172 my $self = shift; 173 my @node = @_; 174 my $r = $self->conn; 175 my $s = RDF::Trine::Serializer::NTriples->new(); 176 my @str = map { $s->serialize_node( $_ ) } @node; 177 my @ids; 178 foreach my $nt (@str) { 179 my $md5 = md5_base64($nt); 180 my $key = "R:n.i:$md5"; 181 my $id = $r->get($key); 182 push(@ids, $id); 183 } 184 return wantarray ? @ids : $ids[0]; 185} 186 187sub _get_or_set_node_id { 188 my $self = shift; 189 my $node = shift; 190 my $r = $self->conn; 191 my $s = RDF::Trine::Serializer::NTriples->new(); 192 my $nt = $s->serialize_node( $node ); 193 194 my $md5 = md5_base64($nt); 195 my $idkey = "R:n.i:$md5"; 196 my $id = $r->get( $idkey ); 197 return $id if (defined($id)); 198 199 $id = $r->incr( 'RT:node.next' ); 200 201 $r->set($idkey, $id); 202 203 my $bucket = int($id / 1000); 204 my $hid = $id % 1000; 205 206 my $valkey = "R:n.v:$bucket"; 207 $r->hset( $valkey, $hid, $nt ); 208 209 return $id; 210} 211 212=item C<< add_statement ( $statement [, $context] ) >> 213 214Adds the specified C<$statement> to the underlying model. 215 216=cut 217 218sub add_statement { 219 my $self = shift; 220 my $st = shift; 221 my $context = shift; 222 unless (blessed($st) and $st->isa('RDF::Trine::Statement')) { 223 throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to add_statement"; 224 } 225 226 if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) { 227 throw RDF::Trine::Error::MethodInvocationError -text => "add_statement cannot be called with both a quad and a context"; 228 } 229 230 if ($self->_bulk_ops) { 231 push(@{ $self->{ ops } }, ['_add_statements', $st, $context]); 232 } else { 233 my $r = $self->conn; 234 my @nodes = $st->nodes; 235 $nodes[3] = $context if ($context); 236 @nodes = map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3]; 237 my @ids = map { $self->_get_or_set_node_id($_) } @nodes; 238 my $key = join(':', @ids); 239 my @keys = qw(s p o g); 240 $r->hmset( "RT:spog:$key", zip @keys, @ids ); 241 $r->sadd( "RT:sset:$ids[0]", $key ); 242 $r->sadd( "RT:pset:$ids[1]", $key ); 243 $r->sadd( "RT:oset:$ids[2]", $key ); 244 $r->sadd( "RT:gset:$ids[3]", $key ); 245 } 246 return; 247} 248 249=item C<< remove_statement ( $statement [, $context]) >> 250 251Removes the specified C<$statement> from the underlying model. 252 253=cut 254 255sub remove_statement { 256 my $self = shift; 257 my $st = shift; 258 my $context = shift; 259 260 unless (blessed($st) and $st->isa('RDF::Trine::Statement')) { 261 throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to remove_statement"; 262 } 263 264 if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) { 265 throw RDF::Trine::Error::MethodInvocationError -text => "remove_statement cannot be called with both a quad and a context"; 266 } 267 268 if ($self->_bulk_ops) { 269 push(@{ $self->{ ops } }, ['_remove_statements', $st, $context]); 270 } else { 271 my $r = $self->conn; 272 my @nodes = $st->nodes; 273 $nodes[3] = $context if ($context); 274 @nodes = map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3]; 275 my @ids = $self->_get_node_id(@nodes); 276 foreach my $i (@ids) { 277 return unless defined($i); 278 } 279 my $key = join(':', @ids); 280 $r->del( "RT:spog:$key" ); 281 $r->srem( "RT:sset:$ids[0]", $key ); 282 $r->srem( "RT:pset:$ids[1]", $key ); 283 $r->srem( "RT:oset:$ids[2]", $key ); 284 $r->srem( "RT:gset:$ids[3]", $key ); 285 } 286 return; 287} 288 289=item C<< remove_statements ( $subject, $predicate, $object [, $context]) >> 290 291Removes the specified C<$statement> from the underlying model. 292 293=cut 294 295sub remove_statements { 296 my $self = shift; 297 my @nodes = @_[0..3]; 298 my $st = RDF::Trine::Statement->new( @nodes[0..2] ); 299 my $context = $nodes[3]; 300 301 if ($self->_bulk_ops) { 302 push(@{ $self->{ ops } }, ['_remove_statement_patterns', $st, $context]); 303 } else { 304 my @strs = map { (not(blessed($_)) or $_->is_variable) ? '*' : $self->_get_or_set_node_id($_) } @nodes; 305 my $key = 'RT:spog:' . join(':', @strs); 306 my $r = $self->conn; 307 foreach my $k ($r->keys($key)) { 308 my ($sid, $pid, $oid, $gid) = $k =~ m/RT:spog:(\d+):(\d+):(\d+):(\d+)/; 309 $r->srem( "RT:sset:$sid", $_ ) for ($r->smembers("RT:sset:$sid")); 310 $r->srem( "RT:pset:$pid", $_ ) for ($r->smembers("RT:pset:$pid")); 311 $r->srem( "RT:oset:$oid", $_ ) for ($r->smembers("RT:oset:$oid")); 312 $r->srem( "RT:gset:$gid", $_ ) for ($r->smembers("RT:gset:$gid")); 313 $r->del( $k ); 314 } 315 } 316 return; 317} 318 319=item C<< get_statements ($subject, $predicate, $object [, $context] ) >> 320 321Returns a stream object of all statements matching the specified subject, 322predicate and objects. Any of the arguments may be undef to match any value. 323 324=cut 325 326sub get_statements { 327 my $self = shift; 328 my @nodes = @_; 329 330 my $use_quad = 0; 331 if (scalar(@_) >= 4) { 332 $use_quad = 1; 333 } elsif (scalar(@nodes) != 3) { 334 $#nodes = 3; 335 $use_quad = 1; 336 } 337 338 my @var_map = qw(s p o g); 339 my %var_map = map { $var_map[$_] => $_ } (0 .. $#var_map); 340 my @node_map; 341 foreach my $i (0 .. $#nodes) { 342 if (not(blessed($nodes[$i])) or $nodes[$i]->is_variable) { 343 $nodes[$i] = RDF::Trine::Node::Variable->new( $var_map[ $i ] ); 344 } 345 } 346 347 my $sub; 348 if ($use_quad) { 349 my $r = $self->conn; 350 my @skeys; 351 my @indexes = qw(s p o g); 352 foreach my $i (0 .. $#indexes) { 353 my $index = $indexes[$i]; 354 my $n = $nodes[$i]; 355 unless ($n->is_variable) { 356 my $id = $self->_get_node_id($n); 357 unless (defined($id)) { 358 return RDF::Trine::Iterator::Graph->new( [] ); 359 } 360 my $key = "RT:${index}set:$id"; 361 push(@skeys, $key); 362 } 363 } 364 if (@skeys) { 365 my @keys = $r->sinter(@skeys); 366 $sub = sub { 367 return unless (scalar(@keys)); 368 my $key = shift(@keys); 369 my @data = split(':', $key); 370 my @nodes = $self->_id_node( @data[0..3] ); 371 my $st = RDF::Trine::Statement::Quad->new( @nodes ); 372 return $st; 373 }; 374 } else { 375 my @strs = map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes; 376 my $key = 'RT:spog:' . join(':', @strs); 377 my @keys = $r->keys($key); 378 $sub = sub { 379 return unless (scalar(@keys)); 380 my $key = shift(@keys); 381 (undef, undef, my @data) = split(':', $key); 382 my @nodes = $self->_id_node( @data ); 383 my $st = RDF::Trine::Statement::Quad->new( @nodes ); 384 return $st; 385 }; 386 } 387 } else { 388 my $r = $self->conn; 389 my @skeys; 390 my @indexes = qw(s p o); 391 foreach my $i (0 .. $#indexes) { 392 my $index = $indexes[$i]; 393 my $n = $nodes[$i]; 394 unless ($n->is_variable) { 395 my $id = $self->_get_node_id($n); 396 unless (defined($id)) { 397 return RDF::Trine::Iterator::Graph->new( [] ); 398 } 399 my $key = "RT:${index}set:$id"; 400 push(@skeys, $key); 401 } 402 } 403 if (@skeys) { 404 my @keys = $r->sinter(@skeys); 405 my %keys; 406 foreach (@keys) { 407 s/:[^:]+$//; 408 $keys{ $_ }++; 409 } 410 @keys = keys %keys; 411 $sub = sub { 412 return unless (scalar(@keys)); 413 my $key = shift(@keys); 414 my @data = split(':', $key); 415 my @nodes = $self->_id_node( @data[0..2] ); 416 my $st = RDF::Trine::Statement->new( @nodes ); 417 return $st; 418 }; 419 } else { 420 my @strs = map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes[0..2]; 421 my $key = 'RT:spog:' . join(':', @strs, '*'); 422 my %triples; 423 foreach ($r->keys($key)) { 424 s/:[^:]+$//; 425 $triples{ $_ }++; 426 } 427 my @keys = keys %triples; 428 $sub = sub { 429 return unless (scalar(@keys)); 430 my $key = shift(@keys); 431 my ($ids) = $key =~ m/^RT:spog:(.*)$/; 432 my @data = split(':', $ids); 433 my @nodes = $self->_id_node( @data ); 434 my $st = RDF::Trine::Statement->new( @nodes ); 435 return $st; 436 }; 437 } 438 } 439 return RDF::Trine::Iterator::Graph->new( $sub ); 440} 441 442=item C<< count_statements ( $subject, $predicate, $object, $context ) >> 443 444Returns a count of all the statements matching the specified subject, 445predicate, object, and context. Any of the arguments may be undef to match any 446value. 447 448=cut 449 450sub count_statements { 451 my $self = shift; 452 my $use_quad = 0; 453 if (scalar(@_) >= 4) { 454 $use_quad = 1; 455# warn "count statements with quad" if ($::debug); 456 } 457 my @nodes = @_; 458 my @strs; 459 foreach my $n (@nodes[0..3]) { 460 if (not(blessed($n)) or $n->is_variable) { 461 push(@strs, '*'); 462 } else { 463 my $id = $self->_get_node_id($n); 464 unless (defined($id)) { 465 return 0; 466 } 467 push(@strs, $id); 468 } 469 } 470 471 if ($use_quad) { 472 my $key = 'RT:spog:' . join(':', @strs); 473 my $r = $self->conn; 474 my @keys = $r->keys($key); 475 return scalar(@keys); 476 } else { 477 my $key = 'RT:spog:' . join(':', @strs); 478 my $r = $self->conn; 479 my @keys = $r->keys($key); 480 my %keys; 481 foreach (@keys) { 482 s/:[^:]+$//; 483 $keys{ $_ }++; 484 } 485 @keys = keys %keys; 486 return scalar(@keys); 487 } 488} 489 490=item C<< get_contexts >> 491 492Returns an RDF::Trine::Iterator over the RDF::Trine::Node objects comprising 493the set of contexts of the stored quads. 494 495=cut 496 497sub get_contexts { 498 my $self = shift; 499 my $r = $self->conn; 500 my @keys = $r->keys('RT:spog:*'); 501 my %graphs; 502 foreach (@keys) { 503 s/^.*://; 504 $graphs{ $_ }++; 505 } 506 my @nodes = grep { not($_->isa('RDF::Trine::Node::Nil')) } $self->_id_node(keys %graphs); 507 return RDF::Trine::Iterator->new( \@nodes ); 508} 509 510 511=item C<< supports ( [ $feature ] ) >> 512 513If C<< $feature >> is specified, returns true if the feature is supported by the 514store, false otherwise. If C<< $feature >> is not specified, returns a list of 515supported features. 516 517=cut 518 519sub supports { 520 my $self = shift; 521 my %features = map { $_ => 1 } ( 522# 'http://www.w3.org/ns/sparql-service-description#SPARQL10Query', 523# 'http://www.w3.org/ns/sparql-service-description#SPARQL11Query', 524 ); 525 if (@_) { 526 my $f = shift; 527 return $features{ $f }; 528 } else { 529 return keys %features; 530 } 531} 532 533# =item C<< get_sparql ( $sparql ) >> 534# 535# Returns an iterator object of all bindings matching the specified SPARQL query. 536# 537# =cut 538# 539# sub get_sparql { 540# my $self = shift; 541# my $sparql = shift; 542# throw RDF::Trine::Error::UnimplementedError -text => "get_sparql not implemented for Redis stores yet"; 543# } 544 545sub _bulk_ops { 546 return 0; 547} 548 549sub _begin_bulk_ops { 550 return 0; 551} 552 553sub _end_bulk_ops { 554 my $self = shift; 555 if (scalar(@{ $self->{ ops } || []})) { 556 my @ops = splice(@{ $self->{ ops } }); 557 my @aggops = $self->_group_bulk_ops( @ops ); 558 my @sparql; 559 warn '_end_bulk_ops: ' . Dumper(\@aggops); 560 throw RDF::Trine::Error::UnimplementedError -text => "bulk operations not implemented for Redis stores yet"; 561 } 562 $self->{BulkOps} = 0; 563} 564 565=item C<< nuke >> 566 567Permanently removes the store and its data. 568 569=cut 570 571sub nuke { 572 my $self = shift; 573 my $r = $self->conn; 574 $r->del('RT:node.next'); 575 foreach my $k ($r->keys('R:n.i:*')) { 576 $r->del($k); 577 } 578 foreach my $k ($r->keys('R:n.v:*')) { 579 $r->del($k); 580 } 581 foreach my $k ($r->keys('RT:spog:*')) { 582 $r->del($k); 583 } 584 $r->del($_) foreach ($r->keys('RT:sset:*')); 585 $r->del($_) foreach ($r->keys('RT:pset:*')); 586 $r->del($_) foreach ($r->keys('RT:oset:*')); 587 $r->del($_) foreach ($r->keys('RT:gset:*')); 588 589 $self->{cache} = Cache::LRU->new( size => $self->{cache_size} ); 590} 591 592 593sub _dump { 594 my $self = shift; 595 my $r = $self->conn; 596 my @keys = $r->keys('RT:spog:*'); 597 warn "--------------------------------------\n"; 598 warn '*** DUMP Redis statements:'; 599 warn "$_\n" foreach (@keys); 600} 601 6021; 603 604__END__ 605 606=back 607 608=head1 REDIS DATA LAYOUT 609 610... 611 612=head1 BUGS 613 614Please report any bugs or feature requests to through the GitHub web interface 615at L<https://github.com/kasei/perlrdf/issues>. 616 617=head1 AUTHOR 618 619Gregory Todd Williams C<< <gwilliams@cpan.org> >> 620 621=head1 COPYRIGHT 622 623Copyright (c) 2006-2012 Gregory Todd Williams. This 624program is free software; you can redistribute it and/or modify it under 625the same terms as Perl itself. 626 627=cut 628