1# coding: utf-8 2# Copyright 2009 Alexandre Fiori 3# https://github.com/fiorix/txredisapi 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17# 18# Credits: 19# The Protocol class is an improvement of txRedis' protocol, 20# by Dorian Raymer and Ludovico Magnocavallo. 21# 22# Sharding and Consistent Hashing implementation by Gleicon Moraes. 23# 24 25import six 26 27import bisect 28import collections 29import functools 30import operator 31import re 32import warnings 33import zlib 34import string 35import hashlib 36import random 37 38from twisted.internet import defer 39from twisted.internet import protocol 40from twisted.internet import reactor 41from twisted.internet.tcp import Connector 42from twisted.protocols import basic 43from twisted.protocols import policies 44from twisted.python import log 45from twisted.python.failure import Failure 46 47try: 48 import hiredis 49except ImportError: 50 hiredis = None 51 52 53class RedisError(Exception): 54 pass 55 56 57class ConnectionError(RedisError): 58 pass 59 60 61class ResponseError(RedisError): 62 pass 63 64 65class ScriptDoesNotExist(ResponseError): 66 pass 67 68 69class NoScriptRunning(ResponseError): 70 pass 71 72 73class InvalidResponse(RedisError): 74 pass 75 76 77class InvalidData(RedisError): 78 pass 79 80 81class WatchError(RedisError): 82 pass 83 84 85class TimeoutError(ConnectionError): 86 pass 87 88 89def list_or_args(command, keys, args): 90 oldapi = bool(args) 91 try: 92 iter(keys) 93 if isinstance(keys, six.string_types) or \ 94 isinstance(keys, six.binary_type): 95 keys = [keys] 96 if not oldapi: 97 return keys 98 oldapi = True 99 except TypeError: 100 oldapi = True 101 keys = [keys] 102 103 if oldapi: 104 warnings.warn(DeprecationWarning( 105 "Passing *args to redis.%s is deprecated. " 106 "Pass an iterable to ``keys`` instead" % command)) 107 keys.extend(args) 108 return keys 109 110# Possible first characters in a string containing an integer or a float. 111_NUM_FIRST_CHARS = frozenset(string.digits + "+-.") 112 113 114class MultiBulkStorage(object): 115 def __init__(self, parent=None): 116 self.items = None 117 self.pending = None 118 self.parent = parent 119 120 def set_pending(self, pending): 121 if self.pending is None: 122 if pending < 0: 123 self.items = None 124 self.pending = 0 125 else: 126 self.items = [] 127 self.pending = pending 128 return self 129 else: 130 m = MultiBulkStorage(self) 131 m.set_pending(pending) 132 return m 133 134 def append(self, item): 135 self.pending -= 1 136 self.items.append(item) 137 138 139class LineReceiver(protocol.Protocol, basic._PauseableMixin): 140 callLater = reactor.callLater 141 line_mode = 1 142 __buffer = six.b('') 143 delimiter = six.b('\r\n') 144 MAX_LENGTH = 16384 145 146 def clearLineBuffer(self): 147 b = self.__buffer 148 self.__buffer = six.b('') 149 return b 150 151 def dataReceived(self, data, unpause=False): 152 if unpause is True: 153 if self.__buffer: 154 self.__buffer = data + self.__buffer 155 else: 156 self.__buffer += data 157 158 self.resumeProducing() 159 else: 160 self.__buffer = self.__buffer + data 161 162 while self.line_mode and not self.paused: 163 try: 164 line, self.__buffer = self.__buffer.split(self.delimiter, 1) 165 except ValueError: 166 if len(self.__buffer) > self.MAX_LENGTH: 167 line, self.__buffer = self.__buffer, six.b('') 168 return self.lineLengthExceeded(line) 169 break 170 else: 171 linelength = len(line) 172 if linelength > self.MAX_LENGTH: 173 exceeded = line + self.__buffer 174 self.__buffer = six.b('') 175 return self.lineLengthExceeded(exceeded) 176 if hasattr(line, 'decode'): 177 why = self.lineReceived(line.decode()) 178 else: 179 why = self.lineReceived(line) 180 if why or self.transport and self.transport.disconnecting: 181 return why 182 else: 183 if not self.paused: 184 data = self.__buffer 185 self.__buffer = six.b('') 186 if data: 187 return self.rawDataReceived(data) 188 189 def setLineMode(self, extra=six.b('')): 190 self.line_mode = 1 191 if extra: 192 self.pauseProducing() 193 self.callLater(0, self.dataReceived, extra, True) 194 195 def setRawMode(self): 196 self.line_mode = 0 197 198 def rawDataReceived(self, data): 199 raise NotImplementedError 200 201 def lineReceived(self, line): 202 raise NotImplementedError 203 204 def sendLine(self, line): 205 if isinstance(line, six.text_type): 206 line = line.encode() 207 return self.transport.write(line + self.delimiter) 208 209 def lineLengthExceeded(self, line): 210 return self.transport.loseConnection() 211 212 213class ReplyQueue(defer.DeferredQueue): 214 """ 215 Subclass defer.DeferredQueue to maintain consistency of 216 producers / consumers in light of defer.cancel 217 """ 218 def _cancelGet(self, d): 219 # rather than remove(d), the default twisted behavior 220 # we need to maintain an entry in the waiting list 221 # because the reply code assumes that every call 222 # to transport.write() generates a corresponding 223 # reply value in the queue. 224 # so we will just replace the cancelled deferred 225 # with a noop 226 i = self.waiting.index(d) 227 self.waiting[i] = defer.Deferred() 228 229 230def _blocking_command(release_on_callback): 231 """ 232 Decorator used for marking protocol methods as `blocking` (methods that 233 block connection from being used for sending another requests) 234 235 release_on_callback means whether connection should be automatically 236 released when deferred returned by method is fired 237 """ 238 def decorator(method): 239 method._blocking = True 240 method._release_on_callback = release_on_callback 241 return method 242 return decorator 243 244 245class BaseRedisProtocol(LineReceiver): 246 """ 247 Redis client protocol. 248 """ 249 250 def __init__(self, charset="utf-8", errors="strict", replyTimeout=None, 251 password=None, dbid=None, convertNumbers=True): 252 self.charset = charset 253 self.errors = errors 254 255 self.bulk_length = 0 256 self.bulk_buffer = bytearray() 257 258 self.post_proc = [] 259 self.multi_bulk = MultiBulkStorage() 260 261 self.replyQueue = ReplyQueue() 262 263 self.transactions = 0 264 self.pendingTransaction = False 265 self.inTransaction = False 266 self.inMulti = False 267 self.unwatch_cc = lambda: () 268 self.commit_cc = lambda: () 269 270 self.script_hashes = set() 271 272 self.pipelining = False 273 self.pipelined_commands = [] 274 self.pipelined_replies = [] 275 276 self.replyTimeout = replyTimeout 277 self.password = password 278 self.dbid = dbid 279 self.convertNumbers = convertNumbers 280 281 self._waiting_for_connect = [] 282 self._waiting_for_disconnect = [] 283 284 285 def whenConnected(self): 286 d = defer.Deferred() 287 self._waiting_for_connect.append(d) 288 return d 289 290 291 def whenDisconnected(self): 292 d = defer.Deferred() 293 self._waiting_for_disconnect.append(d) 294 return d 295 296 297 @defer.inlineCallbacks 298 def connectionMade(self): 299 if self.password is not None: 300 try: 301 response = yield self.auth(self.password) 302 if isinstance(response, ResponseError): 303 raise response 304 except Exception as e: 305 self.factory.continueTrying = False 306 self.transport.loseConnection() 307 308 msg = "Redis error: could not auth: %s" % (str(e)) 309 self.factory.connectionError(msg) 310 if self.factory.isLazy: 311 log.msg(msg) 312 defer.returnValue(None) 313 314 if self.dbid is not None: 315 try: 316 response = yield self.select(self.dbid) 317 if isinstance(response, ResponseError): 318 raise response 319 except Exception as e: 320 self.factory.continueTrying = False 321 self.transport.loseConnection() 322 323 msg = "Redis error: could not set dbid=%s: %s" % \ 324 (self.dbid, str(e)) 325 self.factory.connectionError(msg) 326 if self.factory.isLazy: 327 log.msg(msg) 328 defer.returnValue(None) 329 330 self.connected = 1 331 self._waiting_for_connect, dfrs = [], self._waiting_for_connect 332 for d in dfrs: 333 d.callback(self) 334 335 def connectionLost(self, why): 336 self.connected = 0 337 self.script_hashes.clear() 338 339 self._waiting_for_disconnect, dfrs = [], self._waiting_for_disconnect 340 for d in dfrs: 341 d.callback(self) 342 343 LineReceiver.connectionLost(self, why) 344 while self.replyQueue.waiting: 345 self.replyReceived(ConnectionError("Lost connection")) 346 347 def lineReceived(self, line): 348 """ 349 Reply types: 350 "-" error message 351 "+" single line status reply 352 ":" integer number (protocol level only?) 353 "$" bulk data 354 "*" multi-bulk data 355 """ 356 if line: 357 token, data = line[0], line[1:] 358 else: 359 return 360 361 if token == "$": # bulk data 362 try: 363 self.bulk_length = int(data) 364 except ValueError: 365 self.replyReceived(InvalidResponse("Cannot convert data " 366 "'%s' to integer" % data)) 367 else: 368 if self.bulk_length == -1: 369 self.bulk_length = 0 370 self.bulkDataReceived(None) 371 else: 372 self.bulk_length += 2 # 2 == \r\n 373 self.setRawMode() 374 375 elif token == "*": # multi-bulk data 376 try: 377 n = int(data) 378 except (TypeError, ValueError): 379 self.multi_bulk = MultiBulkStorage() 380 self.replyReceived(InvalidResponse("Cannot convert " 381 "multi-response header " 382 "'%s' to integer" % data)) 383 else: 384 self.multi_bulk = self.multi_bulk.set_pending(n) 385 if n in (0, -1): 386 self.multiBulkDataReceived() 387 388 elif token == "+": # single line status 389 if data == "QUEUED": 390 self.transactions += 1 391 self.replyReceived(data) 392 else: 393 if self.multi_bulk.pending: 394 self.handleMultiBulkElement(data) 395 else: 396 self.replyReceived(data) 397 398 elif token == "-": # error 399 reply = ResponseError(data[4:] if data[:4] == "ERR" else data) 400 if self.multi_bulk.pending: 401 self.handleMultiBulkElement(reply) 402 else: 403 self.replyReceived(reply) 404 405 elif token == ":": # integer 406 try: 407 reply = int(data) 408 except ValueError: 409 reply = InvalidResponse( 410 "Cannot convert data '%s' to integer" % data) 411 412 if self.multi_bulk.pending: 413 self.handleMultiBulkElement(reply) 414 else: 415 self.replyReceived(reply) 416 417 def rawDataReceived(self, data): 418 """ 419 Process and dispatch to bulkDataReceived. 420 """ 421 if self.bulk_length: 422 data, rest = data[:self.bulk_length], data[self.bulk_length:] 423 self.bulk_length -= len(data) 424 else: 425 rest = "" 426 427 self.bulk_buffer.extend(data) 428 if self.bulk_length == 0: 429 bulk_buffer = self.bulk_buffer[:-2] 430 self.bulk_buffer = bytearray() 431 self.bulkDataReceived(bytes(bulk_buffer)) 432 self.setLineMode(extra=rest) 433 434 def bulkDataReceived(self, data): 435 """ 436 Receipt of a bulk data element. 437 """ 438 el = None 439 if data is not None: 440 el = self.tryConvertData(data) 441 442 if self.multi_bulk.pending or self.multi_bulk.items: 443 self.handleMultiBulkElement(el) 444 else: 445 self.replyReceived(el) 446 447 def tryConvertData(self, data): 448 # The hiredis reader implicitly returns integers 449 if isinstance(data, six.integer_types): 450 return data 451 if isinstance(data, list): 452 return [self.tryConvertData(x) for x in data] 453 el = None 454 if self.convertNumbers: 455 if data: 456 num_data = data 457 try: 458 if isinstance(data, six.binary_type): 459 num_data = data.decode() 460 except UnicodeError: 461 pass 462 else: 463 if num_data[0] in _NUM_FIRST_CHARS: # Most likely a number 464 try: 465 el = int(num_data) if num_data.find('.') == -1 \ 466 else float(num_data) 467 except ValueError: 468 pass 469 470 if el is None: 471 el = data 472 if self.charset is not None: 473 try: 474 el = data.decode(self.charset) 475 except UnicodeDecodeError: 476 pass 477 except AttributeError: 478 el = data 479 return el 480 481 def handleMultiBulkElement(self, element): 482 self.multi_bulk.append(element) 483 484 if not self.multi_bulk.pending: 485 self.multiBulkDataReceived() 486 487 def multiBulkDataReceived(self): 488 """ 489 Receipt of list or set of bulk data elements. 490 """ 491 while self.multi_bulk.parent and not self.multi_bulk.pending: 492 p = self.multi_bulk.parent 493 p.append(self.multi_bulk.items) 494 self.multi_bulk = p 495 496 if not self.multi_bulk.pending: 497 reply = self.multi_bulk.items 498 self.multi_bulk = MultiBulkStorage() 499 500 reply = self.handleTransactionData(reply) 501 502 self.replyReceived(reply) 503 504 def handleTransactionData(self, reply): 505 if self.inTransaction and isinstance(reply, list): 506 # watch or multi has been called 507 if self.transactions > 0: 508 # multi: this must be an exec [commit] reply 509 self.transactions -= len(reply) 510 if self.transactions == 0: 511 self.commit_cc() 512 if not self.inTransaction: # multi: this must be an exec reply 513 tmp = [] 514 for f, v in zip(self.post_proc[1:], reply): 515 if callable(f): 516 tmp.append(f(v)) 517 else: 518 tmp.append(v) 519 reply = tmp 520 self.post_proc = [] 521 return reply 522 523 def replyReceived(self, reply): 524 """ 525 Complete reply received and ready to be pushed to the requesting 526 function. 527 """ 528 self.replyQueue.put(reply) 529 530 @staticmethod 531 def handle_reply(r): 532 if isinstance(r, Exception): 533 raise r 534 return r 535 536 def _encode_value(self, arg): 537 if isinstance(arg, six.binary_type): 538 return arg 539 elif isinstance(arg, six.text_type): 540 if self.charset is None: 541 try: 542 return arg.encode() 543 except UnicodeError: 544 pass 545 raise InvalidData("Encoding charset was not specified") 546 try: 547 return arg.encode(self.charset, self.errors) 548 except UnicodeEncodeError as e: 549 raise InvalidData( 550 "Error encoding unicode value '%s': %s" % 551 (repr(arg), e)) 552 elif isinstance(arg, float): 553 return format(arg, "f").encode() 554 elif isinstance(arg, bytearray): 555 return bytes(arg) 556 else: 557 return str(arg).format().encode() 558 559 def _build_command(self, *args, **kwargs): 560 # Build the redis command. 561 cmds = bytearray() 562 cmd_count = 0 563 for s in args: 564 cmd = self._encode_value(s) 565 cmds.extend(six.b("$")) 566 for token in self._encode_value(len(cmd)), cmd: 567 cmds.extend(token) 568 cmds.extend(six.b("\r\n")) 569 cmd_count += 1 570 571 command = bytes(six.b("").join( 572 [six.b("*"), self._encode_value(cmd_count), six.b("\r\n")]) + cmds) 573 if not isinstance(command, six.binary_type): 574 command = command.encode() 575 return command 576 577 def execute_command(self, *args, **kwargs): 578 if self.connected == 0: 579 raise ConnectionError("Not connected") 580 else: 581 command = self._build_command(*args, **kwargs) 582 # When pipelining, buffer this command into our list of 583 # pipelined commands. Otherwise, write the command immediately. 584 if self.pipelining: 585 self.pipelined_commands.append(command) 586 else: 587 self.transport.write(command) 588 589 # Return deferred that will contain the result of this command. 590 # Note: when using pipelining, this deferred will NOT return 591 # until after execute_pipeline is called. 592 593 result = defer.Deferred() 594 595 def fire_result(value): 596 if result.called: 597 return 598 result.callback(value) 599 600 response = self.replyQueue.get().addCallback(self.handle_reply) 601 response.addBoth(fire_result) 602 603 apply_timeout = kwargs.get('apply_timeout', True) 604 if self.replyTimeout and apply_timeout: 605 delayed_call = None 606 607 def fire_timeout(): 608 error_text = 'Not received Redis response in {0} seconds'.format(self.replyTimeout) 609 result.errback(TimeoutError(error_text)) 610 while self.replyQueue.waiting: 611 self.replyQueue.put(TimeoutError(error_text)) 612 self.transport.abortConnection() 613 614 def cancel_timeout(value): 615 if delayed_call.active(): 616 delayed_call.cancel() 617 return value 618 619 delayed_call = self.callLater(self.replyTimeout, fire_timeout) 620 result.addBoth(cancel_timeout) 621 622 # When pipelining, we need to keep track of the deferred replies 623 # so that we can wait for them in a DeferredList when 624 # execute_pipeline is called. 625 if self.pipelining: 626 self.pipelined_replies.append(result) 627 628 if self.inMulti: 629 self.post_proc.append(kwargs.get("post_proc")) 630 else: 631 if "post_proc" in kwargs: 632 f = kwargs["post_proc"] 633 if callable(f): 634 result.addCallback(f) 635 return result 636 637 ## 638 # REDIS COMMANDS 639 ## 640 641 # Connection handling 642 def quit(self): 643 """ 644 Close the connection 645 """ 646 self.factory.continueTrying = False 647 return self.execute_command("QUIT") 648 649 def auth(self, password): 650 """ 651 Simple password authentication if enabled 652 """ 653 return self.execute_command("AUTH", password) 654 655 def ping(self): 656 """ 657 Ping the server 658 """ 659 return self.execute_command("PING") 660 661 # Commands operating on all value types 662 def exists(self, key): 663 """ 664 Test if a key exists 665 """ 666 return self.execute_command("EXISTS", key) 667 668 def delete(self, keys, *args): 669 """ 670 Delete one or more keys 671 """ 672 keys = list_or_args("delete", keys, args) 673 return self.execute_command("DEL", *keys) 674 675 def type(self, key): 676 """ 677 Return the type of the value stored at key 678 """ 679 return self.execute_command("TYPE", key) 680 681 def keys(self, pattern="*"): 682 """ 683 Return all the keys matching a given pattern 684 """ 685 return self.execute_command("KEYS", pattern) 686 687 @staticmethod 688 def _build_scan_args(cursor, pattern, count): 689 """ 690 Construct arguments list for SCAN, SSCAN, HSCAN, ZSCAN commands 691 """ 692 args = [cursor] 693 if pattern is not None: 694 args.extend(("MATCH", pattern)) 695 if count is not None: 696 args.extend(("COUNT", count)) 697 698 return args 699 700 def scan(self, cursor=0, pattern=None, count=None): 701 """ 702 Incrementally iterate the keys in database 703 """ 704 args = self._build_scan_args(cursor, pattern, count) 705 return self.execute_command("SCAN", *args) 706 707 def randomkey(self): 708 """ 709 Return a random key from the key space 710 """ 711 return self.execute_command("RANDOMKEY") 712 713 def rename(self, oldkey, newkey): 714 """ 715 Rename the old key in the new one, 716 destroying the newname key if it already exists 717 """ 718 return self.execute_command("RENAME", oldkey, newkey) 719 720 def renamenx(self, oldkey, newkey): 721 """ 722 Rename the oldname key to newname, 723 if the newname key does not already exist 724 """ 725 return self.execute_command("RENAMENX", oldkey, newkey) 726 727 def dbsize(self): 728 """ 729 Return the number of keys in the current db 730 """ 731 return self.execute_command("DBSIZE") 732 733 def expire(self, key, time): 734 """ 735 Set a time to live in seconds on a key 736 """ 737 return self.execute_command("EXPIRE", key, time) 738 739 def persist(self, key): 740 """ 741 Remove the expire from a key 742 """ 743 return self.execute_command("PERSIST", key) 744 745 def ttl(self, key): 746 """ 747 Get the time to live in seconds of a key 748 """ 749 return self.execute_command("TTL", key) 750 751 def select(self, index): 752 """ 753 Select the DB with the specified index 754 """ 755 return self.execute_command("SELECT", index) 756 757 def move(self, key, dbindex): 758 """ 759 Move the key from the currently selected DB to the dbindex DB 760 """ 761 return self.execute_command("MOVE", key, dbindex) 762 763 def flush(self, all_dbs=False): 764 warnings.warn(DeprecationWarning( 765 "redis.flush() has been deprecated, " 766 "use redis.flushdb() or redis.flushall() instead")) 767 return all_dbs and self.flushall() or self.flushdb() 768 769 def flushdb(self): 770 """ 771 Remove all the keys from the currently selected DB 772 """ 773 return self.execute_command("FLUSHDB") 774 775 def flushall(self): 776 """ 777 Remove all the keys from all the databases 778 """ 779 return self.execute_command("FLUSHALL") 780 781 def time(self): 782 """ 783 Returns the current server time as a two items lists: a Unix timestamp 784 and the amount of microseconds already elapsed in the current second 785 """ 786 return self.execute_command("TIME") 787 788 # Commands operating on string values 789 def set(self, key, value, expire=None, pexpire=None, 790 only_if_not_exists=False, only_if_exists=False): 791 """ 792 Set a key to a string value 793 """ 794 args = [] 795 if expire is not None: 796 args.extend(("EX", expire)) 797 if pexpire is not None: 798 args.extend(("PX", pexpire)) 799 if only_if_not_exists and only_if_exists: 800 raise RedisError("only_if_not_exists and only_if_exists " 801 "cannot be true simultaneously") 802 if only_if_not_exists: 803 args.append("NX") 804 if only_if_exists: 805 args.append("XX") 806 return self.execute_command("SET", key, value, *args) 807 808 def get(self, key): 809 """ 810 Return the string value of the key 811 """ 812 return self.execute_command("GET", key) 813 814 def getbit(self, key, offset): 815 """ 816 Return the bit value at offset in the string value stored at key 817 """ 818 return self.execute_command("GETBIT", key, offset) 819 820 def getset(self, key, value): 821 """ 822 Set a key to a string returning the old value of the key 823 """ 824 return self.execute_command("GETSET", key, value) 825 826 def mget(self, keys, *args): 827 """ 828 Multi-get, return the strings values of the keys 829 """ 830 keys = list_or_args("mget", keys, args) 831 return self.execute_command("MGET", *keys) 832 833 def setbit(self, key, offset, value): 834 """ 835 Sets or clears the bit at offset in the string value stored at key 836 """ 837 if isinstance(value, bool): 838 value = int(value) 839 return self.execute_command("SETBIT", key, offset, value) 840 841 def setnx(self, key, value): 842 """ 843 Set a key to a string value if the key does not exist 844 """ 845 return self.execute_command("SETNX", key, value) 846 847 def setex(self, key, time, value): 848 """ 849 Set+Expire combo command 850 """ 851 return self.execute_command("SETEX", key, time, value) 852 853 def mset(self, mapping): 854 """ 855 Set the respective keys to the respective values. 856 """ 857 items = [] 858 for pair in six.iteritems(mapping): 859 items.extend(pair) 860 return self.execute_command("MSET", *items) 861 862 def msetnx(self, mapping): 863 """ 864 Set multiple keys to multiple values in a single atomic 865 operation if none of the keys already exist 866 """ 867 items = [] 868 for pair in six.iteritems(mapping): 869 items.extend(pair) 870 return self.execute_command("MSETNX", *items) 871 872 def bitop(self, operation, destkey, *srckeys): 873 """ 874 Perform a bitwise operation between multiple keys 875 and store the result in the destination key. 876 """ 877 srclen = len(srckeys) 878 if srclen == 0: 879 return defer.fail(RedisError("no ``srckeys`` specified")) 880 if isinstance(operation, six.string_types): 881 operation = operation.upper() 882 elif operation is operator.and_ or operation is operator.__and__: 883 operation = 'AND' 884 elif operation is operator.or_ or operation is operator.__or__: 885 operation = 'OR' 886 elif operation is operator.__xor__ or operation is operator.xor: 887 operation = 'XOR' 888 elif operation is operator.__not__ or operation is operator.not_: 889 operation = 'NOT' 890 if operation not in ('AND', 'OR', 'XOR', 'NOT'): 891 return defer.fail(InvalidData( 892 "Invalid operation: %s" % operation)) 893 if operation == 'NOT' and srclen > 1: 894 return defer.fail(RedisError( 895 "bitop NOT takes only one ``srckey``")) 896 return self.execute_command('BITOP', operation, destkey, *srckeys) 897 898 def bitcount(self, key, start=None, end=None): 899 if (end is None and start is not None) or \ 900 (start is None and end is not None): 901 raise RedisError("``start`` and ``end`` must both be specified") 902 if start is not None: 903 t = (start, end) 904 else: 905 t = () 906 return self.execute_command("BITCOUNT", key, *t) 907 908 def incr(self, key, amount=1): 909 """ 910 Increment the integer value of key 911 """ 912 return self.execute_command("INCRBY", key, amount) 913 914 def incrby(self, key, amount): 915 """ 916 Increment the integer value of key by integer 917 """ 918 return self.incr(key, amount) 919 920 def decr(self, key, amount=1): 921 """ 922 Decrement the integer value of key 923 """ 924 return self.execute_command("DECRBY", key, amount) 925 926 def decrby(self, key, amount): 927 """ 928 Decrement the integer value of key by integer 929 """ 930 return self.decr(key, amount) 931 932 def append(self, key, value): 933 """ 934 Append the specified string to the string stored at key 935 """ 936 return self.execute_command("APPEND", key, value) 937 938 def substr(self, key, start, end=-1): 939 """ 940 Return a substring of a larger string 941 """ 942 return self.execute_command("SUBSTR", key, start, end) 943 944 # Commands operating on lists 945 def push(self, key, value, tail=False): 946 warnings.warn(DeprecationWarning( 947 "redis.push() has been deprecated, " 948 "use redis.lpush() or redis.rpush() instead")) 949 950 return tail and self.rpush(key, value) or self.lpush(key, value) 951 952 def rpush(self, key, value): 953 """ 954 Append an element to the tail of the List value at key 955 """ 956 if isinstance(value, tuple) or isinstance(value, list): 957 return self.execute_command("RPUSH", key, *value) 958 else: 959 return self.execute_command("RPUSH", key, value) 960 961 def lpush(self, key, value): 962 """ 963 Append an element to the head of the List value at key 964 """ 965 if isinstance(value, tuple) or isinstance(value, list): 966 return self.execute_command("LPUSH", key, *value) 967 else: 968 return self.execute_command("LPUSH", key, value) 969 970 def llen(self, key): 971 """ 972 Return the length of the List value at key 973 """ 974 return self.execute_command("LLEN", key) 975 976 def lrange(self, key, start, end): 977 """ 978 Return a range of elements from the List at key 979 """ 980 return self.execute_command("LRANGE", key, start, end) 981 982 def ltrim(self, key, start, end): 983 """ 984 Trim the list at key to the specified range of elements 985 """ 986 return self.execute_command("LTRIM", key, start, end) 987 988 def lindex(self, key, index): 989 """ 990 Return the element at index position from the List at key 991 """ 992 return self.execute_command("LINDEX", key, index) 993 994 def lset(self, key, index, value): 995 """ 996 Set a new value as the element at index position of the List at key 997 """ 998 return self.execute_command("LSET", key, index, value) 999 1000 def lrem(self, key, count, value): 1001 """ 1002 Remove the first-N, last-N, or all the elements matching value 1003 from the List at key 1004 """ 1005 return self.execute_command("LREM", key, count, value) 1006 1007 def pop(self, key, tail=False): 1008 warnings.warn(DeprecationWarning( 1009 "redis.pop() has been deprecated, " 1010 "user redis.lpop() or redis.rpop() instead")) 1011 1012 return tail and self.rpop(key) or self.lpop(key) 1013 1014 def lpop(self, key): 1015 """ 1016 Return and remove (atomically) the first element of the List at key 1017 """ 1018 return self.execute_command("LPOP", key) 1019 1020 def rpop(self, key): 1021 """ 1022 Return and remove (atomically) the last element of the List at key 1023 """ 1024 return self.execute_command("RPOP", key) 1025 1026 @_blocking_command(release_on_callback=True) 1027 def blpop(self, keys, timeout=0): 1028 """ 1029 Blocking LPOP 1030 """ 1031 if isinstance(keys, six.string_types): 1032 keys = [keys] 1033 else: 1034 keys = list(keys) 1035 1036 keys.append(timeout) 1037 return self.execute_command("BLPOP", *keys, apply_timeout=False) 1038 1039 @_blocking_command(release_on_callback=True) 1040 def brpop(self, keys, timeout=0): 1041 """ 1042 Blocking RPOP 1043 """ 1044 if isinstance(keys, six.string_types): 1045 keys = [keys] 1046 else: 1047 keys = list(keys) 1048 1049 keys.append(timeout) 1050 return self.execute_command("BRPOP", *keys, apply_timeout=False) 1051 1052 @_blocking_command(release_on_callback=True) 1053 def brpoplpush(self, source, destination, timeout=0): 1054 """ 1055 Pop a value from a list, push it to another list and return 1056 it; or block until one is available. 1057 """ 1058 return self.execute_command("BRPOPLPUSH", source, destination, timeout, apply_timeout=False) 1059 1060 def rpoplpush(self, srckey, dstkey): 1061 """ 1062 Return and remove (atomically) the last element of the source 1063 List stored at srckey and push the same element to the 1064 destination List stored at dstkey 1065 """ 1066 return self.execute_command("RPOPLPUSH", srckey, dstkey) 1067 1068 def _make_set(self, result): 1069 if isinstance(result, list): 1070 return set(result) 1071 return result 1072 1073 # Commands operating on sets 1074 def sadd(self, key, members, *args): 1075 """ 1076 Add the specified member to the Set value at key 1077 """ 1078 members = list_or_args("sadd", members, args) 1079 return self.execute_command("SADD", key, *members) 1080 1081 def srem(self, key, members, *args): 1082 """ 1083 Remove the specified member from the Set value at key 1084 """ 1085 members = list_or_args("srem", members, args) 1086 return self.execute_command("SREM", key, *members) 1087 1088 def spop(self, key): 1089 """ 1090 Remove and return (pop) a random element from the Set value at key 1091 """ 1092 return self.execute_command("SPOP", key) 1093 1094 def smove(self, srckey, dstkey, member): 1095 """ 1096 Move the specified member from one Set to another atomically 1097 """ 1098 return self.execute_command( 1099 "SMOVE", srckey, dstkey, member).addCallback(bool) 1100 1101 def scard(self, key): 1102 """ 1103 Return the number of elements (the cardinality) of the Set at key 1104 """ 1105 return self.execute_command("SCARD", key) 1106 1107 def sismember(self, key, value): 1108 """ 1109 Test if the specified value is a member of the Set at key 1110 """ 1111 return self.execute_command("SISMEMBER", key, value).addCallback(bool) 1112 1113 def sinter(self, keys, *args): 1114 """ 1115 Return the intersection between the Sets stored at key1, ..., keyN 1116 """ 1117 keys = list_or_args("sinter", keys, args) 1118 return self.execute_command("SINTER", *keys).addCallback( 1119 self._make_set) 1120 1121 def sinterstore(self, dstkey, keys, *args): 1122 """ 1123 Compute the intersection between the Sets stored 1124 at key1, key2, ..., keyN, and store the resulting Set at dstkey 1125 """ 1126 keys = list_or_args("sinterstore", keys, args) 1127 return self.execute_command("SINTERSTORE", dstkey, *keys) 1128 1129 def sunion(self, keys, *args): 1130 """ 1131 Return the union between the Sets stored at key1, key2, ..., keyN 1132 """ 1133 keys = list_or_args("sunion", keys, args) 1134 return self.execute_command("SUNION", *keys).addCallback( 1135 self._make_set) 1136 1137 def sunionstore(self, dstkey, keys, *args): 1138 """ 1139 Compute the union between the Sets stored 1140 at key1, key2, ..., keyN, and store the resulting Set at dstkey 1141 """ 1142 keys = list_or_args("sunionstore", keys, args) 1143 return self.execute_command("SUNIONSTORE", dstkey, *keys) 1144 1145 def sdiff(self, keys, *args): 1146 """ 1147 Return the difference between the Set stored at key1 and 1148 all the Sets key2, ..., keyN 1149 """ 1150 keys = list_or_args("sdiff", keys, args) 1151 return self.execute_command("SDIFF", *keys).addCallback( 1152 self._make_set) 1153 1154 def sdiffstore(self, dstkey, keys, *args): 1155 """ 1156 Compute the difference between the Set key1 and all the 1157 Sets key2, ..., keyN, and store the resulting Set at dstkey 1158 """ 1159 keys = list_or_args("sdiffstore", keys, args) 1160 return self.execute_command("SDIFFSTORE", dstkey, *keys) 1161 1162 def smembers(self, key): 1163 """ 1164 Return all the members of the Set value at key 1165 """ 1166 return self.execute_command("SMEMBERS", key).addCallback( 1167 self._make_set) 1168 1169 def srandmember(self, key): 1170 """ 1171 Return a random member of the Set value at key 1172 """ 1173 return self.execute_command("SRANDMEMBER", key) 1174 1175 def sscan(self, key, cursor=0, pattern=None, count=None): 1176 args = self._build_scan_args(cursor, pattern, count) 1177 return self.execute_command("SSCAN", key, *args) 1178 1179 # Commands operating on sorted zsets (sorted sets) 1180 def zadd(self, key, score, member, *args): 1181 """ 1182 Add the specified member to the Sorted Set value at key 1183 or update the score if it already exist 1184 """ 1185 if args: 1186 # Args should be pairs (have even number of elements) 1187 if len(args) % 2: 1188 return defer.fail(InvalidData( 1189 "Invalid number of arguments to ZADD")) 1190 else: 1191 l = [score, member] 1192 l.extend(args) 1193 args = l 1194 else: 1195 args = [score, member] 1196 return self.execute_command("ZADD", key, *args) 1197 1198 def zrem(self, key, *args): 1199 """ 1200 Remove the specified member from the Sorted Set value at key 1201 """ 1202 return self.execute_command("ZREM", key, *args) 1203 1204 def zincr(self, key, member): 1205 return self.zincrby(key, 1, member) 1206 1207 def zdecr(self, key, member): 1208 return self.zincrby(key, -1, member) 1209 1210 @staticmethod 1211 def _handle_zincrby(data): 1212 if isinstance(data, (six.binary_type, six.text_type)): 1213 return float(data) 1214 return data 1215 1216 def zincrby(self, key, increment, member): 1217 """ 1218 If the member already exists increment its score by increment, 1219 otherwise add the member setting increment as score 1220 """ 1221 return self.execute_command("ZINCRBY", 1222 key, increment, 1223 member).addCallback(self._handle_zincrby) 1224 1225 def zrank(self, key, member): 1226 """ 1227 Return the rank (or index) or member in the sorted set at key, 1228 with scores being ordered from low to high 1229 """ 1230 return self.execute_command("ZRANK", key, member) 1231 1232 def zrevrank(self, key, member): 1233 """ 1234 Return the rank (or index) or member in the sorted set at key, 1235 with scores being ordered from high to low 1236 """ 1237 return self.execute_command("ZREVRANK", key, member) 1238 1239 def _handle_withscores(self, r): 1240 if isinstance(r, list): 1241 # Return a list tuples of form (value, score) 1242 return list((x[0], float(x[1])) for x in zip(r[::2], r[1::2])) 1243 return r 1244 1245 def _zrange(self, key, start, end, withscores, reverse): 1246 if reverse: 1247 cmd = "ZREVRANGE" 1248 else: 1249 cmd = "ZRANGE" 1250 if withscores: 1251 pieces = (cmd, key, start, end, "WITHSCORES") 1252 else: 1253 pieces = (cmd, key, start, end) 1254 r = self.execute_command(*pieces) 1255 if withscores: 1256 r.addCallback(self._handle_withscores) 1257 return r 1258 1259 def zrange(self, key, start=0, end=-1, withscores=False): 1260 """ 1261 Return a range of elements from the sorted set at key 1262 """ 1263 return self._zrange(key, start, end, withscores, False) 1264 1265 def zrevrange(self, key, start=0, end=-1, withscores=False): 1266 """ 1267 Return a range of elements from the sorted set at key, 1268 exactly like ZRANGE, but the sorted set is ordered in 1269 traversed in reverse order, from the greatest to the smallest score 1270 """ 1271 return self._zrange(key, start, end, withscores, True) 1272 1273 def _zrangebyscore(self, key, min, max, withscores, offset, count, rev): 1274 if rev: 1275 cmd = "ZREVRANGEBYSCORE" 1276 else: 1277 cmd = "ZRANGEBYSCORE" 1278 if (offset is None) != (count is None): # XNOR 1279 return defer.fail(InvalidData( 1280 "Invalid count and offset arguments to %s" % cmd)) 1281 if withscores: 1282 pieces = [cmd, key, min, max, "WITHSCORES"] 1283 else: 1284 pieces = [cmd, key, min, max] 1285 if offset is not None and count is not None: 1286 pieces.extend(("LIMIT", offset, count)) 1287 r = self.execute_command(*pieces) 1288 if withscores: 1289 r.addCallback(self._handle_withscores) 1290 return r 1291 1292 def zrangebyscore(self, key, min='-inf', max='+inf', 1293 withscores=False, offset=None, count=None): 1294 """ 1295 Return all the elements with score >= min and score <= max 1296 (a range query) from the sorted set 1297 """ 1298 return self._zrangebyscore(key, min, max, withscores, offset, 1299 count, False) 1300 1301 def zrevrangebyscore(self, key, max='+inf', min='-inf', 1302 withscores=False, offset=None, count=None): 1303 """ 1304 ZRANGEBYSCORE in reverse order 1305 """ 1306 # ZREVRANGEBYSCORE takes max before min 1307 return self._zrangebyscore(key, max, min, withscores, offset, 1308 count, True) 1309 1310 def zcount(self, key, min='-inf', max='+inf'): 1311 """ 1312 Return the number of elements with score >= min and score <= max 1313 in the sorted set 1314 """ 1315 if min == '-inf' and max == '+inf': 1316 return self.zcard(key) 1317 return self.execute_command("ZCOUNT", key, min, max) 1318 1319 def zcard(self, key): 1320 """ 1321 Return the cardinality (number of elements) of the sorted set at key 1322 """ 1323 return self.execute_command("ZCARD", key) 1324 1325 @staticmethod 1326 def _handle_zscore(data): 1327 if isinstance(data, (six.binary_type, six.text_type)): 1328 return int(data) 1329 return data 1330 1331 def zscore(self, key, element): 1332 """ 1333 Return the score associated with the specified element of the sorted 1334 set at key 1335 """ 1336 return self.execute_command("ZSCORE", key, 1337 element).addCallback(self._handle_zscore) 1338 1339 def zremrangebyrank(self, key, min=0, max=-1): 1340 """ 1341 Remove all the elements with rank >= min and rank <= max from 1342 the sorted set 1343 """ 1344 return self.execute_command("ZREMRANGEBYRANK", key, min, max) 1345 1346 def zremrangebyscore(self, key, min='-inf', max='+inf'): 1347 """ 1348 Remove all the elements with score >= min and score <= max from 1349 the sorted set 1350 """ 1351 return self.execute_command("ZREMRANGEBYSCORE", key, min, max) 1352 1353 def zunionstore(self, dstkey, keys, aggregate=None): 1354 """ 1355 Perform a union over a number of sorted sets with optional 1356 weight and aggregate 1357 """ 1358 return self._zaggregate("ZUNIONSTORE", dstkey, keys, aggregate) 1359 1360 def zinterstore(self, dstkey, keys, aggregate=None): 1361 """ 1362 Perform an intersection over a number of sorted sets with optional 1363 weight and aggregate 1364 """ 1365 return self._zaggregate("ZINTERSTORE", dstkey, keys, aggregate) 1366 1367 def _zaggregate(self, command, dstkey, keys, aggregate): 1368 pieces = [command, dstkey, len(keys)] 1369 if isinstance(keys, dict): 1370 keys, weights = list(zip(*keys.items())) 1371 else: 1372 weights = None 1373 1374 pieces.extend(keys) 1375 if weights: 1376 pieces.append("WEIGHTS") 1377 pieces.extend(weights) 1378 1379 if aggregate: 1380 if aggregate is min: 1381 aggregate = 'MIN' 1382 elif aggregate is max: 1383 aggregate = 'MAX' 1384 elif aggregate is sum: 1385 aggregate = 'SUM' 1386 else: 1387 err_flag = True 1388 if isinstance(aggregate, six.string_types): 1389 aggregate_u = aggregate.upper() 1390 if aggregate_u in ('MIN', 'MAX', 'SUM'): 1391 aggregate = aggregate_u 1392 err_flag = False 1393 if err_flag: 1394 return defer.fail(InvalidData( 1395 "Invalid aggregate function: %s" % aggregate)) 1396 pieces.extend(("AGGREGATE", aggregate)) 1397 return self.execute_command(*pieces) 1398 1399 def zscan(self, key, cursor=0, pattern=None, count=None): 1400 args = self._build_scan_args(cursor, pattern, count) 1401 return self.execute_command("ZSCAN", key, *args) 1402 1403 # Commands operating on hashes 1404 def hset(self, key, field, value): 1405 """ 1406 Set the hash field to the specified value. Creates the hash if needed 1407 """ 1408 return self.execute_command("HSET", key, field, value) 1409 1410 def hsetnx(self, key, field, value): 1411 """ 1412 Set the hash field to the specified value if the field does not exist. 1413 Creates the hash if needed 1414 """ 1415 return self.execute_command("HSETNX", key, field, value) 1416 1417 def hget(self, key, field): 1418 """ 1419 Retrieve the value of the specified hash field. 1420 """ 1421 return self.execute_command("HGET", key, field) 1422 1423 def hmget(self, key, fields): 1424 """ 1425 Get the hash values associated to the specified fields. 1426 """ 1427 return self.execute_command("HMGET", key, *fields) 1428 1429 def hmset(self, key, mapping): 1430 """ 1431 Set the hash fields to their respective values. 1432 """ 1433 items = [] 1434 for pair in six.iteritems(mapping): 1435 items.extend(pair) 1436 return self.execute_command("HMSET", key, *items) 1437 1438 def hincr(self, key, field): 1439 return self.hincrby(key, field, 1) 1440 1441 def hdecr(self, key, field): 1442 return self.hincrby(key, field, -1) 1443 1444 def hincrby(self, key, field, integer): 1445 """ 1446 Increment the integer value of the hash at key on field with integer. 1447 """ 1448 return self.execute_command("HINCRBY", key, field, integer) 1449 1450 def hexists(self, key, field): 1451 """ 1452 Test for existence of a specified field in a hash 1453 """ 1454 return self.execute_command("HEXISTS", key, field).addCallback(bool) 1455 1456 def hdel(self, key, fields): 1457 """ 1458 Remove the specified field or fields from a hash 1459 """ 1460 if isinstance(fields, six.string_types): 1461 fields = [fields] 1462 else: 1463 fields = list(fields) 1464 return self.execute_command("HDEL", key, *fields) 1465 1466 def hlen(self, key): 1467 """ 1468 Return the number of items in a hash. 1469 """ 1470 return self.execute_command("HLEN", key) 1471 1472 def hkeys(self, key): 1473 """ 1474 Return all the fields in a hash. 1475 """ 1476 return self.execute_command("HKEYS", key) 1477 1478 def hvals(self, key): 1479 """ 1480 Return all the values in a hash. 1481 """ 1482 return self.execute_command("HVALS", key) 1483 1484 def hgetall(self, key): 1485 """ 1486 Return all the fields and associated values in a hash. 1487 """ 1488 f = lambda d: dict(list(zip(d[::2], d[1::2]))) 1489 return self.execute_command("HGETALL", key, post_proc=f) 1490 1491 def hscan(self, key, cursor=0, pattern=None, count=None): 1492 args = self._build_scan_args(cursor, pattern, count) 1493 return self.execute_command("HSCAN", key, *args) 1494 1495 # Sorting 1496 def sort(self, key, start=None, end=None, by=None, get=None, 1497 desc=None, alpha=False, store=None): 1498 if (start is not None and end is None) or \ 1499 (end is not None and start is None): 1500 raise RedisError("``start`` and ``end`` must both be specified") 1501 1502 pieces = [key] 1503 if by is not None: 1504 pieces.append("BY") 1505 pieces.append(by) 1506 if start is not None and end is not None: 1507 pieces.append("LIMIT") 1508 pieces.append(start) 1509 pieces.append(end) 1510 if get is not None: 1511 pieces.append("GET") 1512 pieces.append(get) 1513 if desc: 1514 pieces.append("DESC") 1515 if alpha: 1516 pieces.append("ALPHA") 1517 if store is not None: 1518 pieces.append("STORE") 1519 pieces.append(store) 1520 1521 return self.execute_command("SORT", *pieces) 1522 1523 def _clear_txstate(self): 1524 if self.inTransaction: 1525 self.pendingTransaction = False 1526 self.inTransaction = False 1527 self.inMulti = False 1528 self.factory.connectionQueue.put(self) 1529 1530 @_blocking_command(release_on_callback=False) 1531 def watch(self, keys): 1532 if not self.pendingTransaction: 1533 self.pendingTransaction = True 1534 self.inTransaction = False 1535 self.inMulti = False 1536 self.unwatch_cc = self._clear_txstate 1537 self.commit_cc = lambda: () 1538 if isinstance(keys, six.string_types): 1539 keys = [keys] 1540 d = self.execute_command("WATCH", *keys).addCallback(self._tx_started) 1541 return d 1542 1543 def unwatch(self): 1544 self.unwatch_cc() 1545 return self.execute_command("UNWATCH") 1546 1547 # Transactions 1548 # multi() will return a deferred with a "connection" object 1549 # That object must be used for further interactions within 1550 # the transaction. At the end, either exec() or discard() 1551 # must be executed. 1552 @_blocking_command(release_on_callback=False) 1553 def multi(self, keys=None): 1554 self.pendingTransaction = True 1555 self.inTransaction = False 1556 self.inMulti = True 1557 self.unwatch_cc = lambda: () 1558 self.commit_cc = self._clear_txstate 1559 if keys is not None: 1560 d = self.watch(keys) 1561 d.addCallback(lambda _: self.execute_command("MULTI")) 1562 else: 1563 d = self.execute_command("MULTI") 1564 d.addCallback(self._tx_started) 1565 return d 1566 1567 def _tx_started(self, response): 1568 if response != 'OK': 1569 raise RedisError('Invalid response: %s' % response) 1570 self.inTransaction = True 1571 return self 1572 1573 def _commit_check(self, response): 1574 if response is None: 1575 self.transactions = 0 1576 self._clear_txstate() 1577 raise WatchError("Transaction failed") 1578 else: 1579 return response 1580 1581 def commit(self): 1582 if self.inMulti is False: 1583 raise RedisError("Not in transaction") 1584 return self.execute_command("EXEC").addCallback(self._commit_check) 1585 1586 def discard(self): 1587 if self.inMulti is False: 1588 raise RedisError("Not in transaction") 1589 self.post_proc = [] 1590 self.transactions = 0 1591 self._clear_txstate() 1592 return self.execute_command("DISCARD") 1593 1594 # Returns a proxy that works just like .multi() except that commands 1595 # are simply buffered to be written all at once in a pipeline. 1596 # http://redis.io/topics/pipelining 1597 @_blocking_command(release_on_callback=False) 1598 def pipeline(self): 1599 1600 # Return a deferred that returns self (rather than simply self) to allow 1601 # ConnectionHandler to wrap this method with async connection retrieval. 1602 self.pipelining = True 1603 self.pipelined_commands = [] 1604 self.pipelined_replies = [] 1605 return defer.succeed(self) 1606 1607 @defer.inlineCallbacks 1608 def execute_pipeline(self): 1609 if not self.pipelining: 1610 err = "Not currently pipelining commands, " \ 1611 "please use pipeline() first" 1612 raise RedisError(err) 1613 1614 # Flush all the commands at once to redis. Wait for all replies 1615 # to come back using a deferred list. 1616 self.transport.write(six.b("").join(self.pipelined_commands)) 1617 1618 d = defer.DeferredList( 1619 deferredList=self.pipelined_replies, 1620 fireOnOneErrback=True, 1621 consumeErrors=True, 1622 ) 1623 1624 d.addBoth(self._clear_pipeline_state) 1625 1626 results = yield d 1627 1628 defer.returnValue([value for success, value in results]) 1629 1630 def _clear_pipeline_state(self, response): 1631 if self.pipelining: 1632 self.pipelining = False 1633 self.pipelined_commands = [] 1634 self.pipelined_replies = [] 1635 self.factory.connectionQueue.put(self) 1636 1637 return response 1638 1639 # Publish/Subscribe 1640 # see the SubscriberProtocol for subscribing to channels 1641 def publish(self, channel, message): 1642 """ 1643 Publish message to a channel 1644 """ 1645 return self.execute_command("PUBLISH", channel, message) 1646 1647 # Persistence control commands 1648 def save(self): 1649 """ 1650 Synchronously save the DB on disk 1651 """ 1652 return self.execute_command("SAVE") 1653 1654 def bgsave(self): 1655 """ 1656 Asynchronously save the DB on disk 1657 """ 1658 return self.execute_command("BGSAVE") 1659 1660 def lastsave(self): 1661 """ 1662 Return the UNIX time stamp of the last successfully saving of the 1663 dataset on disk 1664 """ 1665 return self.execute_command("LASTSAVE") 1666 1667 def shutdown(self): 1668 """ 1669 Synchronously save the DB on disk, then shutdown the server 1670 """ 1671 self.factory.continueTrying = False 1672 return self.execute_command("SHUTDOWN") 1673 1674 def bgrewriteaof(self): 1675 """ 1676 Rewrite the append only file in background when it gets too big 1677 """ 1678 return self.execute_command("BGREWRITEAOF") 1679 1680 def _process_info(self, r): 1681 if isinstance(r, six.binary_type): 1682 r = r.decode() 1683 keypairs = [x for x in r.split('\r\n') if 1684 ':' in x and not x.startswith('#')] 1685 d = {} 1686 for kv in keypairs: 1687 k, v = kv.split(':') 1688 d[k] = v 1689 return d 1690 1691 # Remote server control commands 1692 def info(self, type=None): 1693 """ 1694 Provide information and statistics about the server 1695 """ 1696 if type is None: 1697 return self.execute_command("INFO") 1698 else: 1699 r = self.execute_command("INFO", type) 1700 return r.addCallback(self._process_info) 1701 1702 # slaveof is missing 1703 1704 # Redis 2.6 scripting commands 1705 def _eval(self, script, script_hash, keys, args): 1706 n = len(keys) 1707 keys_and_args = tuple(keys) + tuple(args) 1708 r = self.execute_command("EVAL", script, n, *keys_and_args) 1709 if script_hash in self.script_hashes: 1710 return r 1711 return r.addCallback(self._eval_success, script_hash) 1712 1713 def _eval_success(self, r, script_hash): 1714 self.script_hashes.add(script_hash) 1715 return r 1716 1717 def _evalsha_failed(self, err, script, script_hash, keys, args): 1718 if err.check(ScriptDoesNotExist): 1719 return self._eval(script, script_hash, keys, args) 1720 return err 1721 1722 def eval(self, script, keys=[], args=[]): 1723 if isinstance(script, six.text_type): 1724 script = script.encode() 1725 h = hashlib.sha1(script).hexdigest() 1726 if h in self.script_hashes: 1727 return self.evalsha(h, keys, args).addErrback( 1728 self._evalsha_failed, script, h, keys, args) 1729 return self._eval(script, h, keys, args) 1730 1731 def _evalsha_errback(self, err, script_hash): 1732 if err.check(ResponseError): 1733 if err.value.args[0].startswith(u'NOSCRIPT'): 1734 if script_hash in self.script_hashes: 1735 self.script_hashes.remove(script_hash) 1736 raise ScriptDoesNotExist("No script matching hash: %s found" % 1737 script_hash) 1738 return err 1739 1740 def evalsha(self, sha1_hash, keys=[], args=[]): 1741 n = len(keys) 1742 keys_and_args = tuple(keys) + tuple(args) 1743 r = self.execute_command("EVALSHA", 1744 sha1_hash, n, 1745 *keys_and_args) 1746 r.addErrback(self._evalsha_errback, sha1_hash) 1747 if sha1_hash not in self.script_hashes: 1748 r.addCallback(self._eval_success, sha1_hash) 1749 return r 1750 1751 def _script_exists_success(self, r): 1752 l = [bool(x) for x in r] 1753 if len(l) == 1: 1754 return l[0] 1755 else: 1756 return l 1757 1758 def script_exists(self, *hashes): 1759 return self.execute_command("SCRIPT", "EXISTS", 1760 post_proc=self._script_exists_success, 1761 *hashes) 1762 1763 def _script_flush_success(self, r): 1764 self.script_hashes.clear() 1765 return r 1766 1767 def script_flush(self): 1768 return self.execute_command("SCRIPT", "FLUSH").addCallback( 1769 self._script_flush_success) 1770 1771 def _handle_script_kill(self, r): 1772 if isinstance(r, Failure): 1773 if r.check(ResponseError): 1774 if r.value.args[0].startswith(u'NOTBUSY'): 1775 raise NoScriptRunning("No script running") 1776 return r 1777 1778 def script_kill(self): 1779 return self.execute_command("SCRIPT", 1780 "KILL").addBoth(self._handle_script_kill) 1781 1782 def script_load(self, script): 1783 return self.execute_command("SCRIPT", "LOAD", script) 1784 1785 # Redis 2.8.9 HyperLogLog commands 1786 def pfadd(self, key, elements, *args): 1787 elements = list_or_args("pfadd", elements, args) 1788 return self.execute_command("PFADD", key, *elements) 1789 1790 def pfcount(self, keys, *args): 1791 keys = list_or_args("pfcount", keys, args) 1792 return self.execute_command("PFCOUNT", *keys) 1793 1794 def pfmerge(self, destKey, sourceKeys, *args): 1795 sourceKeys = list_or_args("pfmerge", sourceKeys, args) 1796 return self.execute_command("PFMERGE", destKey, *sourceKeys) 1797 1798 _SENTINEL_NODE_FLAGS = (("is_master", "master"), ("is_slave", "slave"), 1799 ("is_sdown", "s_down"), ("is_odown", "o_down"), 1800 ("is_sentinel", "sentinel"), 1801 ("is_disconnected", "disconnected"), 1802 ("is_master_down", "master_down")) 1803 1804 def _parse_sentinel_state(self, state_array): 1805 as_dict = dict( 1806 (self.tryConvertData(key), self.tryConvertData(value)) 1807 for key, value in zip(state_array[::2], state_array[1::2]) 1808 ) 1809 flags = set(as_dict['flags'].split(',')) 1810 for bool_name, flag_name in self._SENTINEL_NODE_FLAGS: 1811 as_dict[bool_name] = flag_name in flags 1812 return as_dict 1813 1814 def sentinel_masters(self): 1815 def convert(raw): 1816 result = {} 1817 for array in raw: 1818 as_dict = self._parse_sentinel_state(array) 1819 result[as_dict['name']] = as_dict 1820 return result 1821 return self.execute_command("SENTINEL", "MASTERS").addCallback(convert) 1822 1823 def sentinel_slaves(self, service_name): 1824 def convert(raw): 1825 return [ 1826 self._parse_sentinel_state(array) 1827 for array in raw 1828 ] 1829 return self.execute_command("SENTINEL", "SLAVES", service_name)\ 1830 .addCallback(convert) 1831 1832 def sentinel_get_master_addr_by_name(self, service_name): 1833 return self.execute_command("SENTINEL", "GET-MASTER-ADDR-BY-NAME", service_name) 1834 1835 def role(self): 1836 return self.execute_command("ROLE") 1837 1838 1839class HiredisProtocol(BaseRedisProtocol): 1840 def __init__(self, *args, **kwargs): 1841 BaseRedisProtocol.__init__(self, *args, **kwargs) 1842 self._reader = hiredis.Reader(protocolError=InvalidData, 1843 replyError=ResponseError) 1844 1845 def dataReceived(self, data, unpause=False): 1846 if data: 1847 self._reader.feed(data) 1848 res = self._reader.gets() 1849 while res is not False: 1850 if isinstance(res, (six.text_type, six.binary_type, list)): 1851 res = self.tryConvertData(res) 1852 if res == "QUEUED": 1853 self.transactions += 1 1854 else: 1855 res = self.handleTransactionData(res) 1856 1857 self.replyReceived(res) 1858 res = self._reader.gets() 1859 1860 def _convert_bin_values(self, result): 1861 if isinstance(result, list): 1862 return [self._convert_bin_values(x) for x in result] 1863 elif isinstance(result, dict): 1864 return dict((self._convert_bin_values(k), self._convert_bin_values(v)) 1865 for k, v in six.iteritems(result)) 1866 elif isinstance(result, six.binary_type): 1867 return self.tryConvertData(result) 1868 return result 1869 1870 def commit(self): 1871 r = BaseRedisProtocol.commit(self) 1872 return r.addCallback(self._convert_bin_values) 1873 1874 def scan(self, cursor=0, pattern=None, count=None): 1875 r = BaseRedisProtocol.scan(self, cursor, pattern, count) 1876 return r.addCallback(self._convert_bin_values) 1877 1878 def sscan(self, key, cursor=0, pattern=None, count=None): 1879 r = BaseRedisProtocol.sscan(self, key, cursor, pattern, count) 1880 return r.addCallback(self._convert_bin_values) 1881 1882if hiredis is not None: 1883 RedisProtocol = HiredisProtocol 1884else: 1885 RedisProtocol = BaseRedisProtocol 1886 1887 1888class MonitorProtocol(RedisProtocol): 1889 """ 1890 monitor has the same behavior as subscribe: hold the connection until 1891 something happens. 1892 1893 take care with the performance impact: http://redis.io/commands/monitor 1894 """ 1895 1896 def messageReceived(self, message): 1897 pass 1898 1899 def replyReceived(self, reply): 1900 self.messageReceived(reply) 1901 1902 def monitor(self): 1903 return self.execute_command("MONITOR", apply_timeout=False) 1904 1905 def stop(self): 1906 self.transport.loseConnection() 1907 1908 1909class SubscriberProtocol(RedisProtocol): 1910 _sub_unsub_reponses = set([u"subscribe", u"unsubscribe", u"psubscribe", u"punsubscribe"]) 1911 1912 def messageReceived(self, pattern, channel, message): 1913 pass 1914 1915 def replyReceived(self, reply): 1916 if isinstance(reply, list): 1917 reply_len = len(reply) 1918 if reply_len >= 3 and reply[-3] == u"message": 1919 self.messageReceived(None, *reply[-2:]) 1920 elif reply_len >= 4 and reply[-4] == u"pmessage": 1921 self.messageReceived(*reply[-3:]) 1922 elif reply_len >= 3 and reply[-3] in self._sub_unsub_reponses and len(self.replyQueue.waiting) == 0: 1923 pass 1924 else: 1925 self.replyQueue.put(reply[-3:]) 1926 else: 1927 self.replyQueue.put(reply) 1928 1929 def subscribe(self, channels): 1930 if isinstance(channels, six.string_types): 1931 channels = [channels] 1932 return self.execute_command("SUBSCRIBE", *channels, apply_timeout=False) 1933 1934 def unsubscribe(self, channels): 1935 if isinstance(channels, six.string_types): 1936 channels = [channels] 1937 return self.execute_command("UNSUBSCRIBE", *channels) 1938 1939 def psubscribe(self, patterns): 1940 if isinstance(patterns, six.string_types): 1941 patterns = [patterns] 1942 return self.execute_command("PSUBSCRIBE", *patterns, apply_timeout=False) 1943 1944 def punsubscribe(self, patterns): 1945 if isinstance(patterns, six.string_types): 1946 patterns = [patterns] 1947 return self.execute_command("PUNSUBSCRIBE", *patterns) 1948 1949 1950class ConnectionHandler(object): 1951 def __init__(self, factory): 1952 self._factory = factory 1953 self._connected = factory.deferred 1954 1955 def disconnect(self): 1956 self._factory.continueTrying = 0 1957 self._factory.disconnectCalled = True 1958 for conn in self._factory.pool: 1959 try: 1960 conn.transport.loseConnection() 1961 except: 1962 pass 1963 1964 return self._factory.waitForEmptyPool() 1965 1966 def __getattr__(self, method): 1967 def wrapper(*args, **kwargs): 1968 protocol_method = getattr(self._factory.protocol, method) 1969 blocking = getattr(protocol_method, '_blocking', False) 1970 release_on_callback = getattr(protocol_method, '_release_on_callback', True) 1971 1972 d = self._factory.getConnection(peek=not blocking) 1973 1974 def callback(connection): 1975 try: 1976 d = protocol_method(connection, *args, **kwargs) 1977 except: 1978 if blocking: 1979 self._factory.connectionQueue.put(connection) 1980 raise 1981 1982 def put_back(reply): 1983 self._factory.connectionQueue.put(connection) 1984 return reply 1985 1986 if blocking and release_on_callback: 1987 d.addBoth(put_back) 1988 1989 return d 1990 d.addCallback(callback) 1991 return d 1992 return wrapper 1993 1994 def __repr__(self): 1995 try: 1996 cli = self._factory.pool[0].transport.getPeer() 1997 except: 1998 return "<Redis Connection: Not connected>" 1999 else: 2000 return "<Redis Connection: %s:%s - %d connection(s)>" % \ 2001 (cli.host, cli.port, self._factory.size) 2002 2003 2004class UnixConnectionHandler(ConnectionHandler): 2005 def __repr__(self): 2006 try: 2007 cli = self._factory.pool[0].transport.getPeer() 2008 except: 2009 return "<Redis Connection: Not connected>" 2010 else: 2011 return "<Redis Unix Connection: %s - %d connection(s)>" % \ 2012 (cli.name, self._factory.size) 2013 2014 2015ShardedMethods = frozenset([ 2016 "decr", 2017 "delete", 2018 "exists", 2019 "expire", 2020 "get", 2021 "get_type", 2022 "getset", 2023 "hdel", 2024 "hexists", 2025 "hget", 2026 "hgetall", 2027 "hincrby", 2028 "hkeys", 2029 "hlen", 2030 "hmget", 2031 "hmset", 2032 "hset", 2033 "hvals", 2034 "incr", 2035 "lindex", 2036 "llen", 2037 "lrange", 2038 "lrem", 2039 "lset", 2040 "ltrim", 2041 "pop", 2042 "publish", 2043 "push", 2044 "rename", 2045 "sadd", 2046 "set", 2047 "setex", 2048 "setnx", 2049 "sismember", 2050 "smembers", 2051 "srem", 2052 "ttl", 2053 "zadd", 2054 "zcard", 2055 "zcount", 2056 "zdecr", 2057 "zincr", 2058 "zincrby", 2059 "zrange", 2060 "zrangebyscore", 2061 "zrevrangebyscore", 2062 "zrevrank", 2063 "zrank", 2064 "zrem", 2065 "zremrangebyscore", 2066 "zremrangebyrank", 2067 "zrevrange", 2068 "zscore" 2069]) 2070 2071_findhash = re.compile(r'.+\{(.*)\}.*') 2072 2073 2074class HashRing(object): 2075 """Consistent hash for redis API""" 2076 def __init__(self, nodes=[], replicas=160): 2077 self.nodes = [] 2078 self.replicas = replicas 2079 self.ring = {} 2080 self.sorted_keys = [] 2081 2082 for n in nodes: 2083 self.add_node(n) 2084 2085 def add_node(self, node): 2086 self.nodes.append(node) 2087 for x in range(self.replicas): 2088 uuid = node._factory.uuid 2089 if isinstance(uuid, six.text_type): 2090 uuid = uuid.encode() 2091 crckey = zlib.crc32(six.b(":").join( 2092 [uuid, str(x).format().encode()])) 2093 self.ring[crckey] = node 2094 self.sorted_keys.append(crckey) 2095 2096 self.sorted_keys.sort() 2097 2098 def remove_node(self, node): 2099 self.nodes.remove(node) 2100 for x in range(self.replicas): 2101 crckey = zlib.crc32(six.b(":").join( 2102 [node, str(x).format().encode()])) 2103 self.ring.remove(crckey) 2104 self.sorted_keys.remove(crckey) 2105 2106 def get_node(self, key): 2107 n, i = self.get_node_pos(key) 2108 return n 2109 # self.get_node_pos(key)[0] 2110 2111 def get_node_pos(self, key): 2112 if len(self.ring) == 0: 2113 return [None, None] 2114 crc = zlib.crc32(key) 2115 idx = bisect.bisect(self.sorted_keys, crc) 2116 # prevents out of range index 2117 idx = min(idx, (self.replicas * len(self.nodes)) - 1) 2118 return [self.ring[self.sorted_keys[idx]], idx] 2119 2120 def iter_nodes(self, key): 2121 if len(self.ring) == 0: 2122 yield None, None 2123 node, pos = self.get_node_pos(key) 2124 for k in self.sorted_keys[pos:]: 2125 yield k, self.ring[k] 2126 2127 def __call__(self, key): 2128 return self.get_node(key) 2129 2130 2131class ShardedConnectionHandler(object): 2132 def __init__(self, connections): 2133 if isinstance(connections, defer.DeferredList): 2134 self._ring = None 2135 connections.addCallback(self._makeRing) 2136 else: 2137 self._ring = HashRing(connections) 2138 2139 def _makeRing(self, connections): 2140 connections = list(map(operator.itemgetter(1), connections)) 2141 self._ring = HashRing(connections) 2142 return self 2143 2144 @defer.inlineCallbacks 2145 def disconnect(self): 2146 if not self._ring: 2147 raise ConnectionError("Not connected") 2148 2149 for conn in self._ring.nodes: 2150 yield conn.disconnect() 2151 defer.returnValue(True) 2152 2153 def _wrap(self, method, *args, **kwargs): 2154 try: 2155 key = args[0] 2156 assert isinstance(key, six.string_types) 2157 except: 2158 raise ValueError( 2159 "Method '%s' requires a key as the first argument" % method) 2160 2161 m = _findhash.match(key) 2162 if m is not None and len(m.groups()) >= 1: 2163 node = self._ring(m.groups()[0]) 2164 else: 2165 node = self._ring(key) 2166 2167 return getattr(node, method)(*args, **kwargs) 2168 2169 def pipeline(self): 2170 raise NotImplementedError("Pipelining is not supported across shards") 2171 2172 def __getattr__(self, method): 2173 if method in ShardedMethods: 2174 return functools.partial(self._wrap, method) 2175 else: 2176 raise NotImplementedError("Method '%s' cannot be sharded" % method) 2177 2178 @defer.inlineCallbacks 2179 def mget(self, keys, *args): 2180 """ 2181 high-level mget, required because of the sharding support 2182 """ 2183 2184 keys = list_or_args("mget", keys, args) 2185 group = collections.defaultdict(lambda: []) 2186 for k in keys: 2187 node = self._ring(k) 2188 group[node].append(k) 2189 2190 deferreds = [] 2191 for node, keys in six.iteritems(group.items): 2192 nd = node.mget(keys) 2193 deferreds.append(nd) 2194 2195 result = [] 2196 response = yield defer.DeferredList(deferreds) 2197 for (success, values) in response: 2198 if success: 2199 result += values 2200 2201 defer.returnValue(result) 2202 2203 def __repr__(self): 2204 nodes = [] 2205 for conn in self._ring.nodes: 2206 try: 2207 cli = conn._factory.pool[0].transport.getPeer() 2208 except: 2209 pass 2210 else: 2211 nodes.append(six.b("%s:%s/%d") % 2212 (cli.host, cli.port, conn._factory.size)) 2213 return "<Redis Sharded Connection: %s>" % ", ".join(nodes) 2214 2215 2216class ShardedUnixConnectionHandler(ShardedConnectionHandler): 2217 def __repr__(self): 2218 nodes = [] 2219 for conn in self._ring.nodes: 2220 try: 2221 cli = conn._factory.pool[0].transport.getPeer() 2222 except: 2223 pass 2224 else: 2225 nodes.append(six.b("%s/%d") % 2226 (cli.name, conn._factory.size)) 2227 return "<Redis Sharded Connection: %s>" % ", ".join(nodes) 2228 2229 2230class PeekableQueue(defer.DeferredQueue): 2231 """ 2232 A DeferredQueue that supports peeking, accessing random item without 2233 removing them from the queue. 2234 """ 2235 def __init__(self, *args, **kwargs): 2236 defer.DeferredQueue.__init__(self, *args, **kwargs) 2237 2238 self.peekers = [] 2239 2240 def peek(self): 2241 if self.pending: 2242 return defer.succeed(random.choice(self.pending)) 2243 else: 2244 d = defer.Deferred() 2245 self.peekers.append(d) 2246 return d 2247 2248 def remove(self, item): 2249 self.pending.remove(item) 2250 2251 def put(self, obj): 2252 for d in self.peekers: 2253 d.callback(obj) 2254 self.peekers = [] 2255 2256 defer.DeferredQueue.put(self, obj) 2257 2258 2259class RedisFactory(protocol.ReconnectingClientFactory): 2260 maxDelay = 10 2261 protocol = RedisProtocol 2262 2263 noisy = False 2264 2265 def __init__(self, uuid, dbid, poolsize, isLazy=False, 2266 handler=ConnectionHandler, charset="utf-8", password=None, 2267 replyTimeout=None, convertNumbers=True): 2268 if not isinstance(poolsize, int): 2269 raise ValueError("Redis poolsize must be an integer, not %s" % 2270 repr(poolsize)) 2271 2272 if not isinstance(dbid, (int, type(None))): 2273 raise ValueError("Redis dbid must be an integer, not %s" % 2274 repr(dbid)) 2275 2276 self.uuid = uuid 2277 self.dbid = dbid 2278 self.poolsize = poolsize 2279 self.isLazy = isLazy 2280 self.charset = charset 2281 self.password = password 2282 self.replyTimeout = replyTimeout 2283 self.convertNumbers = convertNumbers 2284 2285 self.idx = 0 2286 self.size = 0 2287 self.pool = [] 2288 self.deferred = defer.Deferred() 2289 self.handler = handler(self) 2290 self.connectionQueue = PeekableQueue() 2291 self._waitingForEmptyPool = set() 2292 self.disconnectCalled = False 2293 2294 def buildProtocol(self, addr): 2295 p = self.protocol(self.charset, replyTimeout=self.replyTimeout, 2296 password=self.password, dbid=self.dbid, 2297 convertNumbers=self.convertNumbers) 2298 p.factory = self 2299 p.whenConnected().addCallback(self.addConnection) 2300 return p 2301 2302 def addConnection(self, conn): 2303 if self.disconnectCalled: 2304 conn.transport.loseConnection() 2305 return 2306 2307 conn.whenDisconnected().addCallback(self.delConnection) 2308 self.connectionQueue.put(conn) 2309 self.pool.append(conn) 2310 self.size = len(self.pool) 2311 if self.deferred: 2312 if self.size == self.poolsize: 2313 self.deferred.callback(self.handler) 2314 self.deferred = None 2315 2316 def delConnection(self, conn): 2317 try: 2318 self.pool.remove(conn) 2319 except Exception as e: 2320 log.msg("Could not remove connection from pool: %s" % str(e)) 2321 2322 self.size = len(self.pool) 2323 if not self.size and self._waitingForEmptyPool: 2324 deferreds = self._waitingForEmptyPool 2325 self._waitingForEmptyPool = set() 2326 for d in deferreds: 2327 d.callback(None) 2328 2329 def _cancelWaitForEmptyPool(self, deferred): 2330 self._waitingForEmptyPool.discard(deferred) 2331 deferred.errback(defer.CancelledError()) 2332 2333 def waitForEmptyPool(self): 2334 """ 2335 Returns a Deferred which fires when the pool size has reached 0. 2336 """ 2337 if not self.size: 2338 return defer.succeed(None) 2339 d = defer.Deferred(self._cancelWaitForEmptyPool) 2340 self._waitingForEmptyPool.add(d) 2341 return d 2342 2343 def connectionError(self, why): 2344 if self.deferred: 2345 self.deferred.errback(ValueError(why)) 2346 self.deferred = None 2347 2348 @defer.inlineCallbacks 2349 def getConnection(self, peek=False): 2350 if not self.continueTrying and not self.size: 2351 raise ConnectionError("Not connected") 2352 2353 while True: 2354 if peek: 2355 conn = yield self.connectionQueue.peek() 2356 else: 2357 conn = yield self.connectionQueue.get() 2358 if conn.connected == 0: 2359 log.msg('Discarding dead connection.') 2360 if peek: 2361 self.connectionQueue.remove(conn) 2362 else: 2363 defer.returnValue(conn) 2364 2365 2366class SubscriberFactory(RedisFactory): 2367 protocol = SubscriberProtocol 2368 2369 def __init__(self, isLazy=False, handler=ConnectionHandler): 2370 RedisFactory.__init__(self, None, None, 1, isLazy=isLazy, 2371 handler=handler) 2372 2373 2374class MonitorFactory(RedisFactory): 2375 protocol = MonitorProtocol 2376 2377 def __init__(self, isLazy=False, handler=ConnectionHandler): 2378 RedisFactory.__init__(self, None, None, 1, isLazy=isLazy, 2379 handler=handler) 2380 2381 2382def makeConnection(host, port, dbid, poolsize, reconnect, isLazy, 2383 charset, password, connectTimeout, replyTimeout, 2384 convertNumbers): 2385 uuid = "%s:%d" % (host, port) 2386 factory = RedisFactory(uuid, dbid, poolsize, isLazy, ConnectionHandler, 2387 charset, password, replyTimeout, convertNumbers) 2388 factory.continueTrying = reconnect 2389 for x in range(poolsize): 2390 reactor.connectTCP(host, port, factory, connectTimeout) 2391 2392 if isLazy: 2393 return factory.handler 2394 else: 2395 return factory.deferred 2396 2397 2398def makeShardedConnection(hosts, dbid, poolsize, reconnect, isLazy, 2399 charset, password, connectTimeout, replyTimeout, 2400 convertNumbers): 2401 err = "Please use a list or tuple of host:port for sharded connections" 2402 if not isinstance(hosts, (list, tuple)): 2403 raise ValueError(err) 2404 2405 connections = [] 2406 for item in hosts: 2407 try: 2408 host, port = item.split(":") 2409 port = int(port) 2410 except: 2411 raise ValueError(err) 2412 2413 c = makeConnection(host, port, dbid, poolsize, reconnect, isLazy, 2414 charset, password, connectTimeout, replyTimeout, 2415 convertNumbers) 2416 connections.append(c) 2417 2418 if isLazy: 2419 return ShardedConnectionHandler(connections) 2420 else: 2421 deferred = defer.DeferredList(connections) 2422 ShardedConnectionHandler(deferred) 2423 return deferred 2424 2425 2426def Connection(host="localhost", port=6379, dbid=None, reconnect=True, 2427 charset="utf-8", password=None, 2428 connectTimeout=None, replyTimeout=None, convertNumbers=True): 2429 return makeConnection(host, port, dbid, 1, reconnect, False, 2430 charset, password, connectTimeout, replyTimeout, 2431 convertNumbers) 2432 2433 2434def lazyConnection(host="localhost", port=6379, dbid=None, reconnect=True, 2435 charset="utf-8", password=None, 2436 connectTimeout=None, replyTimeout=None, convertNumbers=True): 2437 return makeConnection(host, port, dbid, 1, reconnect, True, 2438 charset, password, connectTimeout, replyTimeout, 2439 convertNumbers) 2440 2441 2442def ConnectionPool(host="localhost", port=6379, dbid=None, 2443 poolsize=10, reconnect=True, charset="utf-8", password=None, 2444 connectTimeout=None, replyTimeout=None, 2445 convertNumbers=True): 2446 return makeConnection(host, port, dbid, poolsize, reconnect, False, 2447 charset, password, connectTimeout, replyTimeout, 2448 convertNumbers) 2449 2450 2451def lazyConnectionPool(host="localhost", port=6379, dbid=None, 2452 poolsize=10, reconnect=True, charset="utf-8", 2453 password=None, connectTimeout=None, replyTimeout=None, 2454 convertNumbers=True): 2455 return makeConnection(host, port, dbid, poolsize, reconnect, True, 2456 charset, password, connectTimeout, replyTimeout, 2457 convertNumbers) 2458 2459 2460def ShardedConnection(hosts, dbid=None, reconnect=True, charset="utf-8", 2461 password=None, connectTimeout=None, replyTimeout=None, 2462 convertNumbers=True): 2463 return makeShardedConnection(hosts, dbid, 1, reconnect, False, 2464 charset, password, connectTimeout, 2465 replyTimeout, convertNumbers) 2466 2467 2468def lazyShardedConnection(hosts, dbid=None, reconnect=True, charset="utf-8", 2469 password=None, 2470 connectTimeout=None, replyTimeout=None, 2471 convertNumbers=True): 2472 return makeShardedConnection(hosts, dbid, 1, reconnect, True, 2473 charset, password, connectTimeout, 2474 replyTimeout, convertNumbers) 2475 2476 2477def ShardedConnectionPool(hosts, dbid=None, poolsize=10, reconnect=True, 2478 charset="utf-8", password=None, 2479 connectTimeout=None, replyTimeout=None, 2480 convertNumbers=True): 2481 return makeShardedConnection(hosts, dbid, poolsize, reconnect, False, 2482 charset, password, connectTimeout, 2483 replyTimeout, convertNumbers) 2484 2485 2486def lazyShardedConnectionPool(hosts, dbid=None, poolsize=10, reconnect=True, 2487 charset="utf-8", password=None, 2488 connectTimeout=None, replyTimeout=None, 2489 convertNumbers=True): 2490 return makeShardedConnection(hosts, dbid, poolsize, reconnect, True, 2491 charset, password, connectTimeout, 2492 replyTimeout, convertNumbers) 2493 2494 2495def makeUnixConnection(path, dbid, poolsize, reconnect, isLazy, 2496 charset, password, connectTimeout, replyTimeout, 2497 convertNumbers): 2498 factory = RedisFactory(path, dbid, poolsize, isLazy, UnixConnectionHandler, 2499 charset, password, replyTimeout, convertNumbers) 2500 factory.continueTrying = reconnect 2501 for x in range(poolsize): 2502 reactor.connectUNIX(path, factory, connectTimeout) 2503 2504 if isLazy: 2505 return factory.handler 2506 else: 2507 return factory.deferred 2508 2509 2510def makeShardedUnixConnection(paths, dbid, poolsize, reconnect, isLazy, 2511 charset, password, connectTimeout, replyTimeout, 2512 convertNumbers): 2513 err = "Please use a list or tuple of paths for sharded unix connections" 2514 if not isinstance(paths, (list, tuple)): 2515 raise ValueError(err) 2516 2517 connections = [] 2518 for path in paths: 2519 c = makeUnixConnection(path, dbid, poolsize, reconnect, isLazy, 2520 charset, password, connectTimeout, replyTimeout, 2521 convertNumbers) 2522 connections.append(c) 2523 2524 if isLazy: 2525 return ShardedUnixConnectionHandler(connections) 2526 else: 2527 deferred = defer.DeferredList(connections) 2528 ShardedUnixConnectionHandler(deferred) 2529 return deferred 2530 2531 2532def UnixConnection(path="/tmp/redis.sock", dbid=None, reconnect=True, 2533 charset="utf-8", password=None, 2534 connectTimeout=None, replyTimeout=None, convertNumbers=True): 2535 return makeUnixConnection(path, dbid, 1, reconnect, False, 2536 charset, password, connectTimeout, replyTimeout, 2537 convertNumbers) 2538 2539 2540def lazyUnixConnection(path="/tmp/redis.sock", dbid=None, reconnect=True, 2541 charset="utf-8", password=None, 2542 connectTimeout=None, replyTimeout=None, 2543 convertNumbers=True): 2544 return makeUnixConnection(path, dbid, 1, reconnect, True, 2545 charset, password, connectTimeout, replyTimeout, 2546 convertNumbers) 2547 2548 2549def UnixConnectionPool(path="/tmp/redis.sock", dbid=None, poolsize=10, 2550 reconnect=True, charset="utf-8", password=None, 2551 connectTimeout=None, replyTimeout=None, 2552 convertNumbers=True): 2553 return makeUnixConnection(path, dbid, poolsize, reconnect, False, 2554 charset, password, connectTimeout, replyTimeout, 2555 convertNumbers) 2556 2557 2558def lazyUnixConnectionPool(path="/tmp/redis.sock", dbid=None, poolsize=10, 2559 reconnect=True, charset="utf-8", password=None, 2560 connectTimeout=None, replyTimeout=None, 2561 convertNumbers=True): 2562 return makeUnixConnection(path, dbid, poolsize, reconnect, True, 2563 charset, password, connectTimeout, replyTimeout, 2564 convertNumbers) 2565 2566 2567def ShardedUnixConnection(paths, dbid=None, reconnect=True, charset="utf-8", 2568 password=None, connectTimeout=None, replyTimeout=None, 2569 convertNumbers=True): 2570 return makeShardedUnixConnection(paths, dbid, 1, reconnect, False, 2571 charset, password, connectTimeout, 2572 replyTimeout, convertNumbers) 2573 2574 2575def lazyShardedUnixConnection(paths, dbid=None, reconnect=True, 2576 charset="utf-8", password=None, 2577 connectTimeout=None, replyTimeout=None, 2578 convertNumbers=True): 2579 return makeShardedUnixConnection(paths, dbid, 1, reconnect, True, 2580 charset, password, connectTimeout, 2581 replyTimeout, convertNumbers) 2582 2583 2584def ShardedUnixConnectionPool(paths, dbid=None, poolsize=10, reconnect=True, 2585 charset="utf-8", password=None, 2586 connectTimeout=None, replyTimeout=None, 2587 convertNumbers=True): 2588 return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, False, 2589 charset, password, connectTimeout, 2590 replyTimeout, convertNumbers) 2591 2592 2593def lazyShardedUnixConnectionPool(paths, dbid=None, poolsize=10, 2594 reconnect=True, charset="utf-8", 2595 password=None, connectTimeout=None, 2596 replyTimeout=None, convertNumbers=True): 2597 return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, True, 2598 charset, password, connectTimeout, 2599 replyTimeout, convertNumbers) 2600 2601 2602class MasterNotFoundError(ConnectionError): 2603 pass 2604 2605 2606class SentinelRedisProtocol(RedisProtocol): 2607 2608 def connectionMade(self): 2609 self.factory.resetDelay() 2610 2611 def check_role(role): 2612 if self.factory.is_master and role[0] != "master": 2613 self.transport.loseConnection() 2614 return defer.succeed(None) 2615 else: 2616 self.factory.resetDelay() 2617 return RedisProtocol.connectionMade(self) 2618 2619 if self.password is not None: 2620 self.auth(self.password) 2621 2622 return self.role().addCallback(check_role) 2623 2624 2625class SentinelConnectionFactory(RedisFactory): 2626 2627 initialDelay = 0.1 2628 protocol = SentinelRedisProtocol 2629 2630 def __init__(self, sentinel_manager, service_name, is_master, *args, **kwargs): 2631 RedisFactory.__init__(self, *args, **kwargs) 2632 2633 self.sentinel_manager = sentinel_manager 2634 self.service_name = service_name 2635 self.is_master = is_master 2636 2637 self._current_master_addr = None 2638 self._slave_no = 0 2639 2640 def clientConnectionFailed(self, connector, reason): 2641 self.try_to_connect(connector) 2642 2643 def clientConnectionLost(self, connector, unused_reason): 2644 self.try_to_connect(connector, nodelay=True) 2645 2646 def try_to_connect(self, connector, force_master=False, nodelay=False): 2647 if not self.continueTrying: 2648 return 2649 2650 def on_discovery_err(failure): 2651 failure.trap(MasterNotFoundError) 2652 log.msg("txredisapi: Can't get address from Sentinel: {0}".format(failure.value)) 2653 reactor.callLater(self.delay, self.try_to_connect, connector) 2654 self.resetDelay() 2655 2656 def on_master_addr(addr): 2657 if self._current_master_addr is not None and \ 2658 self._current_master_addr != addr: 2659 self.resetDelay() 2660 # master has changed, dropping all alive connections 2661 for conn in self.pool: 2662 conn.transport.loseConnection() 2663 2664 self._current_master_addr = addr 2665 connector.host, connector.port = addr 2666 if nodelay: 2667 connector.connect() 2668 else: 2669 self.retry(connector) 2670 2671 def on_slave_addrs(addrs): 2672 if addrs: 2673 connector.host, connector.port = addrs[self._slave_no % len(addrs)] 2674 self._slave_no += 1 2675 if nodelay: 2676 connector.connect() 2677 else: 2678 self.retry(connector) 2679 else: 2680 log.msg("txredisapi: No slaves discovered, falling back to master") 2681 self.try_to_connect(connector, force_master=True, nodelay=True) 2682 2683 if self.is_master or force_master: 2684 self.sentinel_manager.discover_master(self.service_name) \ 2685 .addCallbacks(on_master_addr, on_discovery_err) 2686 else: 2687 self.sentinel_manager.discover_slaves(self.service_name) \ 2688 .addCallback(on_slave_addrs) 2689 2690 2691class Sentinel(object): 2692 2693 discovery_timeout = 10 2694 2695 def __init__(self, sentinel_addresses, min_other_sentinels=0, **connection_kwargs): 2696 self.sentinels = [ 2697 lazyConnection(host, port, **connection_kwargs) 2698 for host, port in sentinel_addresses 2699 ] 2700 2701 self.min_other_sentinels = min_other_sentinels 2702 2703 def disconnect(self): 2704 return defer.gatherResults([sentinel.disconnect() for sentinel in self.sentinels], 2705 consumeErrors = True) 2706 2707 def check_master_state(self, state): 2708 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 2709 return False 2710 2711 if int(state["num-other-sentinels"]) < self.min_other_sentinels: 2712 return False 2713 2714 return True 2715 2716 def discover_master(self, service_name): 2717 result = defer.Deferred() 2718 2719 def on_response(response): 2720 if result.called: 2721 return 2722 2723 state = response.get(service_name) 2724 if state and self.check_master_state(state): 2725 result.callback((state["ip"], int(state["port"]))) 2726 timeout_call.cancel() 2727 2728 def on_timeout(): 2729 if not result.called: 2730 result.errback(MasterNotFoundError( 2731 "No master found for {0}".format(service_name))) 2732 2733 # Ignoring errors 2734 for sentinel in self.sentinels: 2735 sentinel.sentinel_masters().addCallbacks(on_response, lambda _: None) 2736 2737 timeout_call = reactor.callLater(self.discovery_timeout, on_timeout) 2738 2739 return result 2740 2741 @staticmethod 2742 def filter_slaves(slaves): 2743 """Remove slaves that are in ODOWN or SDOWN state""" 2744 return [ 2745 (slave["ip"], int(slave["port"])) 2746 for slave in slaves 2747 if not slave["is_odown"] and not slave["is_sdown"] 2748 ] 2749 2750 def discover_slaves(self, service_name): 2751 result = defer.Deferred() 2752 2753 def on_response(response): 2754 if result.called: 2755 return 2756 2757 slaves = self.filter_slaves(response) 2758 if slaves: 2759 result.callback(slaves) 2760 timeout_call.cancel() 2761 2762 def on_timeout(): 2763 if not result.called: 2764 result.callback([]) 2765 2766 for sentinel in self.sentinels: 2767 sentinel.sentinel_slaves(service_name).addCallbacks(on_response, lambda _: None) 2768 2769 timeout_call = reactor.callLater(self.discovery_timeout, on_timeout) 2770 2771 return result 2772 2773 @staticmethod 2774 def _connect_factory_and_return_handler(factory, poolsize): 2775 for _ in range(poolsize): 2776 # host and port will be rewritten by try_to_connect 2777 connector = Connector("0.0.0.0", None, factory, factory.maxDelay, None, reactor) 2778 factory.try_to_connect(connector, nodelay=True) 2779 return factory.handler 2780 2781 def master_for(self, service_name, factory_class=SentinelConnectionFactory, 2782 dbid=None, poolsize=1, **connection_kwargs): 2783 factory = factory_class(sentinel_manager=self, service_name=service_name, 2784 is_master=True, uuid=None, dbid=dbid, 2785 poolsize=poolsize, **connection_kwargs) 2786 return self._connect_factory_and_return_handler(factory, poolsize) 2787 2788 def slave_for(self, service_name, factory_class=SentinelConnectionFactory, 2789 dbid=None, poolsize=1, **connection_kwargs): 2790 factory = factory_class(sentinel_manager=self, service_name=service_name, 2791 is_master=False, uuid=None, dbid=dbid, 2792 poolsize=poolsize, **connection_kwargs) 2793 return self._connect_factory_and_return_handler(factory, poolsize) 2794 2795 2796__all__ = [ 2797 Connection, lazyConnection, 2798 ConnectionPool, lazyConnectionPool, 2799 ShardedConnection, lazyShardedConnection, 2800 ShardedConnectionPool, lazyShardedConnectionPool, 2801 UnixConnection, lazyUnixConnection, 2802 UnixConnectionPool, lazyUnixConnectionPool, 2803 ShardedUnixConnection, lazyShardedUnixConnection, 2804 ShardedUnixConnectionPool, lazyShardedUnixConnectionPool, 2805 Sentinel, MasterNotFoundError 2806] 2807 2808__author__ = "Alexandre Fiori" 2809__version__ = version = "1.4.7" 2810