1#
2# Copyright (c) 2013-2014 Mark Martinec
3# All rights reserved.
4#
5# See LICENSE AND COPYRIGHT section in POD text below for usage
6# and distribution rights.
7#
8
9package Redis::TinyRedis;
10
11use strict;
12use re 'taint';
13use warnings;
14
15use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
16use IO::Socket::UNIX;
17use Time::HiRes ();
18
19use vars qw($VERSION $io_socket_module_name);
20BEGIN {
21  $VERSION = '1.001';
22  if (eval { require IO::Socket::IP }) {
23    $io_socket_module_name = 'IO::Socket::IP';
24  } elsif (eval { require IO::Socket::INET6 }) {
25    $io_socket_module_name = 'IO::Socket::INET6';
26  } elsif (eval { require IO::Socket::INET }) {
27    $io_socket_module_name = 'IO::Socket::INET';
28  }
29}
30
31sub new {
32  my($class, %args) = @_;
33  my $self = bless { args => {%args} }, $class;
34  my $outbuf = ''; $self->{outbuf} = \$outbuf;
35  $self->{batch_size} = 0;
36  $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
37  $self->{on_connect} = $args{on_connect};
38  return if !$self->connect;
39  $self;
40}
41
42sub DESTROY {
43  my $self = $_[0];
44  local($@, $!, $_);
45  undef $self->{sock};
46}
47
48sub disconnect {
49  my $self = $_[0];
50  local($@, $!);
51  undef $self->{sock};
52}
53
54sub connect {
55  my $self = $_[0];
56
57  $self->disconnect;
58  my $sock;
59  my $server = $self->{server};
60  if ($server =~ m{^/}) {
61    $sock = IO::Socket::UNIX->new(
62              Peer => $server, Type => SOCK_STREAM);
63  } elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) {
64    $server = defined $1 ? $1 : $2;  my $port = $3;
65    $sock = $io_socket_module_name->new(
66              PeerAddr => $server, PeerPort => $port, Proto => 'tcp');
67  } else {
68    die "Invalid 'server:port' specification: $server";
69  }
70  if ($sock) {
71    $self->{sock} = $sock;
72
73    $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
74    vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
75
76    # an on_connect() callback must not use batched calls!
77    $self->{on_connect}->($self)  if $self->{on_connect};
78  }
79  $sock;
80}
81
82# Receive, parse and return $cnt consecutive redis replies as a list.
83#
84sub _response {
85  my($self, $cnt) = @_;
86
87  my $sock = $self->{sock};
88  if (!$sock) {
89    $self->connect  or die "Connect failed: $!";
90    $sock = $self->{sock};
91  };
92
93  my @list;
94
95  for (1 .. $cnt) {
96
97    my $result = <$sock>;
98    if (!defined $result) {
99      $self->disconnect;
100      die "Error reading from Redis server: $!";
101    }
102    chomp $result;
103    my $resp_type = substr($result, 0, 1, '');
104
105    if ($resp_type eq '$') {  # bulk reply
106      if ($result < 0) {
107        push(@list, undef);  # null bulk reply
108      } else {
109        my $data = ''; my $ofs = 0; my $len = $result + 2;
110        while ($len > 0) {
111          my $nbytes = read($sock, $data, $len, $ofs);
112          if (!$nbytes) {
113            $self->disconnect;
114            defined $nbytes  or die "Error reading from Redis server: $!";
115            die "Redis server closed connection";
116          }
117          $ofs += $nbytes; $len -= $nbytes;
118        }
119        chomp $data;
120        push(@list, $data);
121      }
122
123    } elsif ($resp_type eq ':') {  # integer reply
124      push(@list, 0+$result);
125
126    } elsif ($resp_type eq '+') {  # status reply
127      push(@list, $result);
128
129    } elsif ($resp_type eq '*') {  # multi-bulk reply
130      push(@list, $result < 0 ? undef : $self->_response(0+$result) );
131
132    } elsif ($resp_type eq '-') {  # error reply
133      die "$result\n";
134
135    } else {
136      die "Unknown Redis reply: $resp_type ($result)";
137    }
138  }
139  \@list;
140}
141
142sub _write_buff {
143  my($self, $bufref) = @_;
144
145  if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
146  my $nwrite;
147  for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
148    # to reliably detect a disconnect we need to check for an input event
149    # using a select; checking status of syswrite is not sufficient
150    my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
151    my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
152    defined $nfound && $nfound >= 0 or die "Select failed: $!";
153    if (vec($rout, $self->{sock_fd}, 1) &&
154        !sysread($self->{sock}, $inbuff, 1024)) {
155      # eof, try reconnecting
156      $self->connect  or die "Connect failed: $!";
157    }
158    local $SIG{PIPE} = 'IGNORE';  # don't signal on a write to a widowed pipe
159    $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
160    next if defined $nwrite;
161    $nwrite = 0;
162    if ($! == EINTR || $! == EAGAIN) {  # no big deal, try again
163      Time::HiRes::sleep(0.1);  # slow down, just in case
164    } else {
165      $self->disconnect;
166      if ($! == ENOTCONN   || $! == EPIPE ||
167          $! == ECONNRESET || $! == ECONNABORTED) {
168        $self->connect  or die "Connect failed: $!";
169      } else {
170        die "Error writing to redis socket: $!";
171      }
172    }
173  }
174  1;
175}
176
177# Send a redis command with arguments, returning a redis reply.
178#
179sub call {
180  my $self = shift;
181
182  my $buff = '*' . scalar(@_) . "\015\012";
183  $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
184
185  $self->_write_buff(\$buff);
186  local($/) = "\015\012";
187  my $arr_ref = $self->_response(1);
188  $arr_ref && $arr_ref->[0];
189}
190
191# Append a redis command with arguments to a batch.
192#
193sub b_call {
194  my $self = shift;
195
196  my $bufref = $self->{outbuf};
197  $$bufref .= '*' . scalar(@_) . "\015\012";
198  $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
199  ++ $self->{batch_size};
200}
201
202# Send a batch of commands, returning an arrayref of redis replies,
203# each array element corresponding to one command in a batch.
204#
205sub b_results {
206  my $self = $_[0];
207  my $batch_size = $self->{batch_size};
208  return if !$batch_size;
209  my $bufref = $self->{outbuf};
210  $self->_write_buff($bufref);
211  $$bufref = ''; $self->{batch_size} = 0;
212  local($/) = "\015\012";
213  $self->_response($batch_size);
214}
215
2161;
217
218__END__
219=head1 NAME
220
221Redis::TinyRedis - client side of the Redis protocol
222
223=head1 SYNOPSIS
224
225EXAMPLE:
226
227  use Redis::TinyRedis;
228
229  sub on_connect {
230    my($r) = @_;
231  # $r->call('AUTH', 'xyz');
232    $r->call('SELECT', 3);
233    $r->call('CLIENT', 'SETNAME', "test[$$]");
234    1;
235  }
236
237# my $server = '/tmp/redis.sock';
238  my $server = '[::1]:6379';
239
240  my $r = Redis::TinyRedis->new(server => $server,
241                                on_connect => \&on_connect);
242  $r or die "Error connecting to a Redis server: $!";
243
244  $r->call('SET', 'key123', 'val123');  # will die on error
245  $r->call('SET', 'key456', 'val456');  # will die on error
246
247  my $v = $r->call('GET', 'key123');
248  if (defined $v) { printf("got %s\n", $v) }
249  else { printf("key not in a database\n") }
250
251  my @keys = ('key123', 'key456', 'keynone');
252  my $values = $r->call('MGET', @keys);
253  printf("got %s => %s\n", $_, shift @$values // 'UNDEF') for @keys;
254
255  # batching (pipelining) multiple commands saves on round-trips
256  $r->b_call('DEL',     'keyxxx');
257  $r->b_call('HINCRBY', 'keyxxx', 'cnt1', 5);
258  $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 1);
259  $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 2);
260  $r->b_call('EXPIRE',  'keyxxx', 120);
261  $r->b_results;  # collect response ignoring results, dies on error
262
263  my $counts = $r->call('HMGET', 'keyxxx', 'cnt1', 'cnt2', 'cnt3');
264  printf("count %s\n", $_ // 'UNDEF') for @$counts;
265
266  # Lua server side scripting
267  my $lua_results = $r->call('EVAL',
268    'return redis.call("HGETALL", KEYS[1])', 1, 'keyxxx');
269  printf("%s\n", join(', ', @$lua_results));
270
271  # traversing all keys
272  for (my $cursor = 0; ; ) {
273    my $pair = $r->call('SCAN', $cursor, 'COUNT', 20);
274    ($cursor, my $elements) = @$pair;
275    printf("key: %s\n", $_) for @$elements;
276    last if !$cursor;
277  }
278
279  # another batch of commands
280  $r->b_call('DEL', $_) for @keys;
281  my $results = $r->b_results;  # collect response
282  printf("delete status for %s: %d\n", $_, shift @$results) for @keys;
283
284  # monitor activity on a database through Redis keyspace notifications
285  $r->call('CONFIG', 'SET', 'notify-keyspace-events', 'KEA');
286  $r->call('PSUBSCRIBE', '__key*__:*');
287  for (1..20) {
288    my $msg = $r->call;  # collect one message at a time
289    printf("%s\n", join(", ",@$msg));
290  }
291  $r->call('UNSUBSCRIBE');
292  $r->call('CONFIG', 'SET', 'notify-keyspace-events', '');
293
294  undef $r;  # DESTROY cleanly closes a connection to a redis server
295
296=head1 DESCRIPTION
297
298This is a Perl module Redis::TinyRedis implementing a client side of
299the Redis protocol, i.e. a unified request protocol as introduced
300in Redis 1.2. Design priorities were speed, simplicity, error checking.
301
302=head1 METHODS
303
304=head2 new
305
306Initializes a Redis::TinyRedis object and established a connection
307to a Redis server. Returns a Redis::TinyRedis object if the connection
308was successfully established (by calling a connect() method implicitly),
309or false otherwise, leaving a failure reason in $! .
310
311=over 4
312
313=item B<server>
314
315Specifies a socket where a Redis server is listening. If a string
316starts with a '/' an absolute path to a Unix socket is assumed,
317otherwise it is interpreted as an INET or INET6 socket specification
318in a syntax as recognized by a C<PeerAddr> option of the underlying
319socket module (IO::Socket::IP, or IO::Socket::INET6, or IO::Socket::INET),
320e.g. '127.0.0.1:6379' or '[::1]:6379' or 'localhost::6379'.
321Port number must be explicitly specified.
322
323A default is '127.0.0.1:6379'.
324
325=item B<on_connect>
326
327Specifies an optional callback subroutine, to be called by a connect()
328method after each successful establishment of a connection to a redis server.
329Useful as a provider of a Redis client authentication or for database
330index selection.
331
332The on_connect() callback is given a Redis::TinyRedis object as its
333argument. This object also carries all arguments that were given in
334a call to new() when it was created, including any additional options
335unrecognized and ignored by Redis::TinyRedis->new().
336
337An on_connect() callback subroutine must not use batched calls
338(b_call / b_results), but may use the call() method.
339
340=back
341
342=head2 connect
343
344Establishes a connection to a Redis server. Returns a socket object,
345or undef if the connection failed, leaving error status in $! .
346The connect() method is called implicitly by new(), or by call() or
347b_results() if a connection was dropped due to some previous failure.
348It may be called explicitly by an application, possibly combined with
349a disconnect() method, to give more control to the application.
350
351=head2 disconnect
352
353Closes a connection to a Redis server if it is established,
354does nothing if a connection is not established. The connection
355will be re-established by subsequent calls to connect() or call()
356or b_results().
357
358Closing a connection is implied by a DESTROY method, so dropping
359references to a Redis::TinyRedis object also cleanly disconnects
360an established session with a Redis server.
361
362=head2 call
363
364Sends a redis command with arguments, returning a redis reply.
365The first argument is expected to be a name of a Redis command,
366followed by its arguments according to Redis documentation.
367
368The command will die if a Redis server returns an error reply.
369It may also die if it needs to implicitly re-establish a connection
370and the connect() call fails.
371
372The returned value is an integer if a Redis server returns an integer
373reply, is a status string if a status reply is returned, is undef if a
374null bulk reply is returned, is a string in case of a bulk reply, and
375is a reference to an array of results in case of a multi-bulk reply.
376
377=head2 b_call
378
379Appends a redis command with arguments to a batch.
380The first argument is expected to be a name of a Redis command,
381followed by its arguments according to Redis documentation.
382
383=head2 b_results
384
385Sends a batch of commands, then resets the batch. Returns a reference
386to an array of redis replies, each array element corresponding to one
387command in a batch.
388
389The command will die if a Redis server returns an error reply.
390It may also die if it needs to implicitly re-establish a connection
391and the connect() call fails.
392
393Returns a reference to an array of results, each one corresponding
394to one command of a batch. A data type of each array element is the
395same as described in a call() method.
396
397=head1 AUTHOR
398
399Mark Martinec, C<< <Mark.Martinec@ijs.si> >>
400
401=head1 BUGS
402
403Please send bug reports to the author.
404
405=head1 LICENSE AND COPYRIGHT
406
407Copyright (c) 2013-2014 Mark Martinec
408All rights reserved.
409
410Redistribution and use in source and binary forms, with or without
411modification, are permitted provided that the following conditions
412are met:
4131. Redistributions of source code must retain the above copyright notice,
414   this list of conditions and the following disclaimer.
4152. Redistributions in binary form must reproduce the above copyright notice,
416   this list of conditions and the following disclaimer in the documentation
417   and/or other materials provided with the distribution.
418
419THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
420AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
421IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
422ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
423BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
424CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
425SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
426INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
427CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
428ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
429POSSIBILITY OF SUCH DAMAGE.
430
431The views and conclusions contained in the software and documentation are
432those of the authors and should not be interpreted as representing official
433policies, either expressed or implied, of the Jozef Stefan Institute.
434
435(the above license is the 2-clause BSD license, also known as
436 a "Simplified BSD License", and pertains to this program only)
437
438=cut
439