1from __future__ import with_statement
2from itertools import chain
3import datetime
4import sys
5import warnings
6import time
7import threading
8import time as mod_time
9import hashlib
10from redis._compat import (b, basestring, bytes, imap, iteritems, iterkeys,
11                           itervalues, izip, long, nativestr, unicode,
12                           safe_unicode)
13from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
14                              SSLConnection, Token)
15from redis.lock import Lock, LuaLock
16from redis.exceptions import (
17    ConnectionError,
18    DataError,
19    ExecAbortError,
20    NoScriptError,
21    PubSubError,
22    RedisError,
23    ResponseError,
24    TimeoutError,
25    WatchError,
26)
27
28SYM_EMPTY = b('')
29
30
31def list_or_args(keys, args):
32    # returns a single list combining keys and args
33    try:
34        iter(keys)
35        # a string or bytes instance can be iterated, but indicates
36        # keys wasn't passed as a list
37        if isinstance(keys, (basestring, bytes)):
38            keys = [keys]
39    except TypeError:
40        keys = [keys]
41    if args:
42        keys.extend(args)
43    return keys
44
45
46def timestamp_to_datetime(response):
47    "Converts a unix timestamp to a Python datetime object"
48    if not response:
49        return None
50    try:
51        response = int(response)
52    except ValueError:
53        return None
54    return datetime.datetime.fromtimestamp(response)
55
56
57def string_keys_to_dict(key_string, callback):
58    return dict.fromkeys(key_string.split(), callback)
59
60
61def dict_merge(*dicts):
62    merged = {}
63    for d in dicts:
64        merged.update(d)
65    return merged
66
67
68def parse_debug_object(response):
69    "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
70    # The 'type' of the object is the first item in the response, but isn't
71    # prefixed with a name
72    response = nativestr(response)
73    response = 'type:' + response
74    response = dict([kv.split(':') for kv in response.split()])
75
76    # parse some expected int values from the string response
77    # note: this cmd isn't spec'd so these may not appear in all redis versions
78    int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
79    for field in int_fields:
80        if field in response:
81            response[field] = int(response[field])
82
83    return response
84
85
86def parse_object(response, infotype):
87    "Parse the results of an OBJECT command"
88    if infotype in ('idletime', 'refcount'):
89        return int_or_none(response)
90    return response
91
92
93def parse_info(response):
94    "Parse the result of Redis's INFO command into a Python dict"
95    info = {}
96    response = nativestr(response)
97
98    def get_value(value):
99        if ',' not in value or '=' not in value:
100            try:
101                if '.' in value:
102                    return float(value)
103                else:
104                    return int(value)
105            except ValueError:
106                return value
107        else:
108            sub_dict = {}
109            for item in value.split(','):
110                k, v = item.rsplit('=', 1)
111                sub_dict[k] = get_value(v)
112            return sub_dict
113
114    for line in response.splitlines():
115        if line and not line.startswith('#'):
116            if line.find(':') != -1:
117                key, value = line.split(':', 1)
118                info[key] = get_value(value)
119            else:
120                # if the line isn't splittable, append it to the "__raw__" key
121                info.setdefault('__raw__', []).append(line)
122
123    return info
124
125
126SENTINEL_STATE_TYPES = {
127    'can-failover-its-master': int,
128    'config-epoch': int,
129    'down-after-milliseconds': int,
130    'failover-timeout': int,
131    'info-refresh': int,
132    'last-hello-message': int,
133    'last-ok-ping-reply': int,
134    'last-ping-reply': int,
135    'last-ping-sent': int,
136    'master-link-down-time': int,
137    'master-port': int,
138    'num-other-sentinels': int,
139    'num-slaves': int,
140    'o-down-time': int,
141    'pending-commands': int,
142    'parallel-syncs': int,
143    'port': int,
144    'quorum': int,
145    'role-reported-time': int,
146    's-down-time': int,
147    'slave-priority': int,
148    'slave-repl-offset': int,
149    'voted-leader-epoch': int
150}
151
152
153def parse_sentinel_state(item):
154    result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
155    flags = set(result['flags'].split(','))
156    for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
157                       ('is_sdown', 's_down'), ('is_odown', 'o_down'),
158                       ('is_sentinel', 'sentinel'),
159                       ('is_disconnected', 'disconnected'),
160                       ('is_master_down', 'master_down')):
161        result[name] = flag in flags
162    return result
163
164
165def parse_sentinel_master(response):
166    return parse_sentinel_state(imap(nativestr, response))
167
168
169def parse_sentinel_masters(response):
170    result = {}
171    for item in response:
172        state = parse_sentinel_state(imap(nativestr, item))
173        result[state['name']] = state
174    return result
175
176
177def parse_sentinel_slaves_and_sentinels(response):
178    return [parse_sentinel_state(imap(nativestr, item)) for item in response]
179
180
181def parse_sentinel_get_master(response):
182    return response and (response[0], int(response[1])) or None
183
184
185def pairs_to_dict(response):
186    "Create a dict given a list of key/value pairs"
187    it = iter(response)
188    return dict(izip(it, it))
189
190
191def pairs_to_dict_typed(response, type_info):
192    it = iter(response)
193    result = {}
194    for key, value in izip(it, it):
195        if key in type_info:
196            try:
197                value = type_info[key](value)
198            except:
199                # if for some reason the value can't be coerced, just use
200                # the string value
201                pass
202        result[key] = value
203    return result
204
205
206def zset_score_pairs(response, **options):
207    """
208    If ``withscores`` is specified in the options, return the response as
209    a list of (value, score) pairs
210    """
211    if not response or not options.get('withscores'):
212        return response
213    score_cast_func = options.get('score_cast_func', float)
214    it = iter(response)
215    return list(izip(it, imap(score_cast_func, it)))
216
217
218def sort_return_tuples(response, **options):
219    """
220    If ``groups`` is specified, return the response as a list of
221    n-element tuples with n being the value found in options['groups']
222    """
223    if not response or not options['groups']:
224        return response
225    n = options['groups']
226    return list(izip(*[response[i::n] for i in range(n)]))
227
228
229def int_or_none(response):
230    if response is None:
231        return None
232    return int(response)
233
234
235def float_or_none(response):
236    if response is None:
237        return None
238    return float(response)
239
240
241def bool_ok(response):
242    return nativestr(response) == 'OK'
243
244
245def parse_client_list(response, **options):
246    clients = []
247    for c in nativestr(response).splitlines():
248        clients.append(dict([pair.split('=') for pair in c.split(' ')]))
249    return clients
250
251
252def parse_config_get(response, **options):
253    response = [nativestr(i) if i is not None else None for i in response]
254    return response and pairs_to_dict(response) or {}
255
256
257def parse_scan(response, **options):
258    cursor, r = response
259    return long(cursor), r
260
261
262def parse_hscan(response, **options):
263    cursor, r = response
264    return long(cursor), r and pairs_to_dict(r) or {}
265
266
267def parse_zscan(response, **options):
268    score_cast_func = options.get('score_cast_func', float)
269    cursor, r = response
270    it = iter(r)
271    return long(cursor), list(izip(it, imap(score_cast_func, it)))
272
273
274def parse_slowlog_get(response, **options):
275    return [{
276        'id': item[0],
277        'start_time': int(item[1]),
278        'duration': int(item[2]),
279        'command': b(' ').join(item[3])
280    } for item in response]
281
282
283def parse_cluster_info(response, **options):
284    return dict([line.split(':') for line in response.splitlines() if line])
285
286
287def _parse_node_line(line):
288    line_items = line.split(' ')
289    node_id, addr, flags, master_id, ping, pong, epoch, \
290        connected = line.split(' ')[:8]
291    slots = [sl.split('-') for sl in line_items[8:]]
292    node_dict = {
293        'node_id': node_id,
294        'flags': flags,
295        'master_id': master_id,
296        'last_ping_sent': ping,
297        'last_pong_rcvd': pong,
298        'epoch': epoch,
299        'slots': slots,
300        'connected': True if connected == 'connected' else False
301    }
302    return addr, node_dict
303
304
305def parse_cluster_nodes(response, **options):
306    raw_lines = response
307    if isinstance(response, basestring):
308        raw_lines = response.splitlines()
309    return dict([_parse_node_line(line) for line in raw_lines])
310
311
312def parse_georadius_generic(response, **options):
313    if options['store'] or options['store_dist']:
314        # `store` and `store_diff` cant be combined
315        # with other command arguments.
316        return response
317
318    if type(response) != list:
319        response_list = [response]
320    else:
321        response_list = response
322
323    if not options['withdist'] and not options['withcoord']\
324            and not options['withhash']:
325        # just a bunch of places
326        return [nativestr(r) for r in response_list]
327
328    cast = {
329        'withdist': float,
330        'withcoord': lambda ll: (float(ll[0]), float(ll[1])),
331        'withhash': int
332    }
333
334    # zip all output results with each casting functino to get
335    # the properly native Python value.
336    f = [nativestr]
337    f += [cast[o] for o in ['withdist', 'withhash', 'withcoord'] if options[o]]
338    return [
339        list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list
340    ]
341
342
343def parse_pubsub_numsub(response, **options):
344    return list(zip(response[0::2], response[1::2]))
345
346
347class StrictRedis(object):
348    """
349    Implementation of the Redis protocol.
350
351    This abstract class provides a Python interface to all Redis commands
352    and an implementation of the Redis protocol.
353
354    Connection and Pipeline derive from this, implementing how
355    the commands are sent and received to the Redis server
356    """
357    RESPONSE_CALLBACKS = dict_merge(
358        string_keys_to_dict(
359            'AUTH EXISTS EXPIRE EXPIREAT HEXISTS HMSET MOVE MSETNX PERSIST '
360            'PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX',
361            bool
362        ),
363        string_keys_to_dict(
364            'BITCOUNT BITPOS DECRBY DEL GETBIT HDEL HLEN HSTRLEN INCRBY '
365            'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE '
366            'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD '
367            'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE '
368            'GEOADD',
369            int
370        ),
371        string_keys_to_dict(
372            'INCRBYFLOAT HINCRBYFLOAT GEODIST',
373            float
374        ),
375        string_keys_to_dict(
376            # these return OK, or int if redis-server is >=1.3.4
377            'LPUSH RPUSH',
378            lambda r: isinstance(r, (long, int)) and r or nativestr(r) == 'OK'
379        ),
380        string_keys_to_dict('SORT', sort_return_tuples),
381        string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
382        string_keys_to_dict(
383            'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE RENAME '
384            'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH',
385            bool_ok
386        ),
387        string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
388        string_keys_to_dict(
389            'SDIFF SINTER SMEMBERS SUNION',
390            lambda r: r and set(r) or set()
391        ),
392        string_keys_to_dict(
393            'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
394            zset_score_pairs
395        ),
396        string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
397        string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True),
398        {
399            'CLIENT GETNAME': lambda r: r and nativestr(r),
400            'CLIENT KILL': bool_ok,
401            'CLIENT LIST': parse_client_list,
402            'CLIENT SETNAME': bool_ok,
403            'CONFIG GET': parse_config_get,
404            'CONFIG RESETSTAT': bool_ok,
405            'CONFIG SET': bool_ok,
406            'DEBUG OBJECT': parse_debug_object,
407            'HGETALL': lambda r: r and pairs_to_dict(r) or {},
408            'HSCAN': parse_hscan,
409            'INFO': parse_info,
410            'LASTSAVE': timestamp_to_datetime,
411            'OBJECT': parse_object,
412            'PING': lambda r: nativestr(r) == 'PONG',
413            'RANDOMKEY': lambda r: r and r or None,
414            'SCAN': parse_scan,
415            'SCRIPT EXISTS': lambda r: list(imap(bool, r)),
416            'SCRIPT FLUSH': bool_ok,
417            'SCRIPT KILL': bool_ok,
418            'SCRIPT LOAD': nativestr,
419            'SENTINEL GET-MASTER-ADDR-BY-NAME': parse_sentinel_get_master,
420            'SENTINEL MASTER': parse_sentinel_master,
421            'SENTINEL MASTERS': parse_sentinel_masters,
422            'SENTINEL MONITOR': bool_ok,
423            'SENTINEL REMOVE': bool_ok,
424            'SENTINEL SENTINELS': parse_sentinel_slaves_and_sentinels,
425            'SENTINEL SET': bool_ok,
426            'SENTINEL SLAVES': parse_sentinel_slaves_and_sentinels,
427            'SET': lambda r: r and nativestr(r) == 'OK',
428            'SLOWLOG GET': parse_slowlog_get,
429            'SLOWLOG LEN': int,
430            'SLOWLOG RESET': bool_ok,
431            'SSCAN': parse_scan,
432            'TIME': lambda x: (int(x[0]), int(x[1])),
433            'ZSCAN': parse_zscan,
434            'CLUSTER ADDSLOTS': bool_ok,
435            'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x),
436            'CLUSTER COUNTKEYSINSLOT': lambda x: int(x),
437            'CLUSTER DELSLOTS': bool_ok,
438            'CLUSTER FAILOVER': bool_ok,
439            'CLUSTER FORGET': bool_ok,
440            'CLUSTER INFO': parse_cluster_info,
441            'CLUSTER KEYSLOT': lambda x: int(x),
442            'CLUSTER MEET': bool_ok,
443            'CLUSTER NODES': parse_cluster_nodes,
444            'CLUSTER REPLICATE': bool_ok,
445            'CLUSTER RESET': bool_ok,
446            'CLUSTER SAVECONFIG': bool_ok,
447            'CLUSTER SET-CONFIG-EPOCH': bool_ok,
448            'CLUSTER SETSLOT': bool_ok,
449            'CLUSTER SLAVES': parse_cluster_nodes,
450            'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]),
451                                         float(ll[1]))
452                                         if ll is not None else None, r)),
453            'GEOHASH': lambda r: list(map(nativestr, r)),
454            'GEORADIUS': parse_georadius_generic,
455            'GEORADIUSBYMEMBER': parse_georadius_generic,
456            'PUBSUB NUMSUB': parse_pubsub_numsub,
457        }
458    )
459
460    @classmethod
461    def from_url(cls, url, db=None, **kwargs):
462        """
463        Return a Redis client object configured from the given URL, which must
464        use either `the ``redis://`` scheme
465        <http://www.iana.org/assignments/uri-schemes/prov/redis>`_ for RESP
466        connections or the ``unix://`` scheme for Unix domain sockets.
467
468        For example::
469
470            redis://[:password]@localhost:6379/0
471            unix://[:password]@/path/to/socket.sock?db=0
472
473        There are several ways to specify a database number. The parse function
474        will return the first specified option:
475            1. A ``db`` querystring option, e.g. redis://localhost?db=0
476            2. If using the redis:// scheme, the path argument of the url, e.g.
477               redis://localhost/0
478            3. The ``db`` argument to this function.
479
480        If none of these options are specified, db=0 is used.
481
482        Any additional querystring arguments and keyword arguments will be
483        passed along to the ConnectionPool class's initializer. In the case
484        of conflicting arguments, querystring arguments always win.
485        """
486        connection_pool = ConnectionPool.from_url(url, db=db, **kwargs)
487        return cls(connection_pool=connection_pool)
488
489    def __init__(self, host='localhost', port=6379,
490                 db=0, password=None, socket_timeout=None,
491                 socket_connect_timeout=None,
492                 socket_keepalive=None, socket_keepalive_options=None,
493                 connection_pool=None, unix_socket_path=None,
494                 encoding='utf-8', encoding_errors='strict',
495                 charset=None, errors=None,
496                 decode_responses=False, retry_on_timeout=False,
497                 ssl=False, ssl_keyfile=None, ssl_certfile=None,
498                 ssl_cert_reqs=None, ssl_ca_certs=None,
499                 max_connections=None):
500        if not connection_pool:
501            if charset is not None:
502                warnings.warn(DeprecationWarning(
503                    '"charset" is deprecated. Use "encoding" instead'))
504                encoding = charset
505            if errors is not None:
506                warnings.warn(DeprecationWarning(
507                    '"errors" is deprecated. Use "encoding_errors" instead'))
508                encoding_errors = errors
509
510            kwargs = {
511                'db': db,
512                'password': password,
513                'socket_timeout': socket_timeout,
514                'encoding': encoding,
515                'encoding_errors': encoding_errors,
516                'decode_responses': decode_responses,
517                'retry_on_timeout': retry_on_timeout,
518                'max_connections': max_connections
519            }
520            # based on input, setup appropriate connection args
521            if unix_socket_path is not None:
522                kwargs.update({
523                    'path': unix_socket_path,
524                    'connection_class': UnixDomainSocketConnection
525                })
526            else:
527                # TCP specific options
528                kwargs.update({
529                    'host': host,
530                    'port': port,
531                    'socket_connect_timeout': socket_connect_timeout,
532                    'socket_keepalive': socket_keepalive,
533                    'socket_keepalive_options': socket_keepalive_options,
534                })
535
536                if ssl:
537                    kwargs.update({
538                        'connection_class': SSLConnection,
539                        'ssl_keyfile': ssl_keyfile,
540                        'ssl_certfile': ssl_certfile,
541                        'ssl_cert_reqs': ssl_cert_reqs,
542                        'ssl_ca_certs': ssl_ca_certs,
543                    })
544            connection_pool = ConnectionPool(**kwargs)
545        self.connection_pool = connection_pool
546        self._use_lua_lock = None
547
548        self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
549
550    def __repr__(self):
551        return "%s<%s>" % (type(self).__name__, repr(self.connection_pool))
552
553    def set_response_callback(self, command, callback):
554        "Set a custom Response Callback"
555        self.response_callbacks[command] = callback
556
557    def pipeline(self, transaction=True, shard_hint=None):
558        """
559        Return a new pipeline object that can queue multiple commands for
560        later execution. ``transaction`` indicates whether all commands
561        should be executed atomically. Apart from making a group of operations
562        atomic, pipelines are useful for reducing the back-and-forth overhead
563        between the client and server.
564        """
565        return StrictPipeline(
566            self.connection_pool,
567            self.response_callbacks,
568            transaction,
569            shard_hint)
570
571    def transaction(self, func, *watches, **kwargs):
572        """
573        Convenience method for executing the callable `func` as a transaction
574        while watching all keys specified in `watches`. The 'func' callable
575        should expect a single argument which is a Pipeline object.
576        """
577        shard_hint = kwargs.pop('shard_hint', None)
578        value_from_callable = kwargs.pop('value_from_callable', False)
579        watch_delay = kwargs.pop('watch_delay', None)
580        with self.pipeline(True, shard_hint) as pipe:
581            while 1:
582                try:
583                    if watches:
584                        pipe.watch(*watches)
585                    func_value = func(pipe)
586                    exec_value = pipe.execute()
587                    return func_value if value_from_callable else exec_value
588                except WatchError:
589                    if watch_delay is not None and watch_delay > 0:
590                        time.sleep(watch_delay)
591                    continue
592
593    def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,
594             lock_class=None, thread_local=True):
595        """
596        Return a new Lock object using key ``name`` that mimics
597        the behavior of threading.Lock.
598
599        If specified, ``timeout`` indicates a maximum life for the lock.
600        By default, it will remain locked until release() is called.
601
602        ``sleep`` indicates the amount of time to sleep per loop iteration
603        when the lock is in blocking mode and another client is currently
604        holding the lock.
605
606        ``blocking_timeout`` indicates the maximum amount of time in seconds to
607        spend trying to acquire the lock. A value of ``None`` indicates
608        continue trying forever. ``blocking_timeout`` can be specified as a
609        float or integer, both representing the number of seconds to wait.
610
611        ``lock_class`` forces the specified lock implementation.
612
613        ``thread_local`` indicates whether the lock token is placed in
614        thread-local storage. By default, the token is placed in thread local
615        storage so that a thread only sees its token, not a token set by
616        another thread. Consider the following timeline:
617
618            time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
619                     thread-1 sets the token to "abc"
620            time: 1, thread-2 blocks trying to acquire `my-lock` using the
621                     Lock instance.
622            time: 5, thread-1 has not yet completed. redis expires the lock
623                     key.
624            time: 5, thread-2 acquired `my-lock` now that it's available.
625                     thread-2 sets the token to "xyz"
626            time: 6, thread-1 finishes its work and calls release(). if the
627                     token is *not* stored in thread local storage, then
628                     thread-1 would see the token value as "xyz" and would be
629                     able to successfully release the thread-2's lock.
630
631        In some use cases it's necessary to disable thread local storage. For
632        example, if you have code where one thread acquires a lock and passes
633        that lock instance to a worker thread to release later. If thread
634        local storage isn't disabled in this case, the worker thread won't see
635        the token set by the thread that acquired the lock. Our assumption
636        is that these cases aren't common and as such default to using
637        thread local storage.        """
638        if lock_class is None:
639            if self._use_lua_lock is None:
640                # the first time .lock() is called, determine if we can use
641                # Lua by attempting to register the necessary scripts
642                try:
643                    LuaLock.register_scripts(self)
644                    self._use_lua_lock = True
645                except ResponseError:
646                    self._use_lua_lock = False
647            lock_class = self._use_lua_lock and LuaLock or Lock
648        return lock_class(self, name, timeout=timeout, sleep=sleep,
649                          blocking_timeout=blocking_timeout,
650                          thread_local=thread_local)
651
652    def pubsub(self, **kwargs):
653        """
654        Return a Publish/Subscribe object. With this object, you can
655        subscribe to channels and listen for messages that get published to
656        them.
657        """
658        return PubSub(self.connection_pool, **kwargs)
659
660    # COMMAND EXECUTION AND PROTOCOL PARSING
661    def execute_command(self, *args, **options):
662        "Execute a command and return a parsed response"
663        pool = self.connection_pool
664        command_name = args[0]
665        connection = pool.get_connection(command_name, **options)
666        try:
667            connection.send_command(*args)
668            return self.parse_response(connection, command_name, **options)
669        except (ConnectionError, TimeoutError) as e:
670            connection.disconnect()
671            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
672                raise
673            connection.send_command(*args)
674            return self.parse_response(connection, command_name, **options)
675        finally:
676            pool.release(connection)
677
678    def parse_response(self, connection, command_name, **options):
679        "Parses a response from the Redis server"
680        response = connection.read_response()
681        if command_name in self.response_callbacks:
682            return self.response_callbacks[command_name](response, **options)
683        return response
684
685    # SERVER INFORMATION
686    def bgrewriteaof(self):
687        "Tell the Redis server to rewrite the AOF file from data in memory."
688        return self.execute_command('BGREWRITEAOF')
689
690    def bgsave(self):
691        """
692        Tell the Redis server to save its data to disk.  Unlike save(),
693        this method is asynchronous and returns immediately.
694        """
695        return self.execute_command('BGSAVE')
696
697    def client_kill(self, address):
698        "Disconnects the client at ``address`` (ip:port)"
699        return self.execute_command('CLIENT KILL', address)
700
701    def client_list(self):
702        "Returns a list of currently connected clients"
703        return self.execute_command('CLIENT LIST')
704
705    def client_getname(self):
706        "Returns the current connection name"
707        return self.execute_command('CLIENT GETNAME')
708
709    def client_setname(self, name):
710        "Sets the current connection name"
711        return self.execute_command('CLIENT SETNAME', name)
712
713    def config_get(self, pattern="*"):
714        "Return a dictionary of configuration based on the ``pattern``"
715        return self.execute_command('CONFIG GET', pattern)
716
717    def config_set(self, name, value):
718        "Set config item ``name`` with ``value``"
719        return self.execute_command('CONFIG SET', name, value)
720
721    def config_resetstat(self):
722        "Reset runtime statistics"
723        return self.execute_command('CONFIG RESETSTAT')
724
725    def config_rewrite(self):
726        "Rewrite config file with the minimal change to reflect running config"
727        return self.execute_command('CONFIG REWRITE')
728
729    def dbsize(self):
730        "Returns the number of keys in the current database"
731        return self.execute_command('DBSIZE')
732
733    def debug_object(self, key):
734        "Returns version specific meta information about a given key"
735        return self.execute_command('DEBUG OBJECT', key)
736
737    def echo(self, value):
738        "Echo the string back from the server"
739        return self.execute_command('ECHO', value)
740
741    def flushall(self):
742        "Delete all keys in all databases on the current host"
743        return self.execute_command('FLUSHALL')
744
745    def flushdb(self):
746        "Delete all keys in the current database"
747        return self.execute_command('FLUSHDB')
748
749    def info(self, section=None):
750        """
751        Returns a dictionary containing information about the Redis server
752
753        The ``section`` option can be used to select a specific section
754        of information
755
756        The section option is not supported by older versions of Redis Server,
757        and will generate ResponseError
758        """
759        if section is None:
760            return self.execute_command('INFO')
761        else:
762            return self.execute_command('INFO', section)
763
764    def lastsave(self):
765        """
766        Return a Python datetime object representing the last time the
767        Redis database was saved to disk
768        """
769        return self.execute_command('LASTSAVE')
770
771    def object(self, infotype, key):
772        "Return the encoding, idletime, or refcount about the key"
773        return self.execute_command('OBJECT', infotype, key, infotype=infotype)
774
775    def ping(self):
776        "Ping the Redis server"
777        return self.execute_command('PING')
778
779    def save(self):
780        """
781        Tell the Redis server to save its data to disk,
782        blocking until the save is complete
783        """
784        return self.execute_command('SAVE')
785
786    def sentinel(self, *args):
787        "Redis Sentinel's SENTINEL command."
788        warnings.warn(
789            DeprecationWarning('Use the individual sentinel_* methods'))
790
791    def sentinel_get_master_addr_by_name(self, service_name):
792        "Returns a (host, port) pair for the given ``service_name``"
793        return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME',
794                                    service_name)
795
796    def sentinel_master(self, service_name):
797        "Returns a dictionary containing the specified masters state."
798        return self.execute_command('SENTINEL MASTER', service_name)
799
800    def sentinel_masters(self):
801        "Returns a list of dictionaries containing each master's state."
802        return self.execute_command('SENTINEL MASTERS')
803
804    def sentinel_monitor(self, name, ip, port, quorum):
805        "Add a new master to Sentinel to be monitored"
806        return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum)
807
808    def sentinel_remove(self, name):
809        "Remove a master from Sentinel's monitoring"
810        return self.execute_command('SENTINEL REMOVE', name)
811
812    def sentinel_sentinels(self, service_name):
813        "Returns a list of sentinels for ``service_name``"
814        return self.execute_command('SENTINEL SENTINELS', service_name)
815
816    def sentinel_set(self, name, option, value):
817        "Set Sentinel monitoring parameters for a given master"
818        return self.execute_command('SENTINEL SET', name, option, value)
819
820    def sentinel_slaves(self, service_name):
821        "Returns a list of slaves for ``service_name``"
822        return self.execute_command('SENTINEL SLAVES', service_name)
823
824    def shutdown(self):
825        "Shutdown the server"
826        try:
827            self.execute_command('SHUTDOWN')
828        except ConnectionError:
829            # a ConnectionError here is expected
830            return
831        raise RedisError("SHUTDOWN seems to have failed.")
832
833    def slaveof(self, host=None, port=None):
834        """
835        Set the server to be a replicated slave of the instance identified
836        by the ``host`` and ``port``. If called without arguments, the
837        instance is promoted to a master instead.
838        """
839        if host is None and port is None:
840            return self.execute_command('SLAVEOF', Token.get_token('NO'),
841                                        Token.get_token('ONE'))
842        return self.execute_command('SLAVEOF', host, port)
843
844    def slowlog_get(self, num=None):
845        """
846        Get the entries from the slowlog. If ``num`` is specified, get the
847        most recent ``num`` items.
848        """
849        args = ['SLOWLOG GET']
850        if num is not None:
851            args.append(num)
852        return self.execute_command(*args)
853
854    def slowlog_len(self):
855        "Get the number of items in the slowlog"
856        return self.execute_command('SLOWLOG LEN')
857
858    def slowlog_reset(self):
859        "Remove all items in the slowlog"
860        return self.execute_command('SLOWLOG RESET')
861
862    def time(self):
863        """
864        Returns the server time as a 2-item tuple of ints:
865        (seconds since epoch, microseconds into this second).
866        """
867        return self.execute_command('TIME')
868
869    def wait(self, num_replicas, timeout):
870        """
871        Redis synchronous replication
872        That returns the number of replicas that processed the query when
873        we finally have at least ``num_replicas``, or when the ``timeout`` was
874        reached.
875        """
876        return self.execute_command('WAIT', num_replicas, timeout)
877
878    # BASIC KEY COMMANDS
879    def append(self, key, value):
880        """
881        Appends the string ``value`` to the value at ``key``. If ``key``
882        doesn't already exist, create it with a value of ``value``.
883        Returns the new length of the value at ``key``.
884        """
885        return self.execute_command('APPEND', key, value)
886
887    def bitcount(self, key, start=None, end=None):
888        """
889        Returns the count of set bits in the value of ``key``.  Optional
890        ``start`` and ``end`` paramaters indicate which bytes to consider
891        """
892        params = [key]
893        if start is not None and end is not None:
894            params.append(start)
895            params.append(end)
896        elif (start is not None and end is None) or \
897                (end is not None and start is None):
898            raise RedisError("Both start and end must be specified")
899        return self.execute_command('BITCOUNT', *params)
900
901    def bitop(self, operation, dest, *keys):
902        """
903        Perform a bitwise operation using ``operation`` between ``keys`` and
904        store the result in ``dest``.
905        """
906        return self.execute_command('BITOP', operation, dest, *keys)
907
908    def bitpos(self, key, bit, start=None, end=None):
909        """
910        Return the position of the first bit set to 1 or 0 in a string.
911        ``start`` and ``end`` difines search range. The range is interpreted
912        as a range of bytes and not a range of bits, so start=0 and end=2
913        means to look at the first three bytes.
914        """
915        if bit not in (0, 1):
916            raise RedisError('bit must be 0 or 1')
917        params = [key, bit]
918
919        start is not None and params.append(start)
920
921        if start is not None and end is not None:
922            params.append(end)
923        elif start is None and end is not None:
924            raise RedisError("start argument is not set, "
925                             "when end is specified")
926        return self.execute_command('BITPOS', *params)
927
928    def decr(self, name, amount=1):
929        """
930        Decrements the value of ``key`` by ``amount``.  If no key exists,
931        the value will be initialized as 0 - ``amount``
932        """
933        return self.execute_command('DECRBY', name, amount)
934
935    def delete(self, *names):
936        "Delete one or more keys specified by ``names``"
937        return self.execute_command('DEL', *names)
938
939    def __delitem__(self, name):
940        self.delete(name)
941
942    def dump(self, name):
943        """
944        Return a serialized version of the value stored at the specified key.
945        If key does not exist a nil bulk reply is returned.
946        """
947        return self.execute_command('DUMP', name)
948
949    def exists(self, name):
950        "Returns a boolean indicating whether key ``name`` exists"
951        return self.execute_command('EXISTS', name)
952    __contains__ = exists
953
954    def expire(self, name, time):
955        """
956        Set an expire flag on key ``name`` for ``time`` seconds. ``time``
957        can be represented by an integer or a Python timedelta object.
958        """
959        if isinstance(time, datetime.timedelta):
960            time = time.seconds + time.days * 24 * 3600
961        return self.execute_command('EXPIRE', name, time)
962
963    def expireat(self, name, when):
964        """
965        Set an expire flag on key ``name``. ``when`` can be represented
966        as an integer indicating unix time or a Python datetime object.
967        """
968        if isinstance(when, datetime.datetime):
969            when = int(mod_time.mktime(when.timetuple()))
970        return self.execute_command('EXPIREAT', name, when)
971
972    def get(self, name):
973        """
974        Return the value at key ``name``, or None if the key doesn't exist
975        """
976        return self.execute_command('GET', name)
977
978    def __getitem__(self, name):
979        """
980        Return the value at key ``name``, raises a KeyError if the key
981        doesn't exist.
982        """
983        value = self.get(name)
984        if value is not None:
985            return value
986        raise KeyError(name)
987
988    def getbit(self, name, offset):
989        "Returns a boolean indicating the value of ``offset`` in ``name``"
990        return self.execute_command('GETBIT', name, offset)
991
992    def getrange(self, key, start, end):
993        """
994        Returns the substring of the string value stored at ``key``,
995        determined by the offsets ``start`` and ``end`` (both are inclusive)
996        """
997        return self.execute_command('GETRANGE', key, start, end)
998
999    def getset(self, name, value):
1000        """
1001        Sets the value at key ``name`` to ``value``
1002        and returns the old value at key ``name`` atomically.
1003        """
1004        return self.execute_command('GETSET', name, value)
1005
1006    def incr(self, name, amount=1):
1007        """
1008        Increments the value of ``key`` by ``amount``.  If no key exists,
1009        the value will be initialized as ``amount``
1010        """
1011        return self.execute_command('INCRBY', name, amount)
1012
1013    def incrby(self, name, amount=1):
1014        """
1015        Increments the value of ``key`` by ``amount``.  If no key exists,
1016        the value will be initialized as ``amount``
1017        """
1018
1019        # An alias for ``incr()``, because it is already implemented
1020        # as INCRBY redis command.
1021        return self.incr(name, amount)
1022
1023    def incrbyfloat(self, name, amount=1.0):
1024        """
1025        Increments the value at key ``name`` by floating ``amount``.
1026        If no key exists, the value will be initialized as ``amount``
1027        """
1028        return self.execute_command('INCRBYFLOAT', name, amount)
1029
1030    def keys(self, pattern='*'):
1031        "Returns a list of keys matching ``pattern``"
1032        return self.execute_command('KEYS', pattern)
1033
1034    def mget(self, keys, *args):
1035        """
1036        Returns a list of values ordered identically to ``keys``
1037        """
1038        args = list_or_args(keys, args)
1039        return self.execute_command('MGET', *args)
1040
1041    def mset(self, *args, **kwargs):
1042        """
1043        Sets key/values based on a mapping. Mapping can be supplied as a single
1044        dictionary argument or as kwargs.
1045        """
1046        if args:
1047            if len(args) != 1 or not isinstance(args[0], dict):
1048                raise RedisError('MSET requires **kwargs or a single dict arg')
1049            kwargs.update(args[0])
1050        items = []
1051        for pair in iteritems(kwargs):
1052            items.extend(pair)
1053        return self.execute_command('MSET', *items)
1054
1055    def msetnx(self, *args, **kwargs):
1056        """
1057        Sets key/values based on a mapping if none of the keys are already set.
1058        Mapping can be supplied as a single dictionary argument or as kwargs.
1059        Returns a boolean indicating if the operation was successful.
1060        """
1061        if args:
1062            if len(args) != 1 or not isinstance(args[0], dict):
1063                raise RedisError('MSETNX requires **kwargs or a single '
1064                                 'dict arg')
1065            kwargs.update(args[0])
1066        items = []
1067        for pair in iteritems(kwargs):
1068            items.extend(pair)
1069        return self.execute_command('MSETNX', *items)
1070
1071    def move(self, name, db):
1072        "Moves the key ``name`` to a different Redis database ``db``"
1073        return self.execute_command('MOVE', name, db)
1074
1075    def persist(self, name):
1076        "Removes an expiration on ``name``"
1077        return self.execute_command('PERSIST', name)
1078
1079    def pexpire(self, name, time):
1080        """
1081        Set an expire flag on key ``name`` for ``time`` milliseconds.
1082        ``time`` can be represented by an integer or a Python timedelta
1083        object.
1084        """
1085        if isinstance(time, datetime.timedelta):
1086            ms = int(time.microseconds / 1000)
1087            time = (time.seconds + time.days * 24 * 3600) * 1000 + ms
1088        return self.execute_command('PEXPIRE', name, time)
1089
1090    def pexpireat(self, name, when):
1091        """
1092        Set an expire flag on key ``name``. ``when`` can be represented
1093        as an integer representing unix time in milliseconds (unix time * 1000)
1094        or a Python datetime object.
1095        """
1096        if isinstance(when, datetime.datetime):
1097            ms = int(when.microsecond / 1000)
1098            when = int(mod_time.mktime(when.timetuple())) * 1000 + ms
1099        return self.execute_command('PEXPIREAT', name, when)
1100
1101    def psetex(self, name, time_ms, value):
1102        """
1103        Set the value of key ``name`` to ``value`` that expires in ``time_ms``
1104        milliseconds. ``time_ms`` can be represented by an integer or a Python
1105        timedelta object
1106        """
1107        if isinstance(time_ms, datetime.timedelta):
1108            ms = int(time_ms.microseconds / 1000)
1109            time_ms = (time_ms.seconds + time_ms.days * 24 * 3600) * 1000 + ms
1110        return self.execute_command('PSETEX', name, time_ms, value)
1111
1112    def pttl(self, name):
1113        "Returns the number of milliseconds until the key ``name`` will expire"
1114        return self.execute_command('PTTL', name)
1115
1116    def randomkey(self):
1117        "Returns the name of a random key"
1118        return self.execute_command('RANDOMKEY')
1119
1120    def rename(self, src, dst):
1121        """
1122        Rename key ``src`` to ``dst``
1123        """
1124        return self.execute_command('RENAME', src, dst)
1125
1126    def renamenx(self, src, dst):
1127        "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
1128        return self.execute_command('RENAMENX', src, dst)
1129
1130    def restore(self, name, ttl, value, replace=False):
1131        """
1132        Create a key using the provided serialized value, previously obtained
1133        using DUMP.
1134        """
1135        params = [name, ttl, value]
1136        if replace:
1137            params.append('REPLACE')
1138        return self.execute_command('RESTORE', *params)
1139
1140    def set(self, name, value, ex=None, px=None, nx=False, xx=False):
1141        """
1142        Set the value at key ``name`` to ``value``
1143
1144        ``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.
1145
1146        ``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.
1147
1148        ``nx`` if set to True, set the value at key ``name`` to ``value`` only
1149            if it does not exist.
1150
1151        ``xx`` if set to True, set the value at key ``name`` to ``value`` only
1152            if it already exists.
1153        """
1154        pieces = [name, value]
1155        if ex is not None:
1156            pieces.append('EX')
1157            if isinstance(ex, datetime.timedelta):
1158                ex = ex.seconds + ex.days * 24 * 3600
1159            pieces.append(ex)
1160        if px is not None:
1161            pieces.append('PX')
1162            if isinstance(px, datetime.timedelta):
1163                ms = int(px.microseconds / 1000)
1164                px = (px.seconds + px.days * 24 * 3600) * 1000 + ms
1165            pieces.append(px)
1166
1167        if nx:
1168            pieces.append('NX')
1169        if xx:
1170            pieces.append('XX')
1171        return self.execute_command('SET', *pieces)
1172
1173    def __setitem__(self, name, value):
1174        self.set(name, value)
1175
1176    def setbit(self, name, offset, value):
1177        """
1178        Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
1179        indicating the previous value of ``offset``.
1180        """
1181        value = value and 1 or 0
1182        return self.execute_command('SETBIT', name, offset, value)
1183
1184    def setex(self, name, time, value):
1185        """
1186        Set the value of key ``name`` to ``value`` that expires in ``time``
1187        seconds. ``time`` can be represented by an integer or a Python
1188        timedelta object.
1189        """
1190        if isinstance(time, datetime.timedelta):
1191            time = time.seconds + time.days * 24 * 3600
1192        return self.execute_command('SETEX', name, time, value)
1193
1194    def setnx(self, name, value):
1195        "Set the value of key ``name`` to ``value`` if key doesn't exist"
1196        return self.execute_command('SETNX', name, value)
1197
1198    def setrange(self, name, offset, value):
1199        """
1200        Overwrite bytes in the value of ``name`` starting at ``offset`` with
1201        ``value``. If ``offset`` plus the length of ``value`` exceeds the
1202        length of the original value, the new value will be larger than before.
1203        If ``offset`` exceeds the length of the original value, null bytes
1204        will be used to pad between the end of the previous value and the start
1205        of what's being injected.
1206
1207        Returns the length of the new string.
1208        """
1209        return self.execute_command('SETRANGE', name, offset, value)
1210
1211    def strlen(self, name):
1212        "Return the number of bytes stored in the value of ``name``"
1213        return self.execute_command('STRLEN', name)
1214
1215    def substr(self, name, start, end=-1):
1216        """
1217        Return a substring of the string at key ``name``. ``start`` and ``end``
1218        are 0-based integers specifying the portion of the string to return.
1219        """
1220        return self.execute_command('SUBSTR', name, start, end)
1221
1222    def touch(self, *args):
1223        """
1224        Alters the last access time of a key(s) ``*args``. A key is ignored
1225        if it does not exist.
1226        """
1227        return self.execute_command('TOUCH', *args)
1228
1229    def ttl(self, name):
1230        "Returns the number of seconds until the key ``name`` will expire"
1231        return self.execute_command('TTL', name)
1232
1233    def type(self, name):
1234        "Returns the type of key ``name``"
1235        return self.execute_command('TYPE', name)
1236
1237    def watch(self, *names):
1238        """
1239        Watches the values at keys ``names``, or None if the key doesn't exist
1240        """
1241        warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
1242
1243    def unwatch(self):
1244        """
1245        Unwatches the value at key ``name``, or None of the key doesn't exist
1246        """
1247        warnings.warn(
1248            DeprecationWarning('Call UNWATCH from a Pipeline object'))
1249
1250    # LIST COMMANDS
1251    def blpop(self, keys, timeout=0):
1252        """
1253        LPOP a value off of the first non-empty list
1254        named in the ``keys`` list.
1255
1256        If none of the lists in ``keys`` has a value to LPOP, then block
1257        for ``timeout`` seconds, or until a value gets pushed on to one
1258        of the lists.
1259
1260        If timeout is 0, then block indefinitely.
1261        """
1262        if timeout is None:
1263            timeout = 0
1264        if isinstance(keys, basestring):
1265            keys = [keys]
1266        else:
1267            keys = list(keys)
1268        keys.append(timeout)
1269        return self.execute_command('BLPOP', *keys)
1270
1271    def brpop(self, keys, timeout=0):
1272        """
1273        RPOP a value off of the first non-empty list
1274        named in the ``keys`` list.
1275
1276        If none of the lists in ``keys`` has a value to RPOP, then block
1277        for ``timeout`` seconds, or until a value gets pushed on to one
1278        of the lists.
1279
1280        If timeout is 0, then block indefinitely.
1281        """
1282        if timeout is None:
1283            timeout = 0
1284        if isinstance(keys, basestring):
1285            keys = [keys]
1286        else:
1287            keys = list(keys)
1288        keys.append(timeout)
1289        return self.execute_command('BRPOP', *keys)
1290
1291    def brpoplpush(self, src, dst, timeout=0):
1292        """
1293        Pop a value off the tail of ``src``, push it on the head of ``dst``
1294        and then return it.
1295
1296        This command blocks until a value is in ``src`` or until ``timeout``
1297        seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
1298        forever.
1299        """
1300        if timeout is None:
1301            timeout = 0
1302        return self.execute_command('BRPOPLPUSH', src, dst, timeout)
1303
1304    def lindex(self, name, index):
1305        """
1306        Return the item from list ``name`` at position ``index``
1307
1308        Negative indexes are supported and will return an item at the
1309        end of the list
1310        """
1311        return self.execute_command('LINDEX', name, index)
1312
1313    def linsert(self, name, where, refvalue, value):
1314        """
1315        Insert ``value`` in list ``name`` either immediately before or after
1316        [``where``] ``refvalue``
1317
1318        Returns the new length of the list on success or -1 if ``refvalue``
1319        is not in the list.
1320        """
1321        return self.execute_command('LINSERT', name, where, refvalue, value)
1322
1323    def llen(self, name):
1324        "Return the length of the list ``name``"
1325        return self.execute_command('LLEN', name)
1326
1327    def lpop(self, name):
1328        "Remove and return the first item of the list ``name``"
1329        return self.execute_command('LPOP', name)
1330
1331    def lpush(self, name, *values):
1332        "Push ``values`` onto the head of the list ``name``"
1333        return self.execute_command('LPUSH', name, *values)
1334
1335    def lpushx(self, name, value):
1336        "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
1337        return self.execute_command('LPUSHX', name, value)
1338
1339    def lrange(self, name, start, end):
1340        """
1341        Return a slice of the list ``name`` between
1342        position ``start`` and ``end``
1343
1344        ``start`` and ``end`` can be negative numbers just like
1345        Python slicing notation
1346        """
1347        return self.execute_command('LRANGE', name, start, end)
1348
1349    def lrem(self, name, count, value):
1350        """
1351        Remove the first ``count`` occurrences of elements equal to ``value``
1352        from the list stored at ``name``.
1353
1354        The count argument influences the operation in the following ways:
1355            count > 0: Remove elements equal to value moving from head to tail.
1356            count < 0: Remove elements equal to value moving from tail to head.
1357            count = 0: Remove all elements equal to value.
1358        """
1359        return self.execute_command('LREM', name, count, value)
1360
1361    def lset(self, name, index, value):
1362        "Set ``position`` of list ``name`` to ``value``"
1363        return self.execute_command('LSET', name, index, value)
1364
1365    def ltrim(self, name, start, end):
1366        """
1367        Trim the list ``name``, removing all values not within the slice
1368        between ``start`` and ``end``
1369
1370        ``start`` and ``end`` can be negative numbers just like
1371        Python slicing notation
1372        """
1373        return self.execute_command('LTRIM', name, start, end)
1374
1375    def rpop(self, name):
1376        "Remove and return the last item of the list ``name``"
1377        return self.execute_command('RPOP', name)
1378
1379    def rpoplpush(self, src, dst):
1380        """
1381        RPOP a value off of the ``src`` list and atomically LPUSH it
1382        on to the ``dst`` list.  Returns the value.
1383        """
1384        return self.execute_command('RPOPLPUSH', src, dst)
1385
1386    def rpush(self, name, *values):
1387        "Push ``values`` onto the tail of the list ``name``"
1388        return self.execute_command('RPUSH', name, *values)
1389
1390    def rpushx(self, name, value):
1391        "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
1392        return self.execute_command('RPUSHX', name, value)
1393
1394    def sort(self, name, start=None, num=None, by=None, get=None,
1395             desc=False, alpha=False, store=None, groups=False):
1396        """
1397        Sort and return the list, set or sorted set at ``name``.
1398
1399        ``start`` and ``num`` allow for paging through the sorted data
1400
1401        ``by`` allows using an external key to weight and sort the items.
1402            Use an "*" to indicate where in the key the item value is located
1403
1404        ``get`` allows for returning items from external keys rather than the
1405            sorted data itself.  Use an "*" to indicate where int he key
1406            the item value is located
1407
1408        ``desc`` allows for reversing the sort
1409
1410        ``alpha`` allows for sorting lexicographically rather than numerically
1411
1412        ``store`` allows for storing the result of the sort into
1413            the key ``store``
1414
1415        ``groups`` if set to True and if ``get`` contains at least two
1416            elements, sort will return a list of tuples, each containing the
1417            values fetched from the arguments to ``get``.
1418
1419        """
1420        if (start is not None and num is None) or \
1421                (num is not None and start is None):
1422            raise RedisError("``start`` and ``num`` must both be specified")
1423
1424        pieces = [name]
1425        if by is not None:
1426            pieces.append(Token.get_token('BY'))
1427            pieces.append(by)
1428        if start is not None and num is not None:
1429            pieces.append(Token.get_token('LIMIT'))
1430            pieces.append(start)
1431            pieces.append(num)
1432        if get is not None:
1433            # If get is a string assume we want to get a single value.
1434            # Otherwise assume it's an interable and we want to get multiple
1435            # values. We can't just iterate blindly because strings are
1436            # iterable.
1437            if isinstance(get, basestring):
1438                pieces.append(Token.get_token('GET'))
1439                pieces.append(get)
1440            else:
1441                for g in get:
1442                    pieces.append(Token.get_token('GET'))
1443                    pieces.append(g)
1444        if desc:
1445            pieces.append(Token.get_token('DESC'))
1446        if alpha:
1447            pieces.append(Token.get_token('ALPHA'))
1448        if store is not None:
1449            pieces.append(Token.get_token('STORE'))
1450            pieces.append(store)
1451
1452        if groups:
1453            if not get or isinstance(get, basestring) or len(get) < 2:
1454                raise DataError('when using "groups" the "get" argument '
1455                                'must be specified and contain at least '
1456                                'two keys')
1457
1458        options = {'groups': len(get) if groups else None}
1459        return self.execute_command('SORT', *pieces, **options)
1460
1461    # SCAN COMMANDS
1462    def scan(self, cursor=0, match=None, count=None):
1463        """
1464        Incrementally return lists of key names. Also return a cursor
1465        indicating the scan position.
1466
1467        ``match`` allows for filtering the keys by pattern
1468
1469        ``count`` allows for hint the minimum number of returns
1470        """
1471        pieces = [cursor]
1472        if match is not None:
1473            pieces.extend([Token.get_token('MATCH'), match])
1474        if count is not None:
1475            pieces.extend([Token.get_token('COUNT'), count])
1476        return self.execute_command('SCAN', *pieces)
1477
1478    def scan_iter(self, match=None, count=None):
1479        """
1480        Make an iterator using the SCAN command so that the client doesn't
1481        need to remember the cursor position.
1482
1483        ``match`` allows for filtering the keys by pattern
1484
1485        ``count`` allows for hint the minimum number of returns
1486        """
1487        cursor = '0'
1488        while cursor != 0:
1489            cursor, data = self.scan(cursor=cursor, match=match, count=count)
1490            for item in data:
1491                yield item
1492
1493    def sscan(self, name, cursor=0, match=None, count=None):
1494        """
1495        Incrementally return lists of elements in a set. Also return a cursor
1496        indicating the scan position.
1497
1498        ``match`` allows for filtering the keys by pattern
1499
1500        ``count`` allows for hint the minimum number of returns
1501        """
1502        pieces = [name, cursor]
1503        if match is not None:
1504            pieces.extend([Token.get_token('MATCH'), match])
1505        if count is not None:
1506            pieces.extend([Token.get_token('COUNT'), count])
1507        return self.execute_command('SSCAN', *pieces)
1508
1509    def sscan_iter(self, name, match=None, count=None):
1510        """
1511        Make an iterator using the SSCAN command so that the client doesn't
1512        need to remember the cursor position.
1513
1514        ``match`` allows for filtering the keys by pattern
1515
1516        ``count`` allows for hint the minimum number of returns
1517        """
1518        cursor = '0'
1519        while cursor != 0:
1520            cursor, data = self.sscan(name, cursor=cursor,
1521                                      match=match, count=count)
1522            for item in data:
1523                yield item
1524
1525    def hscan(self, name, cursor=0, match=None, count=None):
1526        """
1527        Incrementally return key/value slices in a hash. Also return a cursor
1528        indicating the scan position.
1529
1530        ``match`` allows for filtering the keys by pattern
1531
1532        ``count`` allows for hint the minimum number of returns
1533        """
1534        pieces = [name, cursor]
1535        if match is not None:
1536            pieces.extend([Token.get_token('MATCH'), match])
1537        if count is not None:
1538            pieces.extend([Token.get_token('COUNT'), count])
1539        return self.execute_command('HSCAN', *pieces)
1540
1541    def hscan_iter(self, name, match=None, count=None):
1542        """
1543        Make an iterator using the HSCAN command so that the client doesn't
1544        need to remember the cursor position.
1545
1546        ``match`` allows for filtering the keys by pattern
1547
1548        ``count`` allows for hint the minimum number of returns
1549        """
1550        cursor = '0'
1551        while cursor != 0:
1552            cursor, data = self.hscan(name, cursor=cursor,
1553                                      match=match, count=count)
1554            for item in data.items():
1555                yield item
1556
1557    def zscan(self, name, cursor=0, match=None, count=None,
1558              score_cast_func=float):
1559        """
1560        Incrementally return lists of elements in a sorted set. Also return a
1561        cursor indicating the scan position.
1562
1563        ``match`` allows for filtering the keys by pattern
1564
1565        ``count`` allows for hint the minimum number of returns
1566
1567        ``score_cast_func`` a callable used to cast the score return value
1568        """
1569        pieces = [name, cursor]
1570        if match is not None:
1571            pieces.extend([Token.get_token('MATCH'), match])
1572        if count is not None:
1573            pieces.extend([Token.get_token('COUNT'), count])
1574        options = {'score_cast_func': score_cast_func}
1575        return self.execute_command('ZSCAN', *pieces, **options)
1576
1577    def zscan_iter(self, name, match=None, count=None,
1578                   score_cast_func=float):
1579        """
1580        Make an iterator using the ZSCAN command so that the client doesn't
1581        need to remember the cursor position.
1582
1583        ``match`` allows for filtering the keys by pattern
1584
1585        ``count`` allows for hint the minimum number of returns
1586
1587        ``score_cast_func`` a callable used to cast the score return value
1588        """
1589        cursor = '0'
1590        while cursor != 0:
1591            cursor, data = self.zscan(name, cursor=cursor, match=match,
1592                                      count=count,
1593                                      score_cast_func=score_cast_func)
1594            for item in data:
1595                yield item
1596
1597    # SET COMMANDS
1598    def sadd(self, name, *values):
1599        "Add ``value(s)`` to set ``name``"
1600        return self.execute_command('SADD', name, *values)
1601
1602    def scard(self, name):
1603        "Return the number of elements in set ``name``"
1604        return self.execute_command('SCARD', name)
1605
1606    def sdiff(self, keys, *args):
1607        "Return the difference of sets specified by ``keys``"
1608        args = list_or_args(keys, args)
1609        return self.execute_command('SDIFF', *args)
1610
1611    def sdiffstore(self, dest, keys, *args):
1612        """
1613        Store the difference of sets specified by ``keys`` into a new
1614        set named ``dest``.  Returns the number of keys in the new set.
1615        """
1616        args = list_or_args(keys, args)
1617        return self.execute_command('SDIFFSTORE', dest, *args)
1618
1619    def sinter(self, keys, *args):
1620        "Return the intersection of sets specified by ``keys``"
1621        args = list_or_args(keys, args)
1622        return self.execute_command('SINTER', *args)
1623
1624    def sinterstore(self, dest, keys, *args):
1625        """
1626        Store the intersection of sets specified by ``keys`` into a new
1627        set named ``dest``.  Returns the number of keys in the new set.
1628        """
1629        args = list_or_args(keys, args)
1630        return self.execute_command('SINTERSTORE', dest, *args)
1631
1632    def sismember(self, name, value):
1633        "Return a boolean indicating if ``value`` is a member of set ``name``"
1634        return self.execute_command('SISMEMBER', name, value)
1635
1636    def smembers(self, name):
1637        "Return all members of the set ``name``"
1638        return self.execute_command('SMEMBERS', name)
1639
1640    def smove(self, src, dst, value):
1641        "Move ``value`` from set ``src`` to set ``dst`` atomically"
1642        return self.execute_command('SMOVE', src, dst, value)
1643
1644    def spop(self, name):
1645        "Remove and return a random member of set ``name``"
1646        return self.execute_command('SPOP', name)
1647
1648    def srandmember(self, name, number=None):
1649        """
1650        If ``number`` is None, returns a random member of set ``name``.
1651
1652        If ``number`` is supplied, returns a list of ``number`` random
1653        memebers of set ``name``. Note this is only available when running
1654        Redis 2.6+.
1655        """
1656        args = (number is not None) and [number] or []
1657        return self.execute_command('SRANDMEMBER', name, *args)
1658
1659    def srem(self, name, *values):
1660        "Remove ``values`` from set ``name``"
1661        return self.execute_command('SREM', name, *values)
1662
1663    def sunion(self, keys, *args):
1664        "Return the union of sets specified by ``keys``"
1665        args = list_or_args(keys, args)
1666        return self.execute_command('SUNION', *args)
1667
1668    def sunionstore(self, dest, keys, *args):
1669        """
1670        Store the union of sets specified by ``keys`` into a new
1671        set named ``dest``.  Returns the number of keys in the new set.
1672        """
1673        args = list_or_args(keys, args)
1674        return self.execute_command('SUNIONSTORE', dest, *args)
1675
1676    # SORTED SET COMMANDS
1677    def zadd(self, name, *args, **kwargs):
1678        """
1679        Set any number of score, element-name pairs to the key ``name``. Pairs
1680        can be specified in two ways:
1681
1682        As *args, in the form of: score1, name1, score2, name2, ...
1683        or as **kwargs, in the form of: name1=score1, name2=score2, ...
1684
1685        The following example would add four values to the 'my-key' key:
1686        redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
1687        """
1688        pieces = []
1689        if args:
1690            if len(args) % 2 != 0:
1691                raise RedisError("ZADD requires an equal number of "
1692                                 "values and scores")
1693            pieces.extend(args)
1694        for pair in iteritems(kwargs):
1695            pieces.append(pair[1])
1696            pieces.append(pair[0])
1697        return self.execute_command('ZADD', name, *pieces)
1698
1699    def zcard(self, name):
1700        "Return the number of elements in the sorted set ``name``"
1701        return self.execute_command('ZCARD', name)
1702
1703    def zcount(self, name, min, max):
1704        """
1705        Returns the number of elements in the sorted set at key ``name`` with
1706        a score between ``min`` and ``max``.
1707        """
1708        return self.execute_command('ZCOUNT', name, min, max)
1709
1710    def zincrby(self, name, value, amount=1):
1711        "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
1712        return self.execute_command('ZINCRBY', name, amount, value)
1713
1714    def zinterstore(self, dest, keys, aggregate=None):
1715        """
1716        Intersect multiple sorted sets specified by ``keys`` into
1717        a new sorted set, ``dest``. Scores in the destination will be
1718        aggregated based on the ``aggregate``, or SUM if none is provided.
1719        """
1720        return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
1721
1722    def zlexcount(self, name, min, max):
1723        """
1724        Return the number of items in the sorted set ``name`` between the
1725        lexicographical range ``min`` and ``max``.
1726        """
1727        return self.execute_command('ZLEXCOUNT', name, min, max)
1728
1729    def zrange(self, name, start, end, desc=False, withscores=False,
1730               score_cast_func=float):
1731        """
1732        Return a range of values from sorted set ``name`` between
1733        ``start`` and ``end`` sorted in ascending order.
1734
1735        ``start`` and ``end`` can be negative, indicating the end of the range.
1736
1737        ``desc`` a boolean indicating whether to sort the results descendingly
1738
1739        ``withscores`` indicates to return the scores along with the values.
1740        The return type is a list of (value, score) pairs
1741
1742        ``score_cast_func`` a callable used to cast the score return value
1743        """
1744        if desc:
1745            return self.zrevrange(name, start, end, withscores,
1746                                  score_cast_func)
1747        pieces = ['ZRANGE', name, start, end]
1748        if withscores:
1749            pieces.append(Token.get_token('WITHSCORES'))
1750        options = {
1751            'withscores': withscores,
1752            'score_cast_func': score_cast_func
1753        }
1754        return self.execute_command(*pieces, **options)
1755
1756    def zrangebylex(self, name, min, max, start=None, num=None):
1757        """
1758        Return the lexicographical range of values from sorted set ``name``
1759        between ``min`` and ``max``.
1760
1761        If ``start`` and ``num`` are specified, then return a slice of the
1762        range.
1763        """
1764        if (start is not None and num is None) or \
1765                (num is not None and start is None):
1766            raise RedisError("``start`` and ``num`` must both be specified")
1767        pieces = ['ZRANGEBYLEX', name, min, max]
1768        if start is not None and num is not None:
1769            pieces.extend([Token.get_token('LIMIT'), start, num])
1770        return self.execute_command(*pieces)
1771
1772    def zrevrangebylex(self, name, max, min, start=None, num=None):
1773        """
1774        Return the reversed lexicographical range of values from sorted set
1775        ``name`` between ``max`` and ``min``.
1776
1777        If ``start`` and ``num`` are specified, then return a slice of the
1778        range.
1779        """
1780        if (start is not None and num is None) or \
1781                (num is not None and start is None):
1782            raise RedisError("``start`` and ``num`` must both be specified")
1783        pieces = ['ZREVRANGEBYLEX', name, max, min]
1784        if start is not None and num is not None:
1785            pieces.extend([Token.get_token('LIMIT'), start, num])
1786        return self.execute_command(*pieces)
1787
1788    def zrangebyscore(self, name, min, max, start=None, num=None,
1789                      withscores=False, score_cast_func=float):
1790        """
1791        Return a range of values from the sorted set ``name`` with scores
1792        between ``min`` and ``max``.
1793
1794        If ``start`` and ``num`` are specified, then return a slice
1795        of the range.
1796
1797        ``withscores`` indicates to return the scores along with the values.
1798        The return type is a list of (value, score) pairs
1799
1800        `score_cast_func`` a callable used to cast the score return value
1801        """
1802        if (start is not None and num is None) or \
1803                (num is not None and start is None):
1804            raise RedisError("``start`` and ``num`` must both be specified")
1805        pieces = ['ZRANGEBYSCORE', name, min, max]
1806        if start is not None and num is not None:
1807            pieces.extend([Token.get_token('LIMIT'), start, num])
1808        if withscores:
1809            pieces.append(Token.get_token('WITHSCORES'))
1810        options = {
1811            'withscores': withscores,
1812            'score_cast_func': score_cast_func
1813        }
1814        return self.execute_command(*pieces, **options)
1815
1816    def zrank(self, name, value):
1817        """
1818        Returns a 0-based value indicating the rank of ``value`` in sorted set
1819        ``name``
1820        """
1821        return self.execute_command('ZRANK', name, value)
1822
1823    def zrem(self, name, *values):
1824        "Remove member ``values`` from sorted set ``name``"
1825        return self.execute_command('ZREM', name, *values)
1826
1827    def zremrangebylex(self, name, min, max):
1828        """
1829        Remove all elements in the sorted set ``name`` between the
1830        lexicographical range specified by ``min`` and ``max``.
1831
1832        Returns the number of elements removed.
1833        """
1834        return self.execute_command('ZREMRANGEBYLEX', name, min, max)
1835
1836    def zremrangebyrank(self, name, min, max):
1837        """
1838        Remove all elements in the sorted set ``name`` with ranks between
1839        ``min`` and ``max``. Values are 0-based, ordered from smallest score
1840        to largest. Values can be negative indicating the highest scores.
1841        Returns the number of elements removed
1842        """
1843        return self.execute_command('ZREMRANGEBYRANK', name, min, max)
1844
1845    def zremrangebyscore(self, name, min, max):
1846        """
1847        Remove all elements in the sorted set ``name`` with scores
1848        between ``min`` and ``max``. Returns the number of elements removed.
1849        """
1850        return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
1851
1852    def zrevrange(self, name, start, end, withscores=False,
1853                  score_cast_func=float):
1854        """
1855        Return a range of values from sorted set ``name`` between
1856        ``start`` and ``end`` sorted in descending order.
1857
1858        ``start`` and ``end`` can be negative, indicating the end of the range.
1859
1860        ``withscores`` indicates to return the scores along with the values
1861        The return type is a list of (value, score) pairs
1862
1863        ``score_cast_func`` a callable used to cast the score return value
1864        """
1865        pieces = ['ZREVRANGE', name, start, end]
1866        if withscores:
1867            pieces.append(Token.get_token('WITHSCORES'))
1868        options = {
1869            'withscores': withscores,
1870            'score_cast_func': score_cast_func
1871        }
1872        return self.execute_command(*pieces, **options)
1873
1874    def zrevrangebyscore(self, name, max, min, start=None, num=None,
1875                         withscores=False, score_cast_func=float):
1876        """
1877        Return a range of values from the sorted set ``name`` with scores
1878        between ``min`` and ``max`` in descending order.
1879
1880        If ``start`` and ``num`` are specified, then return a slice
1881        of the range.
1882
1883        ``withscores`` indicates to return the scores along with the values.
1884        The return type is a list of (value, score) pairs
1885
1886        ``score_cast_func`` a callable used to cast the score return value
1887        """
1888        if (start is not None and num is None) or \
1889                (num is not None and start is None):
1890            raise RedisError("``start`` and ``num`` must both be specified")
1891        pieces = ['ZREVRANGEBYSCORE', name, max, min]
1892        if start is not None and num is not None:
1893            pieces.extend([Token.get_token('LIMIT'), start, num])
1894        if withscores:
1895            pieces.append(Token.get_token('WITHSCORES'))
1896        options = {
1897            'withscores': withscores,
1898            'score_cast_func': score_cast_func
1899        }
1900        return self.execute_command(*pieces, **options)
1901
1902    def zrevrank(self, name, value):
1903        """
1904        Returns a 0-based value indicating the descending rank of
1905        ``value`` in sorted set ``name``
1906        """
1907        return self.execute_command('ZREVRANK', name, value)
1908
1909    def zscore(self, name, value):
1910        "Return the score of element ``value`` in sorted set ``name``"
1911        return self.execute_command('ZSCORE', name, value)
1912
1913    def zunionstore(self, dest, keys, aggregate=None):
1914        """
1915        Union multiple sorted sets specified by ``keys`` into
1916        a new sorted set, ``dest``. Scores in the destination will be
1917        aggregated based on the ``aggregate``, or SUM if none is provided.
1918        """
1919        return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
1920
1921    def _zaggregate(self, command, dest, keys, aggregate=None):
1922        pieces = [command, dest, len(keys)]
1923        if isinstance(keys, dict):
1924            keys, weights = iterkeys(keys), itervalues(keys)
1925        else:
1926            weights = None
1927        pieces.extend(keys)
1928        if weights:
1929            pieces.append(Token.get_token('WEIGHTS'))
1930            pieces.extend(weights)
1931        if aggregate:
1932            pieces.append(Token.get_token('AGGREGATE'))
1933            pieces.append(aggregate)
1934        return self.execute_command(*pieces)
1935
1936    # HYPERLOGLOG COMMANDS
1937    def pfadd(self, name, *values):
1938        "Adds the specified elements to the specified HyperLogLog."
1939        return self.execute_command('PFADD', name, *values)
1940
1941    def pfcount(self, *sources):
1942        """
1943        Return the approximated cardinality of
1944        the set observed by the HyperLogLog at key(s).
1945        """
1946        return self.execute_command('PFCOUNT', *sources)
1947
1948    def pfmerge(self, dest, *sources):
1949        "Merge N different HyperLogLogs into a single one."
1950        return self.execute_command('PFMERGE', dest, *sources)
1951
1952    # HASH COMMANDS
1953    def hdel(self, name, *keys):
1954        "Delete ``keys`` from hash ``name``"
1955        return self.execute_command('HDEL', name, *keys)
1956
1957    def hexists(self, name, key):
1958        "Returns a boolean indicating if ``key`` exists within hash ``name``"
1959        return self.execute_command('HEXISTS', name, key)
1960
1961    def hget(self, name, key):
1962        "Return the value of ``key`` within the hash ``name``"
1963        return self.execute_command('HGET', name, key)
1964
1965    def hgetall(self, name):
1966        "Return a Python dict of the hash's name/value pairs"
1967        return self.execute_command('HGETALL', name)
1968
1969    def hincrby(self, name, key, amount=1):
1970        "Increment the value of ``key`` in hash ``name`` by ``amount``"
1971        return self.execute_command('HINCRBY', name, key, amount)
1972
1973    def hincrbyfloat(self, name, key, amount=1.0):
1974        """
1975        Increment the value of ``key`` in hash ``name`` by floating ``amount``
1976        """
1977        return self.execute_command('HINCRBYFLOAT', name, key, amount)
1978
1979    def hkeys(self, name):
1980        "Return the list of keys within hash ``name``"
1981        return self.execute_command('HKEYS', name)
1982
1983    def hlen(self, name):
1984        "Return the number of elements in hash ``name``"
1985        return self.execute_command('HLEN', name)
1986
1987    def hset(self, name, key, value):
1988        """
1989        Set ``key`` to ``value`` within hash ``name``
1990        Returns 1 if HSET created a new field, otherwise 0
1991        """
1992        return self.execute_command('HSET', name, key, value)
1993
1994    def hsetnx(self, name, key, value):
1995        """
1996        Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
1997        exist.  Returns 1 if HSETNX created a field, otherwise 0.
1998        """
1999        return self.execute_command('HSETNX', name, key, value)
2000
2001    def hmset(self, name, mapping):
2002        """
2003        Set key to value within hash ``name`` for each corresponding
2004        key and value from the ``mapping`` dict.
2005        """
2006        if not mapping:
2007            raise DataError("'hmset' with 'mapping' of length 0")
2008        items = []
2009        for pair in iteritems(mapping):
2010            items.extend(pair)
2011        return self.execute_command('HMSET', name, *items)
2012
2013    def hmget(self, name, keys, *args):
2014        "Returns a list of values ordered identically to ``keys``"
2015        args = list_or_args(keys, args)
2016        return self.execute_command('HMGET', name, *args)
2017
2018    def hvals(self, name):
2019        "Return the list of values within hash ``name``"
2020        return self.execute_command('HVALS', name)
2021
2022    def hstrlen(self, name, key):
2023        """
2024        Return the number of bytes stored in the value of ``key``
2025        within hash ``name``
2026        """
2027        return self.execute_command('HSTRLEN', name, key)
2028
2029    def publish(self, channel, message):
2030        """
2031        Publish ``message`` on ``channel``.
2032        Returns the number of subscribers the message was delivered to.
2033        """
2034        return self.execute_command('PUBLISH', channel, message)
2035
2036    def pubsub_channels(self, pattern='*'):
2037        """
2038        Return a list of channels that have at least one subscriber
2039        """
2040        return self.execute_command('PUBSUB CHANNELS', pattern)
2041
2042    def pubsub_numpat(self):
2043        """
2044        Returns the number of subscriptions to patterns
2045        """
2046        return self.execute_command('PUBSUB NUMPAT')
2047
2048    def pubsub_numsub(self, *args):
2049        """
2050        Return a list of (channel, number of subscribers) tuples
2051        for each channel given in ``*args``
2052        """
2053        return self.execute_command('PUBSUB NUMSUB', *args)
2054
2055    def cluster(self, cluster_arg, *args):
2056        return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args)
2057
2058    def eval(self, script, numkeys, *keys_and_args):
2059        """
2060        Execute the Lua ``script``, specifying the ``numkeys`` the script
2061        will touch and the key names and argument values in ``keys_and_args``.
2062        Returns the result of the script.
2063
2064        In practice, use the object returned by ``register_script``. This
2065        function exists purely for Redis API completion.
2066        """
2067        return self.execute_command('EVAL', script, numkeys, *keys_and_args)
2068
2069    def evalsha(self, sha, numkeys, *keys_and_args):
2070        """
2071        Use the ``sha`` to execute a Lua script already registered via EVAL
2072        or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
2073        key names and argument values in ``keys_and_args``. Returns the result
2074        of the script.
2075
2076        In practice, use the object returned by ``register_script``. This
2077        function exists purely for Redis API completion.
2078        """
2079        return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
2080
2081    def script_exists(self, *args):
2082        """
2083        Check if a script exists in the script cache by specifying the SHAs of
2084        each script as ``args``. Returns a list of boolean values indicating if
2085        if each already script exists in the cache.
2086        """
2087        return self.execute_command('SCRIPT EXISTS', *args)
2088
2089    def script_flush(self):
2090        "Flush all scripts from the script cache"
2091        return self.execute_command('SCRIPT FLUSH')
2092
2093    def script_kill(self):
2094        "Kill the currently executing Lua script"
2095        return self.execute_command('SCRIPT KILL')
2096
2097    def script_load(self, script):
2098        "Load a Lua ``script`` into the script cache. Returns the SHA."
2099        return self.execute_command('SCRIPT LOAD', script)
2100
2101    def register_script(self, script):
2102        """
2103        Register a Lua ``script`` specifying the ``keys`` it will touch.
2104        Returns a Script object that is callable and hides the complexity of
2105        deal with scripts, keys, and shas. This is the preferred way to work
2106        with Lua scripts.
2107        """
2108        return Script(self, script)
2109
2110    # GEO COMMANDS
2111    def geoadd(self, name, *values):
2112        """
2113        Add the specified geospatial items to the specified key identified
2114        by the ``name`` argument. The Geospatial items are given as ordered
2115        members of the ``values`` argument, each item or place is formed by
2116        the triad longitude, latitude and name.
2117        """
2118        if len(values) % 3 != 0:
2119            raise RedisError("GEOADD requires places with lon, lat and name"
2120                             " values")
2121        return self.execute_command('GEOADD', name, *values)
2122
2123    def geodist(self, name, place1, place2, unit=None):
2124        """
2125        Return the distance between ``place1`` and ``place2`` members of the
2126        ``name`` key.
2127        The units must be one of the following : m, km mi, ft. By default
2128        meters are used.
2129        """
2130        pieces = [name, place1, place2]
2131        if unit and unit not in ('m', 'km', 'mi', 'ft'):
2132            raise RedisError("GEODIST invalid unit")
2133        elif unit:
2134            pieces.append(unit)
2135        return self.execute_command('GEODIST', *pieces)
2136
2137    def geohash(self, name, *values):
2138        """
2139        Return the geo hash string for each item of ``values`` members of
2140        the specified key identified by the ``name``argument.
2141        """
2142        return self.execute_command('GEOHASH', name, *values)
2143
2144    def geopos(self, name, *values):
2145        """
2146        Return the positions of each item of ``values`` as members of
2147        the specified key identified by the ``name``argument. Each position
2148        is represented by the pairs lon and lat.
2149        """
2150        return self.execute_command('GEOPOS', name, *values)
2151
2152    def georadius(self, name, longitude, latitude, radius, unit=None,
2153                  withdist=False, withcoord=False, withhash=False, count=None,
2154                  sort=None, store=None, store_dist=None):
2155        """
2156        Return the members of the specified key identified by the
2157        ``name`` argument which are within the borders of the area specified
2158        with the ``latitude`` and ``longitude`` location and the maximum
2159        distance from the center specified by the ``radius`` value.
2160
2161        The units must be one of the following : m, km mi, ft. By default
2162
2163        ``withdist`` indicates to return the distances of each place.
2164
2165        ``withcoord`` indicates to return the latitude and longitude of
2166        each place.
2167
2168        ``withhash`` indicates to return the geohash string of each place.
2169
2170        ``count`` indicates to return the number of elements up to N.
2171
2172        ``sort`` indicates to return the places in a sorted way, ASC for
2173        nearest to fairest and DESC for fairest to nearest.
2174
2175        ``store`` indicates to save the places names in a sorted set named
2176        with a specific key, each element of the destination sorted set is
2177        populated with the score got from the original geo sorted set.
2178
2179        ``store_dist`` indicates to save the places names in a sorted set
2180        named with a specific key, instead of ``store`` the sorted set
2181        destination score is set with the distance.
2182        """
2183        return self._georadiusgeneric('GEORADIUS',
2184                                      name, longitude, latitude, radius,
2185                                      unit=unit, withdist=withdist,
2186                                      withcoord=withcoord, withhash=withhash,
2187                                      count=count, sort=sort, store=store,
2188                                      store_dist=store_dist)
2189
2190    def georadiusbymember(self, name, member, radius, unit=None,
2191                          withdist=False, withcoord=False, withhash=False,
2192                          count=None, sort=None, store=None, store_dist=None):
2193        """
2194        This command is exactly like ``georadius`` with the sole difference
2195        that instead of taking, as the center of the area to query, a longitude
2196        and latitude value, it takes the name of a member already existing
2197        inside the geospatial index represented by the sorted set.
2198        """
2199        return self._georadiusgeneric('GEORADIUSBYMEMBER',
2200                                      name, member, radius, unit=unit,
2201                                      withdist=withdist, withcoord=withcoord,
2202                                      withhash=withhash, count=count,
2203                                      sort=sort, store=store,
2204                                      store_dist=store_dist)
2205
2206    def _georadiusgeneric(self, command, *args, **kwargs):
2207        pieces = list(args)
2208        if kwargs['unit'] and kwargs['unit'] not in ('m', 'km', 'mi', 'ft'):
2209            raise RedisError("GEORADIUS invalid unit")
2210        elif kwargs['unit']:
2211            pieces.append(kwargs['unit'])
2212        else:
2213            pieces.append('m',)
2214
2215        for token in ('withdist', 'withcoord', 'withhash'):
2216            if kwargs[token]:
2217                pieces.append(Token(token.upper()))
2218
2219        if kwargs['count']:
2220            pieces.extend([Token('COUNT'), kwargs['count']])
2221
2222        if kwargs['sort'] and kwargs['sort'] not in ('ASC', 'DESC'):
2223            raise RedisError("GEORADIUS invalid sort")
2224        elif kwargs['sort']:
2225            pieces.append(Token(kwargs['sort']))
2226
2227        if kwargs['store'] and kwargs['store_dist']:
2228            raise RedisError("GEORADIUS store and store_dist cant be set"
2229                             " together")
2230
2231        if kwargs['store']:
2232            pieces.extend([Token('STORE'), kwargs['store']])
2233
2234        if kwargs['store_dist']:
2235            pieces.extend([Token('STOREDIST'), kwargs['store_dist']])
2236
2237        return self.execute_command(command, *pieces, **kwargs)
2238
2239
2240class Redis(StrictRedis):
2241    """
2242    Provides backwards compatibility with older versions of redis-py that
2243    changed arguments to some commands to be more Pythonic, sane, or by
2244    accident.
2245    """
2246
2247    # Overridden callbacks
2248    RESPONSE_CALLBACKS = dict_merge(
2249        StrictRedis.RESPONSE_CALLBACKS,
2250        {
2251            'TTL': lambda r: r >= 0 and r or None,
2252            'PTTL': lambda r: r >= 0 and r or None,
2253        }
2254    )
2255
2256    def pipeline(self, transaction=True, shard_hint=None):
2257        """
2258        Return a new pipeline object that can queue multiple commands for
2259        later execution. ``transaction`` indicates whether all commands
2260        should be executed atomically. Apart from making a group of operations
2261        atomic, pipelines are useful for reducing the back-and-forth overhead
2262        between the client and server.
2263        """
2264        return Pipeline(
2265            self.connection_pool,
2266            self.response_callbacks,
2267            transaction,
2268            shard_hint)
2269
2270    def setex(self, name, value, time):
2271        """
2272        Set the value of key ``name`` to ``value`` that expires in ``time``
2273        seconds. ``time`` can be represented by an integer or a Python
2274        timedelta object.
2275        """
2276        if isinstance(time, datetime.timedelta):
2277            time = time.seconds + time.days * 24 * 3600
2278        return self.execute_command('SETEX', name, time, value)
2279
2280    def lrem(self, name, value, num=0):
2281        """
2282        Remove the first ``num`` occurrences of elements equal to ``value``
2283        from the list stored at ``name``.
2284
2285        The ``num`` argument influences the operation in the following ways:
2286            num > 0: Remove elements equal to value moving from head to tail.
2287            num < 0: Remove elements equal to value moving from tail to head.
2288            num = 0: Remove all elements equal to value.
2289        """
2290        return self.execute_command('LREM', name, num, value)
2291
2292    def zadd(self, name, *args, **kwargs):
2293        """
2294        NOTE: The order of arguments differs from that of the official ZADD
2295        command. For backwards compatability, this method accepts arguments
2296        in the form of name1, score1, name2, score2, while the official Redis
2297        documents expects score1, name1, score2, name2.
2298
2299        If you're looking to use the standard syntax, consider using the
2300        StrictRedis class. See the API Reference section of the docs for more
2301        information.
2302
2303        Set any number of element-name, score pairs to the key ``name``. Pairs
2304        can be specified in two ways:
2305
2306        As *args, in the form of: name1, score1, name2, score2, ...
2307        or as **kwargs, in the form of: name1=score1, name2=score2, ...
2308
2309        The following example would add four values to the 'my-key' key:
2310        redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4)
2311        """
2312        pieces = []
2313        if args:
2314            if len(args) % 2 != 0:
2315                raise RedisError("ZADD requires an equal number of "
2316                                 "values and scores")
2317            pieces.extend(reversed(args))
2318        for pair in iteritems(kwargs):
2319            pieces.append(pair[1])
2320            pieces.append(pair[0])
2321        return self.execute_command('ZADD', name, *pieces)
2322
2323
2324class PubSub(object):
2325    """
2326    PubSub provides publish, subscribe and listen support to Redis channels.
2327
2328    After subscribing to one or more channels, the listen() method will block
2329    until a message arrives on one of the subscribed channels. That message
2330    will be returned and it's safe to start listening again.
2331    """
2332    PUBLISH_MESSAGE_TYPES = ('message', 'pmessage')
2333    UNSUBSCRIBE_MESSAGE_TYPES = ('unsubscribe', 'punsubscribe')
2334
2335    def __init__(self, connection_pool, shard_hint=None,
2336                 ignore_subscribe_messages=False):
2337        self.connection_pool = connection_pool
2338        self.shard_hint = shard_hint
2339        self.ignore_subscribe_messages = ignore_subscribe_messages
2340        self.connection = None
2341        # we need to know the encoding options for this connection in order
2342        # to lookup channel and pattern names for callback handlers.
2343        self.encoder = self.connection_pool.get_encoder()
2344        self.reset()
2345
2346    def __del__(self):
2347        try:
2348            # if this object went out of scope prior to shutting down
2349            # subscriptions, close the connection manually before
2350            # returning it to the connection pool
2351            self.reset()
2352        except Exception:
2353            pass
2354
2355    def reset(self):
2356        if self.connection:
2357            self.connection.disconnect()
2358            self.connection.clear_connect_callbacks()
2359            self.connection_pool.release(self.connection)
2360            self.connection = None
2361        self.channels = {}
2362        self.patterns = {}
2363
2364    def close(self):
2365        self.reset()
2366
2367    def on_connect(self, connection):
2368        "Re-subscribe to any channels and patterns previously subscribed to"
2369        # NOTE: for python3, we can't pass bytestrings as keyword arguments
2370        # so we need to decode channel/pattern names back to unicode strings
2371        # before passing them to [p]subscribe.
2372        if self.channels:
2373            channels = {}
2374            for k, v in iteritems(self.channels):
2375                channels[self.encoder.decode(k, force=True)] = v
2376            self.subscribe(**channels)
2377        if self.patterns:
2378            patterns = {}
2379            for k, v in iteritems(self.patterns):
2380                patterns[self.encoder.decode(k, force=True)] = v
2381            self.psubscribe(**patterns)
2382
2383    @property
2384    def subscribed(self):
2385        "Indicates if there are subscriptions to any channels or patterns"
2386        return bool(self.channels or self.patterns)
2387
2388    def execute_command(self, *args, **kwargs):
2389        "Execute a publish/subscribe command"
2390
2391        # NOTE: don't parse the response in this function -- it could pull a
2392        # legitimate message off the stack if the connection is already
2393        # subscribed to one or more channels
2394
2395        if self.connection is None:
2396            self.connection = self.connection_pool.get_connection(
2397                'pubsub',
2398                self.shard_hint
2399            )
2400            # register a callback that re-subscribes to any channels we
2401            # were listening to when we were disconnected
2402            self.connection.register_connect_callback(self.on_connect)
2403        connection = self.connection
2404        self._execute(connection, connection.send_command, *args)
2405
2406    def _execute(self, connection, command, *args):
2407        try:
2408            return command(*args)
2409        except (ConnectionError, TimeoutError) as e:
2410            connection.disconnect()
2411            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
2412                raise
2413            # Connect manually here. If the Redis server is down, this will
2414            # fail and raise a ConnectionError as desired.
2415            connection.connect()
2416            # the ``on_connect`` callback should haven been called by the
2417            # connection to resubscribe us to any channels and patterns we were
2418            # previously listening to
2419            return command(*args)
2420
2421    def parse_response(self, block=True, timeout=0):
2422        "Parse the response from a publish/subscribe command"
2423        connection = self.connection
2424        if connection is None:
2425            raise RuntimeError(
2426                'pubsub connection not set: '
2427                'did you forget to call subscribe() or psubscribe()?')
2428        if not block and not connection.can_read(timeout=timeout):
2429            return None
2430        return self._execute(connection, connection.read_response)
2431
2432    def _normalize_keys(self, data):
2433        """
2434        normalize channel/pattern names to be either bytes or strings
2435        based on whether responses are automatically decoded. this saves us
2436        from coercing the value for each message coming in.
2437        """
2438        encode = self.encoder.encode
2439        decode = self.encoder.decode
2440        return dict([(decode(encode(k)), v) for k, v in iteritems(data)])
2441
2442    def psubscribe(self, *args, **kwargs):
2443        """
2444        Subscribe to channel patterns. Patterns supplied as keyword arguments
2445        expect a pattern name as the key and a callable as the value. A
2446        pattern's callable will be invoked automatically when a message is
2447        received on that pattern rather than producing a message via
2448        ``listen()``.
2449        """
2450        if args:
2451            args = list_or_args(args[0], args[1:])
2452        new_patterns = dict.fromkeys(args)
2453        new_patterns.update(kwargs)
2454        ret_val = self.execute_command('PSUBSCRIBE', *iterkeys(new_patterns))
2455        # update the patterns dict AFTER we send the command. we don't want to
2456        # subscribe twice to these patterns, once for the command and again
2457        # for the reconnection.
2458        self.patterns.update(self._normalize_keys(new_patterns))
2459        return ret_val
2460
2461    def punsubscribe(self, *args):
2462        """
2463        Unsubscribe from the supplied patterns. If empy, unsubscribe from
2464        all patterns.
2465        """
2466        if args:
2467            args = list_or_args(args[0], args[1:])
2468        return self.execute_command('PUNSUBSCRIBE', *args)
2469
2470    def subscribe(self, *args, **kwargs):
2471        """
2472        Subscribe to channels. Channels supplied as keyword arguments expect
2473        a channel name as the key and a callable as the value. A channel's
2474        callable will be invoked automatically when a message is received on
2475        that channel rather than producing a message via ``listen()`` or
2476        ``get_message()``.
2477        """
2478        if args:
2479            args = list_or_args(args[0], args[1:])
2480        new_channels = dict.fromkeys(args)
2481        new_channels.update(kwargs)
2482        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
2483        # update the channels dict AFTER we send the command. we don't want to
2484        # subscribe twice to these channels, once for the command and again
2485        # for the reconnection.
2486        self.channels.update(self._normalize_keys(new_channels))
2487        return ret_val
2488
2489    def unsubscribe(self, *args):
2490        """
2491        Unsubscribe from the supplied channels. If empty, unsubscribe from
2492        all channels
2493        """
2494        if args:
2495            args = list_or_args(args[0], args[1:])
2496        return self.execute_command('UNSUBSCRIBE', *args)
2497
2498    def listen(self):
2499        "Listen for messages on channels this client has been subscribed to"
2500        while self.subscribed:
2501            response = self.handle_message(self.parse_response(block=True))
2502            if response is not None:
2503                yield response
2504
2505    def get_message(self, ignore_subscribe_messages=False, timeout=0):
2506        """
2507        Get the next message if one is available, otherwise None.
2508
2509        If timeout is specified, the system will wait for `timeout` seconds
2510        before returning. Timeout should be specified as a floating point
2511        number.
2512        """
2513        response = self.parse_response(block=False, timeout=timeout)
2514        if response:
2515            return self.handle_message(response, ignore_subscribe_messages)
2516        return None
2517
2518    def handle_message(self, response, ignore_subscribe_messages=False):
2519        """
2520        Parses a pub/sub message. If the channel or pattern was subscribed to
2521        with a message handler, the handler is invoked instead of a parsed
2522        message being returned.
2523        """
2524        message_type = nativestr(response[0])
2525        if message_type == 'pmessage':
2526            message = {
2527                'type': message_type,
2528                'pattern': response[1],
2529                'channel': response[2],
2530                'data': response[3]
2531            }
2532        else:
2533            message = {
2534                'type': message_type,
2535                'pattern': None,
2536                'channel': response[1],
2537                'data': response[2]
2538            }
2539
2540        # if this is an unsubscribe message, remove it from memory
2541        if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
2542            subscribed_dict = None
2543            if message_type == 'punsubscribe':
2544                subscribed_dict = self.patterns
2545            else:
2546                subscribed_dict = self.channels
2547            try:
2548                del subscribed_dict[message['channel']]
2549            except KeyError:
2550                pass
2551
2552        if message_type in self.PUBLISH_MESSAGE_TYPES:
2553            # if there's a message handler, invoke it
2554            handler = None
2555            if message_type == 'pmessage':
2556                handler = self.patterns.get(message['pattern'], None)
2557            else:
2558                handler = self.channels.get(message['channel'], None)
2559            if handler:
2560                handler(message)
2561                return None
2562        else:
2563            # this is a subscribe/unsubscribe message. ignore if we don't
2564            # want them
2565            if ignore_subscribe_messages or self.ignore_subscribe_messages:
2566                return None
2567
2568        return message
2569
2570    def run_in_thread(self, sleep_time=0, daemon=False):
2571        for channel, handler in iteritems(self.channels):
2572            if handler is None:
2573                raise PubSubError("Channel: '%s' has no handler registered")
2574        for pattern, handler in iteritems(self.patterns):
2575            if handler is None:
2576                raise PubSubError("Pattern: '%s' has no handler registered")
2577
2578        thread = PubSubWorkerThread(self, sleep_time, daemon=daemon)
2579        thread.start()
2580        return thread
2581
2582
2583class PubSubWorkerThread(threading.Thread):
2584    def __init__(self, pubsub, sleep_time, daemon=False):
2585        super(PubSubWorkerThread, self).__init__()
2586        self.daemon = daemon
2587        self.pubsub = pubsub
2588        self.sleep_time = sleep_time
2589        self._running = False
2590
2591    def run(self):
2592        if self._running:
2593            return
2594        self._running = True
2595        pubsub = self.pubsub
2596        sleep_time = self.sleep_time
2597        while pubsub.subscribed:
2598            pubsub.get_message(ignore_subscribe_messages=True,
2599                               timeout=sleep_time)
2600        pubsub.close()
2601        self._running = False
2602
2603    def stop(self):
2604        # stopping simply unsubscribes from all channels and patterns.
2605        # the unsubscribe responses that are generated will short circuit
2606        # the loop in run(), calling pubsub.close() to clean up the connection
2607        self.pubsub.unsubscribe()
2608        self.pubsub.punsubscribe()
2609
2610
2611class BasePipeline(object):
2612    """
2613    Pipelines provide a way to transmit multiple commands to the Redis server
2614    in one transmission.  This is convenient for batch processing, such as
2615    saving all the values in a list to Redis.
2616
2617    All commands executed within a pipeline are wrapped with MULTI and EXEC
2618    calls. This guarantees all commands executed in the pipeline will be
2619    executed atomically.
2620
2621    Any command raising an exception does *not* halt the execution of
2622    subsequent commands in the pipeline. Instead, the exception is caught
2623    and its instance is placed into the response list returned by execute().
2624    Code iterating over the response list should be able to deal with an
2625    instance of an exception as a potential value. In general, these will be
2626    ResponseError exceptions, such as those raised when issuing a command
2627    on a key of a different datatype.
2628    """
2629
2630    UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH'))
2631
2632    def __init__(self, connection_pool, response_callbacks, transaction,
2633                 shard_hint):
2634        self.connection_pool = connection_pool
2635        self.connection = None
2636        self.response_callbacks = response_callbacks
2637        self.transaction = transaction
2638        self.shard_hint = shard_hint
2639
2640        self.watching = False
2641        self.reset()
2642
2643    def __enter__(self):
2644        return self
2645
2646    def __exit__(self, exc_type, exc_value, traceback):
2647        self.reset()
2648
2649    def __del__(self):
2650        try:
2651            self.reset()
2652        except Exception:
2653            pass
2654
2655    def __len__(self):
2656        return len(self.command_stack)
2657
2658    def reset(self):
2659        self.command_stack = []
2660        self.scripts = set()
2661        # make sure to reset the connection state in the event that we were
2662        # watching something
2663        if self.watching and self.connection:
2664            try:
2665                # call this manually since our unwatch or
2666                # immediate_execute_command methods can call reset()
2667                self.connection.send_command('UNWATCH')
2668                self.connection.read_response()
2669            except ConnectionError:
2670                # disconnect will also remove any previous WATCHes
2671                self.connection.disconnect()
2672        # clean up the other instance attributes
2673        self.watching = False
2674        self.explicit_transaction = False
2675        # we can safely return the connection to the pool here since we're
2676        # sure we're no longer WATCHing anything
2677        if self.connection:
2678            self.connection_pool.release(self.connection)
2679            self.connection = None
2680
2681    def multi(self):
2682        """
2683        Start a transactional block of the pipeline after WATCH commands
2684        are issued. End the transactional block with `execute`.
2685        """
2686        if self.explicit_transaction:
2687            raise RedisError('Cannot issue nested calls to MULTI')
2688        if self.command_stack:
2689            raise RedisError('Commands without an initial WATCH have already '
2690                             'been issued')
2691        self.explicit_transaction = True
2692
2693    def execute_command(self, *args, **kwargs):
2694        if (self.watching or args[0] == 'WATCH') and \
2695                not self.explicit_transaction:
2696            return self.immediate_execute_command(*args, **kwargs)
2697        return self.pipeline_execute_command(*args, **kwargs)
2698
2699    def immediate_execute_command(self, *args, **options):
2700        """
2701        Execute a command immediately, but don't auto-retry on a
2702        ConnectionError if we're already WATCHing a variable. Used when
2703        issuing WATCH or subsequent commands retrieving their values but before
2704        MULTI is called.
2705        """
2706        command_name = args[0]
2707        conn = self.connection
2708        # if this is the first call, we need a connection
2709        if not conn:
2710            conn = self.connection_pool.get_connection(command_name,
2711                                                       self.shard_hint)
2712            self.connection = conn
2713        try:
2714            conn.send_command(*args)
2715            return self.parse_response(conn, command_name, **options)
2716        except (ConnectionError, TimeoutError) as e:
2717            conn.disconnect()
2718            if not conn.retry_on_timeout and isinstance(e, TimeoutError):
2719                raise
2720            # if we're not already watching, we can safely retry the command
2721            try:
2722                if not self.watching:
2723                    conn.send_command(*args)
2724                    return self.parse_response(conn, command_name, **options)
2725            except ConnectionError:
2726                # the retry failed so cleanup.
2727                conn.disconnect()
2728                self.reset()
2729                raise
2730
2731    def pipeline_execute_command(self, *args, **options):
2732        """
2733        Stage a command to be executed when execute() is next called
2734
2735        Returns the current Pipeline object back so commands can be
2736        chained together, such as:
2737
2738        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2739
2740        At some other point, you can then run: pipe.execute(),
2741        which will execute all commands queued in the pipe.
2742        """
2743        self.command_stack.append((args, options))
2744        return self
2745
2746    def _execute_transaction(self, connection, commands, raise_on_error):
2747        cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})])
2748        all_cmds = connection.pack_commands([args for args, _ in cmds])
2749        connection.send_packed_command(all_cmds)
2750        errors = []
2751
2752        # parse off the response for MULTI
2753        # NOTE: we need to handle ResponseErrors here and continue
2754        # so that we read all the additional command messages from
2755        # the socket
2756        try:
2757            self.parse_response(connection, '_')
2758        except ResponseError:
2759            errors.append((0, sys.exc_info()[1]))
2760
2761        # and all the other commands
2762        for i, command in enumerate(commands):
2763            try:
2764                self.parse_response(connection, '_')
2765            except ResponseError:
2766                ex = sys.exc_info()[1]
2767                self.annotate_exception(ex, i + 1, command[0])
2768                errors.append((i, ex))
2769
2770        # parse the EXEC.
2771        try:
2772            response = self.parse_response(connection, '_')
2773        except ExecAbortError:
2774            if self.explicit_transaction:
2775                self.immediate_execute_command('DISCARD')
2776            if errors:
2777                raise errors[0][1]
2778            raise sys.exc_info()[1]
2779
2780        if response is None:
2781            raise WatchError("Watched variable changed.")
2782
2783        # put any parse errors into the response
2784        for i, e in errors:
2785            response.insert(i, e)
2786
2787        if len(response) != len(commands):
2788            self.connection.disconnect()
2789            raise ResponseError("Wrong number of response items from "
2790                                "pipeline execution")
2791
2792        # find any errors in the response and raise if necessary
2793        if raise_on_error:
2794            self.raise_first_error(commands, response)
2795
2796        # We have to run response callbacks manually
2797        data = []
2798        for r, cmd in izip(response, commands):
2799            if not isinstance(r, Exception):
2800                args, options = cmd
2801                command_name = args[0]
2802                if command_name in self.response_callbacks:
2803                    r = self.response_callbacks[command_name](r, **options)
2804            data.append(r)
2805        return data
2806
2807    def _execute_pipeline(self, connection, commands, raise_on_error):
2808        # build up all commands into a single request to increase network perf
2809        all_cmds = connection.pack_commands([args for args, _ in commands])
2810        connection.send_packed_command(all_cmds)
2811
2812        response = []
2813        for args, options in commands:
2814            try:
2815                response.append(
2816                    self.parse_response(connection, args[0], **options))
2817            except ResponseError:
2818                response.append(sys.exc_info()[1])
2819
2820        if raise_on_error:
2821            self.raise_first_error(commands, response)
2822        return response
2823
2824    def raise_first_error(self, commands, response):
2825        for i, r in enumerate(response):
2826            if isinstance(r, ResponseError):
2827                self.annotate_exception(r, i + 1, commands[i][0])
2828                raise r
2829
2830    def annotate_exception(self, exception, number, command):
2831        cmd = safe_unicode(' ').join(imap(safe_unicode, command))
2832        msg = unicode('Command # %d (%s) of pipeline caused error: %s') % (
2833            number, cmd, safe_unicode(exception.args[0]))
2834        exception.args = (msg,) + exception.args[1:]
2835
2836    def parse_response(self, connection, command_name, **options):
2837        result = StrictRedis.parse_response(
2838            self, connection, command_name, **options)
2839        if command_name in self.UNWATCH_COMMANDS:
2840            self.watching = False
2841        elif command_name == 'WATCH':
2842            self.watching = True
2843        return result
2844
2845    def load_scripts(self):
2846        # make sure all scripts that are about to be run on this pipeline exist
2847        scripts = list(self.scripts)
2848        immediate = self.immediate_execute_command
2849        shas = [s.sha for s in scripts]
2850        # we can't use the normal script_* methods because they would just
2851        # get buffered in the pipeline.
2852        exists = immediate('SCRIPT EXISTS', *shas)
2853        if not all(exists):
2854            for s, exist in izip(scripts, exists):
2855                if not exist:
2856                    s.sha = immediate('SCRIPT LOAD', s.script)
2857
2858    def execute(self, raise_on_error=True):
2859        "Execute all the commands in the current pipeline"
2860        stack = self.command_stack
2861        if not stack:
2862            return []
2863        if self.scripts:
2864            self.load_scripts()
2865        if self.transaction or self.explicit_transaction:
2866            execute = self._execute_transaction
2867        else:
2868            execute = self._execute_pipeline
2869
2870        conn = self.connection
2871        if not conn:
2872            conn = self.connection_pool.get_connection('MULTI',
2873                                                       self.shard_hint)
2874            # assign to self.connection so reset() releases the connection
2875            # back to the pool after we're done
2876            self.connection = conn
2877
2878        try:
2879            return execute(conn, stack, raise_on_error)
2880        except (ConnectionError, TimeoutError) as e:
2881            conn.disconnect()
2882            if not conn.retry_on_timeout and isinstance(e, TimeoutError):
2883                raise
2884            # if we were watching a variable, the watch is no longer valid
2885            # since this connection has died. raise a WatchError, which
2886            # indicates the user should retry his transaction. If this is more
2887            # than a temporary failure, the WATCH that the user next issues
2888            # will fail, propegating the real ConnectionError
2889            if self.watching:
2890                raise WatchError("A ConnectionError occured on while watching "
2891                                 "one or more keys")
2892            # otherwise, it's safe to retry since the transaction isn't
2893            # predicated on any state
2894            return execute(conn, stack, raise_on_error)
2895        finally:
2896            self.reset()
2897
2898    def watch(self, *names):
2899        "Watches the values at keys ``names``"
2900        if self.explicit_transaction:
2901            raise RedisError('Cannot issue a WATCH after a MULTI')
2902        return self.execute_command('WATCH', *names)
2903
2904    def unwatch(self):
2905        "Unwatches all previously specified keys"
2906        return self.watching and self.execute_command('UNWATCH') or True
2907
2908
2909class StrictPipeline(BasePipeline, StrictRedis):
2910    "Pipeline for the StrictRedis class"
2911    pass
2912
2913
2914class Pipeline(BasePipeline, Redis):
2915    "Pipeline for the Redis class"
2916    pass
2917
2918
2919class Script(object):
2920    "An executable Lua script object returned by ``register_script``"
2921
2922    def __init__(self, registered_client, script):
2923        self.registered_client = registered_client
2924        self.script = script
2925        # Precalculate and store the SHA1 hex digest of the script.
2926
2927        if isinstance(script, basestring):
2928            # We need the encoding from the client in order to generate an
2929            # accurate byte representation of the script
2930            encoder = registered_client.connection_pool.get_encoder()
2931            script = encoder.encode(script)
2932        self.sha = hashlib.sha1(script).hexdigest()
2933
2934    def __call__(self, keys=[], args=[], client=None):
2935        "Execute the script, passing any required ``args``"
2936        if client is None:
2937            client = self.registered_client
2938        args = tuple(keys) + tuple(args)
2939        # make sure the Redis server knows about the script
2940        if isinstance(client, BasePipeline):
2941            # Make sure the pipeline can register the script before executing.
2942            client.scripts.add(self)
2943        try:
2944            return client.evalsha(self.sha, len(keys), *args)
2945        except NoScriptError:
2946            # Maybe the client is pointed to a differnet server than the client
2947            # that created this instance?
2948            # Overwrite the sha just in case there was a discrepancy.
2949            self.sha = client.script_load(self.script)
2950            return client.evalsha(self.sha, len(keys), *args)
2951