1# coding: utf-8
2# Copyright 2009 Alexandre Fiori
3# https://github.com/fiorix/txredisapi
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17#
18# Credits:
19#   The Protocol class is an improvement of txRedis' protocol,
20#   by Dorian Raymer and Ludovico Magnocavallo.
21#
22#   Sharding and Consistent Hashing implementation by Gleicon Moraes.
23#
24
25import six
26
27import bisect
28import collections
29import functools
30import operator
31import re
32import warnings
33import zlib
34import string
35import hashlib
36import random
37
38from twisted.internet import defer
39from twisted.internet import protocol
40from twisted.internet import reactor
41from twisted.internet.tcp import Connector
42from twisted.protocols import basic
43from twisted.protocols import policies
44from twisted.python import log
45from twisted.python.failure import Failure
46
47try:
48    import hiredis
49except ImportError:
50    hiredis = None
51
52
53class RedisError(Exception):
54    pass
55
56
57class ConnectionError(RedisError):
58    pass
59
60
61class ResponseError(RedisError):
62    pass
63
64
65class ScriptDoesNotExist(ResponseError):
66    pass
67
68
69class NoScriptRunning(ResponseError):
70    pass
71
72
73class InvalidResponse(RedisError):
74    pass
75
76
77class InvalidData(RedisError):
78    pass
79
80
81class WatchError(RedisError):
82    pass
83
84
85class TimeoutError(ConnectionError):
86    pass
87
88
89def list_or_args(command, keys, args):
90    oldapi = bool(args)
91    try:
92        iter(keys)
93        if isinstance(keys, six.string_types) or \
94                isinstance(keys, six.binary_type):
95            keys = [keys]
96            if not oldapi:
97                return keys
98            oldapi = True
99    except TypeError:
100        oldapi = True
101        keys = [keys]
102
103    if oldapi:
104        warnings.warn(DeprecationWarning(
105            "Passing *args to redis.%s is deprecated. "
106            "Pass an iterable to ``keys`` instead" % command))
107        keys.extend(args)
108    return keys
109
110# Possible first characters in a string containing an integer or a float.
111_NUM_FIRST_CHARS = frozenset(string.digits + "+-.")
112
113
114class MultiBulkStorage(object):
115    def __init__(self, parent=None):
116        self.items = None
117        self.pending = None
118        self.parent = parent
119
120    def set_pending(self, pending):
121        if self.pending is None:
122            if pending < 0:
123                self.items = None
124                self.pending = 0
125            else:
126                self.items = []
127                self.pending = pending
128            return self
129        else:
130            m = MultiBulkStorage(self)
131            m.set_pending(pending)
132            return m
133
134    def append(self, item):
135        self.pending -= 1
136        self.items.append(item)
137
138
139class LineReceiver(protocol.Protocol, basic._PauseableMixin):
140    callLater = reactor.callLater
141    line_mode = 1
142    __buffer = six.b('')
143    delimiter = six.b('\r\n')
144    MAX_LENGTH = 16384
145
146    def clearLineBuffer(self):
147        b = self.__buffer
148        self.__buffer = six.b('')
149        return b
150
151    def dataReceived(self, data, unpause=False):
152        if unpause is True:
153            if self.__buffer:
154                self.__buffer = data + self.__buffer
155            else:
156                self.__buffer += data
157
158            self.resumeProducing()
159        else:
160            self.__buffer = self.__buffer + data
161
162        while self.line_mode and not self.paused:
163            try:
164                line, self.__buffer = self.__buffer.split(self.delimiter, 1)
165            except ValueError:
166                if len(self.__buffer) > self.MAX_LENGTH:
167                    line, self.__buffer = self.__buffer, six.b('')
168                    return self.lineLengthExceeded(line)
169                break
170            else:
171                linelength = len(line)
172                if linelength > self.MAX_LENGTH:
173                    exceeded = line + self.__buffer
174                    self.__buffer = six.b('')
175                    return self.lineLengthExceeded(exceeded)
176                if hasattr(line, 'decode'):
177                    why = self.lineReceived(line.decode())
178                else:
179                    why = self.lineReceived(line)
180                if why or self.transport and self.transport.disconnecting:
181                    return why
182        else:
183            if not self.paused:
184                data = self.__buffer
185                self.__buffer = six.b('')
186                if data:
187                    return self.rawDataReceived(data)
188
189    def setLineMode(self, extra=six.b('')):
190        self.line_mode = 1
191        if extra:
192            self.pauseProducing()
193            self.callLater(0, self.dataReceived, extra, True)
194
195    def setRawMode(self):
196        self.line_mode = 0
197
198    def rawDataReceived(self, data):
199        raise NotImplementedError
200
201    def lineReceived(self, line):
202        raise NotImplementedError
203
204    def sendLine(self, line):
205        if isinstance(line, six.text_type):
206            line = line.encode()
207        return self.transport.write(line + self.delimiter)
208
209    def lineLengthExceeded(self, line):
210        return self.transport.loseConnection()
211
212
213class ReplyQueue(defer.DeferredQueue):
214    """
215    Subclass defer.DeferredQueue to maintain consistency of
216    producers / consumers in light of defer.cancel
217    """
218    def _cancelGet(self, d):
219        # rather than remove(d), the default twisted behavior
220        # we need to maintain an entry in the waiting list
221        # because the reply code assumes that every call
222        # to transport.write() generates a corresponding
223        # reply value in the queue.
224        # so we will just replace the cancelled deferred
225        # with a noop
226        i = self.waiting.index(d)
227        self.waiting[i] = defer.Deferred()
228
229
230def _blocking_command(release_on_callback):
231    """
232    Decorator used for marking protocol methods as `blocking` (methods that
233    block connection from being used for sending another requests)
234
235    release_on_callback means whether connection should be automatically
236    released when deferred returned by method is fired
237    """
238    def decorator(method):
239        method._blocking = True
240        method._release_on_callback = release_on_callback
241        return method
242    return decorator
243
244
245class BaseRedisProtocol(LineReceiver):
246    """
247    Redis client protocol.
248    """
249
250    def __init__(self, charset="utf-8", errors="strict", replyTimeout=None,
251                 password=None, dbid=None, convertNumbers=True):
252        self.charset = charset
253        self.errors = errors
254
255        self.bulk_length = 0
256        self.bulk_buffer = bytearray()
257
258        self.post_proc = []
259        self.multi_bulk = MultiBulkStorage()
260
261        self.replyQueue = ReplyQueue()
262
263        self.transactions = 0
264        self.pendingTransaction = False
265        self.inTransaction = False
266        self.inMulti = False
267        self.unwatch_cc = lambda: ()
268        self.commit_cc = lambda: ()
269
270        self.script_hashes = set()
271
272        self.pipelining = False
273        self.pipelined_commands = []
274        self.pipelined_replies = []
275
276        self.replyTimeout = replyTimeout
277        self.password = password
278        self.dbid = dbid
279        self.convertNumbers = convertNumbers
280
281        self._waiting_for_connect = []
282        self._waiting_for_disconnect = []
283
284
285    def whenConnected(self):
286        d = defer.Deferred()
287        self._waiting_for_connect.append(d)
288        return d
289
290
291    def whenDisconnected(self):
292        d = defer.Deferred()
293        self._waiting_for_disconnect.append(d)
294        return d
295
296
297    @defer.inlineCallbacks
298    def connectionMade(self):
299        if self.password is not None:
300            try:
301                response = yield self.auth(self.password)
302                if isinstance(response, ResponseError):
303                    raise response
304            except Exception as e:
305                self.factory.continueTrying = False
306                self.transport.loseConnection()
307
308                msg = "Redis error: could not auth: %s" % (str(e))
309                self.factory.connectionError(msg)
310                if self.factory.isLazy:
311                    log.msg(msg)
312                defer.returnValue(None)
313
314        if self.dbid is not None:
315            try:
316                response = yield self.select(self.dbid)
317                if isinstance(response, ResponseError):
318                    raise response
319            except Exception as e:
320                self.factory.continueTrying = False
321                self.transport.loseConnection()
322
323                msg = "Redis error: could not set dbid=%s: %s" % \
324                      (self.dbid, str(e))
325                self.factory.connectionError(msg)
326                if self.factory.isLazy:
327                    log.msg(msg)
328                defer.returnValue(None)
329
330        self.connected = 1
331        self._waiting_for_connect, dfrs = [], self._waiting_for_connect
332        for d in dfrs:
333            d.callback(self)
334
335    def connectionLost(self, why):
336        self.connected = 0
337        self.script_hashes.clear()
338
339        self._waiting_for_disconnect, dfrs = [], self._waiting_for_disconnect
340        for d in dfrs:
341            d.callback(self)
342
343        LineReceiver.connectionLost(self, why)
344        while self.replyQueue.waiting:
345            self.replyReceived(ConnectionError("Lost connection"))
346
347    def lineReceived(self, line):
348        """
349        Reply types:
350          "-" error message
351          "+" single line status reply
352          ":" integer number (protocol level only?)
353          "$" bulk data
354          "*" multi-bulk data
355        """
356        if line:
357            token, data = line[0], line[1:]
358        else:
359            return
360
361        if token == "$":  # bulk data
362            try:
363                self.bulk_length = int(data)
364            except ValueError:
365                self.replyReceived(InvalidResponse("Cannot convert data "
366                                                   "'%s' to integer" % data))
367            else:
368                if self.bulk_length == -1:
369                    self.bulk_length = 0
370                    self.bulkDataReceived(None)
371                else:
372                    self.bulk_length += 2  # 2 == \r\n
373                    self.setRawMode()
374
375        elif token == "*":  # multi-bulk data
376            try:
377                n = int(data)
378            except (TypeError, ValueError):
379                self.multi_bulk = MultiBulkStorage()
380                self.replyReceived(InvalidResponse("Cannot convert "
381                                                   "multi-response header "
382                                                   "'%s' to integer" % data))
383            else:
384                self.multi_bulk = self.multi_bulk.set_pending(n)
385                if n in (0, -1):
386                    self.multiBulkDataReceived()
387
388        elif token == "+":  # single line status
389            if data == "QUEUED":
390                self.transactions += 1
391                self.replyReceived(data)
392            else:
393                if self.multi_bulk.pending:
394                    self.handleMultiBulkElement(data)
395                else:
396                    self.replyReceived(data)
397
398        elif token == "-":  # error
399            reply = ResponseError(data[4:] if data[:4] == "ERR" else data)
400            if self.multi_bulk.pending:
401                self.handleMultiBulkElement(reply)
402            else:
403                self.replyReceived(reply)
404
405        elif token == ":":  # integer
406            try:
407                reply = int(data)
408            except ValueError:
409                reply = InvalidResponse(
410                    "Cannot convert data '%s' to integer" % data)
411
412            if self.multi_bulk.pending:
413                self.handleMultiBulkElement(reply)
414            else:
415                self.replyReceived(reply)
416
417    def rawDataReceived(self, data):
418        """
419        Process and dispatch to bulkDataReceived.
420        """
421        if self.bulk_length:
422            data, rest = data[:self.bulk_length], data[self.bulk_length:]
423            self.bulk_length -= len(data)
424        else:
425            rest = ""
426
427        self.bulk_buffer.extend(data)
428        if self.bulk_length == 0:
429            bulk_buffer = self.bulk_buffer[:-2]
430            self.bulk_buffer = bytearray()
431            self.bulkDataReceived(bytes(bulk_buffer))
432            self.setLineMode(extra=rest)
433
434    def bulkDataReceived(self, data):
435        """
436        Receipt of a bulk data element.
437        """
438        el = None
439        if data is not None:
440            el = self.tryConvertData(data)
441
442        if self.multi_bulk.pending or self.multi_bulk.items:
443            self.handleMultiBulkElement(el)
444        else:
445            self.replyReceived(el)
446
447    def tryConvertData(self, data):
448        # The hiredis reader implicitly returns integers
449        if isinstance(data, six.integer_types):
450            return data
451        if isinstance(data, list):
452            return [self.tryConvertData(x) for x in data]
453        el = None
454        if self.convertNumbers:
455            if data:
456                num_data = data
457                try:
458                    if isinstance(data, six.binary_type):
459                        num_data = data.decode()
460                except UnicodeError:
461                    pass
462                else:
463                    if num_data[0] in _NUM_FIRST_CHARS:  # Most likely a number
464                        try:
465                            el = int(num_data) if num_data.find('.') == -1 \
466                                else float(num_data)
467                        except ValueError:
468                            pass
469
470        if el is None:
471            el = data
472            if self.charset is not None:
473                try:
474                    el = data.decode(self.charset)
475                except UnicodeDecodeError:
476                    pass
477                except AttributeError:
478                    el = data
479        return el
480
481    def handleMultiBulkElement(self, element):
482        self.multi_bulk.append(element)
483
484        if not self.multi_bulk.pending:
485            self.multiBulkDataReceived()
486
487    def multiBulkDataReceived(self):
488        """
489        Receipt of list or set of bulk data elements.
490        """
491        while self.multi_bulk.parent and not self.multi_bulk.pending:
492            p = self.multi_bulk.parent
493            p.append(self.multi_bulk.items)
494            self.multi_bulk = p
495
496        if not self.multi_bulk.pending:
497            reply = self.multi_bulk.items
498            self.multi_bulk = MultiBulkStorage()
499
500            reply = self.handleTransactionData(reply)
501
502            self.replyReceived(reply)
503
504    def handleTransactionData(self, reply):
505        if self.inTransaction and isinstance(reply, list):
506            # watch or multi has been called
507            if self.transactions > 0:
508                # multi: this must be an exec [commit] reply
509                self.transactions -= len(reply)
510            if self.transactions == 0:
511                self.commit_cc()
512            if not self.inTransaction:  # multi: this must be an exec reply
513                tmp = []
514                for f, v in zip(self.post_proc[1:], reply):
515                    if callable(f):
516                        tmp.append(f(v))
517                    else:
518                        tmp.append(v)
519                    reply = tmp
520            self.post_proc = []
521        return reply
522
523    def replyReceived(self, reply):
524        """
525        Complete reply received and ready to be pushed to the requesting
526        function.
527        """
528        self.replyQueue.put(reply)
529
530    @staticmethod
531    def handle_reply(r):
532        if isinstance(r, Exception):
533            raise r
534        return r
535
536    def _encode_value(self, arg):
537        if isinstance(arg, six.binary_type):
538            return arg
539        elif isinstance(arg, six.text_type):
540            if self.charset is None:
541                try:
542                    return arg.encode()
543                except UnicodeError:
544                    pass
545                raise InvalidData("Encoding charset was not specified")
546            try:
547                return arg.encode(self.charset, self.errors)
548            except UnicodeEncodeError as e:
549                raise InvalidData(
550                    "Error encoding unicode value '%s': %s" %
551                    (repr(arg), e))
552        elif isinstance(arg, float):
553            return format(arg, "f").encode()
554        elif isinstance(arg, bytearray):
555            return bytes(arg)
556        else:
557            return str(arg).format().encode()
558
559    def _build_command(self, *args, **kwargs):
560        # Build the redis command.
561        cmds = bytearray()
562        cmd_count = 0
563        for s in args:
564            cmd = self._encode_value(s)
565            cmds.extend(six.b("$"))
566            for token in self._encode_value(len(cmd)), cmd:
567                cmds.extend(token)
568                cmds.extend(six.b("\r\n"))
569            cmd_count += 1
570
571        command = bytes(six.b("").join(
572            [six.b("*"), self._encode_value(cmd_count), six.b("\r\n")]) + cmds)
573        if not isinstance(command, six.binary_type):
574            command = command.encode()
575        return command
576
577    def execute_command(self, *args, **kwargs):
578        if self.connected == 0:
579            raise ConnectionError("Not connected")
580        else:
581            command = self._build_command(*args, **kwargs)
582            # When pipelining, buffer this command into our list of
583            # pipelined commands. Otherwise, write the command immediately.
584            if self.pipelining:
585                self.pipelined_commands.append(command)
586            else:
587                self.transport.write(command)
588
589            # Return deferred that will contain the result of this command.
590            # Note: when using pipelining, this deferred will NOT return
591            # until after execute_pipeline is called.
592
593            result = defer.Deferred()
594
595            def fire_result(value):
596                if result.called:
597                    return
598                result.callback(value)
599
600            response = self.replyQueue.get().addCallback(self.handle_reply)
601            response.addBoth(fire_result)
602
603            apply_timeout = kwargs.get('apply_timeout', True)
604            if self.replyTimeout and apply_timeout:
605                delayed_call = None
606
607                def fire_timeout():
608                    error_text = 'Not received Redis response in {0} seconds'.format(self.replyTimeout)
609                    result.errback(TimeoutError(error_text))
610                    while self.replyQueue.waiting:
611                        self.replyQueue.put(TimeoutError(error_text))
612                    self.transport.abortConnection()
613
614                def cancel_timeout(value):
615                    if delayed_call.active():
616                        delayed_call.cancel()
617                    return value
618
619                delayed_call = self.callLater(self.replyTimeout, fire_timeout)
620                result.addBoth(cancel_timeout)
621
622            # When pipelining, we need to keep track of the deferred replies
623            # so that we can wait for them in a DeferredList when
624            # execute_pipeline is called.
625            if self.pipelining:
626                self.pipelined_replies.append(result)
627
628            if self.inMulti:
629                self.post_proc.append(kwargs.get("post_proc"))
630            else:
631                if "post_proc" in kwargs:
632                    f = kwargs["post_proc"]
633                    if callable(f):
634                        result.addCallback(f)
635            return result
636
637    ##
638    # REDIS COMMANDS
639    ##
640
641    # Connection handling
642    def quit(self):
643        """
644        Close the connection
645        """
646        self.factory.continueTrying = False
647        return self.execute_command("QUIT")
648
649    def auth(self, password):
650        """
651        Simple password authentication if enabled
652        """
653        return self.execute_command("AUTH", password)
654
655    def ping(self):
656        """
657        Ping the server
658        """
659        return self.execute_command("PING")
660
661    # Commands operating on all value types
662    def exists(self, key):
663        """
664        Test if a key exists
665        """
666        return self.execute_command("EXISTS", key)
667
668    def delete(self, keys, *args):
669        """
670        Delete one or more keys
671        """
672        keys = list_or_args("delete", keys, args)
673        return self.execute_command("DEL", *keys)
674
675    def type(self, key):
676        """
677        Return the type of the value stored at key
678        """
679        return self.execute_command("TYPE", key)
680
681    def keys(self, pattern="*"):
682        """
683        Return all the keys matching a given pattern
684        """
685        return self.execute_command("KEYS", pattern)
686
687    @staticmethod
688    def _build_scan_args(cursor, pattern, count):
689        """
690        Construct arguments list for SCAN, SSCAN, HSCAN, ZSCAN commands
691        """
692        args = [cursor]
693        if pattern is not None:
694            args.extend(("MATCH", pattern))
695        if count is not None:
696            args.extend(("COUNT", count))
697
698        return args
699
700    def scan(self, cursor=0, pattern=None, count=None):
701        """
702        Incrementally iterate the keys in database
703        """
704        args = self._build_scan_args(cursor, pattern, count)
705        return self.execute_command("SCAN", *args)
706
707    def randomkey(self):
708        """
709        Return a random key from the key space
710        """
711        return self.execute_command("RANDOMKEY")
712
713    def rename(self, oldkey, newkey):
714        """
715        Rename the old key in the new one,
716        destroying the newname key if it already exists
717        """
718        return self.execute_command("RENAME", oldkey, newkey)
719
720    def renamenx(self, oldkey, newkey):
721        """
722        Rename the oldname key to newname,
723        if the newname key does not already exist
724        """
725        return self.execute_command("RENAMENX", oldkey, newkey)
726
727    def dbsize(self):
728        """
729        Return the number of keys in the current db
730        """
731        return self.execute_command("DBSIZE")
732
733    def expire(self, key, time):
734        """
735        Set a time to live in seconds on a key
736        """
737        return self.execute_command("EXPIRE", key, time)
738
739    def persist(self, key):
740        """
741        Remove the expire from a key
742        """
743        return self.execute_command("PERSIST", key)
744
745    def ttl(self, key):
746        """
747        Get the time to live in seconds of a key
748        """
749        return self.execute_command("TTL", key)
750
751    def select(self, index):
752        """
753        Select the DB with the specified index
754        """
755        return self.execute_command("SELECT", index)
756
757    def move(self, key, dbindex):
758        """
759        Move the key from the currently selected DB to the dbindex DB
760        """
761        return self.execute_command("MOVE", key, dbindex)
762
763    def flush(self, all_dbs=False):
764        warnings.warn(DeprecationWarning(
765            "redis.flush() has been deprecated, "
766            "use redis.flushdb() or redis.flushall() instead"))
767        return all_dbs and self.flushall() or self.flushdb()
768
769    def flushdb(self):
770        """
771        Remove all the keys from the currently selected DB
772        """
773        return self.execute_command("FLUSHDB")
774
775    def flushall(self):
776        """
777        Remove all the keys from all the databases
778        """
779        return self.execute_command("FLUSHALL")
780
781    def time(self):
782        """
783        Returns the current server time as a two items lists: a Unix timestamp
784        and the amount of microseconds already elapsed in the current second
785        """
786        return self.execute_command("TIME")
787
788    # Commands operating on string values
789    def set(self, key, value, expire=None, pexpire=None,
790            only_if_not_exists=False, only_if_exists=False):
791        """
792        Set a key to a string value
793        """
794        args = []
795        if expire is not None:
796            args.extend(("EX", expire))
797        if pexpire is not None:
798            args.extend(("PX", pexpire))
799        if only_if_not_exists and only_if_exists:
800            raise RedisError("only_if_not_exists and only_if_exists "
801                             "cannot be true simultaneously")
802        if only_if_not_exists:
803            args.append("NX")
804        if only_if_exists:
805            args.append("XX")
806        return self.execute_command("SET", key, value, *args)
807
808    def get(self, key):
809        """
810        Return the string value of the key
811        """
812        return self.execute_command("GET", key)
813
814    def getbit(self, key, offset):
815        """
816        Return the bit value at offset in the string value stored at key
817        """
818        return self.execute_command("GETBIT", key, offset)
819
820    def getset(self, key, value):
821        """
822        Set a key to a string returning the old value of the key
823        """
824        return self.execute_command("GETSET", key, value)
825
826    def mget(self, keys, *args):
827        """
828        Multi-get, return the strings values of the keys
829        """
830        keys = list_or_args("mget", keys, args)
831        return self.execute_command("MGET", *keys)
832
833    def setbit(self, key, offset, value):
834        """
835        Sets or clears the bit at offset in the string value stored at key
836        """
837        if isinstance(value, bool):
838            value = int(value)
839        return self.execute_command("SETBIT", key, offset, value)
840
841    def setnx(self, key, value):
842        """
843        Set a key to a string value if the key does not exist
844        """
845        return self.execute_command("SETNX", key, value)
846
847    def setex(self, key, time, value):
848        """
849        Set+Expire combo command
850        """
851        return self.execute_command("SETEX", key, time, value)
852
853    def mset(self, mapping):
854        """
855        Set the respective keys to the respective values.
856        """
857        items = []
858        for pair in six.iteritems(mapping):
859            items.extend(pair)
860        return self.execute_command("MSET", *items)
861
862    def msetnx(self, mapping):
863        """
864        Set multiple keys to multiple values in a single atomic
865        operation if none of the keys already exist
866        """
867        items = []
868        for pair in six.iteritems(mapping):
869            items.extend(pair)
870        return self.execute_command("MSETNX", *items)
871
872    def bitop(self, operation, destkey, *srckeys):
873        """
874        Perform a bitwise operation between multiple keys
875        and store the result in the destination key.
876        """
877        srclen = len(srckeys)
878        if srclen == 0:
879            return defer.fail(RedisError("no ``srckeys`` specified"))
880        if isinstance(operation, six.string_types):
881            operation = operation.upper()
882        elif operation is operator.and_ or operation is operator.__and__:
883            operation = 'AND'
884        elif operation is operator.or_ or operation is operator.__or__:
885            operation = 'OR'
886        elif operation is operator.__xor__ or operation is operator.xor:
887            operation = 'XOR'
888        elif operation is operator.__not__ or operation is operator.not_:
889            operation = 'NOT'
890        if operation not in ('AND', 'OR', 'XOR', 'NOT'):
891            return defer.fail(InvalidData(
892                "Invalid operation: %s" % operation))
893        if operation == 'NOT' and srclen > 1:
894            return defer.fail(RedisError(
895                "bitop NOT takes only one ``srckey``"))
896        return self.execute_command('BITOP', operation, destkey, *srckeys)
897
898    def bitcount(self, key, start=None, end=None):
899        if (end is None and start is not None) or \
900                (start is None and end is not None):
901            raise RedisError("``start`` and ``end`` must both be specified")
902        if start is not None:
903            t = (start, end)
904        else:
905            t = ()
906        return self.execute_command("BITCOUNT", key, *t)
907
908    def incr(self, key, amount=1):
909        """
910        Increment the integer value of key
911        """
912        return self.execute_command("INCRBY", key, amount)
913
914    def incrby(self, key, amount):
915        """
916        Increment the integer value of key by integer
917        """
918        return self.incr(key, amount)
919
920    def decr(self, key, amount=1):
921        """
922        Decrement the integer value of key
923        """
924        return self.execute_command("DECRBY", key, amount)
925
926    def decrby(self, key, amount):
927        """
928        Decrement the integer value of key by integer
929        """
930        return self.decr(key, amount)
931
932    def append(self, key, value):
933        """
934        Append the specified string to the string stored at key
935        """
936        return self.execute_command("APPEND", key, value)
937
938    def substr(self, key, start, end=-1):
939        """
940        Return a substring of a larger string
941        """
942        return self.execute_command("SUBSTR", key, start, end)
943
944    # Commands operating on lists
945    def push(self, key, value, tail=False):
946        warnings.warn(DeprecationWarning(
947            "redis.push() has been deprecated, "
948            "use redis.lpush() or redis.rpush() instead"))
949
950        return tail and self.rpush(key, value) or self.lpush(key, value)
951
952    def rpush(self, key, value):
953        """
954        Append an element to the tail of the List value at key
955        """
956        if isinstance(value, tuple) or isinstance(value, list):
957            return self.execute_command("RPUSH", key, *value)
958        else:
959            return self.execute_command("RPUSH", key, value)
960
961    def lpush(self, key, value):
962        """
963        Append an element to the head of the List value at key
964        """
965        if isinstance(value, tuple) or isinstance(value, list):
966            return self.execute_command("LPUSH", key, *value)
967        else:
968            return self.execute_command("LPUSH", key, value)
969
970    def llen(self, key):
971        """
972        Return the length of the List value at key
973        """
974        return self.execute_command("LLEN", key)
975
976    def lrange(self, key, start, end):
977        """
978        Return a range of elements from the List at key
979        """
980        return self.execute_command("LRANGE", key, start, end)
981
982    def ltrim(self, key, start, end):
983        """
984        Trim the list at key to the specified range of elements
985        """
986        return self.execute_command("LTRIM", key, start, end)
987
988    def lindex(self, key, index):
989        """
990        Return the element at index position from the List at key
991        """
992        return self.execute_command("LINDEX", key, index)
993
994    def lset(self, key, index, value):
995        """
996        Set a new value as the element at index position of the List at key
997        """
998        return self.execute_command("LSET", key, index, value)
999
1000    def lrem(self, key, count, value):
1001        """
1002        Remove the first-N, last-N, or all the elements matching value
1003        from the List at key
1004        """
1005        return self.execute_command("LREM", key, count, value)
1006
1007    def pop(self, key, tail=False):
1008        warnings.warn(DeprecationWarning(
1009            "redis.pop() has been deprecated, "
1010            "user redis.lpop() or redis.rpop() instead"))
1011
1012        return tail and self.rpop(key) or self.lpop(key)
1013
1014    def lpop(self, key):
1015        """
1016        Return and remove (atomically) the first element of the List at key
1017        """
1018        return self.execute_command("LPOP", key)
1019
1020    def rpop(self, key):
1021        """
1022        Return and remove (atomically) the last element of the List at key
1023        """
1024        return self.execute_command("RPOP", key)
1025
1026    @_blocking_command(release_on_callback=True)
1027    def blpop(self, keys, timeout=0):
1028        """
1029        Blocking LPOP
1030        """
1031        if isinstance(keys, six.string_types):
1032            keys = [keys]
1033        else:
1034            keys = list(keys)
1035
1036        keys.append(timeout)
1037        return self.execute_command("BLPOP", *keys, apply_timeout=False)
1038
1039    @_blocking_command(release_on_callback=True)
1040    def brpop(self, keys, timeout=0):
1041        """
1042        Blocking RPOP
1043        """
1044        if isinstance(keys, six.string_types):
1045            keys = [keys]
1046        else:
1047            keys = list(keys)
1048
1049        keys.append(timeout)
1050        return self.execute_command("BRPOP", *keys, apply_timeout=False)
1051
1052    @_blocking_command(release_on_callback=True)
1053    def brpoplpush(self, source, destination, timeout=0):
1054        """
1055        Pop a value from a list, push it to another list and return
1056        it; or block until one is available.
1057        """
1058        return self.execute_command("BRPOPLPUSH", source, destination, timeout, apply_timeout=False)
1059
1060    def rpoplpush(self, srckey, dstkey):
1061        """
1062        Return and remove (atomically) the last element of the source
1063        List  stored at srckey and push the same element to the
1064        destination List stored at dstkey
1065        """
1066        return self.execute_command("RPOPLPUSH", srckey, dstkey)
1067
1068    def _make_set(self, result):
1069        if isinstance(result, list):
1070            return set(result)
1071        return result
1072
1073    # Commands operating on sets
1074    def sadd(self, key, members, *args):
1075        """
1076        Add the specified member to the Set value at key
1077        """
1078        members = list_or_args("sadd", members, args)
1079        return self.execute_command("SADD", key, *members)
1080
1081    def srem(self, key, members, *args):
1082        """
1083        Remove the specified member from the Set value at key
1084        """
1085        members = list_or_args("srem", members, args)
1086        return self.execute_command("SREM", key, *members)
1087
1088    def spop(self, key):
1089        """
1090        Remove and return (pop) a random element from the Set value at key
1091        """
1092        return self.execute_command("SPOP", key)
1093
1094    def smove(self, srckey, dstkey, member):
1095        """
1096        Move the specified member from one Set to another atomically
1097        """
1098        return self.execute_command(
1099            "SMOVE", srckey, dstkey, member).addCallback(bool)
1100
1101    def scard(self, key):
1102        """
1103        Return the number of elements (the cardinality) of the Set at key
1104        """
1105        return self.execute_command("SCARD", key)
1106
1107    def sismember(self, key, value):
1108        """
1109        Test if the specified value is a member of the Set at key
1110        """
1111        return self.execute_command("SISMEMBER", key, value).addCallback(bool)
1112
1113    def sinter(self, keys, *args):
1114        """
1115        Return the intersection between the Sets stored at key1, ..., keyN
1116        """
1117        keys = list_or_args("sinter", keys, args)
1118        return self.execute_command("SINTER", *keys).addCallback(
1119            self._make_set)
1120
1121    def sinterstore(self, dstkey, keys, *args):
1122        """
1123        Compute the intersection between the Sets stored
1124        at key1, key2, ..., keyN, and store the resulting Set at dstkey
1125        """
1126        keys = list_or_args("sinterstore", keys, args)
1127        return self.execute_command("SINTERSTORE", dstkey, *keys)
1128
1129    def sunion(self, keys, *args):
1130        """
1131        Return the union between the Sets stored at key1, key2, ..., keyN
1132        """
1133        keys = list_or_args("sunion", keys, args)
1134        return self.execute_command("SUNION", *keys).addCallback(
1135            self._make_set)
1136
1137    def sunionstore(self, dstkey, keys, *args):
1138        """
1139        Compute the union between the Sets stored
1140        at key1, key2, ..., keyN, and store the resulting Set at dstkey
1141        """
1142        keys = list_or_args("sunionstore", keys, args)
1143        return self.execute_command("SUNIONSTORE", dstkey, *keys)
1144
1145    def sdiff(self, keys, *args):
1146        """
1147        Return the difference between the Set stored at key1 and
1148        all the Sets key2, ..., keyN
1149        """
1150        keys = list_or_args("sdiff", keys, args)
1151        return self.execute_command("SDIFF", *keys).addCallback(
1152            self._make_set)
1153
1154    def sdiffstore(self, dstkey, keys, *args):
1155        """
1156        Compute the difference between the Set key1 and all the
1157        Sets key2, ..., keyN, and store the resulting Set at dstkey
1158        """
1159        keys = list_or_args("sdiffstore", keys, args)
1160        return self.execute_command("SDIFFSTORE", dstkey, *keys)
1161
1162    def smembers(self, key):
1163        """
1164        Return all the members of the Set value at key
1165        """
1166        return self.execute_command("SMEMBERS", key).addCallback(
1167            self._make_set)
1168
1169    def srandmember(self, key):
1170        """
1171        Return a random member of the Set value at key
1172        """
1173        return self.execute_command("SRANDMEMBER", key)
1174
1175    def sscan(self, key, cursor=0, pattern=None, count=None):
1176        args = self._build_scan_args(cursor, pattern, count)
1177        return self.execute_command("SSCAN", key, *args)
1178
1179    # Commands operating on sorted zsets (sorted sets)
1180    def zadd(self, key, score, member, *args):
1181        """
1182        Add the specified member to the Sorted Set value at key
1183        or update the score if it already exist
1184        """
1185        if args:
1186            # Args should be pairs (have even number of elements)
1187            if len(args) % 2:
1188                return defer.fail(InvalidData(
1189                    "Invalid number of arguments to ZADD"))
1190            else:
1191                l = [score, member]
1192                l.extend(args)
1193                args = l
1194        else:
1195            args = [score, member]
1196        return self.execute_command("ZADD", key, *args)
1197
1198    def zrem(self, key, *args):
1199        """
1200        Remove the specified member from the Sorted Set value at key
1201        """
1202        return self.execute_command("ZREM", key, *args)
1203
1204    def zincr(self, key, member):
1205        return self.zincrby(key, 1, member)
1206
1207    def zdecr(self, key, member):
1208        return self.zincrby(key, -1, member)
1209
1210    @staticmethod
1211    def _handle_zincrby(data):
1212        if isinstance(data, (six.binary_type, six.text_type)):
1213            return float(data)
1214        return data
1215
1216    def zincrby(self, key, increment, member):
1217        """
1218        If the member already exists increment its score by increment,
1219        otherwise add the member setting increment as score
1220        """
1221        return self.execute_command("ZINCRBY",
1222                                    key, increment,
1223                                    member).addCallback(self._handle_zincrby)
1224
1225    def zrank(self, key, member):
1226        """
1227        Return the rank (or index) or member in the sorted set at key,
1228        with scores being ordered from low to high
1229        """
1230        return self.execute_command("ZRANK", key, member)
1231
1232    def zrevrank(self, key, member):
1233        """
1234        Return the rank (or index) or member in the sorted set at key,
1235        with scores being ordered from high to low
1236        """
1237        return self.execute_command("ZREVRANK", key, member)
1238
1239    def _handle_withscores(self, r):
1240        if isinstance(r, list):
1241            # Return a list tuples of form (value, score)
1242            return list((x[0], float(x[1])) for x in zip(r[::2], r[1::2]))
1243        return r
1244
1245    def _zrange(self, key, start, end, withscores, reverse):
1246        if reverse:
1247            cmd = "ZREVRANGE"
1248        else:
1249            cmd = "ZRANGE"
1250        if withscores:
1251            pieces = (cmd, key, start, end, "WITHSCORES")
1252        else:
1253            pieces = (cmd, key, start, end)
1254        r = self.execute_command(*pieces)
1255        if withscores:
1256            r.addCallback(self._handle_withscores)
1257        return r
1258
1259    def zrange(self, key, start=0, end=-1, withscores=False):
1260        """
1261        Return a range of elements from the sorted set at key
1262        """
1263        return self._zrange(key, start, end, withscores, False)
1264
1265    def zrevrange(self, key, start=0, end=-1, withscores=False):
1266        """
1267        Return a range of elements from the sorted set at key,
1268        exactly like ZRANGE, but the sorted set is ordered in
1269        traversed in reverse order, from the greatest to the smallest score
1270        """
1271        return self._zrange(key, start, end, withscores, True)
1272
1273    def _zrangebyscore(self, key, min, max, withscores, offset, count, rev):
1274        if rev:
1275            cmd = "ZREVRANGEBYSCORE"
1276        else:
1277            cmd = "ZRANGEBYSCORE"
1278        if (offset is None) != (count is None):  # XNOR
1279            return defer.fail(InvalidData(
1280                "Invalid count and offset arguments to %s" % cmd))
1281        if withscores:
1282            pieces = [cmd, key, min, max, "WITHSCORES"]
1283        else:
1284            pieces = [cmd, key, min, max]
1285        if offset is not None and count is not None:
1286            pieces.extend(("LIMIT", offset, count))
1287        r = self.execute_command(*pieces)
1288        if withscores:
1289            r.addCallback(self._handle_withscores)
1290        return r
1291
1292    def zrangebyscore(self, key, min='-inf', max='+inf',
1293                      withscores=False, offset=None, count=None):
1294        """
1295        Return all the elements with score >= min and score <= max
1296        (a range query) from the sorted set
1297        """
1298        return self._zrangebyscore(key, min, max, withscores, offset,
1299                                   count, False)
1300
1301    def zrevrangebyscore(self, key, max='+inf', min='-inf',
1302                         withscores=False, offset=None, count=None):
1303        """
1304        ZRANGEBYSCORE in reverse order
1305        """
1306        # ZREVRANGEBYSCORE takes max before min
1307        return self._zrangebyscore(key, max, min, withscores, offset,
1308                                   count, True)
1309
1310    def zcount(self, key, min='-inf', max='+inf'):
1311        """
1312        Return the number of elements with score >= min and score <= max
1313        in the sorted set
1314        """
1315        if min == '-inf' and max == '+inf':
1316            return self.zcard(key)
1317        return self.execute_command("ZCOUNT", key, min, max)
1318
1319    def zcard(self, key):
1320        """
1321        Return the cardinality (number of elements) of the sorted set at key
1322        """
1323        return self.execute_command("ZCARD", key)
1324
1325    @staticmethod
1326    def _handle_zscore(data):
1327        if isinstance(data, (six.binary_type, six.text_type)):
1328            return int(data)
1329        return data
1330
1331    def zscore(self, key, element):
1332        """
1333        Return the score associated with the specified element of the sorted
1334        set at key
1335        """
1336        return self.execute_command("ZSCORE", key,
1337                                    element).addCallback(self._handle_zscore)
1338
1339    def zremrangebyrank(self, key, min=0, max=-1):
1340        """
1341        Remove all the elements with rank >= min and rank <= max from
1342        the sorted set
1343        """
1344        return self.execute_command("ZREMRANGEBYRANK", key, min, max)
1345
1346    def zremrangebyscore(self, key, min='-inf', max='+inf'):
1347        """
1348        Remove all the elements with score >= min and score <= max from
1349        the sorted set
1350        """
1351        return self.execute_command("ZREMRANGEBYSCORE", key, min, max)
1352
1353    def zunionstore(self, dstkey, keys, aggregate=None):
1354        """
1355        Perform a union over a number of sorted sets with optional
1356        weight and aggregate
1357        """
1358        return self._zaggregate("ZUNIONSTORE", dstkey, keys, aggregate)
1359
1360    def zinterstore(self, dstkey, keys, aggregate=None):
1361        """
1362        Perform an intersection over a number of sorted sets with optional
1363        weight and aggregate
1364        """
1365        return self._zaggregate("ZINTERSTORE", dstkey, keys, aggregate)
1366
1367    def _zaggregate(self, command, dstkey, keys, aggregate):
1368        pieces = [command, dstkey, len(keys)]
1369        if isinstance(keys, dict):
1370            keys, weights = list(zip(*keys.items()))
1371        else:
1372            weights = None
1373
1374        pieces.extend(keys)
1375        if weights:
1376            pieces.append("WEIGHTS")
1377            pieces.extend(weights)
1378
1379        if aggregate:
1380            if aggregate is min:
1381                aggregate = 'MIN'
1382            elif aggregate is max:
1383                aggregate = 'MAX'
1384            elif aggregate is sum:
1385                aggregate = 'SUM'
1386            else:
1387                err_flag = True
1388                if isinstance(aggregate, six.string_types):
1389                    aggregate_u = aggregate.upper()
1390                    if aggregate_u in ('MIN', 'MAX', 'SUM'):
1391                        aggregate = aggregate_u
1392                        err_flag = False
1393                if err_flag:
1394                    return defer.fail(InvalidData(
1395                        "Invalid aggregate function: %s" % aggregate))
1396            pieces.extend(("AGGREGATE", aggregate))
1397        return self.execute_command(*pieces)
1398
1399    def zscan(self, key, cursor=0, pattern=None, count=None):
1400        args = self._build_scan_args(cursor, pattern, count)
1401        return self.execute_command("ZSCAN", key, *args)
1402
1403    # Commands operating on hashes
1404    def hset(self, key, field, value):
1405        """
1406        Set the hash field to the specified value. Creates the hash if needed
1407        """
1408        return self.execute_command("HSET", key, field, value)
1409
1410    def hsetnx(self, key, field, value):
1411        """
1412        Set the hash field to the specified value if the field does not exist.
1413        Creates the hash if needed
1414        """
1415        return self.execute_command("HSETNX", key, field, value)
1416
1417    def hget(self, key, field):
1418        """
1419        Retrieve the value of the specified hash field.
1420        """
1421        return self.execute_command("HGET", key, field)
1422
1423    def hmget(self, key, fields):
1424        """
1425        Get the hash values associated to the specified fields.
1426        """
1427        return self.execute_command("HMGET", key, *fields)
1428
1429    def hmset(self, key, mapping):
1430        """
1431        Set the hash fields to their respective values.
1432        """
1433        items = []
1434        for pair in six.iteritems(mapping):
1435            items.extend(pair)
1436        return self.execute_command("HMSET", key, *items)
1437
1438    def hincr(self, key, field):
1439        return self.hincrby(key, field, 1)
1440
1441    def hdecr(self, key, field):
1442        return self.hincrby(key, field, -1)
1443
1444    def hincrby(self, key, field, integer):
1445        """
1446        Increment the integer value of the hash at key on field with integer.
1447        """
1448        return self.execute_command("HINCRBY", key, field, integer)
1449
1450    def hexists(self, key, field):
1451        """
1452        Test for existence of a specified field in a hash
1453        """
1454        return self.execute_command("HEXISTS", key, field).addCallback(bool)
1455
1456    def hdel(self, key, fields):
1457        """
1458        Remove the specified field or fields from a hash
1459        """
1460        if isinstance(fields, six.string_types):
1461            fields = [fields]
1462        else:
1463            fields = list(fields)
1464        return self.execute_command("HDEL", key, *fields)
1465
1466    def hlen(self, key):
1467        """
1468        Return the number of items in a hash.
1469        """
1470        return self.execute_command("HLEN", key)
1471
1472    def hkeys(self, key):
1473        """
1474        Return all the fields in a hash.
1475        """
1476        return self.execute_command("HKEYS", key)
1477
1478    def hvals(self, key):
1479        """
1480        Return all the values in a hash.
1481        """
1482        return self.execute_command("HVALS", key)
1483
1484    def hgetall(self, key):
1485        """
1486        Return all the fields and associated values in a hash.
1487        """
1488        f = lambda d: dict(list(zip(d[::2], d[1::2])))
1489        return self.execute_command("HGETALL", key, post_proc=f)
1490
1491    def hscan(self, key, cursor=0, pattern=None, count=None):
1492        args = self._build_scan_args(cursor, pattern, count)
1493        return self.execute_command("HSCAN", key, *args)
1494
1495    # Sorting
1496    def sort(self, key, start=None, end=None, by=None, get=None,
1497             desc=None, alpha=False, store=None):
1498        if (start is not None and end is None) or \
1499           (end is not None and start is None):
1500            raise RedisError("``start`` and ``end`` must both be specified")
1501
1502        pieces = [key]
1503        if by is not None:
1504            pieces.append("BY")
1505            pieces.append(by)
1506        if start is not None and end is not None:
1507            pieces.append("LIMIT")
1508            pieces.append(start)
1509            pieces.append(end)
1510        if get is not None:
1511            pieces.append("GET")
1512            pieces.append(get)
1513        if desc:
1514            pieces.append("DESC")
1515        if alpha:
1516            pieces.append("ALPHA")
1517        if store is not None:
1518            pieces.append("STORE")
1519            pieces.append(store)
1520
1521        return self.execute_command("SORT", *pieces)
1522
1523    def _clear_txstate(self):
1524        if self.inTransaction:
1525            self.pendingTransaction = False
1526            self.inTransaction = False
1527            self.inMulti = False
1528            self.factory.connectionQueue.put(self)
1529
1530    @_blocking_command(release_on_callback=False)
1531    def watch(self, keys):
1532        if not self.pendingTransaction:
1533            self.pendingTransaction = True
1534            self.inTransaction = False
1535            self.inMulti = False
1536            self.unwatch_cc = self._clear_txstate
1537            self.commit_cc = lambda: ()
1538        if isinstance(keys, six.string_types):
1539            keys = [keys]
1540        d = self.execute_command("WATCH", *keys).addCallback(self._tx_started)
1541        return d
1542
1543    def unwatch(self):
1544        self.unwatch_cc()
1545        return self.execute_command("UNWATCH")
1546
1547    # Transactions
1548    # multi() will return a deferred with a "connection" object
1549    # That object must be used for further interactions within
1550    # the transaction. At the end, either exec() or discard()
1551    # must be executed.
1552    @_blocking_command(release_on_callback=False)
1553    def multi(self, keys=None):
1554        self.pendingTransaction = True
1555        self.inTransaction = False
1556        self.inMulti = True
1557        self.unwatch_cc = lambda: ()
1558        self.commit_cc = self._clear_txstate
1559        if keys is not None:
1560            d = self.watch(keys)
1561            d.addCallback(lambda _: self.execute_command("MULTI"))
1562        else:
1563            d = self.execute_command("MULTI")
1564        d.addCallback(self._tx_started)
1565        return d
1566
1567    def _tx_started(self, response):
1568        if response != 'OK':
1569            raise RedisError('Invalid response: %s' % response)
1570        self.inTransaction = True
1571        return self
1572
1573    def _commit_check(self, response):
1574        if response is None:
1575            self.transactions = 0
1576            self._clear_txstate()
1577            raise WatchError("Transaction failed")
1578        else:
1579            return response
1580
1581    def commit(self):
1582        if self.inMulti is False:
1583            raise RedisError("Not in transaction")
1584        return self.execute_command("EXEC").addCallback(self._commit_check)
1585
1586    def discard(self):
1587        if self.inMulti is False:
1588            raise RedisError("Not in transaction")
1589        self.post_proc = []
1590        self.transactions = 0
1591        self._clear_txstate()
1592        return self.execute_command("DISCARD")
1593
1594    # Returns a proxy that works just like .multi() except that commands
1595    # are simply buffered to be written all at once in a pipeline.
1596    # http://redis.io/topics/pipelining
1597    @_blocking_command(release_on_callback=False)
1598    def pipeline(self):
1599
1600        # Return a deferred that returns self (rather than simply self) to allow
1601        # ConnectionHandler to wrap this method with async connection retrieval.
1602        self.pipelining = True
1603        self.pipelined_commands = []
1604        self.pipelined_replies = []
1605        return defer.succeed(self)
1606
1607    @defer.inlineCallbacks
1608    def execute_pipeline(self):
1609        if not self.pipelining:
1610            err = "Not currently pipelining commands, " \
1611                  "please use pipeline() first"
1612            raise RedisError(err)
1613
1614        # Flush all the commands at once to redis. Wait for all replies
1615        # to come back using a deferred list.
1616        self.transport.write(six.b("").join(self.pipelined_commands))
1617
1618        d = defer.DeferredList(
1619            deferredList=self.pipelined_replies,
1620            fireOnOneErrback=True,
1621            consumeErrors=True,
1622            )
1623
1624        d.addBoth(self._clear_pipeline_state)
1625
1626        results = yield d
1627
1628        defer.returnValue([value for success, value in results])
1629
1630    def _clear_pipeline_state(self, response):
1631        if self.pipelining:
1632            self.pipelining = False
1633            self.pipelined_commands = []
1634            self.pipelined_replies = []
1635            self.factory.connectionQueue.put(self)
1636
1637        return response
1638
1639    # Publish/Subscribe
1640    # see the SubscriberProtocol for subscribing to channels
1641    def publish(self, channel, message):
1642        """
1643        Publish message to a channel
1644        """
1645        return self.execute_command("PUBLISH", channel, message)
1646
1647    # Persistence control commands
1648    def save(self):
1649        """
1650        Synchronously save the DB on disk
1651        """
1652        return self.execute_command("SAVE")
1653
1654    def bgsave(self):
1655        """
1656        Asynchronously save the DB on disk
1657        """
1658        return self.execute_command("BGSAVE")
1659
1660    def lastsave(self):
1661        """
1662        Return the UNIX time stamp of the last successfully saving of the
1663        dataset on disk
1664        """
1665        return self.execute_command("LASTSAVE")
1666
1667    def shutdown(self):
1668        """
1669        Synchronously save the DB on disk, then shutdown the server
1670        """
1671        self.factory.continueTrying = False
1672        return self.execute_command("SHUTDOWN")
1673
1674    def bgrewriteaof(self):
1675        """
1676        Rewrite the append only file in background when it gets too big
1677        """
1678        return self.execute_command("BGREWRITEAOF")
1679
1680    def _process_info(self, r):
1681        if isinstance(r, six.binary_type):
1682            r = r.decode()
1683        keypairs = [x for x in r.split('\r\n') if
1684                    ':' in x and not x.startswith('#')]
1685        d = {}
1686        for kv in keypairs:
1687            k, v = kv.split(':')
1688            d[k] = v
1689        return d
1690
1691    # Remote server control commands
1692    def info(self, type=None):
1693        """
1694        Provide information and statistics about the server
1695        """
1696        if type is None:
1697            return self.execute_command("INFO")
1698        else:
1699            r = self.execute_command("INFO", type)
1700            return r.addCallback(self._process_info)
1701
1702    # slaveof is missing
1703
1704    # Redis 2.6 scripting commands
1705    def _eval(self, script, script_hash, keys, args):
1706        n = len(keys)
1707        keys_and_args = tuple(keys) + tuple(args)
1708        r = self.execute_command("EVAL", script, n, *keys_and_args)
1709        if script_hash in self.script_hashes:
1710            return r
1711        return r.addCallback(self._eval_success, script_hash)
1712
1713    def _eval_success(self, r, script_hash):
1714        self.script_hashes.add(script_hash)
1715        return r
1716
1717    def _evalsha_failed(self, err, script, script_hash, keys, args):
1718        if err.check(ScriptDoesNotExist):
1719            return self._eval(script, script_hash, keys, args)
1720        return err
1721
1722    def eval(self, script, keys=[], args=[]):
1723        if isinstance(script, six.text_type):
1724            script = script.encode()
1725        h = hashlib.sha1(script).hexdigest()
1726        if h in self.script_hashes:
1727            return self.evalsha(h, keys, args).addErrback(
1728                self._evalsha_failed, script, h, keys, args)
1729        return self._eval(script, h, keys, args)
1730
1731    def _evalsha_errback(self, err, script_hash):
1732        if err.check(ResponseError):
1733            if err.value.args[0].startswith(u'NOSCRIPT'):
1734                if script_hash in self.script_hashes:
1735                    self.script_hashes.remove(script_hash)
1736                raise ScriptDoesNotExist("No script matching hash: %s found" %
1737                                         script_hash)
1738        return err
1739
1740    def evalsha(self, sha1_hash, keys=[], args=[]):
1741        n = len(keys)
1742        keys_and_args = tuple(keys) + tuple(args)
1743        r = self.execute_command("EVALSHA",
1744                                 sha1_hash, n,
1745                                 *keys_and_args)
1746        r.addErrback(self._evalsha_errback, sha1_hash)
1747        if sha1_hash not in self.script_hashes:
1748            r.addCallback(self._eval_success, sha1_hash)
1749        return r
1750
1751    def _script_exists_success(self, r):
1752        l = [bool(x) for x in r]
1753        if len(l) == 1:
1754            return l[0]
1755        else:
1756            return l
1757
1758    def script_exists(self, *hashes):
1759        return self.execute_command("SCRIPT", "EXISTS",
1760                                    post_proc=self._script_exists_success,
1761                                    *hashes)
1762
1763    def _script_flush_success(self, r):
1764        self.script_hashes.clear()
1765        return r
1766
1767    def script_flush(self):
1768        return self.execute_command("SCRIPT", "FLUSH").addCallback(
1769            self._script_flush_success)
1770
1771    def _handle_script_kill(self, r):
1772        if isinstance(r, Failure):
1773            if r.check(ResponseError):
1774                if r.value.args[0].startswith(u'NOTBUSY'):
1775                    raise NoScriptRunning("No script running")
1776        return r
1777
1778    def script_kill(self):
1779        return self.execute_command("SCRIPT",
1780                                    "KILL").addBoth(self._handle_script_kill)
1781
1782    def script_load(self, script):
1783        return self.execute_command("SCRIPT",  "LOAD", script)
1784
1785    # Redis 2.8.9 HyperLogLog commands
1786    def pfadd(self, key, elements, *args):
1787        elements = list_or_args("pfadd", elements, args)
1788        return self.execute_command("PFADD", key, *elements)
1789
1790    def pfcount(self, keys, *args):
1791        keys = list_or_args("pfcount", keys, args)
1792        return self.execute_command("PFCOUNT", *keys)
1793
1794    def pfmerge(self, destKey, sourceKeys, *args):
1795        sourceKeys = list_or_args("pfmerge", sourceKeys, args)
1796        return self.execute_command("PFMERGE", destKey, *sourceKeys)
1797
1798    _SENTINEL_NODE_FLAGS = (("is_master", "master"), ("is_slave", "slave"),
1799                            ("is_sdown", "s_down"), ("is_odown", "o_down"),
1800                            ("is_sentinel", "sentinel"),
1801                            ("is_disconnected", "disconnected"),
1802                            ("is_master_down", "master_down"))
1803
1804    def _parse_sentinel_state(self, state_array):
1805        as_dict = dict(
1806            (self.tryConvertData(key), self.tryConvertData(value))
1807            for key, value in zip(state_array[::2], state_array[1::2])
1808        )
1809        flags = set(as_dict['flags'].split(','))
1810        for bool_name, flag_name in self._SENTINEL_NODE_FLAGS:
1811            as_dict[bool_name] = flag_name in flags
1812        return as_dict
1813
1814    def sentinel_masters(self):
1815        def convert(raw):
1816            result = {}
1817            for array in raw:
1818                as_dict = self._parse_sentinel_state(array)
1819                result[as_dict['name']] = as_dict
1820            return result
1821        return self.execute_command("SENTINEL", "MASTERS").addCallback(convert)
1822
1823    def sentinel_slaves(self, service_name):
1824        def convert(raw):
1825            return [
1826                self._parse_sentinel_state(array)
1827                for array in raw
1828            ]
1829        return self.execute_command("SENTINEL", "SLAVES", service_name)\
1830            .addCallback(convert)
1831
1832    def sentinel_get_master_addr_by_name(self, service_name):
1833        return self.execute_command("SENTINEL", "GET-MASTER-ADDR-BY-NAME", service_name)
1834
1835    def role(self):
1836        return self.execute_command("ROLE")
1837
1838
1839class HiredisProtocol(BaseRedisProtocol):
1840    def __init__(self, *args, **kwargs):
1841        BaseRedisProtocol.__init__(self, *args, **kwargs)
1842        self._reader = hiredis.Reader(protocolError=InvalidData,
1843                                      replyError=ResponseError)
1844
1845    def dataReceived(self, data, unpause=False):
1846        if data:
1847            self._reader.feed(data)
1848        res = self._reader.gets()
1849        while res is not False:
1850            if isinstance(res, (six.text_type, six.binary_type, list)):
1851                res = self.tryConvertData(res)
1852            if res == "QUEUED":
1853                self.transactions += 1
1854            else:
1855                res = self.handleTransactionData(res)
1856
1857            self.replyReceived(res)
1858            res = self._reader.gets()
1859
1860    def _convert_bin_values(self, result):
1861        if isinstance(result, list):
1862            return [self._convert_bin_values(x) for x in result]
1863        elif isinstance(result, dict):
1864            return dict((self._convert_bin_values(k), self._convert_bin_values(v))
1865                        for k, v in six.iteritems(result))
1866        elif isinstance(result, six.binary_type):
1867            return self.tryConvertData(result)
1868        return result
1869
1870    def commit(self):
1871        r = BaseRedisProtocol.commit(self)
1872        return r.addCallback(self._convert_bin_values)
1873
1874    def scan(self, cursor=0, pattern=None, count=None):
1875        r = BaseRedisProtocol.scan(self, cursor, pattern, count)
1876        return r.addCallback(self._convert_bin_values)
1877
1878    def sscan(self, key, cursor=0, pattern=None, count=None):
1879        r = BaseRedisProtocol.sscan(self, key, cursor, pattern, count)
1880        return r.addCallback(self._convert_bin_values)
1881
1882if hiredis is not None:
1883    RedisProtocol = HiredisProtocol
1884else:
1885    RedisProtocol = BaseRedisProtocol
1886
1887
1888class MonitorProtocol(RedisProtocol):
1889    """
1890    monitor has the same behavior as subscribe: hold the connection until
1891    something happens.
1892
1893    take care with the performance impact: http://redis.io/commands/monitor
1894    """
1895
1896    def messageReceived(self, message):
1897        pass
1898
1899    def replyReceived(self, reply):
1900        self.messageReceived(reply)
1901
1902    def monitor(self):
1903        return self.execute_command("MONITOR", apply_timeout=False)
1904
1905    def stop(self):
1906        self.transport.loseConnection()
1907
1908
1909class SubscriberProtocol(RedisProtocol):
1910    _sub_unsub_reponses = set([u"subscribe", u"unsubscribe", u"psubscribe", u"punsubscribe"])
1911
1912    def messageReceived(self, pattern, channel, message):
1913        pass
1914
1915    def replyReceived(self, reply):
1916        if isinstance(reply, list):
1917            reply_len = len(reply)
1918            if reply_len >= 3 and reply[-3] == u"message":
1919                self.messageReceived(None, *reply[-2:])
1920            elif reply_len >= 4 and reply[-4] == u"pmessage":
1921                self.messageReceived(*reply[-3:])
1922            elif reply_len >= 3 and reply[-3] in self._sub_unsub_reponses and len(self.replyQueue.waiting) == 0:
1923                pass
1924            else:
1925                self.replyQueue.put(reply[-3:])
1926        else:
1927            self.replyQueue.put(reply)
1928
1929    def subscribe(self, channels):
1930        if isinstance(channels, six.string_types):
1931            channels = [channels]
1932        return self.execute_command("SUBSCRIBE", *channels, apply_timeout=False)
1933
1934    def unsubscribe(self, channels):
1935        if isinstance(channels, six.string_types):
1936            channels = [channels]
1937        return self.execute_command("UNSUBSCRIBE", *channels)
1938
1939    def psubscribe(self, patterns):
1940        if isinstance(patterns, six.string_types):
1941            patterns = [patterns]
1942        return self.execute_command("PSUBSCRIBE", *patterns, apply_timeout=False)
1943
1944    def punsubscribe(self, patterns):
1945        if isinstance(patterns, six.string_types):
1946            patterns = [patterns]
1947        return self.execute_command("PUNSUBSCRIBE", *patterns)
1948
1949
1950class ConnectionHandler(object):
1951    def __init__(self, factory):
1952        self._factory = factory
1953        self._connected = factory.deferred
1954
1955    def disconnect(self):
1956        self._factory.continueTrying = 0
1957        self._factory.disconnectCalled = True
1958        for conn in self._factory.pool:
1959            try:
1960                conn.transport.loseConnection()
1961            except:
1962                pass
1963
1964        return self._factory.waitForEmptyPool()
1965
1966    def __getattr__(self, method):
1967        def wrapper(*args, **kwargs):
1968            protocol_method = getattr(self._factory.protocol, method)
1969            blocking = getattr(protocol_method, '_blocking', False)
1970            release_on_callback = getattr(protocol_method, '_release_on_callback', True)
1971
1972            d = self._factory.getConnection(peek=not blocking)
1973
1974            def callback(connection):
1975                try:
1976                    d = protocol_method(connection, *args, **kwargs)
1977                except:
1978                    if blocking:
1979                        self._factory.connectionQueue.put(connection)
1980                    raise
1981
1982                def put_back(reply):
1983                    self._factory.connectionQueue.put(connection)
1984                    return reply
1985
1986                if blocking and release_on_callback:
1987                    d.addBoth(put_back)
1988
1989                return d
1990            d.addCallback(callback)
1991            return d
1992        return wrapper
1993
1994    def __repr__(self):
1995        try:
1996            cli = self._factory.pool[0].transport.getPeer()
1997        except:
1998            return "<Redis Connection: Not connected>"
1999        else:
2000            return "<Redis Connection: %s:%s - %d connection(s)>" % \
2001                   (cli.host, cli.port, self._factory.size)
2002
2003
2004class UnixConnectionHandler(ConnectionHandler):
2005    def __repr__(self):
2006        try:
2007            cli = self._factory.pool[0].transport.getPeer()
2008        except:
2009            return "<Redis Connection: Not connected>"
2010        else:
2011            return "<Redis Unix Connection: %s - %d connection(s)>" % \
2012                   (cli.name, self._factory.size)
2013
2014
2015ShardedMethods = frozenset([
2016    "decr",
2017    "delete",
2018    "exists",
2019    "expire",
2020    "get",
2021    "get_type",
2022    "getset",
2023    "hdel",
2024    "hexists",
2025    "hget",
2026    "hgetall",
2027    "hincrby",
2028    "hkeys",
2029    "hlen",
2030    "hmget",
2031    "hmset",
2032    "hset",
2033    "hvals",
2034    "incr",
2035    "lindex",
2036    "llen",
2037    "lrange",
2038    "lrem",
2039    "lset",
2040    "ltrim",
2041    "pop",
2042    "publish",
2043    "push",
2044    "rename",
2045    "sadd",
2046    "set",
2047    "setex",
2048    "setnx",
2049    "sismember",
2050    "smembers",
2051    "srem",
2052    "ttl",
2053    "zadd",
2054    "zcard",
2055    "zcount",
2056    "zdecr",
2057    "zincr",
2058    "zincrby",
2059    "zrange",
2060    "zrangebyscore",
2061    "zrevrangebyscore",
2062    "zrevrank",
2063    "zrank",
2064    "zrem",
2065    "zremrangebyscore",
2066    "zremrangebyrank",
2067    "zrevrange",
2068    "zscore"
2069])
2070
2071_findhash = re.compile(r'.+\{(.*)\}.*')
2072
2073
2074class HashRing(object):
2075    """Consistent hash for redis API"""
2076    def __init__(self, nodes=[], replicas=160):
2077        self.nodes = []
2078        self.replicas = replicas
2079        self.ring = {}
2080        self.sorted_keys = []
2081
2082        for n in nodes:
2083            self.add_node(n)
2084
2085    def add_node(self, node):
2086        self.nodes.append(node)
2087        for x in range(self.replicas):
2088            uuid = node._factory.uuid
2089            if isinstance(uuid, six.text_type):
2090                uuid = uuid.encode()
2091            crckey = zlib.crc32(six.b(":").join(
2092                [uuid, str(x).format().encode()]))
2093            self.ring[crckey] = node
2094            self.sorted_keys.append(crckey)
2095
2096        self.sorted_keys.sort()
2097
2098    def remove_node(self, node):
2099        self.nodes.remove(node)
2100        for x in range(self.replicas):
2101            crckey = zlib.crc32(six.b(":").join(
2102                [node, str(x).format().encode()]))
2103            self.ring.remove(crckey)
2104            self.sorted_keys.remove(crckey)
2105
2106    def get_node(self, key):
2107        n, i = self.get_node_pos(key)
2108        return n
2109    # self.get_node_pos(key)[0]
2110
2111    def get_node_pos(self, key):
2112        if len(self.ring) == 0:
2113            return [None, None]
2114        crc = zlib.crc32(key)
2115        idx = bisect.bisect(self.sorted_keys, crc)
2116        # prevents out of range index
2117        idx = min(idx, (self.replicas * len(self.nodes)) - 1)
2118        return [self.ring[self.sorted_keys[idx]], idx]
2119
2120    def iter_nodes(self, key):
2121        if len(self.ring) == 0:
2122            yield None, None
2123        node, pos = self.get_node_pos(key)
2124        for k in self.sorted_keys[pos:]:
2125            yield k, self.ring[k]
2126
2127    def __call__(self, key):
2128        return self.get_node(key)
2129
2130
2131class ShardedConnectionHandler(object):
2132    def __init__(self, connections):
2133        if isinstance(connections, defer.DeferredList):
2134            self._ring = None
2135            connections.addCallback(self._makeRing)
2136        else:
2137            self._ring = HashRing(connections)
2138
2139    def _makeRing(self, connections):
2140        connections = list(map(operator.itemgetter(1), connections))
2141        self._ring = HashRing(connections)
2142        return self
2143
2144    @defer.inlineCallbacks
2145    def disconnect(self):
2146        if not self._ring:
2147            raise ConnectionError("Not connected")
2148
2149        for conn in self._ring.nodes:
2150            yield conn.disconnect()
2151        defer.returnValue(True)
2152
2153    def _wrap(self, method, *args, **kwargs):
2154        try:
2155            key = args[0]
2156            assert isinstance(key, six.string_types)
2157        except:
2158            raise ValueError(
2159                "Method '%s' requires a key as the first argument" % method)
2160
2161        m = _findhash.match(key)
2162        if m is not None and len(m.groups()) >= 1:
2163            node = self._ring(m.groups()[0])
2164        else:
2165            node = self._ring(key)
2166
2167        return getattr(node, method)(*args, **kwargs)
2168
2169    def pipeline(self):
2170        raise NotImplementedError("Pipelining is not supported across shards")
2171
2172    def __getattr__(self, method):
2173        if method in ShardedMethods:
2174            return functools.partial(self._wrap, method)
2175        else:
2176            raise NotImplementedError("Method '%s' cannot be sharded" % method)
2177
2178    @defer.inlineCallbacks
2179    def mget(self, keys, *args):
2180        """
2181        high-level mget, required because of the sharding support
2182        """
2183
2184        keys = list_or_args("mget", keys, args)
2185        group = collections.defaultdict(lambda: [])
2186        for k in keys:
2187            node = self._ring(k)
2188            group[node].append(k)
2189
2190        deferreds = []
2191        for node, keys in six.iteritems(group.items):
2192            nd = node.mget(keys)
2193            deferreds.append(nd)
2194
2195        result = []
2196        response = yield defer.DeferredList(deferreds)
2197        for (success, values) in response:
2198            if success:
2199                result += values
2200
2201        defer.returnValue(result)
2202
2203    def __repr__(self):
2204        nodes = []
2205        for conn in self._ring.nodes:
2206            try:
2207                cli = conn._factory.pool[0].transport.getPeer()
2208            except:
2209                pass
2210            else:
2211                nodes.append(six.b("%s:%s/%d") %
2212                             (cli.host, cli.port, conn._factory.size))
2213        return "<Redis Sharded Connection: %s>" % ", ".join(nodes)
2214
2215
2216class ShardedUnixConnectionHandler(ShardedConnectionHandler):
2217    def __repr__(self):
2218        nodes = []
2219        for conn in self._ring.nodes:
2220            try:
2221                cli = conn._factory.pool[0].transport.getPeer()
2222            except:
2223                pass
2224            else:
2225                nodes.append(six.b("%s/%d") %
2226                             (cli.name, conn._factory.size))
2227        return "<Redis Sharded Connection: %s>" % ", ".join(nodes)
2228
2229
2230class PeekableQueue(defer.DeferredQueue):
2231    """
2232    A DeferredQueue that supports peeking, accessing random item without
2233    removing them from the queue.
2234    """
2235    def __init__(self, *args, **kwargs):
2236        defer.DeferredQueue.__init__(self, *args, **kwargs)
2237
2238        self.peekers = []
2239
2240    def peek(self):
2241        if self.pending:
2242            return defer.succeed(random.choice(self.pending))
2243        else:
2244            d = defer.Deferred()
2245            self.peekers.append(d)
2246            return d
2247
2248    def remove(self, item):
2249        self.pending.remove(item)
2250
2251    def put(self, obj):
2252        for d in self.peekers:
2253            d.callback(obj)
2254        self.peekers = []
2255
2256        defer.DeferredQueue.put(self, obj)
2257
2258
2259class RedisFactory(protocol.ReconnectingClientFactory):
2260    maxDelay = 10
2261    protocol = RedisProtocol
2262
2263    noisy = False
2264
2265    def __init__(self, uuid, dbid, poolsize, isLazy=False,
2266                 handler=ConnectionHandler, charset="utf-8", password=None,
2267                 replyTimeout=None, convertNumbers=True):
2268        if not isinstance(poolsize, int):
2269            raise ValueError("Redis poolsize must be an integer, not %s" %
2270                             repr(poolsize))
2271
2272        if not isinstance(dbid, (int, type(None))):
2273            raise ValueError("Redis dbid must be an integer, not %s" %
2274                             repr(dbid))
2275
2276        self.uuid = uuid
2277        self.dbid = dbid
2278        self.poolsize = poolsize
2279        self.isLazy = isLazy
2280        self.charset = charset
2281        self.password = password
2282        self.replyTimeout = replyTimeout
2283        self.convertNumbers = convertNumbers
2284
2285        self.idx = 0
2286        self.size = 0
2287        self.pool = []
2288        self.deferred = defer.Deferred()
2289        self.handler = handler(self)
2290        self.connectionQueue = PeekableQueue()
2291        self._waitingForEmptyPool = set()
2292        self.disconnectCalled = False
2293
2294    def buildProtocol(self, addr):
2295        p = self.protocol(self.charset, replyTimeout=self.replyTimeout,
2296                          password=self.password, dbid=self.dbid,
2297                          convertNumbers=self.convertNumbers)
2298        p.factory = self
2299        p.whenConnected().addCallback(self.addConnection)
2300        return p
2301
2302    def addConnection(self, conn):
2303        if self.disconnectCalled:
2304            conn.transport.loseConnection()
2305            return
2306
2307        conn.whenDisconnected().addCallback(self.delConnection)
2308        self.connectionQueue.put(conn)
2309        self.pool.append(conn)
2310        self.size = len(self.pool)
2311        if self.deferred:
2312            if self.size == self.poolsize:
2313                self.deferred.callback(self.handler)
2314                self.deferred = None
2315
2316    def delConnection(self, conn):
2317        try:
2318            self.pool.remove(conn)
2319        except Exception as e:
2320            log.msg("Could not remove connection from pool: %s" % str(e))
2321
2322        self.size = len(self.pool)
2323        if not self.size and self._waitingForEmptyPool:
2324            deferreds = self._waitingForEmptyPool
2325            self._waitingForEmptyPool = set()
2326            for d in deferreds:
2327                d.callback(None)
2328
2329    def _cancelWaitForEmptyPool(self, deferred):
2330        self._waitingForEmptyPool.discard(deferred)
2331        deferred.errback(defer.CancelledError())
2332
2333    def waitForEmptyPool(self):
2334        """
2335        Returns a Deferred which fires when the pool size has reached 0.
2336        """
2337        if not self.size:
2338            return defer.succeed(None)
2339        d = defer.Deferred(self._cancelWaitForEmptyPool)
2340        self._waitingForEmptyPool.add(d)
2341        return d
2342
2343    def connectionError(self, why):
2344        if self.deferred:
2345            self.deferred.errback(ValueError(why))
2346            self.deferred = None
2347
2348    @defer.inlineCallbacks
2349    def getConnection(self, peek=False):
2350        if not self.continueTrying and not self.size:
2351            raise ConnectionError("Not connected")
2352
2353        while True:
2354            if peek:
2355                conn = yield self.connectionQueue.peek()
2356            else:
2357                conn = yield self.connectionQueue.get()
2358            if conn.connected == 0:
2359                log.msg('Discarding dead connection.')
2360                if peek:
2361                    self.connectionQueue.remove(conn)
2362            else:
2363                defer.returnValue(conn)
2364
2365
2366class SubscriberFactory(RedisFactory):
2367    protocol = SubscriberProtocol
2368
2369    def __init__(self, isLazy=False, handler=ConnectionHandler):
2370        RedisFactory.__init__(self, None, None, 1, isLazy=isLazy,
2371                              handler=handler)
2372
2373
2374class MonitorFactory(RedisFactory):
2375    protocol = MonitorProtocol
2376
2377    def __init__(self, isLazy=False, handler=ConnectionHandler):
2378        RedisFactory.__init__(self, None, None, 1, isLazy=isLazy,
2379                              handler=handler)
2380
2381
2382def makeConnection(host, port, dbid, poolsize, reconnect, isLazy,
2383                   charset, password, connectTimeout, replyTimeout,
2384                   convertNumbers):
2385    uuid = "%s:%d" % (host, port)
2386    factory = RedisFactory(uuid, dbid, poolsize, isLazy, ConnectionHandler,
2387                           charset, password, replyTimeout, convertNumbers)
2388    factory.continueTrying = reconnect
2389    for x in range(poolsize):
2390        reactor.connectTCP(host, port, factory, connectTimeout)
2391
2392    if isLazy:
2393        return factory.handler
2394    else:
2395        return factory.deferred
2396
2397
2398def makeShardedConnection(hosts, dbid, poolsize, reconnect, isLazy,
2399                          charset, password, connectTimeout, replyTimeout,
2400                          convertNumbers):
2401    err = "Please use a list or tuple of host:port for sharded connections"
2402    if not isinstance(hosts, (list, tuple)):
2403        raise ValueError(err)
2404
2405    connections = []
2406    for item in hosts:
2407        try:
2408            host, port = item.split(":")
2409            port = int(port)
2410        except:
2411            raise ValueError(err)
2412
2413        c = makeConnection(host, port, dbid, poolsize, reconnect, isLazy,
2414                           charset, password, connectTimeout, replyTimeout,
2415                           convertNumbers)
2416        connections.append(c)
2417
2418    if isLazy:
2419        return ShardedConnectionHandler(connections)
2420    else:
2421        deferred = defer.DeferredList(connections)
2422        ShardedConnectionHandler(deferred)
2423        return deferred
2424
2425
2426def Connection(host="localhost", port=6379, dbid=None, reconnect=True,
2427               charset="utf-8", password=None,
2428               connectTimeout=None, replyTimeout=None, convertNumbers=True):
2429    return makeConnection(host, port, dbid, 1, reconnect, False,
2430                          charset, password, connectTimeout, replyTimeout,
2431                          convertNumbers)
2432
2433
2434def lazyConnection(host="localhost", port=6379, dbid=None, reconnect=True,
2435                   charset="utf-8", password=None,
2436                   connectTimeout=None, replyTimeout=None, convertNumbers=True):
2437    return makeConnection(host, port, dbid, 1, reconnect, True,
2438                          charset, password, connectTimeout, replyTimeout,
2439                          convertNumbers)
2440
2441
2442def ConnectionPool(host="localhost", port=6379, dbid=None,
2443                   poolsize=10, reconnect=True, charset="utf-8", password=None,
2444                   connectTimeout=None, replyTimeout=None,
2445                   convertNumbers=True):
2446    return makeConnection(host, port, dbid, poolsize, reconnect, False,
2447                          charset, password, connectTimeout, replyTimeout,
2448                          convertNumbers)
2449
2450
2451def lazyConnectionPool(host="localhost", port=6379, dbid=None,
2452                       poolsize=10, reconnect=True, charset="utf-8",
2453                       password=None, connectTimeout=None, replyTimeout=None,
2454                       convertNumbers=True):
2455    return makeConnection(host, port, dbid, poolsize, reconnect, True,
2456                          charset, password, connectTimeout, replyTimeout,
2457                          convertNumbers)
2458
2459
2460def ShardedConnection(hosts, dbid=None, reconnect=True, charset="utf-8",
2461                      password=None, connectTimeout=None, replyTimeout=None,
2462                      convertNumbers=True):
2463    return makeShardedConnection(hosts, dbid, 1, reconnect, False,
2464                                 charset, password, connectTimeout,
2465                                 replyTimeout, convertNumbers)
2466
2467
2468def lazyShardedConnection(hosts, dbid=None, reconnect=True, charset="utf-8",
2469                          password=None,
2470                          connectTimeout=None, replyTimeout=None,
2471                          convertNumbers=True):
2472    return makeShardedConnection(hosts, dbid, 1, reconnect, True,
2473                                 charset, password, connectTimeout,
2474                                 replyTimeout, convertNumbers)
2475
2476
2477def ShardedConnectionPool(hosts, dbid=None, poolsize=10, reconnect=True,
2478                          charset="utf-8", password=None,
2479                          connectTimeout=None, replyTimeout=None,
2480                          convertNumbers=True):
2481    return makeShardedConnection(hosts, dbid, poolsize, reconnect, False,
2482                                 charset, password, connectTimeout,
2483                                 replyTimeout, convertNumbers)
2484
2485
2486def lazyShardedConnectionPool(hosts, dbid=None, poolsize=10, reconnect=True,
2487                              charset="utf-8", password=None,
2488                              connectTimeout=None, replyTimeout=None,
2489                              convertNumbers=True):
2490    return makeShardedConnection(hosts, dbid, poolsize, reconnect, True,
2491                                 charset, password, connectTimeout,
2492                                 replyTimeout, convertNumbers)
2493
2494
2495def makeUnixConnection(path, dbid, poolsize, reconnect, isLazy,
2496                       charset, password, connectTimeout, replyTimeout,
2497                       convertNumbers):
2498    factory = RedisFactory(path, dbid, poolsize, isLazy, UnixConnectionHandler,
2499                           charset, password, replyTimeout, convertNumbers)
2500    factory.continueTrying = reconnect
2501    for x in range(poolsize):
2502        reactor.connectUNIX(path, factory, connectTimeout)
2503
2504    if isLazy:
2505        return factory.handler
2506    else:
2507        return factory.deferred
2508
2509
2510def makeShardedUnixConnection(paths, dbid, poolsize, reconnect, isLazy,
2511                              charset, password, connectTimeout, replyTimeout,
2512                              convertNumbers):
2513    err = "Please use a list or tuple of paths for sharded unix connections"
2514    if not isinstance(paths, (list, tuple)):
2515        raise ValueError(err)
2516
2517    connections = []
2518    for path in paths:
2519        c = makeUnixConnection(path, dbid, poolsize, reconnect, isLazy,
2520                               charset, password, connectTimeout, replyTimeout,
2521                               convertNumbers)
2522        connections.append(c)
2523
2524    if isLazy:
2525        return ShardedUnixConnectionHandler(connections)
2526    else:
2527        deferred = defer.DeferredList(connections)
2528        ShardedUnixConnectionHandler(deferred)
2529        return deferred
2530
2531
2532def UnixConnection(path="/tmp/redis.sock", dbid=None, reconnect=True,
2533                   charset="utf-8", password=None,
2534                   connectTimeout=None, replyTimeout=None, convertNumbers=True):
2535    return makeUnixConnection(path, dbid, 1, reconnect, False,
2536                              charset, password, connectTimeout, replyTimeout,
2537                              convertNumbers)
2538
2539
2540def lazyUnixConnection(path="/tmp/redis.sock", dbid=None, reconnect=True,
2541                       charset="utf-8", password=None,
2542                       connectTimeout=None, replyTimeout=None,
2543                       convertNumbers=True):
2544    return makeUnixConnection(path, dbid, 1, reconnect, True,
2545                              charset, password, connectTimeout, replyTimeout,
2546                              convertNumbers)
2547
2548
2549def UnixConnectionPool(path="/tmp/redis.sock", dbid=None, poolsize=10,
2550                       reconnect=True, charset="utf-8", password=None,
2551                       connectTimeout=None, replyTimeout=None,
2552                       convertNumbers=True):
2553    return makeUnixConnection(path, dbid, poolsize, reconnect, False,
2554                              charset, password, connectTimeout, replyTimeout,
2555                              convertNumbers)
2556
2557
2558def lazyUnixConnectionPool(path="/tmp/redis.sock", dbid=None, poolsize=10,
2559                           reconnect=True, charset="utf-8", password=None,
2560                           connectTimeout=None, replyTimeout=None,
2561                           convertNumbers=True):
2562    return makeUnixConnection(path, dbid, poolsize, reconnect, True,
2563                              charset, password, connectTimeout, replyTimeout,
2564                              convertNumbers)
2565
2566
2567def ShardedUnixConnection(paths, dbid=None, reconnect=True, charset="utf-8",
2568                          password=None, connectTimeout=None, replyTimeout=None,
2569                          convertNumbers=True):
2570    return makeShardedUnixConnection(paths, dbid, 1, reconnect, False,
2571                                     charset, password, connectTimeout,
2572                                     replyTimeout, convertNumbers)
2573
2574
2575def lazyShardedUnixConnection(paths, dbid=None, reconnect=True,
2576                              charset="utf-8", password=None,
2577                              connectTimeout=None, replyTimeout=None,
2578                              convertNumbers=True):
2579    return makeShardedUnixConnection(paths, dbid, 1, reconnect, True,
2580                                     charset, password, connectTimeout,
2581                                     replyTimeout, convertNumbers)
2582
2583
2584def ShardedUnixConnectionPool(paths, dbid=None, poolsize=10, reconnect=True,
2585                              charset="utf-8", password=None,
2586                              connectTimeout=None, replyTimeout=None,
2587                              convertNumbers=True):
2588    return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, False,
2589                                     charset, password, connectTimeout,
2590                                     replyTimeout, convertNumbers)
2591
2592
2593def lazyShardedUnixConnectionPool(paths, dbid=None, poolsize=10,
2594                                  reconnect=True, charset="utf-8",
2595                                  password=None, connectTimeout=None,
2596                                  replyTimeout=None, convertNumbers=True):
2597    return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, True,
2598                                     charset, password, connectTimeout,
2599                                     replyTimeout, convertNumbers)
2600
2601
2602class MasterNotFoundError(ConnectionError):
2603    pass
2604
2605
2606class SentinelRedisProtocol(RedisProtocol):
2607
2608    def connectionMade(self):
2609        self.factory.resetDelay()
2610
2611        def check_role(role):
2612            if self.factory.is_master and role[0] != "master":
2613                self.transport.loseConnection()
2614                return defer.succeed(None)
2615            else:
2616                self.factory.resetDelay()
2617                return RedisProtocol.connectionMade(self)
2618
2619        if self.password is not None:
2620            self.auth(self.password)
2621
2622        return self.role().addCallback(check_role)
2623
2624
2625class SentinelConnectionFactory(RedisFactory):
2626
2627    initialDelay = 0.1
2628    protocol = SentinelRedisProtocol
2629
2630    def __init__(self, sentinel_manager, service_name, is_master, *args, **kwargs):
2631        RedisFactory.__init__(self, *args, **kwargs)
2632
2633        self.sentinel_manager = sentinel_manager
2634        self.service_name = service_name
2635        self.is_master = is_master
2636
2637        self._current_master_addr = None
2638        self._slave_no = 0
2639
2640    def clientConnectionFailed(self, connector, reason):
2641        self.try_to_connect(connector)
2642
2643    def clientConnectionLost(self, connector, unused_reason):
2644        self.try_to_connect(connector, nodelay=True)
2645
2646    def try_to_connect(self, connector, force_master=False, nodelay=False):
2647        if not self.continueTrying:
2648            return
2649
2650        def on_discovery_err(failure):
2651            failure.trap(MasterNotFoundError)
2652            log.msg("txredisapi: Can't get address from Sentinel: {0}".format(failure.value))
2653            reactor.callLater(self.delay, self.try_to_connect, connector)
2654            self.resetDelay()
2655
2656        def on_master_addr(addr):
2657            if self._current_master_addr is not None and \
2658               self._current_master_addr != addr:
2659                self.resetDelay()
2660                # master has changed, dropping all alive connections
2661                for conn in self.pool:
2662                    conn.transport.loseConnection()
2663
2664            self._current_master_addr = addr
2665            connector.host, connector.port = addr
2666            if nodelay:
2667                connector.connect()
2668            else:
2669                self.retry(connector)
2670
2671        def on_slave_addrs(addrs):
2672            if addrs:
2673                connector.host, connector.port = addrs[self._slave_no % len(addrs)]
2674                self._slave_no += 1
2675                if nodelay:
2676                    connector.connect()
2677                else:
2678                    self.retry(connector)
2679            else:
2680                log.msg("txredisapi: No slaves discovered, falling back to master")
2681                self.try_to_connect(connector, force_master=True, nodelay=True)
2682
2683        if self.is_master or force_master:
2684            self.sentinel_manager.discover_master(self.service_name) \
2685                .addCallbacks(on_master_addr, on_discovery_err)
2686        else:
2687            self.sentinel_manager.discover_slaves(self.service_name) \
2688                .addCallback(on_slave_addrs)
2689
2690
2691class Sentinel(object):
2692
2693    discovery_timeout = 10
2694
2695    def __init__(self, sentinel_addresses, min_other_sentinels=0, **connection_kwargs):
2696        self.sentinels = [
2697            lazyConnection(host, port, **connection_kwargs)
2698            for host, port in sentinel_addresses
2699        ]
2700
2701        self.min_other_sentinels = min_other_sentinels
2702
2703    def disconnect(self):
2704        return defer.gatherResults([sentinel.disconnect() for sentinel in self.sentinels],
2705                                   consumeErrors = True)
2706
2707    def check_master_state(self, state):
2708        if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
2709            return False
2710
2711        if int(state["num-other-sentinels"]) < self.min_other_sentinels:
2712            return False
2713
2714        return True
2715
2716    def discover_master(self, service_name):
2717        result = defer.Deferred()
2718
2719        def on_response(response):
2720            if result.called:
2721                return
2722
2723            state = response.get(service_name)
2724            if state and self.check_master_state(state):
2725                result.callback((state["ip"], int(state["port"])))
2726                timeout_call.cancel()
2727
2728        def on_timeout():
2729            if not result.called:
2730                result.errback(MasterNotFoundError(
2731                    "No master found for {0}".format(service_name)))
2732
2733        # Ignoring errors
2734        for sentinel in self.sentinels:
2735            sentinel.sentinel_masters().addCallbacks(on_response, lambda _: None)
2736
2737        timeout_call = reactor.callLater(self.discovery_timeout, on_timeout)
2738
2739        return result
2740
2741    @staticmethod
2742    def filter_slaves(slaves):
2743        """Remove slaves that are in ODOWN or SDOWN state"""
2744        return [
2745            (slave["ip"], int(slave["port"]))
2746            for slave in slaves
2747            if not slave["is_odown"] and not slave["is_sdown"]
2748        ]
2749
2750    def discover_slaves(self, service_name):
2751        result = defer.Deferred()
2752
2753        def on_response(response):
2754            if result.called:
2755                return
2756
2757            slaves = self.filter_slaves(response)
2758            if slaves:
2759                result.callback(slaves)
2760                timeout_call.cancel()
2761
2762        def on_timeout():
2763            if not result.called:
2764                result.callback([])
2765
2766        for sentinel in self.sentinels:
2767            sentinel.sentinel_slaves(service_name).addCallbacks(on_response, lambda _: None)
2768
2769        timeout_call = reactor.callLater(self.discovery_timeout, on_timeout)
2770
2771        return result
2772
2773    @staticmethod
2774    def _connect_factory_and_return_handler(factory, poolsize):
2775        for _ in range(poolsize):
2776            # host and port will be rewritten by try_to_connect
2777            connector = Connector("0.0.0.0", None, factory, factory.maxDelay, None, reactor)
2778            factory.try_to_connect(connector, nodelay=True)
2779        return factory.handler
2780
2781    def master_for(self, service_name, factory_class=SentinelConnectionFactory,
2782                   dbid=None, poolsize=1, **connection_kwargs):
2783        factory = factory_class(sentinel_manager=self, service_name=service_name,
2784                                is_master=True, uuid=None, dbid=dbid,
2785                                poolsize=poolsize, **connection_kwargs)
2786        return self._connect_factory_and_return_handler(factory, poolsize)
2787
2788    def slave_for(self, service_name, factory_class=SentinelConnectionFactory,
2789                  dbid=None, poolsize=1, **connection_kwargs):
2790        factory = factory_class(sentinel_manager=self, service_name=service_name,
2791                                is_master=False, uuid=None, dbid=dbid,
2792                                poolsize=poolsize, **connection_kwargs)
2793        return self._connect_factory_and_return_handler(factory, poolsize)
2794
2795
2796__all__ = [
2797    Connection, lazyConnection,
2798    ConnectionPool, lazyConnectionPool,
2799    ShardedConnection, lazyShardedConnection,
2800    ShardedConnectionPool, lazyShardedConnectionPool,
2801    UnixConnection, lazyUnixConnection,
2802    UnixConnectionPool, lazyUnixConnectionPool,
2803    ShardedUnixConnection, lazyShardedUnixConnection,
2804    ShardedUnixConnectionPool, lazyShardedUnixConnectionPool,
2805    Sentinel, MasterNotFoundError
2806]
2807
2808__author__ = "Alexandre Fiori"
2809__version__ = version = "1.4.7"
2810