1from __future__ import with_statement 2from itertools import chain 3import datetime 4import sys 5import warnings 6import time 7import threading 8import time as mod_time 9import hashlib 10from redis._compat import (b, basestring, bytes, imap, iteritems, iterkeys, 11 itervalues, izip, long, nativestr, unicode, 12 safe_unicode) 13from redis.connection import (ConnectionPool, UnixDomainSocketConnection, 14 SSLConnection, Token) 15from redis.lock import Lock, LuaLock 16from redis.exceptions import ( 17 ConnectionError, 18 DataError, 19 ExecAbortError, 20 NoScriptError, 21 PubSubError, 22 RedisError, 23 ResponseError, 24 TimeoutError, 25 WatchError, 26) 27 28SYM_EMPTY = b('') 29 30 31def list_or_args(keys, args): 32 # returns a single list combining keys and args 33 try: 34 iter(keys) 35 # a string or bytes instance can be iterated, but indicates 36 # keys wasn't passed as a list 37 if isinstance(keys, (basestring, bytes)): 38 keys = [keys] 39 except TypeError: 40 keys = [keys] 41 if args: 42 keys.extend(args) 43 return keys 44 45 46def timestamp_to_datetime(response): 47 "Converts a unix timestamp to a Python datetime object" 48 if not response: 49 return None 50 try: 51 response = int(response) 52 except ValueError: 53 return None 54 return datetime.datetime.fromtimestamp(response) 55 56 57def string_keys_to_dict(key_string, callback): 58 return dict.fromkeys(key_string.split(), callback) 59 60 61def dict_merge(*dicts): 62 merged = {} 63 for d in dicts: 64 merged.update(d) 65 return merged 66 67 68def parse_debug_object(response): 69 "Parse the results of Redis's DEBUG OBJECT command into a Python dict" 70 # The 'type' of the object is the first item in the response, but isn't 71 # prefixed with a name 72 response = nativestr(response) 73 response = 'type:' + response 74 response = dict([kv.split(':') for kv in response.split()]) 75 76 # parse some expected int values from the string response 77 # note: this cmd isn't spec'd so these may not appear in all redis versions 78 int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle') 79 for field in int_fields: 80 if field in response: 81 response[field] = int(response[field]) 82 83 return response 84 85 86def parse_object(response, infotype): 87 "Parse the results of an OBJECT command" 88 if infotype in ('idletime', 'refcount'): 89 return int_or_none(response) 90 return response 91 92 93def parse_info(response): 94 "Parse the result of Redis's INFO command into a Python dict" 95 info = {} 96 response = nativestr(response) 97 98 def get_value(value): 99 if ',' not in value or '=' not in value: 100 try: 101 if '.' in value: 102 return float(value) 103 else: 104 return int(value) 105 except ValueError: 106 return value 107 else: 108 sub_dict = {} 109 for item in value.split(','): 110 k, v = item.rsplit('=', 1) 111 sub_dict[k] = get_value(v) 112 return sub_dict 113 114 for line in response.splitlines(): 115 if line and not line.startswith('#'): 116 if line.find(':') != -1: 117 key, value = line.split(':', 1) 118 info[key] = get_value(value) 119 else: 120 # if the line isn't splittable, append it to the "__raw__" key 121 info.setdefault('__raw__', []).append(line) 122 123 return info 124 125 126SENTINEL_STATE_TYPES = { 127 'can-failover-its-master': int, 128 'config-epoch': int, 129 'down-after-milliseconds': int, 130 'failover-timeout': int, 131 'info-refresh': int, 132 'last-hello-message': int, 133 'last-ok-ping-reply': int, 134 'last-ping-reply': int, 135 'last-ping-sent': int, 136 'master-link-down-time': int, 137 'master-port': int, 138 'num-other-sentinels': int, 139 'num-slaves': int, 140 'o-down-time': int, 141 'pending-commands': int, 142 'parallel-syncs': int, 143 'port': int, 144 'quorum': int, 145 'role-reported-time': int, 146 's-down-time': int, 147 'slave-priority': int, 148 'slave-repl-offset': int, 149 'voted-leader-epoch': int 150} 151 152 153def parse_sentinel_state(item): 154 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES) 155 flags = set(result['flags'].split(',')) 156 for name, flag in (('is_master', 'master'), ('is_slave', 'slave'), 157 ('is_sdown', 's_down'), ('is_odown', 'o_down'), 158 ('is_sentinel', 'sentinel'), 159 ('is_disconnected', 'disconnected'), 160 ('is_master_down', 'master_down')): 161 result[name] = flag in flags 162 return result 163 164 165def parse_sentinel_master(response): 166 return parse_sentinel_state(imap(nativestr, response)) 167 168 169def parse_sentinel_masters(response): 170 result = {} 171 for item in response: 172 state = parse_sentinel_state(imap(nativestr, item)) 173 result[state['name']] = state 174 return result 175 176 177def parse_sentinel_slaves_and_sentinels(response): 178 return [parse_sentinel_state(imap(nativestr, item)) for item in response] 179 180 181def parse_sentinel_get_master(response): 182 return response and (response[0], int(response[1])) or None 183 184 185def pairs_to_dict(response): 186 "Create a dict given a list of key/value pairs" 187 it = iter(response) 188 return dict(izip(it, it)) 189 190 191def pairs_to_dict_typed(response, type_info): 192 it = iter(response) 193 result = {} 194 for key, value in izip(it, it): 195 if key in type_info: 196 try: 197 value = type_info[key](value) 198 except: 199 # if for some reason the value can't be coerced, just use 200 # the string value 201 pass 202 result[key] = value 203 return result 204 205 206def zset_score_pairs(response, **options): 207 """ 208 If ``withscores`` is specified in the options, return the response as 209 a list of (value, score) pairs 210 """ 211 if not response or not options.get('withscores'): 212 return response 213 score_cast_func = options.get('score_cast_func', float) 214 it = iter(response) 215 return list(izip(it, imap(score_cast_func, it))) 216 217 218def sort_return_tuples(response, **options): 219 """ 220 If ``groups`` is specified, return the response as a list of 221 n-element tuples with n being the value found in options['groups'] 222 """ 223 if not response or not options['groups']: 224 return response 225 n = options['groups'] 226 return list(izip(*[response[i::n] for i in range(n)])) 227 228 229def int_or_none(response): 230 if response is None: 231 return None 232 return int(response) 233 234 235def float_or_none(response): 236 if response is None: 237 return None 238 return float(response) 239 240 241def bool_ok(response): 242 return nativestr(response) == 'OK' 243 244 245def parse_client_list(response, **options): 246 clients = [] 247 for c in nativestr(response).splitlines(): 248 clients.append(dict([pair.split('=') for pair in c.split(' ')])) 249 return clients 250 251 252def parse_config_get(response, **options): 253 response = [nativestr(i) if i is not None else None for i in response] 254 return response and pairs_to_dict(response) or {} 255 256 257def parse_scan(response, **options): 258 cursor, r = response 259 return long(cursor), r 260 261 262def parse_hscan(response, **options): 263 cursor, r = response 264 return long(cursor), r and pairs_to_dict(r) or {} 265 266 267def parse_zscan(response, **options): 268 score_cast_func = options.get('score_cast_func', float) 269 cursor, r = response 270 it = iter(r) 271 return long(cursor), list(izip(it, imap(score_cast_func, it))) 272 273 274def parse_slowlog_get(response, **options): 275 return [{ 276 'id': item[0], 277 'start_time': int(item[1]), 278 'duration': int(item[2]), 279 'command': b(' ').join(item[3]) 280 } for item in response] 281 282 283def parse_cluster_info(response, **options): 284 return dict([line.split(':') for line in response.splitlines() if line]) 285 286 287def _parse_node_line(line): 288 line_items = line.split(' ') 289 node_id, addr, flags, master_id, ping, pong, epoch, \ 290 connected = line.split(' ')[:8] 291 slots = [sl.split('-') for sl in line_items[8:]] 292 node_dict = { 293 'node_id': node_id, 294 'flags': flags, 295 'master_id': master_id, 296 'last_ping_sent': ping, 297 'last_pong_rcvd': pong, 298 'epoch': epoch, 299 'slots': slots, 300 'connected': True if connected == 'connected' else False 301 } 302 return addr, node_dict 303 304 305def parse_cluster_nodes(response, **options): 306 raw_lines = response 307 if isinstance(response, basestring): 308 raw_lines = response.splitlines() 309 return dict([_parse_node_line(line) for line in raw_lines]) 310 311 312def parse_georadius_generic(response, **options): 313 if options['store'] or options['store_dist']: 314 # `store` and `store_diff` cant be combined 315 # with other command arguments. 316 return response 317 318 if type(response) != list: 319 response_list = [response] 320 else: 321 response_list = response 322 323 if not options['withdist'] and not options['withcoord']\ 324 and not options['withhash']: 325 # just a bunch of places 326 return [nativestr(r) for r in response_list] 327 328 cast = { 329 'withdist': float, 330 'withcoord': lambda ll: (float(ll[0]), float(ll[1])), 331 'withhash': int 332 } 333 334 # zip all output results with each casting functino to get 335 # the properly native Python value. 336 f = [nativestr] 337 f += [cast[o] for o in ['withdist', 'withhash', 'withcoord'] if options[o]] 338 return [ 339 list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list 340 ] 341 342 343def parse_pubsub_numsub(response, **options): 344 return list(zip(response[0::2], response[1::2])) 345 346 347class StrictRedis(object): 348 """ 349 Implementation of the Redis protocol. 350 351 This abstract class provides a Python interface to all Redis commands 352 and an implementation of the Redis protocol. 353 354 Connection and Pipeline derive from this, implementing how 355 the commands are sent and received to the Redis server 356 """ 357 RESPONSE_CALLBACKS = dict_merge( 358 string_keys_to_dict( 359 'AUTH EXISTS EXPIRE EXPIREAT HEXISTS HMSET MOVE MSETNX PERSIST ' 360 'PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX', 361 bool 362 ), 363 string_keys_to_dict( 364 'BITCOUNT BITPOS DECRBY DEL GETBIT HDEL HLEN HSTRLEN INCRBY ' 365 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 366 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 367 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' 368 'GEOADD', 369 int 370 ), 371 string_keys_to_dict( 372 'INCRBYFLOAT HINCRBYFLOAT GEODIST', 373 float 374 ), 375 string_keys_to_dict( 376 # these return OK, or int if redis-server is >=1.3.4 377 'LPUSH RPUSH', 378 lambda r: isinstance(r, (long, int)) and r or nativestr(r) == 'OK' 379 ), 380 string_keys_to_dict('SORT', sort_return_tuples), 381 string_keys_to_dict('ZSCORE ZINCRBY', float_or_none), 382 string_keys_to_dict( 383 'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE RENAME ' 384 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', 385 bool_ok 386 ), 387 string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None), 388 string_keys_to_dict( 389 'SDIFF SINTER SMEMBERS SUNION', 390 lambda r: r and set(r) or set() 391 ), 392 string_keys_to_dict( 393 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', 394 zset_score_pairs 395 ), 396 string_keys_to_dict('ZRANK ZREVRANK', int_or_none), 397 string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), 398 { 399 'CLIENT GETNAME': lambda r: r and nativestr(r), 400 'CLIENT KILL': bool_ok, 401 'CLIENT LIST': parse_client_list, 402 'CLIENT SETNAME': bool_ok, 403 'CONFIG GET': parse_config_get, 404 'CONFIG RESETSTAT': bool_ok, 405 'CONFIG SET': bool_ok, 406 'DEBUG OBJECT': parse_debug_object, 407 'HGETALL': lambda r: r and pairs_to_dict(r) or {}, 408 'HSCAN': parse_hscan, 409 'INFO': parse_info, 410 'LASTSAVE': timestamp_to_datetime, 411 'OBJECT': parse_object, 412 'PING': lambda r: nativestr(r) == 'PONG', 413 'RANDOMKEY': lambda r: r and r or None, 414 'SCAN': parse_scan, 415 'SCRIPT EXISTS': lambda r: list(imap(bool, r)), 416 'SCRIPT FLUSH': bool_ok, 417 'SCRIPT KILL': bool_ok, 418 'SCRIPT LOAD': nativestr, 419 'SENTINEL GET-MASTER-ADDR-BY-NAME': parse_sentinel_get_master, 420 'SENTINEL MASTER': parse_sentinel_master, 421 'SENTINEL MASTERS': parse_sentinel_masters, 422 'SENTINEL MONITOR': bool_ok, 423 'SENTINEL REMOVE': bool_ok, 424 'SENTINEL SENTINELS': parse_sentinel_slaves_and_sentinels, 425 'SENTINEL SET': bool_ok, 426 'SENTINEL SLAVES': parse_sentinel_slaves_and_sentinels, 427 'SET': lambda r: r and nativestr(r) == 'OK', 428 'SLOWLOG GET': parse_slowlog_get, 429 'SLOWLOG LEN': int, 430 'SLOWLOG RESET': bool_ok, 431 'SSCAN': parse_scan, 432 'TIME': lambda x: (int(x[0]), int(x[1])), 433 'ZSCAN': parse_zscan, 434 'CLUSTER ADDSLOTS': bool_ok, 435 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x), 436 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x), 437 'CLUSTER DELSLOTS': bool_ok, 438 'CLUSTER FAILOVER': bool_ok, 439 'CLUSTER FORGET': bool_ok, 440 'CLUSTER INFO': parse_cluster_info, 441 'CLUSTER KEYSLOT': lambda x: int(x), 442 'CLUSTER MEET': bool_ok, 443 'CLUSTER NODES': parse_cluster_nodes, 444 'CLUSTER REPLICATE': bool_ok, 445 'CLUSTER RESET': bool_ok, 446 'CLUSTER SAVECONFIG': bool_ok, 447 'CLUSTER SET-CONFIG-EPOCH': bool_ok, 448 'CLUSTER SETSLOT': bool_ok, 449 'CLUSTER SLAVES': parse_cluster_nodes, 450 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), 451 float(ll[1])) 452 if ll is not None else None, r)), 453 'GEOHASH': lambda r: list(map(nativestr, r)), 454 'GEORADIUS': parse_georadius_generic, 455 'GEORADIUSBYMEMBER': parse_georadius_generic, 456 'PUBSUB NUMSUB': parse_pubsub_numsub, 457 } 458 ) 459 460 @classmethod 461 def from_url(cls, url, db=None, **kwargs): 462 """ 463 Return a Redis client object configured from the given URL, which must 464 use either `the ``redis://`` scheme 465 <http://www.iana.org/assignments/uri-schemes/prov/redis>`_ for RESP 466 connections or the ``unix://`` scheme for Unix domain sockets. 467 468 For example:: 469 470 redis://[:password]@localhost:6379/0 471 unix://[:password]@/path/to/socket.sock?db=0 472 473 There are several ways to specify a database number. The parse function 474 will return the first specified option: 475 1. A ``db`` querystring option, e.g. redis://localhost?db=0 476 2. If using the redis:// scheme, the path argument of the url, e.g. 477 redis://localhost/0 478 3. The ``db`` argument to this function. 479 480 If none of these options are specified, db=0 is used. 481 482 Any additional querystring arguments and keyword arguments will be 483 passed along to the ConnectionPool class's initializer. In the case 484 of conflicting arguments, querystring arguments always win. 485 """ 486 connection_pool = ConnectionPool.from_url(url, db=db, **kwargs) 487 return cls(connection_pool=connection_pool) 488 489 def __init__(self, host='localhost', port=6379, 490 db=0, password=None, socket_timeout=None, 491 socket_connect_timeout=None, 492 socket_keepalive=None, socket_keepalive_options=None, 493 connection_pool=None, unix_socket_path=None, 494 encoding='utf-8', encoding_errors='strict', 495 charset=None, errors=None, 496 decode_responses=False, retry_on_timeout=False, 497 ssl=False, ssl_keyfile=None, ssl_certfile=None, 498 ssl_cert_reqs=None, ssl_ca_certs=None, 499 max_connections=None): 500 if not connection_pool: 501 if charset is not None: 502 warnings.warn(DeprecationWarning( 503 '"charset" is deprecated. Use "encoding" instead')) 504 encoding = charset 505 if errors is not None: 506 warnings.warn(DeprecationWarning( 507 '"errors" is deprecated. Use "encoding_errors" instead')) 508 encoding_errors = errors 509 510 kwargs = { 511 'db': db, 512 'password': password, 513 'socket_timeout': socket_timeout, 514 'encoding': encoding, 515 'encoding_errors': encoding_errors, 516 'decode_responses': decode_responses, 517 'retry_on_timeout': retry_on_timeout, 518 'max_connections': max_connections 519 } 520 # based on input, setup appropriate connection args 521 if unix_socket_path is not None: 522 kwargs.update({ 523 'path': unix_socket_path, 524 'connection_class': UnixDomainSocketConnection 525 }) 526 else: 527 # TCP specific options 528 kwargs.update({ 529 'host': host, 530 'port': port, 531 'socket_connect_timeout': socket_connect_timeout, 532 'socket_keepalive': socket_keepalive, 533 'socket_keepalive_options': socket_keepalive_options, 534 }) 535 536 if ssl: 537 kwargs.update({ 538 'connection_class': SSLConnection, 539 'ssl_keyfile': ssl_keyfile, 540 'ssl_certfile': ssl_certfile, 541 'ssl_cert_reqs': ssl_cert_reqs, 542 'ssl_ca_certs': ssl_ca_certs, 543 }) 544 connection_pool = ConnectionPool(**kwargs) 545 self.connection_pool = connection_pool 546 self._use_lua_lock = None 547 548 self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() 549 550 def __repr__(self): 551 return "%s<%s>" % (type(self).__name__, repr(self.connection_pool)) 552 553 def set_response_callback(self, command, callback): 554 "Set a custom Response Callback" 555 self.response_callbacks[command] = callback 556 557 def pipeline(self, transaction=True, shard_hint=None): 558 """ 559 Return a new pipeline object that can queue multiple commands for 560 later execution. ``transaction`` indicates whether all commands 561 should be executed atomically. Apart from making a group of operations 562 atomic, pipelines are useful for reducing the back-and-forth overhead 563 between the client and server. 564 """ 565 return StrictPipeline( 566 self.connection_pool, 567 self.response_callbacks, 568 transaction, 569 shard_hint) 570 571 def transaction(self, func, *watches, **kwargs): 572 """ 573 Convenience method for executing the callable `func` as a transaction 574 while watching all keys specified in `watches`. The 'func' callable 575 should expect a single argument which is a Pipeline object. 576 """ 577 shard_hint = kwargs.pop('shard_hint', None) 578 value_from_callable = kwargs.pop('value_from_callable', False) 579 watch_delay = kwargs.pop('watch_delay', None) 580 with self.pipeline(True, shard_hint) as pipe: 581 while 1: 582 try: 583 if watches: 584 pipe.watch(*watches) 585 func_value = func(pipe) 586 exec_value = pipe.execute() 587 return func_value if value_from_callable else exec_value 588 except WatchError: 589 if watch_delay is not None and watch_delay > 0: 590 time.sleep(watch_delay) 591 continue 592 593 def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None, 594 lock_class=None, thread_local=True): 595 """ 596 Return a new Lock object using key ``name`` that mimics 597 the behavior of threading.Lock. 598 599 If specified, ``timeout`` indicates a maximum life for the lock. 600 By default, it will remain locked until release() is called. 601 602 ``sleep`` indicates the amount of time to sleep per loop iteration 603 when the lock is in blocking mode and another client is currently 604 holding the lock. 605 606 ``blocking_timeout`` indicates the maximum amount of time in seconds to 607 spend trying to acquire the lock. A value of ``None`` indicates 608 continue trying forever. ``blocking_timeout`` can be specified as a 609 float or integer, both representing the number of seconds to wait. 610 611 ``lock_class`` forces the specified lock implementation. 612 613 ``thread_local`` indicates whether the lock token is placed in 614 thread-local storage. By default, the token is placed in thread local 615 storage so that a thread only sees its token, not a token set by 616 another thread. Consider the following timeline: 617 618 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 619 thread-1 sets the token to "abc" 620 time: 1, thread-2 blocks trying to acquire `my-lock` using the 621 Lock instance. 622 time: 5, thread-1 has not yet completed. redis expires the lock 623 key. 624 time: 5, thread-2 acquired `my-lock` now that it's available. 625 thread-2 sets the token to "xyz" 626 time: 6, thread-1 finishes its work and calls release(). if the 627 token is *not* stored in thread local storage, then 628 thread-1 would see the token value as "xyz" and would be 629 able to successfully release the thread-2's lock. 630 631 In some use cases it's necessary to disable thread local storage. For 632 example, if you have code where one thread acquires a lock and passes 633 that lock instance to a worker thread to release later. If thread 634 local storage isn't disabled in this case, the worker thread won't see 635 the token set by the thread that acquired the lock. Our assumption 636 is that these cases aren't common and as such default to using 637 thread local storage. """ 638 if lock_class is None: 639 if self._use_lua_lock is None: 640 # the first time .lock() is called, determine if we can use 641 # Lua by attempting to register the necessary scripts 642 try: 643 LuaLock.register_scripts(self) 644 self._use_lua_lock = True 645 except ResponseError: 646 self._use_lua_lock = False 647 lock_class = self._use_lua_lock and LuaLock or Lock 648 return lock_class(self, name, timeout=timeout, sleep=sleep, 649 blocking_timeout=blocking_timeout, 650 thread_local=thread_local) 651 652 def pubsub(self, **kwargs): 653 """ 654 Return a Publish/Subscribe object. With this object, you can 655 subscribe to channels and listen for messages that get published to 656 them. 657 """ 658 return PubSub(self.connection_pool, **kwargs) 659 660 # COMMAND EXECUTION AND PROTOCOL PARSING 661 def execute_command(self, *args, **options): 662 "Execute a command and return a parsed response" 663 pool = self.connection_pool 664 command_name = args[0] 665 connection = pool.get_connection(command_name, **options) 666 try: 667 connection.send_command(*args) 668 return self.parse_response(connection, command_name, **options) 669 except (ConnectionError, TimeoutError) as e: 670 connection.disconnect() 671 if not connection.retry_on_timeout and isinstance(e, TimeoutError): 672 raise 673 connection.send_command(*args) 674 return self.parse_response(connection, command_name, **options) 675 finally: 676 pool.release(connection) 677 678 def parse_response(self, connection, command_name, **options): 679 "Parses a response from the Redis server" 680 response = connection.read_response() 681 if command_name in self.response_callbacks: 682 return self.response_callbacks[command_name](response, **options) 683 return response 684 685 # SERVER INFORMATION 686 def bgrewriteaof(self): 687 "Tell the Redis server to rewrite the AOF file from data in memory." 688 return self.execute_command('BGREWRITEAOF') 689 690 def bgsave(self): 691 """ 692 Tell the Redis server to save its data to disk. Unlike save(), 693 this method is asynchronous and returns immediately. 694 """ 695 return self.execute_command('BGSAVE') 696 697 def client_kill(self, address): 698 "Disconnects the client at ``address`` (ip:port)" 699 return self.execute_command('CLIENT KILL', address) 700 701 def client_list(self): 702 "Returns a list of currently connected clients" 703 return self.execute_command('CLIENT LIST') 704 705 def client_getname(self): 706 "Returns the current connection name" 707 return self.execute_command('CLIENT GETNAME') 708 709 def client_setname(self, name): 710 "Sets the current connection name" 711 return self.execute_command('CLIENT SETNAME', name) 712 713 def config_get(self, pattern="*"): 714 "Return a dictionary of configuration based on the ``pattern``" 715 return self.execute_command('CONFIG GET', pattern) 716 717 def config_set(self, name, value): 718 "Set config item ``name`` with ``value``" 719 return self.execute_command('CONFIG SET', name, value) 720 721 def config_resetstat(self): 722 "Reset runtime statistics" 723 return self.execute_command('CONFIG RESETSTAT') 724 725 def config_rewrite(self): 726 "Rewrite config file with the minimal change to reflect running config" 727 return self.execute_command('CONFIG REWRITE') 728 729 def dbsize(self): 730 "Returns the number of keys in the current database" 731 return self.execute_command('DBSIZE') 732 733 def debug_object(self, key): 734 "Returns version specific meta information about a given key" 735 return self.execute_command('DEBUG OBJECT', key) 736 737 def echo(self, value): 738 "Echo the string back from the server" 739 return self.execute_command('ECHO', value) 740 741 def flushall(self): 742 "Delete all keys in all databases on the current host" 743 return self.execute_command('FLUSHALL') 744 745 def flushdb(self): 746 "Delete all keys in the current database" 747 return self.execute_command('FLUSHDB') 748 749 def info(self, section=None): 750 """ 751 Returns a dictionary containing information about the Redis server 752 753 The ``section`` option can be used to select a specific section 754 of information 755 756 The section option is not supported by older versions of Redis Server, 757 and will generate ResponseError 758 """ 759 if section is None: 760 return self.execute_command('INFO') 761 else: 762 return self.execute_command('INFO', section) 763 764 def lastsave(self): 765 """ 766 Return a Python datetime object representing the last time the 767 Redis database was saved to disk 768 """ 769 return self.execute_command('LASTSAVE') 770 771 def object(self, infotype, key): 772 "Return the encoding, idletime, or refcount about the key" 773 return self.execute_command('OBJECT', infotype, key, infotype=infotype) 774 775 def ping(self): 776 "Ping the Redis server" 777 return self.execute_command('PING') 778 779 def save(self): 780 """ 781 Tell the Redis server to save its data to disk, 782 blocking until the save is complete 783 """ 784 return self.execute_command('SAVE') 785 786 def sentinel(self, *args): 787 "Redis Sentinel's SENTINEL command." 788 warnings.warn( 789 DeprecationWarning('Use the individual sentinel_* methods')) 790 791 def sentinel_get_master_addr_by_name(self, service_name): 792 "Returns a (host, port) pair for the given ``service_name``" 793 return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME', 794 service_name) 795 796 def sentinel_master(self, service_name): 797 "Returns a dictionary containing the specified masters state." 798 return self.execute_command('SENTINEL MASTER', service_name) 799 800 def sentinel_masters(self): 801 "Returns a list of dictionaries containing each master's state." 802 return self.execute_command('SENTINEL MASTERS') 803 804 def sentinel_monitor(self, name, ip, port, quorum): 805 "Add a new master to Sentinel to be monitored" 806 return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum) 807 808 def sentinel_remove(self, name): 809 "Remove a master from Sentinel's monitoring" 810 return self.execute_command('SENTINEL REMOVE', name) 811 812 def sentinel_sentinels(self, service_name): 813 "Returns a list of sentinels for ``service_name``" 814 return self.execute_command('SENTINEL SENTINELS', service_name) 815 816 def sentinel_set(self, name, option, value): 817 "Set Sentinel monitoring parameters for a given master" 818 return self.execute_command('SENTINEL SET', name, option, value) 819 820 def sentinel_slaves(self, service_name): 821 "Returns a list of slaves for ``service_name``" 822 return self.execute_command('SENTINEL SLAVES', service_name) 823 824 def shutdown(self): 825 "Shutdown the server" 826 try: 827 self.execute_command('SHUTDOWN') 828 except ConnectionError: 829 # a ConnectionError here is expected 830 return 831 raise RedisError("SHUTDOWN seems to have failed.") 832 833 def slaveof(self, host=None, port=None): 834 """ 835 Set the server to be a replicated slave of the instance identified 836 by the ``host`` and ``port``. If called without arguments, the 837 instance is promoted to a master instead. 838 """ 839 if host is None and port is None: 840 return self.execute_command('SLAVEOF', Token.get_token('NO'), 841 Token.get_token('ONE')) 842 return self.execute_command('SLAVEOF', host, port) 843 844 def slowlog_get(self, num=None): 845 """ 846 Get the entries from the slowlog. If ``num`` is specified, get the 847 most recent ``num`` items. 848 """ 849 args = ['SLOWLOG GET'] 850 if num is not None: 851 args.append(num) 852 return self.execute_command(*args) 853 854 def slowlog_len(self): 855 "Get the number of items in the slowlog" 856 return self.execute_command('SLOWLOG LEN') 857 858 def slowlog_reset(self): 859 "Remove all items in the slowlog" 860 return self.execute_command('SLOWLOG RESET') 861 862 def time(self): 863 """ 864 Returns the server time as a 2-item tuple of ints: 865 (seconds since epoch, microseconds into this second). 866 """ 867 return self.execute_command('TIME') 868 869 def wait(self, num_replicas, timeout): 870 """ 871 Redis synchronous replication 872 That returns the number of replicas that processed the query when 873 we finally have at least ``num_replicas``, or when the ``timeout`` was 874 reached. 875 """ 876 return self.execute_command('WAIT', num_replicas, timeout) 877 878 # BASIC KEY COMMANDS 879 def append(self, key, value): 880 """ 881 Appends the string ``value`` to the value at ``key``. If ``key`` 882 doesn't already exist, create it with a value of ``value``. 883 Returns the new length of the value at ``key``. 884 """ 885 return self.execute_command('APPEND', key, value) 886 887 def bitcount(self, key, start=None, end=None): 888 """ 889 Returns the count of set bits in the value of ``key``. Optional 890 ``start`` and ``end`` paramaters indicate which bytes to consider 891 """ 892 params = [key] 893 if start is not None and end is not None: 894 params.append(start) 895 params.append(end) 896 elif (start is not None and end is None) or \ 897 (end is not None and start is None): 898 raise RedisError("Both start and end must be specified") 899 return self.execute_command('BITCOUNT', *params) 900 901 def bitop(self, operation, dest, *keys): 902 """ 903 Perform a bitwise operation using ``operation`` between ``keys`` and 904 store the result in ``dest``. 905 """ 906 return self.execute_command('BITOP', operation, dest, *keys) 907 908 def bitpos(self, key, bit, start=None, end=None): 909 """ 910 Return the position of the first bit set to 1 or 0 in a string. 911 ``start`` and ``end`` difines search range. The range is interpreted 912 as a range of bytes and not a range of bits, so start=0 and end=2 913 means to look at the first three bytes. 914 """ 915 if bit not in (0, 1): 916 raise RedisError('bit must be 0 or 1') 917 params = [key, bit] 918 919 start is not None and params.append(start) 920 921 if start is not None and end is not None: 922 params.append(end) 923 elif start is None and end is not None: 924 raise RedisError("start argument is not set, " 925 "when end is specified") 926 return self.execute_command('BITPOS', *params) 927 928 def decr(self, name, amount=1): 929 """ 930 Decrements the value of ``key`` by ``amount``. If no key exists, 931 the value will be initialized as 0 - ``amount`` 932 """ 933 return self.execute_command('DECRBY', name, amount) 934 935 def delete(self, *names): 936 "Delete one or more keys specified by ``names``" 937 return self.execute_command('DEL', *names) 938 939 def __delitem__(self, name): 940 self.delete(name) 941 942 def dump(self, name): 943 """ 944 Return a serialized version of the value stored at the specified key. 945 If key does not exist a nil bulk reply is returned. 946 """ 947 return self.execute_command('DUMP', name) 948 949 def exists(self, name): 950 "Returns a boolean indicating whether key ``name`` exists" 951 return self.execute_command('EXISTS', name) 952 __contains__ = exists 953 954 def expire(self, name, time): 955 """ 956 Set an expire flag on key ``name`` for ``time`` seconds. ``time`` 957 can be represented by an integer or a Python timedelta object. 958 """ 959 if isinstance(time, datetime.timedelta): 960 time = time.seconds + time.days * 24 * 3600 961 return self.execute_command('EXPIRE', name, time) 962 963 def expireat(self, name, when): 964 """ 965 Set an expire flag on key ``name``. ``when`` can be represented 966 as an integer indicating unix time or a Python datetime object. 967 """ 968 if isinstance(when, datetime.datetime): 969 when = int(mod_time.mktime(when.timetuple())) 970 return self.execute_command('EXPIREAT', name, when) 971 972 def get(self, name): 973 """ 974 Return the value at key ``name``, or None if the key doesn't exist 975 """ 976 return self.execute_command('GET', name) 977 978 def __getitem__(self, name): 979 """ 980 Return the value at key ``name``, raises a KeyError if the key 981 doesn't exist. 982 """ 983 value = self.get(name) 984 if value is not None: 985 return value 986 raise KeyError(name) 987 988 def getbit(self, name, offset): 989 "Returns a boolean indicating the value of ``offset`` in ``name``" 990 return self.execute_command('GETBIT', name, offset) 991 992 def getrange(self, key, start, end): 993 """ 994 Returns the substring of the string value stored at ``key``, 995 determined by the offsets ``start`` and ``end`` (both are inclusive) 996 """ 997 return self.execute_command('GETRANGE', key, start, end) 998 999 def getset(self, name, value): 1000 """ 1001 Sets the value at key ``name`` to ``value`` 1002 and returns the old value at key ``name`` atomically. 1003 """ 1004 return self.execute_command('GETSET', name, value) 1005 1006 def incr(self, name, amount=1): 1007 """ 1008 Increments the value of ``key`` by ``amount``. If no key exists, 1009 the value will be initialized as ``amount`` 1010 """ 1011 return self.execute_command('INCRBY', name, amount) 1012 1013 def incrby(self, name, amount=1): 1014 """ 1015 Increments the value of ``key`` by ``amount``. If no key exists, 1016 the value will be initialized as ``amount`` 1017 """ 1018 1019 # An alias for ``incr()``, because it is already implemented 1020 # as INCRBY redis command. 1021 return self.incr(name, amount) 1022 1023 def incrbyfloat(self, name, amount=1.0): 1024 """ 1025 Increments the value at key ``name`` by floating ``amount``. 1026 If no key exists, the value will be initialized as ``amount`` 1027 """ 1028 return self.execute_command('INCRBYFLOAT', name, amount) 1029 1030 def keys(self, pattern='*'): 1031 "Returns a list of keys matching ``pattern``" 1032 return self.execute_command('KEYS', pattern) 1033 1034 def mget(self, keys, *args): 1035 """ 1036 Returns a list of values ordered identically to ``keys`` 1037 """ 1038 args = list_or_args(keys, args) 1039 return self.execute_command('MGET', *args) 1040 1041 def mset(self, *args, **kwargs): 1042 """ 1043 Sets key/values based on a mapping. Mapping can be supplied as a single 1044 dictionary argument or as kwargs. 1045 """ 1046 if args: 1047 if len(args) != 1 or not isinstance(args[0], dict): 1048 raise RedisError('MSET requires **kwargs or a single dict arg') 1049 kwargs.update(args[0]) 1050 items = [] 1051 for pair in iteritems(kwargs): 1052 items.extend(pair) 1053 return self.execute_command('MSET', *items) 1054 1055 def msetnx(self, *args, **kwargs): 1056 """ 1057 Sets key/values based on a mapping if none of the keys are already set. 1058 Mapping can be supplied as a single dictionary argument or as kwargs. 1059 Returns a boolean indicating if the operation was successful. 1060 """ 1061 if args: 1062 if len(args) != 1 or not isinstance(args[0], dict): 1063 raise RedisError('MSETNX requires **kwargs or a single ' 1064 'dict arg') 1065 kwargs.update(args[0]) 1066 items = [] 1067 for pair in iteritems(kwargs): 1068 items.extend(pair) 1069 return self.execute_command('MSETNX', *items) 1070 1071 def move(self, name, db): 1072 "Moves the key ``name`` to a different Redis database ``db``" 1073 return self.execute_command('MOVE', name, db) 1074 1075 def persist(self, name): 1076 "Removes an expiration on ``name``" 1077 return self.execute_command('PERSIST', name) 1078 1079 def pexpire(self, name, time): 1080 """ 1081 Set an expire flag on key ``name`` for ``time`` milliseconds. 1082 ``time`` can be represented by an integer or a Python timedelta 1083 object. 1084 """ 1085 if isinstance(time, datetime.timedelta): 1086 ms = int(time.microseconds / 1000) 1087 time = (time.seconds + time.days * 24 * 3600) * 1000 + ms 1088 return self.execute_command('PEXPIRE', name, time) 1089 1090 def pexpireat(self, name, when): 1091 """ 1092 Set an expire flag on key ``name``. ``when`` can be represented 1093 as an integer representing unix time in milliseconds (unix time * 1000) 1094 or a Python datetime object. 1095 """ 1096 if isinstance(when, datetime.datetime): 1097 ms = int(when.microsecond / 1000) 1098 when = int(mod_time.mktime(when.timetuple())) * 1000 + ms 1099 return self.execute_command('PEXPIREAT', name, when) 1100 1101 def psetex(self, name, time_ms, value): 1102 """ 1103 Set the value of key ``name`` to ``value`` that expires in ``time_ms`` 1104 milliseconds. ``time_ms`` can be represented by an integer or a Python 1105 timedelta object 1106 """ 1107 if isinstance(time_ms, datetime.timedelta): 1108 ms = int(time_ms.microseconds / 1000) 1109 time_ms = (time_ms.seconds + time_ms.days * 24 * 3600) * 1000 + ms 1110 return self.execute_command('PSETEX', name, time_ms, value) 1111 1112 def pttl(self, name): 1113 "Returns the number of milliseconds until the key ``name`` will expire" 1114 return self.execute_command('PTTL', name) 1115 1116 def randomkey(self): 1117 "Returns the name of a random key" 1118 return self.execute_command('RANDOMKEY') 1119 1120 def rename(self, src, dst): 1121 """ 1122 Rename key ``src`` to ``dst`` 1123 """ 1124 return self.execute_command('RENAME', src, dst) 1125 1126 def renamenx(self, src, dst): 1127 "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist" 1128 return self.execute_command('RENAMENX', src, dst) 1129 1130 def restore(self, name, ttl, value, replace=False): 1131 """ 1132 Create a key using the provided serialized value, previously obtained 1133 using DUMP. 1134 """ 1135 params = [name, ttl, value] 1136 if replace: 1137 params.append('REPLACE') 1138 return self.execute_command('RESTORE', *params) 1139 1140 def set(self, name, value, ex=None, px=None, nx=False, xx=False): 1141 """ 1142 Set the value at key ``name`` to ``value`` 1143 1144 ``ex`` sets an expire flag on key ``name`` for ``ex`` seconds. 1145 1146 ``px`` sets an expire flag on key ``name`` for ``px`` milliseconds. 1147 1148 ``nx`` if set to True, set the value at key ``name`` to ``value`` only 1149 if it does not exist. 1150 1151 ``xx`` if set to True, set the value at key ``name`` to ``value`` only 1152 if it already exists. 1153 """ 1154 pieces = [name, value] 1155 if ex is not None: 1156 pieces.append('EX') 1157 if isinstance(ex, datetime.timedelta): 1158 ex = ex.seconds + ex.days * 24 * 3600 1159 pieces.append(ex) 1160 if px is not None: 1161 pieces.append('PX') 1162 if isinstance(px, datetime.timedelta): 1163 ms = int(px.microseconds / 1000) 1164 px = (px.seconds + px.days * 24 * 3600) * 1000 + ms 1165 pieces.append(px) 1166 1167 if nx: 1168 pieces.append('NX') 1169 if xx: 1170 pieces.append('XX') 1171 return self.execute_command('SET', *pieces) 1172 1173 def __setitem__(self, name, value): 1174 self.set(name, value) 1175 1176 def setbit(self, name, offset, value): 1177 """ 1178 Flag the ``offset`` in ``name`` as ``value``. Returns a boolean 1179 indicating the previous value of ``offset``. 1180 """ 1181 value = value and 1 or 0 1182 return self.execute_command('SETBIT', name, offset, value) 1183 1184 def setex(self, name, time, value): 1185 """ 1186 Set the value of key ``name`` to ``value`` that expires in ``time`` 1187 seconds. ``time`` can be represented by an integer or a Python 1188 timedelta object. 1189 """ 1190 if isinstance(time, datetime.timedelta): 1191 time = time.seconds + time.days * 24 * 3600 1192 return self.execute_command('SETEX', name, time, value) 1193 1194 def setnx(self, name, value): 1195 "Set the value of key ``name`` to ``value`` if key doesn't exist" 1196 return self.execute_command('SETNX', name, value) 1197 1198 def setrange(self, name, offset, value): 1199 """ 1200 Overwrite bytes in the value of ``name`` starting at ``offset`` with 1201 ``value``. If ``offset`` plus the length of ``value`` exceeds the 1202 length of the original value, the new value will be larger than before. 1203 If ``offset`` exceeds the length of the original value, null bytes 1204 will be used to pad between the end of the previous value and the start 1205 of what's being injected. 1206 1207 Returns the length of the new string. 1208 """ 1209 return self.execute_command('SETRANGE', name, offset, value) 1210 1211 def strlen(self, name): 1212 "Return the number of bytes stored in the value of ``name``" 1213 return self.execute_command('STRLEN', name) 1214 1215 def substr(self, name, start, end=-1): 1216 """ 1217 Return a substring of the string at key ``name``. ``start`` and ``end`` 1218 are 0-based integers specifying the portion of the string to return. 1219 """ 1220 return self.execute_command('SUBSTR', name, start, end) 1221 1222 def touch(self, *args): 1223 """ 1224 Alters the last access time of a key(s) ``*args``. A key is ignored 1225 if it does not exist. 1226 """ 1227 return self.execute_command('TOUCH', *args) 1228 1229 def ttl(self, name): 1230 "Returns the number of seconds until the key ``name`` will expire" 1231 return self.execute_command('TTL', name) 1232 1233 def type(self, name): 1234 "Returns the type of key ``name``" 1235 return self.execute_command('TYPE', name) 1236 1237 def watch(self, *names): 1238 """ 1239 Watches the values at keys ``names``, or None if the key doesn't exist 1240 """ 1241 warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object')) 1242 1243 def unwatch(self): 1244 """ 1245 Unwatches the value at key ``name``, or None of the key doesn't exist 1246 """ 1247 warnings.warn( 1248 DeprecationWarning('Call UNWATCH from a Pipeline object')) 1249 1250 # LIST COMMANDS 1251 def blpop(self, keys, timeout=0): 1252 """ 1253 LPOP a value off of the first non-empty list 1254 named in the ``keys`` list. 1255 1256 If none of the lists in ``keys`` has a value to LPOP, then block 1257 for ``timeout`` seconds, or until a value gets pushed on to one 1258 of the lists. 1259 1260 If timeout is 0, then block indefinitely. 1261 """ 1262 if timeout is None: 1263 timeout = 0 1264 if isinstance(keys, basestring): 1265 keys = [keys] 1266 else: 1267 keys = list(keys) 1268 keys.append(timeout) 1269 return self.execute_command('BLPOP', *keys) 1270 1271 def brpop(self, keys, timeout=0): 1272 """ 1273 RPOP a value off of the first non-empty list 1274 named in the ``keys`` list. 1275 1276 If none of the lists in ``keys`` has a value to RPOP, then block 1277 for ``timeout`` seconds, or until a value gets pushed on to one 1278 of the lists. 1279 1280 If timeout is 0, then block indefinitely. 1281 """ 1282 if timeout is None: 1283 timeout = 0 1284 if isinstance(keys, basestring): 1285 keys = [keys] 1286 else: 1287 keys = list(keys) 1288 keys.append(timeout) 1289 return self.execute_command('BRPOP', *keys) 1290 1291 def brpoplpush(self, src, dst, timeout=0): 1292 """ 1293 Pop a value off the tail of ``src``, push it on the head of ``dst`` 1294 and then return it. 1295 1296 This command blocks until a value is in ``src`` or until ``timeout`` 1297 seconds elapse, whichever is first. A ``timeout`` value of 0 blocks 1298 forever. 1299 """ 1300 if timeout is None: 1301 timeout = 0 1302 return self.execute_command('BRPOPLPUSH', src, dst, timeout) 1303 1304 def lindex(self, name, index): 1305 """ 1306 Return the item from list ``name`` at position ``index`` 1307 1308 Negative indexes are supported and will return an item at the 1309 end of the list 1310 """ 1311 return self.execute_command('LINDEX', name, index) 1312 1313 def linsert(self, name, where, refvalue, value): 1314 """ 1315 Insert ``value`` in list ``name`` either immediately before or after 1316 [``where``] ``refvalue`` 1317 1318 Returns the new length of the list on success or -1 if ``refvalue`` 1319 is not in the list. 1320 """ 1321 return self.execute_command('LINSERT', name, where, refvalue, value) 1322 1323 def llen(self, name): 1324 "Return the length of the list ``name``" 1325 return self.execute_command('LLEN', name) 1326 1327 def lpop(self, name): 1328 "Remove and return the first item of the list ``name``" 1329 return self.execute_command('LPOP', name) 1330 1331 def lpush(self, name, *values): 1332 "Push ``values`` onto the head of the list ``name``" 1333 return self.execute_command('LPUSH', name, *values) 1334 1335 def lpushx(self, name, value): 1336 "Push ``value`` onto the head of the list ``name`` if ``name`` exists" 1337 return self.execute_command('LPUSHX', name, value) 1338 1339 def lrange(self, name, start, end): 1340 """ 1341 Return a slice of the list ``name`` between 1342 position ``start`` and ``end`` 1343 1344 ``start`` and ``end`` can be negative numbers just like 1345 Python slicing notation 1346 """ 1347 return self.execute_command('LRANGE', name, start, end) 1348 1349 def lrem(self, name, count, value): 1350 """ 1351 Remove the first ``count`` occurrences of elements equal to ``value`` 1352 from the list stored at ``name``. 1353 1354 The count argument influences the operation in the following ways: 1355 count > 0: Remove elements equal to value moving from head to tail. 1356 count < 0: Remove elements equal to value moving from tail to head. 1357 count = 0: Remove all elements equal to value. 1358 """ 1359 return self.execute_command('LREM', name, count, value) 1360 1361 def lset(self, name, index, value): 1362 "Set ``position`` of list ``name`` to ``value``" 1363 return self.execute_command('LSET', name, index, value) 1364 1365 def ltrim(self, name, start, end): 1366 """ 1367 Trim the list ``name``, removing all values not within the slice 1368 between ``start`` and ``end`` 1369 1370 ``start`` and ``end`` can be negative numbers just like 1371 Python slicing notation 1372 """ 1373 return self.execute_command('LTRIM', name, start, end) 1374 1375 def rpop(self, name): 1376 "Remove and return the last item of the list ``name``" 1377 return self.execute_command('RPOP', name) 1378 1379 def rpoplpush(self, src, dst): 1380 """ 1381 RPOP a value off of the ``src`` list and atomically LPUSH it 1382 on to the ``dst`` list. Returns the value. 1383 """ 1384 return self.execute_command('RPOPLPUSH', src, dst) 1385 1386 def rpush(self, name, *values): 1387 "Push ``values`` onto the tail of the list ``name``" 1388 return self.execute_command('RPUSH', name, *values) 1389 1390 def rpushx(self, name, value): 1391 "Push ``value`` onto the tail of the list ``name`` if ``name`` exists" 1392 return self.execute_command('RPUSHX', name, value) 1393 1394 def sort(self, name, start=None, num=None, by=None, get=None, 1395 desc=False, alpha=False, store=None, groups=False): 1396 """ 1397 Sort and return the list, set or sorted set at ``name``. 1398 1399 ``start`` and ``num`` allow for paging through the sorted data 1400 1401 ``by`` allows using an external key to weight and sort the items. 1402 Use an "*" to indicate where in the key the item value is located 1403 1404 ``get`` allows for returning items from external keys rather than the 1405 sorted data itself. Use an "*" to indicate where int he key 1406 the item value is located 1407 1408 ``desc`` allows for reversing the sort 1409 1410 ``alpha`` allows for sorting lexicographically rather than numerically 1411 1412 ``store`` allows for storing the result of the sort into 1413 the key ``store`` 1414 1415 ``groups`` if set to True and if ``get`` contains at least two 1416 elements, sort will return a list of tuples, each containing the 1417 values fetched from the arguments to ``get``. 1418 1419 """ 1420 if (start is not None and num is None) or \ 1421 (num is not None and start is None): 1422 raise RedisError("``start`` and ``num`` must both be specified") 1423 1424 pieces = [name] 1425 if by is not None: 1426 pieces.append(Token.get_token('BY')) 1427 pieces.append(by) 1428 if start is not None and num is not None: 1429 pieces.append(Token.get_token('LIMIT')) 1430 pieces.append(start) 1431 pieces.append(num) 1432 if get is not None: 1433 # If get is a string assume we want to get a single value. 1434 # Otherwise assume it's an interable and we want to get multiple 1435 # values. We can't just iterate blindly because strings are 1436 # iterable. 1437 if isinstance(get, basestring): 1438 pieces.append(Token.get_token('GET')) 1439 pieces.append(get) 1440 else: 1441 for g in get: 1442 pieces.append(Token.get_token('GET')) 1443 pieces.append(g) 1444 if desc: 1445 pieces.append(Token.get_token('DESC')) 1446 if alpha: 1447 pieces.append(Token.get_token('ALPHA')) 1448 if store is not None: 1449 pieces.append(Token.get_token('STORE')) 1450 pieces.append(store) 1451 1452 if groups: 1453 if not get or isinstance(get, basestring) or len(get) < 2: 1454 raise DataError('when using "groups" the "get" argument ' 1455 'must be specified and contain at least ' 1456 'two keys') 1457 1458 options = {'groups': len(get) if groups else None} 1459 return self.execute_command('SORT', *pieces, **options) 1460 1461 # SCAN COMMANDS 1462 def scan(self, cursor=0, match=None, count=None): 1463 """ 1464 Incrementally return lists of key names. Also return a cursor 1465 indicating the scan position. 1466 1467 ``match`` allows for filtering the keys by pattern 1468 1469 ``count`` allows for hint the minimum number of returns 1470 """ 1471 pieces = [cursor] 1472 if match is not None: 1473 pieces.extend([Token.get_token('MATCH'), match]) 1474 if count is not None: 1475 pieces.extend([Token.get_token('COUNT'), count]) 1476 return self.execute_command('SCAN', *pieces) 1477 1478 def scan_iter(self, match=None, count=None): 1479 """ 1480 Make an iterator using the SCAN command so that the client doesn't 1481 need to remember the cursor position. 1482 1483 ``match`` allows for filtering the keys by pattern 1484 1485 ``count`` allows for hint the minimum number of returns 1486 """ 1487 cursor = '0' 1488 while cursor != 0: 1489 cursor, data = self.scan(cursor=cursor, match=match, count=count) 1490 for item in data: 1491 yield item 1492 1493 def sscan(self, name, cursor=0, match=None, count=None): 1494 """ 1495 Incrementally return lists of elements in a set. Also return a cursor 1496 indicating the scan position. 1497 1498 ``match`` allows for filtering the keys by pattern 1499 1500 ``count`` allows for hint the minimum number of returns 1501 """ 1502 pieces = [name, cursor] 1503 if match is not None: 1504 pieces.extend([Token.get_token('MATCH'), match]) 1505 if count is not None: 1506 pieces.extend([Token.get_token('COUNT'), count]) 1507 return self.execute_command('SSCAN', *pieces) 1508 1509 def sscan_iter(self, name, match=None, count=None): 1510 """ 1511 Make an iterator using the SSCAN command so that the client doesn't 1512 need to remember the cursor position. 1513 1514 ``match`` allows for filtering the keys by pattern 1515 1516 ``count`` allows for hint the minimum number of returns 1517 """ 1518 cursor = '0' 1519 while cursor != 0: 1520 cursor, data = self.sscan(name, cursor=cursor, 1521 match=match, count=count) 1522 for item in data: 1523 yield item 1524 1525 def hscan(self, name, cursor=0, match=None, count=None): 1526 """ 1527 Incrementally return key/value slices in a hash. Also return a cursor 1528 indicating the scan position. 1529 1530 ``match`` allows for filtering the keys by pattern 1531 1532 ``count`` allows for hint the minimum number of returns 1533 """ 1534 pieces = [name, cursor] 1535 if match is not None: 1536 pieces.extend([Token.get_token('MATCH'), match]) 1537 if count is not None: 1538 pieces.extend([Token.get_token('COUNT'), count]) 1539 return self.execute_command('HSCAN', *pieces) 1540 1541 def hscan_iter(self, name, match=None, count=None): 1542 """ 1543 Make an iterator using the HSCAN command so that the client doesn't 1544 need to remember the cursor position. 1545 1546 ``match`` allows for filtering the keys by pattern 1547 1548 ``count`` allows for hint the minimum number of returns 1549 """ 1550 cursor = '0' 1551 while cursor != 0: 1552 cursor, data = self.hscan(name, cursor=cursor, 1553 match=match, count=count) 1554 for item in data.items(): 1555 yield item 1556 1557 def zscan(self, name, cursor=0, match=None, count=None, 1558 score_cast_func=float): 1559 """ 1560 Incrementally return lists of elements in a sorted set. Also return a 1561 cursor indicating the scan position. 1562 1563 ``match`` allows for filtering the keys by pattern 1564 1565 ``count`` allows for hint the minimum number of returns 1566 1567 ``score_cast_func`` a callable used to cast the score return value 1568 """ 1569 pieces = [name, cursor] 1570 if match is not None: 1571 pieces.extend([Token.get_token('MATCH'), match]) 1572 if count is not None: 1573 pieces.extend([Token.get_token('COUNT'), count]) 1574 options = {'score_cast_func': score_cast_func} 1575 return self.execute_command('ZSCAN', *pieces, **options) 1576 1577 def zscan_iter(self, name, match=None, count=None, 1578 score_cast_func=float): 1579 """ 1580 Make an iterator using the ZSCAN command so that the client doesn't 1581 need to remember the cursor position. 1582 1583 ``match`` allows for filtering the keys by pattern 1584 1585 ``count`` allows for hint the minimum number of returns 1586 1587 ``score_cast_func`` a callable used to cast the score return value 1588 """ 1589 cursor = '0' 1590 while cursor != 0: 1591 cursor, data = self.zscan(name, cursor=cursor, match=match, 1592 count=count, 1593 score_cast_func=score_cast_func) 1594 for item in data: 1595 yield item 1596 1597 # SET COMMANDS 1598 def sadd(self, name, *values): 1599 "Add ``value(s)`` to set ``name``" 1600 return self.execute_command('SADD', name, *values) 1601 1602 def scard(self, name): 1603 "Return the number of elements in set ``name``" 1604 return self.execute_command('SCARD', name) 1605 1606 def sdiff(self, keys, *args): 1607 "Return the difference of sets specified by ``keys``" 1608 args = list_or_args(keys, args) 1609 return self.execute_command('SDIFF', *args) 1610 1611 def sdiffstore(self, dest, keys, *args): 1612 """ 1613 Store the difference of sets specified by ``keys`` into a new 1614 set named ``dest``. Returns the number of keys in the new set. 1615 """ 1616 args = list_or_args(keys, args) 1617 return self.execute_command('SDIFFSTORE', dest, *args) 1618 1619 def sinter(self, keys, *args): 1620 "Return the intersection of sets specified by ``keys``" 1621 args = list_or_args(keys, args) 1622 return self.execute_command('SINTER', *args) 1623 1624 def sinterstore(self, dest, keys, *args): 1625 """ 1626 Store the intersection of sets specified by ``keys`` into a new 1627 set named ``dest``. Returns the number of keys in the new set. 1628 """ 1629 args = list_or_args(keys, args) 1630 return self.execute_command('SINTERSTORE', dest, *args) 1631 1632 def sismember(self, name, value): 1633 "Return a boolean indicating if ``value`` is a member of set ``name``" 1634 return self.execute_command('SISMEMBER', name, value) 1635 1636 def smembers(self, name): 1637 "Return all members of the set ``name``" 1638 return self.execute_command('SMEMBERS', name) 1639 1640 def smove(self, src, dst, value): 1641 "Move ``value`` from set ``src`` to set ``dst`` atomically" 1642 return self.execute_command('SMOVE', src, dst, value) 1643 1644 def spop(self, name): 1645 "Remove and return a random member of set ``name``" 1646 return self.execute_command('SPOP', name) 1647 1648 def srandmember(self, name, number=None): 1649 """ 1650 If ``number`` is None, returns a random member of set ``name``. 1651 1652 If ``number`` is supplied, returns a list of ``number`` random 1653 memebers of set ``name``. Note this is only available when running 1654 Redis 2.6+. 1655 """ 1656 args = (number is not None) and [number] or [] 1657 return self.execute_command('SRANDMEMBER', name, *args) 1658 1659 def srem(self, name, *values): 1660 "Remove ``values`` from set ``name``" 1661 return self.execute_command('SREM', name, *values) 1662 1663 def sunion(self, keys, *args): 1664 "Return the union of sets specified by ``keys``" 1665 args = list_or_args(keys, args) 1666 return self.execute_command('SUNION', *args) 1667 1668 def sunionstore(self, dest, keys, *args): 1669 """ 1670 Store the union of sets specified by ``keys`` into a new 1671 set named ``dest``. Returns the number of keys in the new set. 1672 """ 1673 args = list_or_args(keys, args) 1674 return self.execute_command('SUNIONSTORE', dest, *args) 1675 1676 # SORTED SET COMMANDS 1677 def zadd(self, name, *args, **kwargs): 1678 """ 1679 Set any number of score, element-name pairs to the key ``name``. Pairs 1680 can be specified in two ways: 1681 1682 As *args, in the form of: score1, name1, score2, name2, ... 1683 or as **kwargs, in the form of: name1=score1, name2=score2, ... 1684 1685 The following example would add four values to the 'my-key' key: 1686 redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4) 1687 """ 1688 pieces = [] 1689 if args: 1690 if len(args) % 2 != 0: 1691 raise RedisError("ZADD requires an equal number of " 1692 "values and scores") 1693 pieces.extend(args) 1694 for pair in iteritems(kwargs): 1695 pieces.append(pair[1]) 1696 pieces.append(pair[0]) 1697 return self.execute_command('ZADD', name, *pieces) 1698 1699 def zcard(self, name): 1700 "Return the number of elements in the sorted set ``name``" 1701 return self.execute_command('ZCARD', name) 1702 1703 def zcount(self, name, min, max): 1704 """ 1705 Returns the number of elements in the sorted set at key ``name`` with 1706 a score between ``min`` and ``max``. 1707 """ 1708 return self.execute_command('ZCOUNT', name, min, max) 1709 1710 def zincrby(self, name, value, amount=1): 1711 "Increment the score of ``value`` in sorted set ``name`` by ``amount``" 1712 return self.execute_command('ZINCRBY', name, amount, value) 1713 1714 def zinterstore(self, dest, keys, aggregate=None): 1715 """ 1716 Intersect multiple sorted sets specified by ``keys`` into 1717 a new sorted set, ``dest``. Scores in the destination will be 1718 aggregated based on the ``aggregate``, or SUM if none is provided. 1719 """ 1720 return self._zaggregate('ZINTERSTORE', dest, keys, aggregate) 1721 1722 def zlexcount(self, name, min, max): 1723 """ 1724 Return the number of items in the sorted set ``name`` between the 1725 lexicographical range ``min`` and ``max``. 1726 """ 1727 return self.execute_command('ZLEXCOUNT', name, min, max) 1728 1729 def zrange(self, name, start, end, desc=False, withscores=False, 1730 score_cast_func=float): 1731 """ 1732 Return a range of values from sorted set ``name`` between 1733 ``start`` and ``end`` sorted in ascending order. 1734 1735 ``start`` and ``end`` can be negative, indicating the end of the range. 1736 1737 ``desc`` a boolean indicating whether to sort the results descendingly 1738 1739 ``withscores`` indicates to return the scores along with the values. 1740 The return type is a list of (value, score) pairs 1741 1742 ``score_cast_func`` a callable used to cast the score return value 1743 """ 1744 if desc: 1745 return self.zrevrange(name, start, end, withscores, 1746 score_cast_func) 1747 pieces = ['ZRANGE', name, start, end] 1748 if withscores: 1749 pieces.append(Token.get_token('WITHSCORES')) 1750 options = { 1751 'withscores': withscores, 1752 'score_cast_func': score_cast_func 1753 } 1754 return self.execute_command(*pieces, **options) 1755 1756 def zrangebylex(self, name, min, max, start=None, num=None): 1757 """ 1758 Return the lexicographical range of values from sorted set ``name`` 1759 between ``min`` and ``max``. 1760 1761 If ``start`` and ``num`` are specified, then return a slice of the 1762 range. 1763 """ 1764 if (start is not None and num is None) or \ 1765 (num is not None and start is None): 1766 raise RedisError("``start`` and ``num`` must both be specified") 1767 pieces = ['ZRANGEBYLEX', name, min, max] 1768 if start is not None and num is not None: 1769 pieces.extend([Token.get_token('LIMIT'), start, num]) 1770 return self.execute_command(*pieces) 1771 1772 def zrevrangebylex(self, name, max, min, start=None, num=None): 1773 """ 1774 Return the reversed lexicographical range of values from sorted set 1775 ``name`` between ``max`` and ``min``. 1776 1777 If ``start`` and ``num`` are specified, then return a slice of the 1778 range. 1779 """ 1780 if (start is not None and num is None) or \ 1781 (num is not None and start is None): 1782 raise RedisError("``start`` and ``num`` must both be specified") 1783 pieces = ['ZREVRANGEBYLEX', name, max, min] 1784 if start is not None and num is not None: 1785 pieces.extend([Token.get_token('LIMIT'), start, num]) 1786 return self.execute_command(*pieces) 1787 1788 def zrangebyscore(self, name, min, max, start=None, num=None, 1789 withscores=False, score_cast_func=float): 1790 """ 1791 Return a range of values from the sorted set ``name`` with scores 1792 between ``min`` and ``max``. 1793 1794 If ``start`` and ``num`` are specified, then return a slice 1795 of the range. 1796 1797 ``withscores`` indicates to return the scores along with the values. 1798 The return type is a list of (value, score) pairs 1799 1800 `score_cast_func`` a callable used to cast the score return value 1801 """ 1802 if (start is not None and num is None) or \ 1803 (num is not None and start is None): 1804 raise RedisError("``start`` and ``num`` must both be specified") 1805 pieces = ['ZRANGEBYSCORE', name, min, max] 1806 if start is not None and num is not None: 1807 pieces.extend([Token.get_token('LIMIT'), start, num]) 1808 if withscores: 1809 pieces.append(Token.get_token('WITHSCORES')) 1810 options = { 1811 'withscores': withscores, 1812 'score_cast_func': score_cast_func 1813 } 1814 return self.execute_command(*pieces, **options) 1815 1816 def zrank(self, name, value): 1817 """ 1818 Returns a 0-based value indicating the rank of ``value`` in sorted set 1819 ``name`` 1820 """ 1821 return self.execute_command('ZRANK', name, value) 1822 1823 def zrem(self, name, *values): 1824 "Remove member ``values`` from sorted set ``name``" 1825 return self.execute_command('ZREM', name, *values) 1826 1827 def zremrangebylex(self, name, min, max): 1828 """ 1829 Remove all elements in the sorted set ``name`` between the 1830 lexicographical range specified by ``min`` and ``max``. 1831 1832 Returns the number of elements removed. 1833 """ 1834 return self.execute_command('ZREMRANGEBYLEX', name, min, max) 1835 1836 def zremrangebyrank(self, name, min, max): 1837 """ 1838 Remove all elements in the sorted set ``name`` with ranks between 1839 ``min`` and ``max``. Values are 0-based, ordered from smallest score 1840 to largest. Values can be negative indicating the highest scores. 1841 Returns the number of elements removed 1842 """ 1843 return self.execute_command('ZREMRANGEBYRANK', name, min, max) 1844 1845 def zremrangebyscore(self, name, min, max): 1846 """ 1847 Remove all elements in the sorted set ``name`` with scores 1848 between ``min`` and ``max``. Returns the number of elements removed. 1849 """ 1850 return self.execute_command('ZREMRANGEBYSCORE', name, min, max) 1851 1852 def zrevrange(self, name, start, end, withscores=False, 1853 score_cast_func=float): 1854 """ 1855 Return a range of values from sorted set ``name`` between 1856 ``start`` and ``end`` sorted in descending order. 1857 1858 ``start`` and ``end`` can be negative, indicating the end of the range. 1859 1860 ``withscores`` indicates to return the scores along with the values 1861 The return type is a list of (value, score) pairs 1862 1863 ``score_cast_func`` a callable used to cast the score return value 1864 """ 1865 pieces = ['ZREVRANGE', name, start, end] 1866 if withscores: 1867 pieces.append(Token.get_token('WITHSCORES')) 1868 options = { 1869 'withscores': withscores, 1870 'score_cast_func': score_cast_func 1871 } 1872 return self.execute_command(*pieces, **options) 1873 1874 def zrevrangebyscore(self, name, max, min, start=None, num=None, 1875 withscores=False, score_cast_func=float): 1876 """ 1877 Return a range of values from the sorted set ``name`` with scores 1878 between ``min`` and ``max`` in descending order. 1879 1880 If ``start`` and ``num`` are specified, then return a slice 1881 of the range. 1882 1883 ``withscores`` indicates to return the scores along with the values. 1884 The return type is a list of (value, score) pairs 1885 1886 ``score_cast_func`` a callable used to cast the score return value 1887 """ 1888 if (start is not None and num is None) or \ 1889 (num is not None and start is None): 1890 raise RedisError("``start`` and ``num`` must both be specified") 1891 pieces = ['ZREVRANGEBYSCORE', name, max, min] 1892 if start is not None and num is not None: 1893 pieces.extend([Token.get_token('LIMIT'), start, num]) 1894 if withscores: 1895 pieces.append(Token.get_token('WITHSCORES')) 1896 options = { 1897 'withscores': withscores, 1898 'score_cast_func': score_cast_func 1899 } 1900 return self.execute_command(*pieces, **options) 1901 1902 def zrevrank(self, name, value): 1903 """ 1904 Returns a 0-based value indicating the descending rank of 1905 ``value`` in sorted set ``name`` 1906 """ 1907 return self.execute_command('ZREVRANK', name, value) 1908 1909 def zscore(self, name, value): 1910 "Return the score of element ``value`` in sorted set ``name``" 1911 return self.execute_command('ZSCORE', name, value) 1912 1913 def zunionstore(self, dest, keys, aggregate=None): 1914 """ 1915 Union multiple sorted sets specified by ``keys`` into 1916 a new sorted set, ``dest``. Scores in the destination will be 1917 aggregated based on the ``aggregate``, or SUM if none is provided. 1918 """ 1919 return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate) 1920 1921 def _zaggregate(self, command, dest, keys, aggregate=None): 1922 pieces = [command, dest, len(keys)] 1923 if isinstance(keys, dict): 1924 keys, weights = iterkeys(keys), itervalues(keys) 1925 else: 1926 weights = None 1927 pieces.extend(keys) 1928 if weights: 1929 pieces.append(Token.get_token('WEIGHTS')) 1930 pieces.extend(weights) 1931 if aggregate: 1932 pieces.append(Token.get_token('AGGREGATE')) 1933 pieces.append(aggregate) 1934 return self.execute_command(*pieces) 1935 1936 # HYPERLOGLOG COMMANDS 1937 def pfadd(self, name, *values): 1938 "Adds the specified elements to the specified HyperLogLog." 1939 return self.execute_command('PFADD', name, *values) 1940 1941 def pfcount(self, *sources): 1942 """ 1943 Return the approximated cardinality of 1944 the set observed by the HyperLogLog at key(s). 1945 """ 1946 return self.execute_command('PFCOUNT', *sources) 1947 1948 def pfmerge(self, dest, *sources): 1949 "Merge N different HyperLogLogs into a single one." 1950 return self.execute_command('PFMERGE', dest, *sources) 1951 1952 # HASH COMMANDS 1953 def hdel(self, name, *keys): 1954 "Delete ``keys`` from hash ``name``" 1955 return self.execute_command('HDEL', name, *keys) 1956 1957 def hexists(self, name, key): 1958 "Returns a boolean indicating if ``key`` exists within hash ``name``" 1959 return self.execute_command('HEXISTS', name, key) 1960 1961 def hget(self, name, key): 1962 "Return the value of ``key`` within the hash ``name``" 1963 return self.execute_command('HGET', name, key) 1964 1965 def hgetall(self, name): 1966 "Return a Python dict of the hash's name/value pairs" 1967 return self.execute_command('HGETALL', name) 1968 1969 def hincrby(self, name, key, amount=1): 1970 "Increment the value of ``key`` in hash ``name`` by ``amount``" 1971 return self.execute_command('HINCRBY', name, key, amount) 1972 1973 def hincrbyfloat(self, name, key, amount=1.0): 1974 """ 1975 Increment the value of ``key`` in hash ``name`` by floating ``amount`` 1976 """ 1977 return self.execute_command('HINCRBYFLOAT', name, key, amount) 1978 1979 def hkeys(self, name): 1980 "Return the list of keys within hash ``name``" 1981 return self.execute_command('HKEYS', name) 1982 1983 def hlen(self, name): 1984 "Return the number of elements in hash ``name``" 1985 return self.execute_command('HLEN', name) 1986 1987 def hset(self, name, key, value): 1988 """ 1989 Set ``key`` to ``value`` within hash ``name`` 1990 Returns 1 if HSET created a new field, otherwise 0 1991 """ 1992 return self.execute_command('HSET', name, key, value) 1993 1994 def hsetnx(self, name, key, value): 1995 """ 1996 Set ``key`` to ``value`` within hash ``name`` if ``key`` does not 1997 exist. Returns 1 if HSETNX created a field, otherwise 0. 1998 """ 1999 return self.execute_command('HSETNX', name, key, value) 2000 2001 def hmset(self, name, mapping): 2002 """ 2003 Set key to value within hash ``name`` for each corresponding 2004 key and value from the ``mapping`` dict. 2005 """ 2006 if not mapping: 2007 raise DataError("'hmset' with 'mapping' of length 0") 2008 items = [] 2009 for pair in iteritems(mapping): 2010 items.extend(pair) 2011 return self.execute_command('HMSET', name, *items) 2012 2013 def hmget(self, name, keys, *args): 2014 "Returns a list of values ordered identically to ``keys``" 2015 args = list_or_args(keys, args) 2016 return self.execute_command('HMGET', name, *args) 2017 2018 def hvals(self, name): 2019 "Return the list of values within hash ``name``" 2020 return self.execute_command('HVALS', name) 2021 2022 def hstrlen(self, name, key): 2023 """ 2024 Return the number of bytes stored in the value of ``key`` 2025 within hash ``name`` 2026 """ 2027 return self.execute_command('HSTRLEN', name, key) 2028 2029 def publish(self, channel, message): 2030 """ 2031 Publish ``message`` on ``channel``. 2032 Returns the number of subscribers the message was delivered to. 2033 """ 2034 return self.execute_command('PUBLISH', channel, message) 2035 2036 def pubsub_channels(self, pattern='*'): 2037 """ 2038 Return a list of channels that have at least one subscriber 2039 """ 2040 return self.execute_command('PUBSUB CHANNELS', pattern) 2041 2042 def pubsub_numpat(self): 2043 """ 2044 Returns the number of subscriptions to patterns 2045 """ 2046 return self.execute_command('PUBSUB NUMPAT') 2047 2048 def pubsub_numsub(self, *args): 2049 """ 2050 Return a list of (channel, number of subscribers) tuples 2051 for each channel given in ``*args`` 2052 """ 2053 return self.execute_command('PUBSUB NUMSUB', *args) 2054 2055 def cluster(self, cluster_arg, *args): 2056 return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args) 2057 2058 def eval(self, script, numkeys, *keys_and_args): 2059 """ 2060 Execute the Lua ``script``, specifying the ``numkeys`` the script 2061 will touch and the key names and argument values in ``keys_and_args``. 2062 Returns the result of the script. 2063 2064 In practice, use the object returned by ``register_script``. This 2065 function exists purely for Redis API completion. 2066 """ 2067 return self.execute_command('EVAL', script, numkeys, *keys_and_args) 2068 2069 def evalsha(self, sha, numkeys, *keys_and_args): 2070 """ 2071 Use the ``sha`` to execute a Lua script already registered via EVAL 2072 or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the 2073 key names and argument values in ``keys_and_args``. Returns the result 2074 of the script. 2075 2076 In practice, use the object returned by ``register_script``. This 2077 function exists purely for Redis API completion. 2078 """ 2079 return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args) 2080 2081 def script_exists(self, *args): 2082 """ 2083 Check if a script exists in the script cache by specifying the SHAs of 2084 each script as ``args``. Returns a list of boolean values indicating if 2085 if each already script exists in the cache. 2086 """ 2087 return self.execute_command('SCRIPT EXISTS', *args) 2088 2089 def script_flush(self): 2090 "Flush all scripts from the script cache" 2091 return self.execute_command('SCRIPT FLUSH') 2092 2093 def script_kill(self): 2094 "Kill the currently executing Lua script" 2095 return self.execute_command('SCRIPT KILL') 2096 2097 def script_load(self, script): 2098 "Load a Lua ``script`` into the script cache. Returns the SHA." 2099 return self.execute_command('SCRIPT LOAD', script) 2100 2101 def register_script(self, script): 2102 """ 2103 Register a Lua ``script`` specifying the ``keys`` it will touch. 2104 Returns a Script object that is callable and hides the complexity of 2105 deal with scripts, keys, and shas. This is the preferred way to work 2106 with Lua scripts. 2107 """ 2108 return Script(self, script) 2109 2110 # GEO COMMANDS 2111 def geoadd(self, name, *values): 2112 """ 2113 Add the specified geospatial items to the specified key identified 2114 by the ``name`` argument. The Geospatial items are given as ordered 2115 members of the ``values`` argument, each item or place is formed by 2116 the triad longitude, latitude and name. 2117 """ 2118 if len(values) % 3 != 0: 2119 raise RedisError("GEOADD requires places with lon, lat and name" 2120 " values") 2121 return self.execute_command('GEOADD', name, *values) 2122 2123 def geodist(self, name, place1, place2, unit=None): 2124 """ 2125 Return the distance between ``place1`` and ``place2`` members of the 2126 ``name`` key. 2127 The units must be one of the following : m, km mi, ft. By default 2128 meters are used. 2129 """ 2130 pieces = [name, place1, place2] 2131 if unit and unit not in ('m', 'km', 'mi', 'ft'): 2132 raise RedisError("GEODIST invalid unit") 2133 elif unit: 2134 pieces.append(unit) 2135 return self.execute_command('GEODIST', *pieces) 2136 2137 def geohash(self, name, *values): 2138 """ 2139 Return the geo hash string for each item of ``values`` members of 2140 the specified key identified by the ``name``argument. 2141 """ 2142 return self.execute_command('GEOHASH', name, *values) 2143 2144 def geopos(self, name, *values): 2145 """ 2146 Return the positions of each item of ``values`` as members of 2147 the specified key identified by the ``name``argument. Each position 2148 is represented by the pairs lon and lat. 2149 """ 2150 return self.execute_command('GEOPOS', name, *values) 2151 2152 def georadius(self, name, longitude, latitude, radius, unit=None, 2153 withdist=False, withcoord=False, withhash=False, count=None, 2154 sort=None, store=None, store_dist=None): 2155 """ 2156 Return the members of the specified key identified by the 2157 ``name`` argument which are within the borders of the area specified 2158 with the ``latitude`` and ``longitude`` location and the maximum 2159 distance from the center specified by the ``radius`` value. 2160 2161 The units must be one of the following : m, km mi, ft. By default 2162 2163 ``withdist`` indicates to return the distances of each place. 2164 2165 ``withcoord`` indicates to return the latitude and longitude of 2166 each place. 2167 2168 ``withhash`` indicates to return the geohash string of each place. 2169 2170 ``count`` indicates to return the number of elements up to N. 2171 2172 ``sort`` indicates to return the places in a sorted way, ASC for 2173 nearest to fairest and DESC for fairest to nearest. 2174 2175 ``store`` indicates to save the places names in a sorted set named 2176 with a specific key, each element of the destination sorted set is 2177 populated with the score got from the original geo sorted set. 2178 2179 ``store_dist`` indicates to save the places names in a sorted set 2180 named with a specific key, instead of ``store`` the sorted set 2181 destination score is set with the distance. 2182 """ 2183 return self._georadiusgeneric('GEORADIUS', 2184 name, longitude, latitude, radius, 2185 unit=unit, withdist=withdist, 2186 withcoord=withcoord, withhash=withhash, 2187 count=count, sort=sort, store=store, 2188 store_dist=store_dist) 2189 2190 def georadiusbymember(self, name, member, radius, unit=None, 2191 withdist=False, withcoord=False, withhash=False, 2192 count=None, sort=None, store=None, store_dist=None): 2193 """ 2194 This command is exactly like ``georadius`` with the sole difference 2195 that instead of taking, as the center of the area to query, a longitude 2196 and latitude value, it takes the name of a member already existing 2197 inside the geospatial index represented by the sorted set. 2198 """ 2199 return self._georadiusgeneric('GEORADIUSBYMEMBER', 2200 name, member, radius, unit=unit, 2201 withdist=withdist, withcoord=withcoord, 2202 withhash=withhash, count=count, 2203 sort=sort, store=store, 2204 store_dist=store_dist) 2205 2206 def _georadiusgeneric(self, command, *args, **kwargs): 2207 pieces = list(args) 2208 if kwargs['unit'] and kwargs['unit'] not in ('m', 'km', 'mi', 'ft'): 2209 raise RedisError("GEORADIUS invalid unit") 2210 elif kwargs['unit']: 2211 pieces.append(kwargs['unit']) 2212 else: 2213 pieces.append('m',) 2214 2215 for token in ('withdist', 'withcoord', 'withhash'): 2216 if kwargs[token]: 2217 pieces.append(Token(token.upper())) 2218 2219 if kwargs['count']: 2220 pieces.extend([Token('COUNT'), kwargs['count']]) 2221 2222 if kwargs['sort'] and kwargs['sort'] not in ('ASC', 'DESC'): 2223 raise RedisError("GEORADIUS invalid sort") 2224 elif kwargs['sort']: 2225 pieces.append(Token(kwargs['sort'])) 2226 2227 if kwargs['store'] and kwargs['store_dist']: 2228 raise RedisError("GEORADIUS store and store_dist cant be set" 2229 " together") 2230 2231 if kwargs['store']: 2232 pieces.extend([Token('STORE'), kwargs['store']]) 2233 2234 if kwargs['store_dist']: 2235 pieces.extend([Token('STOREDIST'), kwargs['store_dist']]) 2236 2237 return self.execute_command(command, *pieces, **kwargs) 2238 2239 2240class Redis(StrictRedis): 2241 """ 2242 Provides backwards compatibility with older versions of redis-py that 2243 changed arguments to some commands to be more Pythonic, sane, or by 2244 accident. 2245 """ 2246 2247 # Overridden callbacks 2248 RESPONSE_CALLBACKS = dict_merge( 2249 StrictRedis.RESPONSE_CALLBACKS, 2250 { 2251 'TTL': lambda r: r >= 0 and r or None, 2252 'PTTL': lambda r: r >= 0 and r or None, 2253 } 2254 ) 2255 2256 def pipeline(self, transaction=True, shard_hint=None): 2257 """ 2258 Return a new pipeline object that can queue multiple commands for 2259 later execution. ``transaction`` indicates whether all commands 2260 should be executed atomically. Apart from making a group of operations 2261 atomic, pipelines are useful for reducing the back-and-forth overhead 2262 between the client and server. 2263 """ 2264 return Pipeline( 2265 self.connection_pool, 2266 self.response_callbacks, 2267 transaction, 2268 shard_hint) 2269 2270 def setex(self, name, value, time): 2271 """ 2272 Set the value of key ``name`` to ``value`` that expires in ``time`` 2273 seconds. ``time`` can be represented by an integer or a Python 2274 timedelta object. 2275 """ 2276 if isinstance(time, datetime.timedelta): 2277 time = time.seconds + time.days * 24 * 3600 2278 return self.execute_command('SETEX', name, time, value) 2279 2280 def lrem(self, name, value, num=0): 2281 """ 2282 Remove the first ``num`` occurrences of elements equal to ``value`` 2283 from the list stored at ``name``. 2284 2285 The ``num`` argument influences the operation in the following ways: 2286 num > 0: Remove elements equal to value moving from head to tail. 2287 num < 0: Remove elements equal to value moving from tail to head. 2288 num = 0: Remove all elements equal to value. 2289 """ 2290 return self.execute_command('LREM', name, num, value) 2291 2292 def zadd(self, name, *args, **kwargs): 2293 """ 2294 NOTE: The order of arguments differs from that of the official ZADD 2295 command. For backwards compatability, this method accepts arguments 2296 in the form of name1, score1, name2, score2, while the official Redis 2297 documents expects score1, name1, score2, name2. 2298 2299 If you're looking to use the standard syntax, consider using the 2300 StrictRedis class. See the API Reference section of the docs for more 2301 information. 2302 2303 Set any number of element-name, score pairs to the key ``name``. Pairs 2304 can be specified in two ways: 2305 2306 As *args, in the form of: name1, score1, name2, score2, ... 2307 or as **kwargs, in the form of: name1=score1, name2=score2, ... 2308 2309 The following example would add four values to the 'my-key' key: 2310 redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4) 2311 """ 2312 pieces = [] 2313 if args: 2314 if len(args) % 2 != 0: 2315 raise RedisError("ZADD requires an equal number of " 2316 "values and scores") 2317 pieces.extend(reversed(args)) 2318 for pair in iteritems(kwargs): 2319 pieces.append(pair[1]) 2320 pieces.append(pair[0]) 2321 return self.execute_command('ZADD', name, *pieces) 2322 2323 2324class PubSub(object): 2325 """ 2326 PubSub provides publish, subscribe and listen support to Redis channels. 2327 2328 After subscribing to one or more channels, the listen() method will block 2329 until a message arrives on one of the subscribed channels. That message 2330 will be returned and it's safe to start listening again. 2331 """ 2332 PUBLISH_MESSAGE_TYPES = ('message', 'pmessage') 2333 UNSUBSCRIBE_MESSAGE_TYPES = ('unsubscribe', 'punsubscribe') 2334 2335 def __init__(self, connection_pool, shard_hint=None, 2336 ignore_subscribe_messages=False): 2337 self.connection_pool = connection_pool 2338 self.shard_hint = shard_hint 2339 self.ignore_subscribe_messages = ignore_subscribe_messages 2340 self.connection = None 2341 # we need to know the encoding options for this connection in order 2342 # to lookup channel and pattern names for callback handlers. 2343 self.encoder = self.connection_pool.get_encoder() 2344 self.reset() 2345 2346 def __del__(self): 2347 try: 2348 # if this object went out of scope prior to shutting down 2349 # subscriptions, close the connection manually before 2350 # returning it to the connection pool 2351 self.reset() 2352 except Exception: 2353 pass 2354 2355 def reset(self): 2356 if self.connection: 2357 self.connection.disconnect() 2358 self.connection.clear_connect_callbacks() 2359 self.connection_pool.release(self.connection) 2360 self.connection = None 2361 self.channels = {} 2362 self.patterns = {} 2363 2364 def close(self): 2365 self.reset() 2366 2367 def on_connect(self, connection): 2368 "Re-subscribe to any channels and patterns previously subscribed to" 2369 # NOTE: for python3, we can't pass bytestrings as keyword arguments 2370 # so we need to decode channel/pattern names back to unicode strings 2371 # before passing them to [p]subscribe. 2372 if self.channels: 2373 channels = {} 2374 for k, v in iteritems(self.channels): 2375 channels[self.encoder.decode(k, force=True)] = v 2376 self.subscribe(**channels) 2377 if self.patterns: 2378 patterns = {} 2379 for k, v in iteritems(self.patterns): 2380 patterns[self.encoder.decode(k, force=True)] = v 2381 self.psubscribe(**patterns) 2382 2383 @property 2384 def subscribed(self): 2385 "Indicates if there are subscriptions to any channels or patterns" 2386 return bool(self.channels or self.patterns) 2387 2388 def execute_command(self, *args, **kwargs): 2389 "Execute a publish/subscribe command" 2390 2391 # NOTE: don't parse the response in this function -- it could pull a 2392 # legitimate message off the stack if the connection is already 2393 # subscribed to one or more channels 2394 2395 if self.connection is None: 2396 self.connection = self.connection_pool.get_connection( 2397 'pubsub', 2398 self.shard_hint 2399 ) 2400 # register a callback that re-subscribes to any channels we 2401 # were listening to when we were disconnected 2402 self.connection.register_connect_callback(self.on_connect) 2403 connection = self.connection 2404 self._execute(connection, connection.send_command, *args) 2405 2406 def _execute(self, connection, command, *args): 2407 try: 2408 return command(*args) 2409 except (ConnectionError, TimeoutError) as e: 2410 connection.disconnect() 2411 if not connection.retry_on_timeout and isinstance(e, TimeoutError): 2412 raise 2413 # Connect manually here. If the Redis server is down, this will 2414 # fail and raise a ConnectionError as desired. 2415 connection.connect() 2416 # the ``on_connect`` callback should haven been called by the 2417 # connection to resubscribe us to any channels and patterns we were 2418 # previously listening to 2419 return command(*args) 2420 2421 def parse_response(self, block=True, timeout=0): 2422 "Parse the response from a publish/subscribe command" 2423 connection = self.connection 2424 if connection is None: 2425 raise RuntimeError( 2426 'pubsub connection not set: ' 2427 'did you forget to call subscribe() or psubscribe()?') 2428 if not block and not connection.can_read(timeout=timeout): 2429 return None 2430 return self._execute(connection, connection.read_response) 2431 2432 def _normalize_keys(self, data): 2433 """ 2434 normalize channel/pattern names to be either bytes or strings 2435 based on whether responses are automatically decoded. this saves us 2436 from coercing the value for each message coming in. 2437 """ 2438 encode = self.encoder.encode 2439 decode = self.encoder.decode 2440 return dict([(decode(encode(k)), v) for k, v in iteritems(data)]) 2441 2442 def psubscribe(self, *args, **kwargs): 2443 """ 2444 Subscribe to channel patterns. Patterns supplied as keyword arguments 2445 expect a pattern name as the key and a callable as the value. A 2446 pattern's callable will be invoked automatically when a message is 2447 received on that pattern rather than producing a message via 2448 ``listen()``. 2449 """ 2450 if args: 2451 args = list_or_args(args[0], args[1:]) 2452 new_patterns = dict.fromkeys(args) 2453 new_patterns.update(kwargs) 2454 ret_val = self.execute_command('PSUBSCRIBE', *iterkeys(new_patterns)) 2455 # update the patterns dict AFTER we send the command. we don't want to 2456 # subscribe twice to these patterns, once for the command and again 2457 # for the reconnection. 2458 self.patterns.update(self._normalize_keys(new_patterns)) 2459 return ret_val 2460 2461 def punsubscribe(self, *args): 2462 """ 2463 Unsubscribe from the supplied patterns. If empy, unsubscribe from 2464 all patterns. 2465 """ 2466 if args: 2467 args = list_or_args(args[0], args[1:]) 2468 return self.execute_command('PUNSUBSCRIBE', *args) 2469 2470 def subscribe(self, *args, **kwargs): 2471 """ 2472 Subscribe to channels. Channels supplied as keyword arguments expect 2473 a channel name as the key and a callable as the value. A channel's 2474 callable will be invoked automatically when a message is received on 2475 that channel rather than producing a message via ``listen()`` or 2476 ``get_message()``. 2477 """ 2478 if args: 2479 args = list_or_args(args[0], args[1:]) 2480 new_channels = dict.fromkeys(args) 2481 new_channels.update(kwargs) 2482 ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels)) 2483 # update the channels dict AFTER we send the command. we don't want to 2484 # subscribe twice to these channels, once for the command and again 2485 # for the reconnection. 2486 self.channels.update(self._normalize_keys(new_channels)) 2487 return ret_val 2488 2489 def unsubscribe(self, *args): 2490 """ 2491 Unsubscribe from the supplied channels. If empty, unsubscribe from 2492 all channels 2493 """ 2494 if args: 2495 args = list_or_args(args[0], args[1:]) 2496 return self.execute_command('UNSUBSCRIBE', *args) 2497 2498 def listen(self): 2499 "Listen for messages on channels this client has been subscribed to" 2500 while self.subscribed: 2501 response = self.handle_message(self.parse_response(block=True)) 2502 if response is not None: 2503 yield response 2504 2505 def get_message(self, ignore_subscribe_messages=False, timeout=0): 2506 """ 2507 Get the next message if one is available, otherwise None. 2508 2509 If timeout is specified, the system will wait for `timeout` seconds 2510 before returning. Timeout should be specified as a floating point 2511 number. 2512 """ 2513 response = self.parse_response(block=False, timeout=timeout) 2514 if response: 2515 return self.handle_message(response, ignore_subscribe_messages) 2516 return None 2517 2518 def handle_message(self, response, ignore_subscribe_messages=False): 2519 """ 2520 Parses a pub/sub message. If the channel or pattern was subscribed to 2521 with a message handler, the handler is invoked instead of a parsed 2522 message being returned. 2523 """ 2524 message_type = nativestr(response[0]) 2525 if message_type == 'pmessage': 2526 message = { 2527 'type': message_type, 2528 'pattern': response[1], 2529 'channel': response[2], 2530 'data': response[3] 2531 } 2532 else: 2533 message = { 2534 'type': message_type, 2535 'pattern': None, 2536 'channel': response[1], 2537 'data': response[2] 2538 } 2539 2540 # if this is an unsubscribe message, remove it from memory 2541 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 2542 subscribed_dict = None 2543 if message_type == 'punsubscribe': 2544 subscribed_dict = self.patterns 2545 else: 2546 subscribed_dict = self.channels 2547 try: 2548 del subscribed_dict[message['channel']] 2549 except KeyError: 2550 pass 2551 2552 if message_type in self.PUBLISH_MESSAGE_TYPES: 2553 # if there's a message handler, invoke it 2554 handler = None 2555 if message_type == 'pmessage': 2556 handler = self.patterns.get(message['pattern'], None) 2557 else: 2558 handler = self.channels.get(message['channel'], None) 2559 if handler: 2560 handler(message) 2561 return None 2562 else: 2563 # this is a subscribe/unsubscribe message. ignore if we don't 2564 # want them 2565 if ignore_subscribe_messages or self.ignore_subscribe_messages: 2566 return None 2567 2568 return message 2569 2570 def run_in_thread(self, sleep_time=0, daemon=False): 2571 for channel, handler in iteritems(self.channels): 2572 if handler is None: 2573 raise PubSubError("Channel: '%s' has no handler registered") 2574 for pattern, handler in iteritems(self.patterns): 2575 if handler is None: 2576 raise PubSubError("Pattern: '%s' has no handler registered") 2577 2578 thread = PubSubWorkerThread(self, sleep_time, daemon=daemon) 2579 thread.start() 2580 return thread 2581 2582 2583class PubSubWorkerThread(threading.Thread): 2584 def __init__(self, pubsub, sleep_time, daemon=False): 2585 super(PubSubWorkerThread, self).__init__() 2586 self.daemon = daemon 2587 self.pubsub = pubsub 2588 self.sleep_time = sleep_time 2589 self._running = False 2590 2591 def run(self): 2592 if self._running: 2593 return 2594 self._running = True 2595 pubsub = self.pubsub 2596 sleep_time = self.sleep_time 2597 while pubsub.subscribed: 2598 pubsub.get_message(ignore_subscribe_messages=True, 2599 timeout=sleep_time) 2600 pubsub.close() 2601 self._running = False 2602 2603 def stop(self): 2604 # stopping simply unsubscribes from all channels and patterns. 2605 # the unsubscribe responses that are generated will short circuit 2606 # the loop in run(), calling pubsub.close() to clean up the connection 2607 self.pubsub.unsubscribe() 2608 self.pubsub.punsubscribe() 2609 2610 2611class BasePipeline(object): 2612 """ 2613 Pipelines provide a way to transmit multiple commands to the Redis server 2614 in one transmission. This is convenient for batch processing, such as 2615 saving all the values in a list to Redis. 2616 2617 All commands executed within a pipeline are wrapped with MULTI and EXEC 2618 calls. This guarantees all commands executed in the pipeline will be 2619 executed atomically. 2620 2621 Any command raising an exception does *not* halt the execution of 2622 subsequent commands in the pipeline. Instead, the exception is caught 2623 and its instance is placed into the response list returned by execute(). 2624 Code iterating over the response list should be able to deal with an 2625 instance of an exception as a potential value. In general, these will be 2626 ResponseError exceptions, such as those raised when issuing a command 2627 on a key of a different datatype. 2628 """ 2629 2630 UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH')) 2631 2632 def __init__(self, connection_pool, response_callbacks, transaction, 2633 shard_hint): 2634 self.connection_pool = connection_pool 2635 self.connection = None 2636 self.response_callbacks = response_callbacks 2637 self.transaction = transaction 2638 self.shard_hint = shard_hint 2639 2640 self.watching = False 2641 self.reset() 2642 2643 def __enter__(self): 2644 return self 2645 2646 def __exit__(self, exc_type, exc_value, traceback): 2647 self.reset() 2648 2649 def __del__(self): 2650 try: 2651 self.reset() 2652 except Exception: 2653 pass 2654 2655 def __len__(self): 2656 return len(self.command_stack) 2657 2658 def reset(self): 2659 self.command_stack = [] 2660 self.scripts = set() 2661 # make sure to reset the connection state in the event that we were 2662 # watching something 2663 if self.watching and self.connection: 2664 try: 2665 # call this manually since our unwatch or 2666 # immediate_execute_command methods can call reset() 2667 self.connection.send_command('UNWATCH') 2668 self.connection.read_response() 2669 except ConnectionError: 2670 # disconnect will also remove any previous WATCHes 2671 self.connection.disconnect() 2672 # clean up the other instance attributes 2673 self.watching = False 2674 self.explicit_transaction = False 2675 # we can safely return the connection to the pool here since we're 2676 # sure we're no longer WATCHing anything 2677 if self.connection: 2678 self.connection_pool.release(self.connection) 2679 self.connection = None 2680 2681 def multi(self): 2682 """ 2683 Start a transactional block of the pipeline after WATCH commands 2684 are issued. End the transactional block with `execute`. 2685 """ 2686 if self.explicit_transaction: 2687 raise RedisError('Cannot issue nested calls to MULTI') 2688 if self.command_stack: 2689 raise RedisError('Commands without an initial WATCH have already ' 2690 'been issued') 2691 self.explicit_transaction = True 2692 2693 def execute_command(self, *args, **kwargs): 2694 if (self.watching or args[0] == 'WATCH') and \ 2695 not self.explicit_transaction: 2696 return self.immediate_execute_command(*args, **kwargs) 2697 return self.pipeline_execute_command(*args, **kwargs) 2698 2699 def immediate_execute_command(self, *args, **options): 2700 """ 2701 Execute a command immediately, but don't auto-retry on a 2702 ConnectionError if we're already WATCHing a variable. Used when 2703 issuing WATCH or subsequent commands retrieving their values but before 2704 MULTI is called. 2705 """ 2706 command_name = args[0] 2707 conn = self.connection 2708 # if this is the first call, we need a connection 2709 if not conn: 2710 conn = self.connection_pool.get_connection(command_name, 2711 self.shard_hint) 2712 self.connection = conn 2713 try: 2714 conn.send_command(*args) 2715 return self.parse_response(conn, command_name, **options) 2716 except (ConnectionError, TimeoutError) as e: 2717 conn.disconnect() 2718 if not conn.retry_on_timeout and isinstance(e, TimeoutError): 2719 raise 2720 # if we're not already watching, we can safely retry the command 2721 try: 2722 if not self.watching: 2723 conn.send_command(*args) 2724 return self.parse_response(conn, command_name, **options) 2725 except ConnectionError: 2726 # the retry failed so cleanup. 2727 conn.disconnect() 2728 self.reset() 2729 raise 2730 2731 def pipeline_execute_command(self, *args, **options): 2732 """ 2733 Stage a command to be executed when execute() is next called 2734 2735 Returns the current Pipeline object back so commands can be 2736 chained together, such as: 2737 2738 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 2739 2740 At some other point, you can then run: pipe.execute(), 2741 which will execute all commands queued in the pipe. 2742 """ 2743 self.command_stack.append((args, options)) 2744 return self 2745 2746 def _execute_transaction(self, connection, commands, raise_on_error): 2747 cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})]) 2748 all_cmds = connection.pack_commands([args for args, _ in cmds]) 2749 connection.send_packed_command(all_cmds) 2750 errors = [] 2751 2752 # parse off the response for MULTI 2753 # NOTE: we need to handle ResponseErrors here and continue 2754 # so that we read all the additional command messages from 2755 # the socket 2756 try: 2757 self.parse_response(connection, '_') 2758 except ResponseError: 2759 errors.append((0, sys.exc_info()[1])) 2760 2761 # and all the other commands 2762 for i, command in enumerate(commands): 2763 try: 2764 self.parse_response(connection, '_') 2765 except ResponseError: 2766 ex = sys.exc_info()[1] 2767 self.annotate_exception(ex, i + 1, command[0]) 2768 errors.append((i, ex)) 2769 2770 # parse the EXEC. 2771 try: 2772 response = self.parse_response(connection, '_') 2773 except ExecAbortError: 2774 if self.explicit_transaction: 2775 self.immediate_execute_command('DISCARD') 2776 if errors: 2777 raise errors[0][1] 2778 raise sys.exc_info()[1] 2779 2780 if response is None: 2781 raise WatchError("Watched variable changed.") 2782 2783 # put any parse errors into the response 2784 for i, e in errors: 2785 response.insert(i, e) 2786 2787 if len(response) != len(commands): 2788 self.connection.disconnect() 2789 raise ResponseError("Wrong number of response items from " 2790 "pipeline execution") 2791 2792 # find any errors in the response and raise if necessary 2793 if raise_on_error: 2794 self.raise_first_error(commands, response) 2795 2796 # We have to run response callbacks manually 2797 data = [] 2798 for r, cmd in izip(response, commands): 2799 if not isinstance(r, Exception): 2800 args, options = cmd 2801 command_name = args[0] 2802 if command_name in self.response_callbacks: 2803 r = self.response_callbacks[command_name](r, **options) 2804 data.append(r) 2805 return data 2806 2807 def _execute_pipeline(self, connection, commands, raise_on_error): 2808 # build up all commands into a single request to increase network perf 2809 all_cmds = connection.pack_commands([args for args, _ in commands]) 2810 connection.send_packed_command(all_cmds) 2811 2812 response = [] 2813 for args, options in commands: 2814 try: 2815 response.append( 2816 self.parse_response(connection, args[0], **options)) 2817 except ResponseError: 2818 response.append(sys.exc_info()[1]) 2819 2820 if raise_on_error: 2821 self.raise_first_error(commands, response) 2822 return response 2823 2824 def raise_first_error(self, commands, response): 2825 for i, r in enumerate(response): 2826 if isinstance(r, ResponseError): 2827 self.annotate_exception(r, i + 1, commands[i][0]) 2828 raise r 2829 2830 def annotate_exception(self, exception, number, command): 2831 cmd = safe_unicode(' ').join(imap(safe_unicode, command)) 2832 msg = unicode('Command # %d (%s) of pipeline caused error: %s') % ( 2833 number, cmd, safe_unicode(exception.args[0])) 2834 exception.args = (msg,) + exception.args[1:] 2835 2836 def parse_response(self, connection, command_name, **options): 2837 result = StrictRedis.parse_response( 2838 self, connection, command_name, **options) 2839 if command_name in self.UNWATCH_COMMANDS: 2840 self.watching = False 2841 elif command_name == 'WATCH': 2842 self.watching = True 2843 return result 2844 2845 def load_scripts(self): 2846 # make sure all scripts that are about to be run on this pipeline exist 2847 scripts = list(self.scripts) 2848 immediate = self.immediate_execute_command 2849 shas = [s.sha for s in scripts] 2850 # we can't use the normal script_* methods because they would just 2851 # get buffered in the pipeline. 2852 exists = immediate('SCRIPT EXISTS', *shas) 2853 if not all(exists): 2854 for s, exist in izip(scripts, exists): 2855 if not exist: 2856 s.sha = immediate('SCRIPT LOAD', s.script) 2857 2858 def execute(self, raise_on_error=True): 2859 "Execute all the commands in the current pipeline" 2860 stack = self.command_stack 2861 if not stack: 2862 return [] 2863 if self.scripts: 2864 self.load_scripts() 2865 if self.transaction or self.explicit_transaction: 2866 execute = self._execute_transaction 2867 else: 2868 execute = self._execute_pipeline 2869 2870 conn = self.connection 2871 if not conn: 2872 conn = self.connection_pool.get_connection('MULTI', 2873 self.shard_hint) 2874 # assign to self.connection so reset() releases the connection 2875 # back to the pool after we're done 2876 self.connection = conn 2877 2878 try: 2879 return execute(conn, stack, raise_on_error) 2880 except (ConnectionError, TimeoutError) as e: 2881 conn.disconnect() 2882 if not conn.retry_on_timeout and isinstance(e, TimeoutError): 2883 raise 2884 # if we were watching a variable, the watch is no longer valid 2885 # since this connection has died. raise a WatchError, which 2886 # indicates the user should retry his transaction. If this is more 2887 # than a temporary failure, the WATCH that the user next issues 2888 # will fail, propegating the real ConnectionError 2889 if self.watching: 2890 raise WatchError("A ConnectionError occured on while watching " 2891 "one or more keys") 2892 # otherwise, it's safe to retry since the transaction isn't 2893 # predicated on any state 2894 return execute(conn, stack, raise_on_error) 2895 finally: 2896 self.reset() 2897 2898 def watch(self, *names): 2899 "Watches the values at keys ``names``" 2900 if self.explicit_transaction: 2901 raise RedisError('Cannot issue a WATCH after a MULTI') 2902 return self.execute_command('WATCH', *names) 2903 2904 def unwatch(self): 2905 "Unwatches all previously specified keys" 2906 return self.watching and self.execute_command('UNWATCH') or True 2907 2908 2909class StrictPipeline(BasePipeline, StrictRedis): 2910 "Pipeline for the StrictRedis class" 2911 pass 2912 2913 2914class Pipeline(BasePipeline, Redis): 2915 "Pipeline for the Redis class" 2916 pass 2917 2918 2919class Script(object): 2920 "An executable Lua script object returned by ``register_script``" 2921 2922 def __init__(self, registered_client, script): 2923 self.registered_client = registered_client 2924 self.script = script 2925 # Precalculate and store the SHA1 hex digest of the script. 2926 2927 if isinstance(script, basestring): 2928 # We need the encoding from the client in order to generate an 2929 # accurate byte representation of the script 2930 encoder = registered_client.connection_pool.get_encoder() 2931 script = encoder.encode(script) 2932 self.sha = hashlib.sha1(script).hexdigest() 2933 2934 def __call__(self, keys=[], args=[], client=None): 2935 "Execute the script, passing any required ``args``" 2936 if client is None: 2937 client = self.registered_client 2938 args = tuple(keys) + tuple(args) 2939 # make sure the Redis server knows about the script 2940 if isinstance(client, BasePipeline): 2941 # Make sure the pipeline can register the script before executing. 2942 client.scripts.add(self) 2943 try: 2944 return client.evalsha(self.sha, len(keys), *args) 2945 except NoScriptError: 2946 # Maybe the client is pointed to a differnet server than the client 2947 # that created this instance? 2948 # Overwrite the sha just in case there was a discrepancy. 2949 self.sha = client.script_load(self.script) 2950 return client.evalsha(self.sha, len(keys), *args) 2951