1 #ifndef _PHPREDIS_CLUSTER_LIBRARY_H
2 #define _PHPREDIS_CLUSTER_LIBRARY_H
3 
4 #include "common.h"
5 
6 #ifdef ZTS
7 #include "TSRM.h"
8 #endif
9 
10 /* Redis cluster hash slots and N-1 which we'll use to find it */
11 #define REDIS_CLUSTER_SLOTS 16384
12 #define REDIS_CLUSTER_MOD   (REDIS_CLUSTER_SLOTS-1)
13 
14 /* Complete representation for various commands in RESP */
15 #define RESP_MULTI_CMD         "*1\r\n$5\r\nMULTI\r\n"
16 #define RESP_EXEC_CMD          "*1\r\n$4\r\nEXEC\r\n"
17 #define RESP_DISCARD_CMD       "*1\r\n$7\r\nDISCARD\r\n"
18 #define RESP_UNWATCH_CMD       "*1\r\n$7\r\nUNWATCH\r\n"
19 #define RESP_CLUSTER_SLOTS_CMD "*2\r\n$7\r\nCLUSTER\r\n$5\r\nSLOTS\r\n"
20 #define RESP_ASKING_CMD        "*1\r\n$6\r\nASKING\r\n"
21 #define RESP_READONLY_CMD      "*1\r\n$8\r\nREADONLY\r\n"
22 #define RESP_READWRITE_CMD     "*1\r\n$9\r\nREADWRITE\r\n"
23 
24 #define RESP_READONLY_CMD_LEN (sizeof(RESP_READONLY_CMD)-1)
25 
26 /* MOVED/ASK comparison macros */
27 #define IS_MOVED(p) (p[0]=='M' && p[1]=='O' && p[2]=='V' && p[3]=='E' && \
28                      p[4]=='D' && p[5]==' ')
29 #define IS_ASK(p)   (p[0]=='A' && p[1]=='S' && p[2]=='K' && p[3]==' ')
30 
31 /* MOVED/ASK lengths */
32 #define MOVED_LEN (sizeof("MOVED ")-1)
33 #define ASK_LEN   (sizeof("ASK ")-1)
34 
35 /* Initial allocation size for key distribution container */
36 #define CLUSTER_KEYDIST_ALLOC 8
37 
38 /* Macros to access nodes, sockets, and streams for a given slot */
39 #define SLOT(c,s) (c->master[s])
40 #define SLOT_SOCK(c,s) (SLOT(c,s)->sock)
41 #define SLOT_STREAM(c,s) (SLOT_SOCK(c,s)->stream)
42 #define SLOT_SLAVES(c,s) (c->master[s]->slaves)
43 
44 /* Macros to access socket and stream for the node we're communicating with */
45 #define CMD_SOCK(c) (c->cmd_sock)
46 #define CMD_STREAM(c) (c->cmd_sock->stream)
47 
48 /* Compare redirection slot information with the passed node */
49 #define CLUSTER_REDIR_CMP(c, sock) \
50     (sock->port != c->redir_port || \
51     ZSTR_LEN(sock->host) != c->redir_host_len || \
52     memcmp(ZSTR_VAL(sock->host),c->redir_host,c->redir_host_len))
53 
54 /* Clear out our "last error" */
55 #define CLUSTER_CLEAR_ERROR(c) do { \
56     if (c->err) { \
57         zend_string_release(c->err); \
58         c->err = NULL; \
59     } \
60     c->clusterdown = 0; \
61 } while (0)
62 
63 /* Protected sending of data down the wire to a RedisSock->stream */
64 #define CLUSTER_SEND_PAYLOAD(sock, buf, len) \
65     (sock && !redis_sock_server_open(sock) && sock->stream && !redis_check_eof(sock, 1 ) && \
66      php_stream_write(sock->stream, buf, len)==len)
67 
68 /* Macro to read our reply type character */
69 #define CLUSTER_VALIDATE_REPLY_TYPE(sock, type) \
70     (redis_check_eof(sock, 1) == 0 && \
71      (php_stream_getc(sock->stream) == type))
72 
73 /* Reset our last single line reply buffer and length */
74 #define CLUSTER_CLEAR_REPLY(c) \
75     *c->line_reply = '\0'; c->reply_len = 0;
76 
77 /* Helper to determine if we're in MULTI mode */
78 #define CLUSTER_IS_ATOMIC(c) (c->flags->mode != MULTI)
79 
80 /* Helper that either returns false or adds false in multi mode */
81 #define CLUSTER_RETURN_FALSE(c) \
82     if(CLUSTER_IS_ATOMIC(c)) { \
83         RETURN_FALSE; \
84     } else { \
85         add_next_index_bool(&c->multi_resp, 0); \
86         return; \
87     }
88 
89 /* Helper to either return a bool value or add it to MULTI response */
90 #define CLUSTER_RETURN_BOOL(c, b) \
91     if(CLUSTER_IS_ATOMIC(c)) { \
92         RETURN_BOOL(b); \
93     } else { \
94         add_next_index_bool(&c->multi_resp, b); \
95     }
96 
97 /* Helper to respond with a double or add it to our MULTI response */
98 #define CLUSTER_RETURN_DOUBLE(c, d) \
99     if(CLUSTER_IS_ATOMIC(c)) { \
100         RETURN_DOUBLE(d); \
101     } else { \
102         add_next_index_double(&c->multi_resp, d); \
103     }
104 
105 /* Helper to return a string value */
106 #define CLUSTER_RETURN_STRING(c, str, len) \
107     if(CLUSTER_IS_ATOMIC(c)) { \
108         RETVAL_STRINGL(str, len); \
109     } else { \
110         add_next_index_stringl(&c->multi_resp, str, len); \
111     } \
112 
113 /* Return a LONG value */
114 #define CLUSTER_RETURN_LONG(c, val) \
115     if(CLUSTER_IS_ATOMIC(c)) { \
116         RETURN_LONG(val); \
117     } else { \
118         add_next_index_long(&c->multi_resp, val); \
119     }
120 
121 /* Macro to clear out a clusterMultiCmd structure */
122 #define CLUSTER_MULTI_CLEAR(mc) \
123     mc->cmd.len  = 0; \
124     mc->args.len = 0; \
125     mc->argc     = 0; \
126 
127 /* Initialzie a clusterMultiCmd with a keyword and length */
128 #define CLUSTER_MULTI_INIT(mc, keyword, keyword_len) \
129     mc.kw     = keyword; \
130     mc.kw_len = keyword_len; \
131 
132 #define CLUSTER_CACHING_ENABLED() (INI_INT("redis.clusters.cache_slots") == 1)
133 
134 /* Cluster redirection enum */
135 typedef enum CLUSTER_REDIR_TYPE {
136     REDIR_NONE,
137     REDIR_MOVED,
138     REDIR_ASK
139 } CLUSTER_REDIR_TYPE;
140 
141 /* MULTI BULK response callback typedef */
142 typedef int  (*mbulk_cb)(RedisSock*,zval*,long long, void*);
143 
144 /* A list of covered slot ranges */
145 typedef struct redisSlotRange {
146     unsigned short low;
147     unsigned short high;
148 } redisSlotRange;
149 
150 /* Simple host/port information for our cache */
151 typedef struct redisCachedHost {
152     zend_string *addr;
153     unsigned short port;
154 } redisCachedHost;
155 
156 /* Storage for a cached master node */
157 typedef struct redisCachedMaster {
158     redisCachedHost host;
159 
160     redisSlotRange *slot;   /* Slots and count */
161     size_t slots;
162 
163     redisCachedHost *slave; /* Slaves and their count */
164     size_t slaves;
165 } redisCachedMaster;
166 
167 typedef struct redisCachedCluster {
168     // int rsrc_id;               /* Zend resource ID */
169     zend_string *hash;         /* What we're cached by */
170     redisCachedMaster *master; /* Array of masters */
171     size_t count;              /* Number of masters */
172 } redisCachedCluster;
173 
174 /* A Redis Cluster master node */
175 typedef struct redisClusterNode {
176     RedisSock *sock;      /* Our Redis socket in question */
177     short slot;           /* One slot we believe this node serves */
178     zend_llist slots;     /* List of all slots we believe this node serves */
179     unsigned short slave; /* Are we a slave */
180     HashTable *slaves;    /* Hash table of slaves */
181 } redisClusterNode;
182 
183 /* Forward declarations */
184 typedef struct clusterFoldItem clusterFoldItem;
185 
186 /* RedisCluster implementation structure */
187 typedef struct redisCluster {
188 
189     /* One RedisSock struct for serialization and prefix information */
190     RedisSock *flags;
191 
192     /* How long in milliseconds should we wait when being bounced around */
193     long waitms;
194 
195     /* Are we flagged as being in readonly mode, meaning we could fall back to
196      * a given master's slave */
197     short readonly;
198 
199     /* RedisCluster failover options (never, on error, to load balance) */
200     short failover;
201 
202     /* Hash table of seed host/ports */
203     HashTable *seeds;
204 
205     /* RedisCluster masters, by direct slot */
206     redisClusterNode *master[REDIS_CLUSTER_SLOTS];
207 
208     /* All RedisCluster objects we've created/are connected to */
209     HashTable *nodes;
210 
211     /* Transaction handling linked list, and where we are as we EXEC */
212     clusterFoldItem *multi_head;
213     clusterFoldItem *multi_curr;
214 
215     /* When we issue EXEC to nodes, we need to keep track of how many replies
216      * we have, as this can fail for various reasons (EXECABORT, watch, etc.) */
217     char multi_len[REDIS_CLUSTER_SLOTS];
218 
219     /* Variable to store MULTI response */
220     zval multi_resp;
221 
222     /* Flag for when we get a CLUSTERDOWN error */
223     short clusterdown;
224 
225     /* Key to our persistent list cache and number of redirections we've
226      * received since construction */
227     zend_string *cache_key;
228     uint64_t redirections;
229 
230     /* The last ERROR we encountered */
231     zend_string *err;
232 
233     /* The slot our command is operating on, as well as it's socket */
234     unsigned short cmd_slot;
235     RedisSock *cmd_sock;
236 
237     /* The slot where we're subscribed */
238     short subscribed_slot;
239 
240     /* The first line of our last reply, not including our reply type byte
241      * or the trailing \r\n */
242     char line_reply[1024];
243 
244     /* The last reply type and length or integer response we got */
245     REDIS_REPLY_TYPE reply_type;
246     long long reply_len;
247 
248     /* Last MOVED or ASK redirection response information */
249     CLUSTER_REDIR_TYPE redir_type;
250     char               redir_host[255];
251     int                redir_host_len;
252     unsigned short     redir_slot;
253     unsigned short     redir_port;
254 
255     /* Zend object handler */
256     zend_object std;
257 } redisCluster;
258 
259 /* RedisCluster response processing callback */
260 typedef void (*cluster_cb)(INTERNAL_FUNCTION_PARAMETERS, redisCluster*, void*);
261 
262 /* Context for processing transactions */
263 struct clusterFoldItem {
264     /* Response processing callback */
265     cluster_cb callback;
266 
267     /* The actual socket where we send this request */
268     unsigned short slot;
269 
270     /* Any context we need to send to our callback */
271     void *ctx;
272 
273     /* Next item in our list */
274     struct clusterFoldItem *next;
275 };
276 
277 /* Key and value container, with info if they need freeing */
278 typedef struct clusterKeyVal {
279     char *key, *val;
280     int  key_len,  val_len;
281     int  key_free, val_free;
282 } clusterKeyVal;
283 
284 /* Container to hold keys (and possibly values) for when we need to distribute
285  * commands across more than 1 node (e.g. WATCH, MGET, MSET, etc) */
286 typedef struct clusterDistList {
287     clusterKeyVal *entry;
288     size_t len, size;
289 } clusterDistList;
290 
291 /* Context for things like MGET/MSET/MSETNX.  When executing in MULTI mode,
292  * we'll want to re-integrate into one running array, except for the last
293  * command execution, in which we'll want to return the value (or add it) */
294 typedef struct clusterMultiCtx {
295     /* Our running array */
296     zval *z_multi;
297 
298     /* How many keys did we request for this bit */
299     int count;
300 
301     /* Is this the last entry */
302     short last;
303 } clusterMultiCtx;
304 
305 /* Container for things like MGET, MSET, and MSETNX, which split the command
306  * into a header and payload while aggregating to a specific slot. */
307 typedef struct clusterMultiCmd {
308     /* Keyword and keyword length */
309     char *kw;
310     int  kw_len;
311 
312     /* Arguments in our payload */
313     int argc;
314 
315     /* The full command, built into cmd, and args as we aggregate */
316     smart_string cmd;
317     smart_string args;
318 } clusterMultiCmd;
319 
320 /* Hiredis like structure for processing any sort of reply Redis Cluster might
321  * give us, including N level deep nested multi-bulk replies.  Unlike hiredis
322  * we don't encode errors, here as that's handled in the cluster structure. */
323 typedef struct clusterReply {
324     REDIS_REPLY_TYPE type;         /* Our reply type */
325     size_t integer;                /* Integer reply */
326     long long len;                 /* Length of our string */
327     char *str;                     /* String reply */
328     long long elements;            /* Count of array elements */
329     struct clusterReply **element; /* Array elements */
330 } clusterReply;
331 
332 /* Direct variant response handler */
333 clusterReply *cluster_read_resp(redisCluster *c, int status_strings);
334 clusterReply *cluster_read_sock_resp(RedisSock *redis_sock,
335     REDIS_REPLY_TYPE type, char *line_reply, long long reply_len);
336 void cluster_free_reply(clusterReply *reply, int free_data);
337 
338 /* Cluster distribution helpers for WATCH */
339 HashTable *cluster_dist_create();
340 void cluster_dist_free(HashTable *ht);
341 int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
342     size_t key_len, clusterKeyVal **kv);
343 void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val
344    );
345 
346 /* Aggregation for multi commands like MGET, MSET, and MSETNX */
347 void cluster_multi_init(clusterMultiCmd *mc, char *kw, int kw_len);
348 void cluster_multi_free(clusterMultiCmd *mc);
349 void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len);
350 void cluster_multi_fini(clusterMultiCmd *mc);
351 
352 /* Hash a key to it's slot, using the Redis Cluster hash algorithm */
353 unsigned short cluster_hash_key_zval(zval *key);
354 unsigned short cluster_hash_key(const char *key, int len);
355 
356 /* Validate and sanitize cluster construction args */
357 zend_string** cluster_validate_args(double timeout, double read_timeout,
358     HashTable *seeds, uint32_t *nseeds, char **errstr);
359 
360 void free_seed_array(zend_string **seeds, uint32_t nseeds);
361 
362 /* Generate a unique hash string from seeds array */
363 zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t nseeds);
364 
365 /* Get the current time in milliseconds */
366 long long mstime(void);
367 
368 PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
369     int cmd_len);
370 
371 PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force);
372 
373 PHP_REDIS_API int cluster_send_exec(redisCluster *c, short slot);
374 PHP_REDIS_API int cluster_send_discard(redisCluster *c, short slot);
375 PHP_REDIS_API int cluster_abort_exec(redisCluster *c);
376 PHP_REDIS_API int cluster_reset_multi(redisCluster *c);
377 
378 PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host,
379     unsigned short port);
380 PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
381     int cmd_len, REDIS_REPLY_TYPE rtype);
382 
383 PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
384     int failover, int persistent);
385 PHP_REDIS_API void cluster_free(redisCluster *c, int free_ctx);
386 PHP_REDIS_API void cluster_init_seeds(redisCluster *c, zend_string **seeds, uint32_t nseeds);
387 PHP_REDIS_API int cluster_map_keyspace(redisCluster *c);
388 PHP_REDIS_API void cluster_free_node(redisClusterNode *node);
389 
390 /* Functions for interacting with cached slots maps */
391 PHP_REDIS_API redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes);
392 PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc);
393 PHP_REDIS_API void cluster_init_cache(redisCluster *c, redisCachedCluster *rcc);
394 
395 /* Functions to facilitate cluster slot caching */
396 
397 PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock, int *len);
398 
399 PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes);
400 PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash);
401 
402 /*
403  * Redis Cluster response handlers.  Our response handlers generally take the
404  * following form:
405  *      PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
406  *          void *ctx)
407  *
408  * Reply handlers are responsible for setting the PHP return value (either to
409  * something valid, or FALSE in the case of some failures).
410  */
411 
412 PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
413     void *ctx);
414 PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
415     void *ctx);
416 PHP_REDIS_API void cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
417     void *ctx);
418 PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
419     void *ctx);
420 PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
421     void *ctx);
422 PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
423     void *ctx);
424 PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
425     void *ctx);
426 PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
427     void *ctx);
428 PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
429     void *ctx);
430 PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
431     void *ctx);
432 PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
433     void *ctx);
434 
435 PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS,
436     redisCluster *c, void *ctx);
437 
438 PHP_REDIS_API void cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
439     redisCluster *c, void *ctx);
440 
441 PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS,
442     redisCluster *c, void *ctx);
443 
444 /* MULTI BULK response functions */
445 PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
446     redisCluster *c, mbulk_cb func, void *ctx);
447 PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
448     redisCluster *c, void *ctx);
449 PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
450     redisCluster *c, void *ctx);
451 PHP_REDIS_API void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS,
452     redisCluster *c, void *ctx);
453 PHP_REDIS_API void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS,
454     redisCluster *c, void *ctx);
455 PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,
456     redisCluster *c, void *ctx);
457 PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
458     redisCluster *c, void *ctx);
459 PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
460     redisCluster *c, int pull, mbulk_cb cb, zval *z_ret);
461 
462 /* Handlers for things like DEL/MGET/MSET/MSETNX */
463 PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS,
464     redisCluster *c, void *ctx);
465 PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
466     redisCluster *c, void *ctx);
467 PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS,
468     redisCluster *c, void *ctx);
469 PHP_REDIS_API void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS,
470     redisCluster *c, void *ctx);
471 
472 /* Response handler for ZSCAN, SSCAN, and HSCAN */
473 PHP_REDIS_API int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS,
474     redisCluster *c, REDIS_SCAN_TYPE type, long *it);
475 
476 /* INFO response handler */
477 PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS,
478     redisCluster *c, void *ctx);
479 
480 /* CLIENT LIST response handler */
481 PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS,
482     redisCluster *c, void *ctx);
483 
484 /* Custom STREAM handlers */
485 PHP_REDIS_API void cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS,
486     redisCluster *c, void *ctx);
487 PHP_REDIS_API void cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS,
488     redisCluster *c, void *ctx);
489 PHP_REDIS_API void cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS,
490     redisCluster *c, void *ctx);
491 PHP_REDIS_API void cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS,
492     redisCluster *c, void *ctx);
493 
494 /* Custom ACL handlers */
495 PHP_REDIS_API void cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx);
496 PHP_REDIS_API void cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx);
497 
498 /* MULTI BULK processing callbacks */
499 int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result,
500     long long count, void *ctx);
501 int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result,
502     long long count, void *ctx);
503 int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result,
504     long long count, void *ctx);
505 int mbulk_resp_loop_zipdbl(RedisSock *redis_sock, zval *z_result,
506     long long count, void *ctx);
507 int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
508     long long count, void *ctx);
509 
510 #endif
511 
512 /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */
513