1 #ifndef _PHPREDIS_CLUSTER_LIBRARY_H 2 #define _PHPREDIS_CLUSTER_LIBRARY_H 3 4 #include "common.h" 5 6 #ifdef ZTS 7 #include "TSRM.h" 8 #endif 9 10 /* Redis cluster hash slots and N-1 which we'll use to find it */ 11 #define REDIS_CLUSTER_SLOTS 16384 12 #define REDIS_CLUSTER_MOD (REDIS_CLUSTER_SLOTS-1) 13 14 /* Complete representation for various commands in RESP */ 15 #define RESP_MULTI_CMD "*1\r\n$5\r\nMULTI\r\n" 16 #define RESP_EXEC_CMD "*1\r\n$4\r\nEXEC\r\n" 17 #define RESP_DISCARD_CMD "*1\r\n$7\r\nDISCARD\r\n" 18 #define RESP_UNWATCH_CMD "*1\r\n$7\r\nUNWATCH\r\n" 19 #define RESP_CLUSTER_SLOTS_CMD "*2\r\n$7\r\nCLUSTER\r\n$5\r\nSLOTS\r\n" 20 #define RESP_ASKING_CMD "*1\r\n$6\r\nASKING\r\n" 21 #define RESP_READONLY_CMD "*1\r\n$8\r\nREADONLY\r\n" 22 #define RESP_READWRITE_CMD "*1\r\n$9\r\nREADWRITE\r\n" 23 24 #define RESP_READONLY_CMD_LEN (sizeof(RESP_READONLY_CMD)-1) 25 26 /* MOVED/ASK comparison macros */ 27 #define IS_MOVED(p) (p[0]=='M' && p[1]=='O' && p[2]=='V' && p[3]=='E' && \ 28 p[4]=='D' && p[5]==' ') 29 #define IS_ASK(p) (p[0]=='A' && p[1]=='S' && p[2]=='K' && p[3]==' ') 30 31 /* MOVED/ASK lengths */ 32 #define MOVED_LEN (sizeof("MOVED ")-1) 33 #define ASK_LEN (sizeof("ASK ")-1) 34 35 /* Initial allocation size for key distribution container */ 36 #define CLUSTER_KEYDIST_ALLOC 8 37 38 /* Macros to access nodes, sockets, and streams for a given slot */ 39 #define SLOT(c,s) (c->master[s]) 40 #define SLOT_SOCK(c,s) (SLOT(c,s)->sock) 41 #define SLOT_STREAM(c,s) (SLOT_SOCK(c,s)->stream) 42 #define SLOT_SLAVES(c,s) (c->master[s]->slaves) 43 44 /* Macros to access socket and stream for the node we're communicating with */ 45 #define CMD_SOCK(c) (c->cmd_sock) 46 #define CMD_STREAM(c) (c->cmd_sock->stream) 47 48 /* Compare redirection slot information with the passed node */ 49 #define CLUSTER_REDIR_CMP(c, sock) \ 50 (sock->port != c->redir_port || \ 51 ZSTR_LEN(sock->host) != c->redir_host_len || \ 52 memcmp(ZSTR_VAL(sock->host),c->redir_host,c->redir_host_len)) 53 54 /* Clear out our "last error" */ 55 #define CLUSTER_CLEAR_ERROR(c) do { \ 56 if (c->err) { \ 57 zend_string_release(c->err); \ 58 c->err = NULL; \ 59 } \ 60 c->clusterdown = 0; \ 61 } while (0) 62 63 /* Protected sending of data down the wire to a RedisSock->stream */ 64 #define CLUSTER_SEND_PAYLOAD(sock, buf, len) \ 65 (sock && !redis_sock_server_open(sock) && sock->stream && !redis_check_eof(sock, 1 ) && \ 66 php_stream_write(sock->stream, buf, len)==len) 67 68 /* Macro to read our reply type character */ 69 #define CLUSTER_VALIDATE_REPLY_TYPE(sock, type) \ 70 (redis_check_eof(sock, 1) == 0 && \ 71 (php_stream_getc(sock->stream) == type)) 72 73 /* Reset our last single line reply buffer and length */ 74 #define CLUSTER_CLEAR_REPLY(c) \ 75 *c->line_reply = '\0'; c->reply_len = 0; 76 77 /* Helper to determine if we're in MULTI mode */ 78 #define CLUSTER_IS_ATOMIC(c) (c->flags->mode != MULTI) 79 80 /* Helper that either returns false or adds false in multi mode */ 81 #define CLUSTER_RETURN_FALSE(c) \ 82 if(CLUSTER_IS_ATOMIC(c)) { \ 83 RETURN_FALSE; \ 84 } else { \ 85 add_next_index_bool(&c->multi_resp, 0); \ 86 return; \ 87 } 88 89 /* Helper to either return a bool value or add it to MULTI response */ 90 #define CLUSTER_RETURN_BOOL(c, b) \ 91 if(CLUSTER_IS_ATOMIC(c)) { \ 92 RETURN_BOOL(b); \ 93 } else { \ 94 add_next_index_bool(&c->multi_resp, b); \ 95 } 96 97 /* Helper to respond with a double or add it to our MULTI response */ 98 #define CLUSTER_RETURN_DOUBLE(c, d) \ 99 if(CLUSTER_IS_ATOMIC(c)) { \ 100 RETURN_DOUBLE(d); \ 101 } else { \ 102 add_next_index_double(&c->multi_resp, d); \ 103 } 104 105 /* Helper to return a string value */ 106 #define CLUSTER_RETURN_STRING(c, str, len) \ 107 if(CLUSTER_IS_ATOMIC(c)) { \ 108 RETVAL_STRINGL(str, len); \ 109 } else { \ 110 add_next_index_stringl(&c->multi_resp, str, len); \ 111 } \ 112 113 /* Return a LONG value */ 114 #define CLUSTER_RETURN_LONG(c, val) \ 115 if(CLUSTER_IS_ATOMIC(c)) { \ 116 RETURN_LONG(val); \ 117 } else { \ 118 add_next_index_long(&c->multi_resp, val); \ 119 } 120 121 /* Macro to clear out a clusterMultiCmd structure */ 122 #define CLUSTER_MULTI_CLEAR(mc) \ 123 mc->cmd.len = 0; \ 124 mc->args.len = 0; \ 125 mc->argc = 0; \ 126 127 /* Initialzie a clusterMultiCmd with a keyword and length */ 128 #define CLUSTER_MULTI_INIT(mc, keyword, keyword_len) \ 129 mc.kw = keyword; \ 130 mc.kw_len = keyword_len; \ 131 132 #define CLUSTER_CACHING_ENABLED() (INI_INT("redis.clusters.cache_slots") == 1) 133 134 /* Cluster redirection enum */ 135 typedef enum CLUSTER_REDIR_TYPE { 136 REDIR_NONE, 137 REDIR_MOVED, 138 REDIR_ASK 139 } CLUSTER_REDIR_TYPE; 140 141 /* MULTI BULK response callback typedef */ 142 typedef int (*mbulk_cb)(RedisSock*,zval*,long long, void*); 143 144 /* A list of covered slot ranges */ 145 typedef struct redisSlotRange { 146 unsigned short low; 147 unsigned short high; 148 } redisSlotRange; 149 150 /* Simple host/port information for our cache */ 151 typedef struct redisCachedHost { 152 zend_string *addr; 153 unsigned short port; 154 } redisCachedHost; 155 156 /* Storage for a cached master node */ 157 typedef struct redisCachedMaster { 158 redisCachedHost host; 159 160 redisSlotRange *slot; /* Slots and count */ 161 size_t slots; 162 163 redisCachedHost *slave; /* Slaves and their count */ 164 size_t slaves; 165 } redisCachedMaster; 166 167 typedef struct redisCachedCluster { 168 // int rsrc_id; /* Zend resource ID */ 169 zend_string *hash; /* What we're cached by */ 170 redisCachedMaster *master; /* Array of masters */ 171 size_t count; /* Number of masters */ 172 } redisCachedCluster; 173 174 /* A Redis Cluster master node */ 175 typedef struct redisClusterNode { 176 RedisSock *sock; /* Our Redis socket in question */ 177 short slot; /* One slot we believe this node serves */ 178 zend_llist slots; /* List of all slots we believe this node serves */ 179 unsigned short slave; /* Are we a slave */ 180 HashTable *slaves; /* Hash table of slaves */ 181 } redisClusterNode; 182 183 /* Forward declarations */ 184 typedef struct clusterFoldItem clusterFoldItem; 185 186 /* RedisCluster implementation structure */ 187 typedef struct redisCluster { 188 189 /* One RedisSock struct for serialization and prefix information */ 190 RedisSock *flags; 191 192 /* How long in milliseconds should we wait when being bounced around */ 193 long waitms; 194 195 /* Are we flagged as being in readonly mode, meaning we could fall back to 196 * a given master's slave */ 197 short readonly; 198 199 /* RedisCluster failover options (never, on error, to load balance) */ 200 short failover; 201 202 /* Hash table of seed host/ports */ 203 HashTable *seeds; 204 205 /* RedisCluster masters, by direct slot */ 206 redisClusterNode *master[REDIS_CLUSTER_SLOTS]; 207 208 /* All RedisCluster objects we've created/are connected to */ 209 HashTable *nodes; 210 211 /* Transaction handling linked list, and where we are as we EXEC */ 212 clusterFoldItem *multi_head; 213 clusterFoldItem *multi_curr; 214 215 /* When we issue EXEC to nodes, we need to keep track of how many replies 216 * we have, as this can fail for various reasons (EXECABORT, watch, etc.) */ 217 char multi_len[REDIS_CLUSTER_SLOTS]; 218 219 /* Variable to store MULTI response */ 220 zval multi_resp; 221 222 /* Flag for when we get a CLUSTERDOWN error */ 223 short clusterdown; 224 225 /* Key to our persistent list cache and number of redirections we've 226 * received since construction */ 227 zend_string *cache_key; 228 uint64_t redirections; 229 230 /* The last ERROR we encountered */ 231 zend_string *err; 232 233 /* The slot our command is operating on, as well as it's socket */ 234 unsigned short cmd_slot; 235 RedisSock *cmd_sock; 236 237 /* The slot where we're subscribed */ 238 short subscribed_slot; 239 240 /* The first line of our last reply, not including our reply type byte 241 * or the trailing \r\n */ 242 char line_reply[1024]; 243 244 /* The last reply type and length or integer response we got */ 245 REDIS_REPLY_TYPE reply_type; 246 long long reply_len; 247 248 /* Last MOVED or ASK redirection response information */ 249 CLUSTER_REDIR_TYPE redir_type; 250 char redir_host[255]; 251 int redir_host_len; 252 unsigned short redir_slot; 253 unsigned short redir_port; 254 255 /* Zend object handler */ 256 zend_object std; 257 } redisCluster; 258 259 /* RedisCluster response processing callback */ 260 typedef void (*cluster_cb)(INTERNAL_FUNCTION_PARAMETERS, redisCluster*, void*); 261 262 /* Context for processing transactions */ 263 struct clusterFoldItem { 264 /* Response processing callback */ 265 cluster_cb callback; 266 267 /* The actual socket where we send this request */ 268 unsigned short slot; 269 270 /* Any context we need to send to our callback */ 271 void *ctx; 272 273 /* Next item in our list */ 274 struct clusterFoldItem *next; 275 }; 276 277 /* Key and value container, with info if they need freeing */ 278 typedef struct clusterKeyVal { 279 char *key, *val; 280 int key_len, val_len; 281 int key_free, val_free; 282 } clusterKeyVal; 283 284 /* Container to hold keys (and possibly values) for when we need to distribute 285 * commands across more than 1 node (e.g. WATCH, MGET, MSET, etc) */ 286 typedef struct clusterDistList { 287 clusterKeyVal *entry; 288 size_t len, size; 289 } clusterDistList; 290 291 /* Context for things like MGET/MSET/MSETNX. When executing in MULTI mode, 292 * we'll want to re-integrate into one running array, except for the last 293 * command execution, in which we'll want to return the value (or add it) */ 294 typedef struct clusterMultiCtx { 295 /* Our running array */ 296 zval *z_multi; 297 298 /* How many keys did we request for this bit */ 299 int count; 300 301 /* Is this the last entry */ 302 short last; 303 } clusterMultiCtx; 304 305 /* Container for things like MGET, MSET, and MSETNX, which split the command 306 * into a header and payload while aggregating to a specific slot. */ 307 typedef struct clusterMultiCmd { 308 /* Keyword and keyword length */ 309 char *kw; 310 int kw_len; 311 312 /* Arguments in our payload */ 313 int argc; 314 315 /* The full command, built into cmd, and args as we aggregate */ 316 smart_string cmd; 317 smart_string args; 318 } clusterMultiCmd; 319 320 /* Hiredis like structure for processing any sort of reply Redis Cluster might 321 * give us, including N level deep nested multi-bulk replies. Unlike hiredis 322 * we don't encode errors, here as that's handled in the cluster structure. */ 323 typedef struct clusterReply { 324 REDIS_REPLY_TYPE type; /* Our reply type */ 325 size_t integer; /* Integer reply */ 326 long long len; /* Length of our string */ 327 char *str; /* String reply */ 328 long long elements; /* Count of array elements */ 329 struct clusterReply **element; /* Array elements */ 330 } clusterReply; 331 332 /* Direct variant response handler */ 333 clusterReply *cluster_read_resp(redisCluster *c, int status_strings); 334 clusterReply *cluster_read_sock_resp(RedisSock *redis_sock, 335 REDIS_REPLY_TYPE type, char *line_reply, long long reply_len); 336 void cluster_free_reply(clusterReply *reply, int free_data); 337 338 /* Cluster distribution helpers for WATCH */ 339 HashTable *cluster_dist_create(); 340 void cluster_dist_free(HashTable *ht); 341 int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key, 342 size_t key_len, clusterKeyVal **kv); 343 void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val 344 ); 345 346 /* Aggregation for multi commands like MGET, MSET, and MSETNX */ 347 void cluster_multi_init(clusterMultiCmd *mc, char *kw, int kw_len); 348 void cluster_multi_free(clusterMultiCmd *mc); 349 void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len); 350 void cluster_multi_fini(clusterMultiCmd *mc); 351 352 /* Hash a key to it's slot, using the Redis Cluster hash algorithm */ 353 unsigned short cluster_hash_key_zval(zval *key); 354 unsigned short cluster_hash_key(const char *key, int len); 355 356 /* Validate and sanitize cluster construction args */ 357 zend_string** cluster_validate_args(double timeout, double read_timeout, 358 HashTable *seeds, uint32_t *nseeds, char **errstr); 359 360 void free_seed_array(zend_string **seeds, uint32_t nseeds); 361 362 /* Generate a unique hash string from seeds array */ 363 zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t nseeds); 364 365 /* Get the current time in milliseconds */ 366 long long mstime(void); 367 368 PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd, 369 int cmd_len); 370 371 PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force); 372 373 PHP_REDIS_API int cluster_send_exec(redisCluster *c, short slot); 374 PHP_REDIS_API int cluster_send_discard(redisCluster *c, short slot); 375 PHP_REDIS_API int cluster_abort_exec(redisCluster *c); 376 PHP_REDIS_API int cluster_reset_multi(redisCluster *c); 377 378 PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host, 379 unsigned short port); 380 PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd, 381 int cmd_len, REDIS_REPLY_TYPE rtype); 382 383 PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout, 384 int failover, int persistent); 385 PHP_REDIS_API void cluster_free(redisCluster *c, int free_ctx); 386 PHP_REDIS_API void cluster_init_seeds(redisCluster *c, zend_string **seeds, uint32_t nseeds); 387 PHP_REDIS_API int cluster_map_keyspace(redisCluster *c); 388 PHP_REDIS_API void cluster_free_node(redisClusterNode *node); 389 390 /* Functions for interacting with cached slots maps */ 391 PHP_REDIS_API redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes); 392 PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc); 393 PHP_REDIS_API void cluster_init_cache(redisCluster *c, redisCachedCluster *rcc); 394 395 /* Functions to facilitate cluster slot caching */ 396 397 PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock, int *len); 398 399 PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes); 400 PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash); 401 402 /* 403 * Redis Cluster response handlers. Our response handlers generally take the 404 * following form: 405 * PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 406 * void *ctx) 407 * 408 * Reply handlers are responsible for setting the PHP return value (either to 409 * something valid, or FALSE in the case of some failures). 410 */ 411 412 PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 413 void *ctx); 414 PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 415 void *ctx); 416 PHP_REDIS_API void cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 417 void *ctx); 418 PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 419 void *ctx); 420 PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 421 void *ctx); 422 PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 423 void *ctx); 424 PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 425 void *ctx); 426 PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 427 void *ctx); 428 PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 429 void *ctx); 430 PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 431 void *ctx); 432 PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 433 void *ctx); 434 435 PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, 436 redisCluster *c, void *ctx); 437 438 PHP_REDIS_API void cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS, 439 redisCluster *c, void *ctx); 440 441 PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, 442 redisCluster *c, void *ctx); 443 444 /* MULTI BULK response functions */ 445 PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 446 redisCluster *c, mbulk_cb func, void *ctx); 447 PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, 448 redisCluster *c, void *ctx); 449 PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 450 redisCluster *c, void *ctx); 451 PHP_REDIS_API void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS, 452 redisCluster *c, void *ctx); 453 PHP_REDIS_API void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS, 454 redisCluster *c, void *ctx); 455 PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, 456 redisCluster *c, void *ctx); 457 PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 458 redisCluster *c, void *ctx); 459 PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 460 redisCluster *c, int pull, mbulk_cb cb, zval *z_ret); 461 462 /* Handlers for things like DEL/MGET/MSET/MSETNX */ 463 PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, 464 redisCluster *c, void *ctx); 465 PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, 466 redisCluster *c, void *ctx); 467 PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS, 468 redisCluster *c, void *ctx); 469 PHP_REDIS_API void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS, 470 redisCluster *c, void *ctx); 471 472 /* Response handler for ZSCAN, SSCAN, and HSCAN */ 473 PHP_REDIS_API int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, 474 redisCluster *c, REDIS_SCAN_TYPE type, long *it); 475 476 /* INFO response handler */ 477 PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, 478 redisCluster *c, void *ctx); 479 480 /* CLIENT LIST response handler */ 481 PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, 482 redisCluster *c, void *ctx); 483 484 /* Custom STREAM handlers */ 485 PHP_REDIS_API void cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, 486 redisCluster *c, void *ctx); 487 PHP_REDIS_API void cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, 488 redisCluster *c, void *ctx); 489 PHP_REDIS_API void cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, 490 redisCluster *c, void *ctx); 491 PHP_REDIS_API void cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS, 492 redisCluster *c, void *ctx); 493 494 /* Custom ACL handlers */ 495 PHP_REDIS_API void cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); 496 PHP_REDIS_API void cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); 497 498 /* MULTI BULK processing callbacks */ 499 int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result, 500 long long count, void *ctx); 501 int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result, 502 long long count, void *ctx); 503 int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result, 504 long long count, void *ctx); 505 int mbulk_resp_loop_zipdbl(RedisSock *redis_sock, zval *z_result, 506 long long count, void *ctx); 507 int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result, 508 long long count, void *ctx); 509 510 #endif 511 512 /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */ 513