1# 2# Copyright (c) 2013-2014 Mark Martinec 3# All rights reserved. 4# 5# See LICENSE AND COPYRIGHT section in POD text below for usage 6# and distribution rights. 7# 8 9package Redis::TinyRedis; 10 11use strict; 12use re 'taint'; 13use warnings; 14 15use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED); 16use IO::Socket::UNIX; 17use Time::HiRes (); 18 19use vars qw($VERSION $io_socket_module_name); 20BEGIN { 21 $VERSION = '1.001'; 22 if (eval { require IO::Socket::IP }) { 23 $io_socket_module_name = 'IO::Socket::IP'; 24 } elsif (eval { require IO::Socket::INET6 }) { 25 $io_socket_module_name = 'IO::Socket::INET6'; 26 } elsif (eval { require IO::Socket::INET }) { 27 $io_socket_module_name = 'IO::Socket::INET'; 28 } 29} 30 31sub new { 32 my($class, %args) = @_; 33 my $self = bless { args => {%args} }, $class; 34 my $outbuf = ''; $self->{outbuf} = \$outbuf; 35 $self->{batch_size} = 0; 36 $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379'; 37 $self->{on_connect} = $args{on_connect}; 38 return if !$self->connect; 39 $self; 40} 41 42sub DESTROY { 43 my $self = $_[0]; 44 local($@, $!, $_); 45 undef $self->{sock}; 46} 47 48sub disconnect { 49 my $self = $_[0]; 50 local($@, $!); 51 undef $self->{sock}; 52} 53 54sub connect { 55 my $self = $_[0]; 56 57 $self->disconnect; 58 my $sock; 59 my $server = $self->{server}; 60 if ($server =~ m{^/}) { 61 $sock = IO::Socket::UNIX->new( 62 Peer => $server, Type => SOCK_STREAM); 63 } elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) { 64 $server = defined $1 ? $1 : $2; my $port = $3; 65 $sock = $io_socket_module_name->new( 66 PeerAddr => $server, PeerPort => $port, Proto => 'tcp'); 67 } else { 68 die "Invalid 'server:port' specification: $server"; 69 } 70 if ($sock) { 71 $self->{sock} = $sock; 72 73 $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = ''; 74 vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1; 75 76 # an on_connect() callback must not use batched calls! 77 $self->{on_connect}->($self) if $self->{on_connect}; 78 } 79 $sock; 80} 81 82# Receive, parse and return $cnt consecutive redis replies as a list. 83# 84sub _response { 85 my($self, $cnt) = @_; 86 87 my $sock = $self->{sock}; 88 if (!$sock) { 89 $self->connect or die "Connect failed: $!"; 90 $sock = $self->{sock}; 91 }; 92 93 my @list; 94 95 for (1 .. $cnt) { 96 97 my $result = <$sock>; 98 if (!defined $result) { 99 $self->disconnect; 100 die "Error reading from Redis server: $!"; 101 } 102 chomp $result; 103 my $resp_type = substr($result, 0, 1, ''); 104 105 if ($resp_type eq '$') { # bulk reply 106 if ($result < 0) { 107 push(@list, undef); # null bulk reply 108 } else { 109 my $data = ''; my $ofs = 0; my $len = $result + 2; 110 while ($len > 0) { 111 my $nbytes = read($sock, $data, $len, $ofs); 112 if (!$nbytes) { 113 $self->disconnect; 114 defined $nbytes or die "Error reading from Redis server: $!"; 115 die "Redis server closed connection"; 116 } 117 $ofs += $nbytes; $len -= $nbytes; 118 } 119 chomp $data; 120 push(@list, $data); 121 } 122 123 } elsif ($resp_type eq ':') { # integer reply 124 push(@list, 0+$result); 125 126 } elsif ($resp_type eq '+') { # status reply 127 push(@list, $result); 128 129 } elsif ($resp_type eq '*') { # multi-bulk reply 130 push(@list, $result < 0 ? undef : $self->_response(0+$result) ); 131 132 } elsif ($resp_type eq '-') { # error reply 133 die "$result\n"; 134 135 } else { 136 die "Unknown Redis reply: $resp_type ($result)"; 137 } 138 } 139 \@list; 140} 141 142sub _write_buff { 143 my($self, $bufref) = @_; 144 145 if (!$self->{sock}) { $self->connect or die "Connect failed: $!" }; 146 my $nwrite; 147 for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) { 148 # to reliably detect a disconnect we need to check for an input event 149 # using a select; checking status of syswrite is not sufficient 150 my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask}; 151 my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef); 152 defined $nfound && $nfound >= 0 or die "Select failed: $!"; 153 if (vec($rout, $self->{sock_fd}, 1) && 154 !sysread($self->{sock}, $inbuff, 1024)) { 155 # eof, try reconnecting 156 $self->connect or die "Connect failed: $!"; 157 } 158 local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe 159 $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs); 160 next if defined $nwrite; 161 $nwrite = 0; 162 if ($! == EINTR || $! == EAGAIN) { # no big deal, try again 163 Time::HiRes::sleep(0.1); # slow down, just in case 164 } else { 165 $self->disconnect; 166 if ($! == ENOTCONN || $! == EPIPE || 167 $! == ECONNRESET || $! == ECONNABORTED) { 168 $self->connect or die "Connect failed: $!"; 169 } else { 170 die "Error writing to redis socket: $!"; 171 } 172 } 173 } 174 1; 175} 176 177# Send a redis command with arguments, returning a redis reply. 178# 179sub call { 180 my $self = shift; 181 182 my $buff = '*' . scalar(@_) . "\015\012"; 183 $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_; 184 185 $self->_write_buff(\$buff); 186 local($/) = "\015\012"; 187 my $arr_ref = $self->_response(1); 188 $arr_ref && $arr_ref->[0]; 189} 190 191# Append a redis command with arguments to a batch. 192# 193sub b_call { 194 my $self = shift; 195 196 my $bufref = $self->{outbuf}; 197 $$bufref .= '*' . scalar(@_) . "\015\012"; 198 $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_; 199 ++ $self->{batch_size}; 200} 201 202# Send a batch of commands, returning an arrayref of redis replies, 203# each array element corresponding to one command in a batch. 204# 205sub b_results { 206 my $self = $_[0]; 207 my $batch_size = $self->{batch_size}; 208 return if !$batch_size; 209 my $bufref = $self->{outbuf}; 210 $self->_write_buff($bufref); 211 $$bufref = ''; $self->{batch_size} = 0; 212 local($/) = "\015\012"; 213 $self->_response($batch_size); 214} 215 2161; 217 218__END__ 219=head1 NAME 220 221Redis::TinyRedis - client side of the Redis protocol 222 223=head1 SYNOPSIS 224 225EXAMPLE: 226 227 use Redis::TinyRedis; 228 229 sub on_connect { 230 my($r) = @_; 231 # $r->call('AUTH', 'xyz'); 232 $r->call('SELECT', 3); 233 $r->call('CLIENT', 'SETNAME', "test[$$]"); 234 1; 235 } 236 237# my $server = '/tmp/redis.sock'; 238 my $server = '[::1]:6379'; 239 240 my $r = Redis::TinyRedis->new(server => $server, 241 on_connect => \&on_connect); 242 $r or die "Error connecting to a Redis server: $!"; 243 244 $r->call('SET', 'key123', 'val123'); # will die on error 245 $r->call('SET', 'key456', 'val456'); # will die on error 246 247 my $v = $r->call('GET', 'key123'); 248 if (defined $v) { printf("got %s\n", $v) } 249 else { printf("key not in a database\n") } 250 251 my @keys = ('key123', 'key456', 'keynone'); 252 my $values = $r->call('MGET', @keys); 253 printf("got %s => %s\n", $_, shift @$values // 'UNDEF') for @keys; 254 255 # batching (pipelining) multiple commands saves on round-trips 256 $r->b_call('DEL', 'keyxxx'); 257 $r->b_call('HINCRBY', 'keyxxx', 'cnt1', 5); 258 $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 1); 259 $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 2); 260 $r->b_call('EXPIRE', 'keyxxx', 120); 261 $r->b_results; # collect response ignoring results, dies on error 262 263 my $counts = $r->call('HMGET', 'keyxxx', 'cnt1', 'cnt2', 'cnt3'); 264 printf("count %s\n", $_ // 'UNDEF') for @$counts; 265 266 # Lua server side scripting 267 my $lua_results = $r->call('EVAL', 268 'return redis.call("HGETALL", KEYS[1])', 1, 'keyxxx'); 269 printf("%s\n", join(', ', @$lua_results)); 270 271 # traversing all keys 272 for (my $cursor = 0; ; ) { 273 my $pair = $r->call('SCAN', $cursor, 'COUNT', 20); 274 ($cursor, my $elements) = @$pair; 275 printf("key: %s\n", $_) for @$elements; 276 last if !$cursor; 277 } 278 279 # another batch of commands 280 $r->b_call('DEL', $_) for @keys; 281 my $results = $r->b_results; # collect response 282 printf("delete status for %s: %d\n", $_, shift @$results) for @keys; 283 284 # monitor activity on a database through Redis keyspace notifications 285 $r->call('CONFIG', 'SET', 'notify-keyspace-events', 'KEA'); 286 $r->call('PSUBSCRIBE', '__key*__:*'); 287 for (1..20) { 288 my $msg = $r->call; # collect one message at a time 289 printf("%s\n", join(", ",@$msg)); 290 } 291 $r->call('UNSUBSCRIBE'); 292 $r->call('CONFIG', 'SET', 'notify-keyspace-events', ''); 293 294 undef $r; # DESTROY cleanly closes a connection to a redis server 295 296=head1 DESCRIPTION 297 298This is a Perl module Redis::TinyRedis implementing a client side of 299the Redis protocol, i.e. a unified request protocol as introduced 300in Redis 1.2. Design priorities were speed, simplicity, error checking. 301 302=head1 METHODS 303 304=head2 new 305 306Initializes a Redis::TinyRedis object and established a connection 307to a Redis server. Returns a Redis::TinyRedis object if the connection 308was successfully established (by calling a connect() method implicitly), 309or false otherwise, leaving a failure reason in $! . 310 311=over 4 312 313=item B<server> 314 315Specifies a socket where a Redis server is listening. If a string 316starts with a '/' an absolute path to a Unix socket is assumed, 317otherwise it is interpreted as an INET or INET6 socket specification 318in a syntax as recognized by a C<PeerAddr> option of the underlying 319socket module (IO::Socket::IP, or IO::Socket::INET6, or IO::Socket::INET), 320e.g. '127.0.0.1:6379' or '[::1]:6379' or 'localhost::6379'. 321Port number must be explicitly specified. 322 323A default is '127.0.0.1:6379'. 324 325=item B<on_connect> 326 327Specifies an optional callback subroutine, to be called by a connect() 328method after each successful establishment of a connection to a redis server. 329Useful as a provider of a Redis client authentication or for database 330index selection. 331 332The on_connect() callback is given a Redis::TinyRedis object as its 333argument. This object also carries all arguments that were given in 334a call to new() when it was created, including any additional options 335unrecognized and ignored by Redis::TinyRedis->new(). 336 337An on_connect() callback subroutine must not use batched calls 338(b_call / b_results), but may use the call() method. 339 340=back 341 342=head2 connect 343 344Establishes a connection to a Redis server. Returns a socket object, 345or undef if the connection failed, leaving error status in $! . 346The connect() method is called implicitly by new(), or by call() or 347b_results() if a connection was dropped due to some previous failure. 348It may be called explicitly by an application, possibly combined with 349a disconnect() method, to give more control to the application. 350 351=head2 disconnect 352 353Closes a connection to a Redis server if it is established, 354does nothing if a connection is not established. The connection 355will be re-established by subsequent calls to connect() or call() 356or b_results(). 357 358Closing a connection is implied by a DESTROY method, so dropping 359references to a Redis::TinyRedis object also cleanly disconnects 360an established session with a Redis server. 361 362=head2 call 363 364Sends a redis command with arguments, returning a redis reply. 365The first argument is expected to be a name of a Redis command, 366followed by its arguments according to Redis documentation. 367 368The command will die if a Redis server returns an error reply. 369It may also die if it needs to implicitly re-establish a connection 370and the connect() call fails. 371 372The returned value is an integer if a Redis server returns an integer 373reply, is a status string if a status reply is returned, is undef if a 374null bulk reply is returned, is a string in case of a bulk reply, and 375is a reference to an array of results in case of a multi-bulk reply. 376 377=head2 b_call 378 379Appends a redis command with arguments to a batch. 380The first argument is expected to be a name of a Redis command, 381followed by its arguments according to Redis documentation. 382 383=head2 b_results 384 385Sends a batch of commands, then resets the batch. Returns a reference 386to an array of redis replies, each array element corresponding to one 387command in a batch. 388 389The command will die if a Redis server returns an error reply. 390It may also die if it needs to implicitly re-establish a connection 391and the connect() call fails. 392 393Returns a reference to an array of results, each one corresponding 394to one command of a batch. A data type of each array element is the 395same as described in a call() method. 396 397=head1 AUTHOR 398 399Mark Martinec, C<< <Mark.Martinec@ijs.si> >> 400 401=head1 BUGS 402 403Please send bug reports to the author. 404 405=head1 LICENSE AND COPYRIGHT 406 407Copyright (c) 2013-2014 Mark Martinec 408All rights reserved. 409 410Redistribution and use in source and binary forms, with or without 411modification, are permitted provided that the following conditions 412are met: 4131. Redistributions of source code must retain the above copyright notice, 414 this list of conditions and the following disclaimer. 4152. Redistributions in binary form must reproduce the above copyright notice, 416 this list of conditions and the following disclaimer in the documentation 417 and/or other materials provided with the distribution. 418 419THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 420AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 421IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 422ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS 423BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 424CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 425SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 426INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 427CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 428ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 429POSSIBILITY OF SUCH DAMAGE. 430 431The views and conclusions contained in the software and documentation are 432those of the authors and should not be interpreted as representing official 433policies, either expressed or implied, of the Jozef Stefan Institute. 434 435(the above license is the 2-clause BSD license, also known as 436 a "Simplified BSD License", and pertains to this program only) 437 438=cut 439