1 #include "php_redis.h"
2 #include "common.h"
3 #include "library.h"
4 #include "redis_commands.h"
5 #include "cluster_library.h"
6 #include "crc16.h"
7 #include <zend_exceptions.h>
8 
9 extern zend_class_entry *redis_cluster_exception_ce;
10 int le_cluster_slot_cache;
11 
12 /* Debugging methods/
13 static void cluster_dump_nodes(redisCluster *c) {
14     redisClusterNode *p;
15 
16     ZEND_HASH_FOREACH_PTR(c->nodes, p) {
17         if (p == NULL) {
18             continue;
19         }
20 
21         const char *slave = (p->slave) ? "slave" : "master";
22         php_printf("%d %s %d %d", p->sock->port, slave,p->sock->prefix_len,
23             p->slot);
24 
25         php_printf("\n");
26     } ZEND_HASH_FOREACH_END();
27 }
28 
29 static void cluster_log(char *fmt, ...)
30 {
31     va_list args;
32     char buffer[1024];
33 
34     va_start(args, fmt);
35     vsnprintf(buffer,sizeof(buffer),fmt,args);
36     va_end(args);
37 
38     fprintf(stderr, "%s\n", buffer);
39 }
40 
41 // Debug function to dump a clusterReply structure recursively
42 static void dump_reply(clusterReply *reply, int indent) {
43     smart_string buf = {0};
44     int i;
45 
46     switch(reply->type) {
47         case TYPE_ERR:
48             smart_string_appendl(&buf, "(error) ", sizeof("(error) ")-1);
49             smart_string_appendl(&buf, reply->str, reply->len);
50             break;
51         case TYPE_LINE:
52             smart_string_appendl(&buf, reply->str, reply->len);
53             break;
54         case TYPE_INT:
55             smart_string_appendl(&buf, "(integer) ", sizeof("(integer) ")-1);
56             smart_string_append_long(&buf, reply->integer);
57             break;
58         case TYPE_BULK:
59             smart_string_appendl(&buf,"\"", 1);
60             smart_string_appendl(&buf, reply->str, reply->len);
61             smart_string_appendl(&buf, "\"", 1);
62             break;
63         case TYPE_MULTIBULK:
64             if (reply->elements < 0) {
65                 smart_string_appendl(&buf, "(nil)", sizeof("(nil)")-1);
66             } else {
67                 for (i = 0; i < reply->elements; i++) {
68                     dump_reply(reply->element[i], indent+2);
69                 }
70             }
71             break;
72         default:
73             break;
74     }
75 
76     if (buf.len > 0) {
77         for (i = 0; i < indent; i++) {
78             php_printf(" ");
79         }
80 
81         smart_string_0(&buf);
82         php_printf("%s", buf.c);
83         php_printf("\n");
84 
85         efree(buf.c);
86     }
87 }
88 */
89 
90 
91 /* Recursively free our reply object.  If free_data is non-zero we'll also free
92  * the payload data (strings) themselves.  If not, we just free the structs */
cluster_free_reply(clusterReply * reply,int free_data)93 void cluster_free_reply(clusterReply *reply, int free_data) {
94     long long i;
95 
96     switch(reply->type) {
97         case TYPE_ERR:
98         case TYPE_LINE:
99         case TYPE_BULK:
100             if (free_data && reply->str)
101                 efree(reply->str);
102             break;
103         case TYPE_MULTIBULK:
104             if (reply->element) {
105                 if (reply->elements > 0) {
106                     for (i = 0; i < reply->elements && reply->element[i]; i++) {
107                         cluster_free_reply(reply->element[i], free_data);
108                     }
109                 }
110                 efree(reply->element);
111             }
112             break;
113         default:
114             break;
115     }
116     efree(reply);
117 }
118 
119 static int
cluster_multibulk_resp_recursive(RedisSock * sock,size_t elements,clusterReply ** element,int status_strings)120 cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
121                                  clusterReply **element, int status_strings)
122 {
123     int i;
124     size_t sz;
125     clusterReply *r;
126     long len;
127     char buf[1024];
128 
129     for (i = 0; i < elements; i++) {
130         r = element[i] = ecalloc(1, sizeof(clusterReply));
131 
132         // Bomb out, flag error condition on a communication failure
133         if (redis_read_reply_type(sock, &r->type, &len) < 0) {
134             return FAILURE;
135         }
136 
137         /* Set our reply len */
138         r->len = len;
139 
140         switch(r->type) {
141             case TYPE_ERR:
142             case TYPE_LINE:
143                 if (redis_sock_gets(sock,buf,sizeof(buf),&sz) < 0) {
144                     return FAILURE;
145                 }
146                 r->len = (long long)sz;
147                 if (status_strings) r->str = estrndup(buf, r->len);
148                 break;
149             case TYPE_INT:
150                 r->integer = len;
151                 break;
152             case TYPE_BULK:
153                 if (r->len >= 0) {
154                     r->str = redis_sock_read_bulk_reply(sock,r->len);
155                     if (!r->str) {
156                         return FAILURE;
157                     }
158                 }
159                 break;
160             case TYPE_MULTIBULK:
161                 r->elements = r->len;
162                 if (r->elements > 0) {
163                     r->element = ecalloc(r->len, sizeof(*r->element));
164                     if (cluster_multibulk_resp_recursive(sock, r->elements, r->element, status_strings) < 0) {
165                         return FAILURE;
166                     }
167                 }
168                 break;
169             default:
170                 return FAILURE;
171         }
172     }
173     return SUCCESS;
174 }
175 
176 /* Return the socket for a slot and slave index */
cluster_slot_sock(redisCluster * c,unsigned short slot,zend_ulong slaveidx)177 static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot,
178                                     zend_ulong slaveidx)
179 {
180     redisClusterNode *node;
181 
182     /* Return the master if we're not looking for a slave */
183     if (slaveidx == 0) {
184         return SLOT_SOCK(c, slot);
185     }
186 
187     /* Abort if we can't find this slave */
188     if (!SLOT_SLAVES(c, slot) ||
189         (node = zend_hash_index_find_ptr(SLOT_SLAVES(c,slot), slaveidx)) == NULL
190     ) {
191         return NULL;
192     }
193 
194     /* Success, return the slave */
195     return node->sock;
196 }
197 
198 /* Read the response from a cluster */
cluster_read_resp(redisCluster * c,int status_strings)199 clusterReply *cluster_read_resp(redisCluster *c, int status_strings) {
200     return cluster_read_sock_resp(c->cmd_sock, c->reply_type,
201                                   status_strings ? c->line_reply : NULL,
202                                   c->reply_len);
203 }
204 
205 /* Read any sort of response from the socket, having already issued the
206  * command and consumed the reply type and meta info (length) */
207 clusterReply*
cluster_read_sock_resp(RedisSock * redis_sock,REDIS_REPLY_TYPE type,char * line_reply,long long len)208 cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
209                        char *line_reply, long long len)
210 {
211     clusterReply *r;
212 
213     r = ecalloc(1, sizeof(clusterReply));
214     r->type = type;
215 
216     switch(r->type) {
217         case TYPE_INT:
218             r->integer = len;
219             break;
220         case TYPE_LINE:
221             if (line_reply) {
222                 r->str = estrndup(line_reply, len);
223                 r->len = len;
224             }
225         case TYPE_ERR:
226             return r;
227         case TYPE_BULK:
228             r->len = len;
229             r->str = redis_sock_read_bulk_reply(redis_sock, len);
230             if (r->len != -1 && !r->str) {
231                 cluster_free_reply(r, 1);
232                 return NULL;
233             }
234             break;
235         case TYPE_MULTIBULK:
236             r->elements = len;
237             if (r->elements > 0) {
238                 r->element = ecalloc(len, sizeof(clusterReply*));
239                 if (cluster_multibulk_resp_recursive(redis_sock, len, r->element, line_reply != NULL) < 0) {
240                     cluster_free_reply(r, 1);
241                     return NULL;
242                 }
243             }
244             break;
245         default:
246             cluster_free_reply(r, 1);
247             return NULL;
248     }
249 
250     // Success, return the reply
251     return r;
252 }
253 
254 /*
255  * Helpers to send various 'control type commands to a specific node, e.g.
256  * MULTI, ASKING, READONLY, READWRITE, etc
257  */
258 
259 /* Send a command to the specific socket and validate reply type */
cluster_send_direct(RedisSock * redis_sock,char * cmd,int cmd_len,REDIS_REPLY_TYPE type)260 static int cluster_send_direct(RedisSock *redis_sock, char *cmd, int cmd_len,
261                                REDIS_REPLY_TYPE type)
262 {
263     char buf[1024];
264 
265     /* Connect to the socket if we aren't yet and send our command, validate the reply type, and consume the first line */
266     if (!CLUSTER_SEND_PAYLOAD(redis_sock,cmd,cmd_len) ||
267         !CLUSTER_VALIDATE_REPLY_TYPE(redis_sock, type) ||
268         !php_stream_gets(redis_sock->stream, buf, sizeof(buf))) return -1;
269 
270     /* Success! */
271     return 0;
272 }
273 
cluster_send_asking(RedisSock * redis_sock)274 static int cluster_send_asking(RedisSock *redis_sock) {
275     return cluster_send_direct(redis_sock, RESP_ASKING_CMD,
276         sizeof(RESP_ASKING_CMD)-1, TYPE_LINE);
277 }
278 
279 /* Send READONLY to a specific RedisSock unless it's already flagged as being
280  * in READONLY mode.  If we can send the command, we flag the socket as being
281  * in that mode. */
cluster_send_readonly(RedisSock * redis_sock)282 static int cluster_send_readonly(RedisSock *redis_sock) {
283     int ret;
284 
285     /* We don't have to do anything if we're already in readonly mode */
286     if (redis_sock->readonly) return 0;
287 
288     /* Return success if we can send it */
289     ret = cluster_send_direct(redis_sock, RESP_READONLY_CMD,
290         sizeof(RESP_READONLY_CMD) - 1, TYPE_LINE);
291 
292     /* Flag this socket as READONLY if our command worked */
293     redis_sock->readonly = !ret;
294 
295     /* Return the result of our send */
296     return ret;
297 }
298 
299 /* Send MULTI to a specific ReidsSock */
cluster_send_multi(redisCluster * c,short slot)300 static int cluster_send_multi(redisCluster *c, short slot) {
301     if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_MULTI_CMD,
302         sizeof(RESP_MULTI_CMD) - 1, TYPE_LINE) == 0)
303     {
304         c->cmd_sock->mode = MULTI;
305         return 0;
306     }
307     return -1;
308 }
309 
310 /* Send EXEC to a given slot.  We can use the normal command processing mechanism
311  * here because we know we'll only have sent MULTI to the master nodes.  We can't
312  * failover inside a transaction, as we don't know if the transaction will only
313  * be readonly commands, or contain write commands as well */
cluster_send_exec(redisCluster * c,short slot)314 PHP_REDIS_API int cluster_send_exec(redisCluster *c, short slot) {
315     int retval;
316 
317     /* Send exec */
318     retval = cluster_send_slot(c, slot, RESP_EXEC_CMD, sizeof(RESP_EXEC_CMD)-1,
319         TYPE_MULTIBULK);
320 
321     /* We'll either get a length corresponding to the number of commands sent to
322      * this node, or -1 in the case of EXECABORT or WATCH failure. */
323     c->multi_len[slot] = c->reply_len > 0 ? 1 : -1;
324 
325     /* Return our retval */
326     return retval;
327 }
328 
cluster_send_discard(redisCluster * c,short slot)329 PHP_REDIS_API int cluster_send_discard(redisCluster *c, short slot) {
330     if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_DISCARD_CMD,
331         sizeof(RESP_DISCARD_CMD)-1, TYPE_LINE))
332     {
333         return 0;
334     }
335     return -1;
336 }
337 
338 /*
339  * Cluster key distribution helpers.  For a small handlful of commands, we want
340  * to distribute them across 1-N nodes.  These methods provide simple containers
341  * for the purposes of splitting keys/values in this way
342  * */
343 
344 /* Free cluster distribution list inside a HashTable */
cluster_dist_free_ht(zval * p)345 static void cluster_dist_free_ht(zval *p) {
346     clusterDistList *dl = *(clusterDistList**)p;
347     int i;
348 
349     for (i = 0; i < dl->len; i++) {
350         if (dl->entry[i].key_free)
351             efree(dl->entry[i].key);
352         if (dl->entry[i].val_free)
353             efree(dl->entry[i].val);
354     }
355 
356     efree(dl->entry);
357     efree(dl);
358 }
359 
360 /* Spin up a HashTable that will contain distribution lists */
cluster_dist_create()361 HashTable *cluster_dist_create() {
362     HashTable *ret;
363 
364     ALLOC_HASHTABLE(ret);
365     zend_hash_init(ret, 0, NULL, cluster_dist_free_ht, 0);
366 
367     return ret;
368 }
369 
370 /* Free distribution list */
cluster_dist_free(HashTable * ht)371 void cluster_dist_free(HashTable *ht) {
372     zend_hash_destroy(ht);
373     efree(ht);
374 }
375 
376 /* Create a clusterDistList object */
cluster_dl_create()377 static clusterDistList *cluster_dl_create() {
378     clusterDistList *dl;
379 
380     dl        = emalloc(sizeof(clusterDistList));
381     dl->entry = emalloc(CLUSTER_KEYDIST_ALLOC * sizeof(clusterKeyVal));
382     dl->size  = CLUSTER_KEYDIST_ALLOC;
383     dl->len   = 0;
384 
385     return dl;
386 }
387 
388 /* Add a key to a dist list, returning the keval entry */
cluster_dl_add_key(clusterDistList * dl,char * key,int key_len,int key_free)389 static clusterKeyVal *cluster_dl_add_key(clusterDistList *dl, char *key,
390                                          int key_len, int key_free)
391 {
392     // Reallocate if required
393     if (dl->len == dl->size) {
394         dl->entry = erealloc(dl->entry, sizeof(clusterKeyVal) * dl->size * 2);
395         dl->size *= 2;
396     }
397 
398     // Set key info
399     dl->entry[dl->len].key = key;
400     dl->entry[dl->len].key_len = key_len;
401     dl->entry[dl->len].key_free = key_free;
402 
403     // NULL out any values
404     dl->entry[dl->len].val = NULL;
405     dl->entry[dl->len].val_len = 0;
406     dl->entry[dl->len].val_free = 0;
407 
408     return &(dl->entry[dl->len++]);
409 }
410 
411 /* Add a key, returning a pointer to the entry where passed for easy adding
412  * of values to match this key */
cluster_dist_add_key(redisCluster * c,HashTable * ht,char * key,size_t key_len,clusterKeyVal ** kv)413 int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
414                           size_t key_len, clusterKeyVal **kv)
415 {
416     int key_free;
417     short slot;
418     clusterDistList *dl;
419     clusterKeyVal *retptr;
420 
421     // Prefix our key and hash it
422     key_free = redis_key_prefix(c->flags, &key, &key_len);
423     slot = cluster_hash_key(key, key_len);
424 
425     // We can't do this if we don't fully understand the keyspace
426     if (c->master[slot] == NULL) {
427         if (key_free) efree(key);
428         return FAILURE;
429     }
430 
431     // Look for this slot
432     if ((dl = zend_hash_index_find_ptr(ht, (zend_ulong)slot)) == NULL) {
433         dl = cluster_dl_create();
434         zend_hash_index_update_ptr(ht, (zend_ulong)slot, dl);
435     }
436 
437     // Now actually add this key
438     retptr = cluster_dl_add_key(dl, key, key_len, key_free);
439 
440     // Push our return pointer if requested
441     if (kv) *kv = retptr;
442 
443     return SUCCESS;
444 }
445 
446 /* Provided a clusterKeyVal, add a value */
cluster_dist_add_val(redisCluster * c,clusterKeyVal * kv,zval * z_val)447 void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *z_val
448                         )
449 {
450     char *val;
451     size_t val_len;
452     int val_free;
453 
454     // Serialize our value
455     val_free = redis_pack(c->flags, z_val, &val, &val_len);
456 
457     // Attach it to the provied keyval entry
458     kv->val = val;
459     kv->val_len = val_len;
460     kv->val_free = val_free;
461 }
462 
463 /* Free allocated memory for a clusterMultiCmd */
cluster_multi_free(clusterMultiCmd * mc)464 void cluster_multi_free(clusterMultiCmd *mc) {
465     efree(mc->cmd.c);
466     efree(mc->args.c);
467 }
468 
469 /* Add an argument to a clusterMultiCmd */
cluster_multi_add(clusterMultiCmd * mc,char * data,int data_len)470 void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len) {
471     mc->argc++;
472     redis_cmd_append_sstr(&(mc->args), data, data_len);
473 }
474 
475 /* Finalize a clusterMutliCmd by constructing the whole thing */
cluster_multi_fini(clusterMultiCmd * mc)476 void cluster_multi_fini(clusterMultiCmd *mc) {
477     mc->cmd.len = 0;
478     redis_cmd_init_sstr(&(mc->cmd), mc->argc, mc->kw, mc->kw_len);
479     smart_string_appendl(&(mc->cmd), mc->args.c, mc->args.len);
480 }
481 
482 /* Set our last error string encountered */
483 static void
cluster_set_err(redisCluster * c,char * err,int err_len)484 cluster_set_err(redisCluster *c, char *err, int err_len)
485 {
486     // Free our last error
487     if (c->err != NULL) {
488         zend_string_release(c->err);
489         c->err = NULL;
490     }
491     if (err != NULL && err_len > 0) {
492         c->err = zend_string_init(err, err_len, 0);
493         if (err_len >= sizeof("CLUSTERDOWN") - 1 &&
494             !memcmp(err, "CLUSTERDOWN", sizeof("CLUSTERDOWN") - 1)
495         ) {
496             c->clusterdown = 1;
497         }
498     }
499 }
500 
501 /* Destructor for slaves */
ht_free_slave(zval * data)502 static void ht_free_slave(zval *data) {
503     if (*(redisClusterNode**)data) {
504         cluster_free_node(*(redisClusterNode**)data);
505     }
506 }
507 
508 /* Get the hash slot for a given key */
cluster_hash_key(const char * key,int len)509 unsigned short cluster_hash_key(const char *key, int len) {
510     int s, e;
511 
512     // Find first occurrence of {, if any
513     for (s = 0; s < len; s++) {
514         if (key[s]=='{') break;
515     }
516 
517     // There is no '{', hash everything
518     if (s == len) return crc16(key, len) & REDIS_CLUSTER_MOD;
519 
520     // Found it, look for a tailing '}'
521     for (e =s + 1; e < len; e++) {
522         if (key[e] == '}') break;
523     }
524 
525     // Hash the whole key if we don't find a tailing } or if {} is empty
526     if (e == len || e == s+1) return crc16(key, len) & REDIS_CLUSTER_MOD;
527 
528     // Hash just the bit between { and }
529     return crc16((char*)key+s+1,e-s-1) & REDIS_CLUSTER_MOD;
530 }
531 
532 /* Grab the current time in milliseconds */
mstime(void)533 long long mstime(void) {
534     struct timeval tv;
535     long long mst;
536 
537     gettimeofday(&tv, NULL);
538     mst = ((long long)tv.tv_sec)*1000;
539     mst += tv.tv_usec/1000;
540 
541     return mst;
542 }
543 
544 /* Hash a key from a ZVAL */
cluster_hash_key_zval(zval * z_key)545 unsigned short cluster_hash_key_zval(zval *z_key) {
546     const char *kptr;
547     char buf[255];
548     int klen;
549 
550     // Switch based on ZVAL type
551     switch(Z_TYPE_P(z_key)) {
552         case IS_STRING:
553             kptr = Z_STRVAL_P(z_key);
554             klen = Z_STRLEN_P(z_key);
555             break;
556         case IS_LONG:
557             klen = snprintf(buf,sizeof(buf),ZEND_LONG_FMT,Z_LVAL_P(z_key));
558             kptr = (const char *)buf;
559             break;
560         case IS_DOUBLE:
561             klen = snprintf(buf,sizeof(buf),"%f",Z_DVAL_P(z_key));
562             kptr = (const char *)buf;
563             break;
564         case IS_ARRAY:
565             kptr = "Array";
566             klen = sizeof("Array")-1;
567             break;
568         case IS_OBJECT:
569             kptr = "Object";
570             klen = sizeof("Object")-1;
571             break;
572         default:
573             kptr = "";
574             klen = 0;
575     }
576 
577     // Hash the string representation
578     return cluster_hash_key(kptr, klen);
579 }
580 
581 /* Fisher-Yates shuffle for integer array */
fyshuffle(int * array,size_t len)582 static void fyshuffle(int *array, size_t len) {
583     int temp, n = len;
584     size_t r;
585 
586     /* Randomize */
587     while (n > 1) {
588         r = ((int)((double)n-- * (rand() / (RAND_MAX+1.0))));
589         temp = array[n];
590         array[n] = array[r];
591         array[r] = temp;
592     };
593 }
594 
595 /* Execute a CLUSTER SLOTS command against the seed socket, and return the
596  * reply or NULL on failure. */
cluster_get_slots(RedisSock * redis_sock)597 clusterReply* cluster_get_slots(RedisSock *redis_sock)
598 {
599     clusterReply *r;
600     REDIS_REPLY_TYPE type;
601     long len;
602 
603     // Send the command to the socket and consume reply type
604     if (redis_sock_write(redis_sock, RESP_CLUSTER_SLOTS_CMD,
605                         sizeof(RESP_CLUSTER_SLOTS_CMD)-1) < 0 ||
606                         redis_read_reply_type(redis_sock, &type, &len) < 0)
607     {
608         return NULL;
609     }
610 
611     // Consume the rest of our response
612     if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len)) == NULL ||
613        r->type != TYPE_MULTIBULK || r->elements < 1)
614     {
615         if (r) cluster_free_reply(r, 1);
616         return NULL;
617     }
618 
619     // Return our reply
620     return r;
621 }
622 
623 /* Create a cluster node */
624 static redisClusterNode*
cluster_node_create(redisCluster * c,char * host,size_t host_len,unsigned short port,unsigned short slot,short slave)625 cluster_node_create(redisCluster *c, char *host, size_t host_len,
626                     unsigned short port, unsigned short slot, short slave)
627 {
628     redisClusterNode *node = emalloc(sizeof(redisClusterNode));
629 
630     // It lives in at least this slot, flag slave status
631     node->slot   = slot;
632     node->slave  = slave;
633     node->slaves = NULL;
634 
635     /* Initialize our list of slot ranges */
636     zend_llist_init(&node->slots, sizeof(redisSlotRange), NULL, 0);
637 
638     // Attach socket
639     node->sock = redis_sock_create(host, host_len, port,
640                                    c->flags->timeout, c->flags->read_timeout,
641                                    c->flags->persistent, NULL, 0);
642 
643     /* Stream context */
644     node->sock->stream_ctx = c->flags->stream_ctx;
645 
646     redis_sock_set_auth(node->sock, c->flags->user, c->flags->pass);
647 
648     return node;
649 }
650 
651 /* Attach a slave to a master */
652 PHP_REDIS_API int
cluster_node_add_slave(redisClusterNode * master,redisClusterNode * slave)653 cluster_node_add_slave(redisClusterNode *master, redisClusterNode *slave)
654 {
655     zend_ulong index;
656 
657     // Allocate our slaves hash table if we haven't yet
658     if (!master->slaves) {
659         ALLOC_HASHTABLE(master->slaves);
660         zend_hash_init(master->slaves, 0, NULL, ht_free_slave, 0);
661         index = 1;
662     } else {
663         index = master->slaves->nNextFreeElement;
664     }
665 
666     return zend_hash_index_update_ptr(master->slaves, index, slave) != NULL;
667 }
668 
669 /* Sanity check/validation for CLUSTER SLOTS command */
670 #define VALIDATE_SLOTS_OUTER(r) \
671     (r->elements >= 3 && r2->element[0]->type == TYPE_INT && \
672      r->element[1]->type == TYPE_INT)
673 #define VALIDATE_SLOTS_INNER(r) \
674     (r->type == TYPE_MULTIBULK && r->elements >= 2 && \
675      r->element[0]->type == TYPE_BULK && r->element[1]->type == TYPE_INT)
676 
677 /* Use the output of CLUSTER SLOTS to map our nodes */
cluster_map_slots(redisCluster * c,clusterReply * r)678 static int cluster_map_slots(redisCluster *c, clusterReply *r) {
679     redisClusterNode *pnode, *master, *slave;
680     redisSlotRange range;
681     int i,j, hlen, klen;
682     short low, high;
683     clusterReply *r2, *r3;
684     unsigned short port;
685     char *host, key[1024];
686     zend_hash_clean(c->nodes);
687     for (i = 0; i < r->elements; i++) {
688         // Inner response
689         r2 = r->element[i];
690 
691         // Validate outer and master slot
692         if (!VALIDATE_SLOTS_OUTER(r2) || !VALIDATE_SLOTS_INNER(r2->element[2])) {
693             return -1;
694         }
695 
696         // Master
697         r3 = r2->element[2];
698 
699         // Grab our slot range, as well as master host/port
700         low  = (unsigned short)r2->element[0]->integer;
701         high = (unsigned short)r2->element[1]->integer;
702         host = r3->element[0]->str;
703         hlen = r3->element[0]->len;
704         port = (unsigned short)r3->element[1]->integer;
705 
706         // If the node is new, create and add to nodes.  Otherwise use it.
707         klen = snprintf(key, sizeof(key), "%s:%d", host, port);
708         if ((pnode = zend_hash_str_find_ptr(c->nodes, key, klen)) == NULL) {
709             master = cluster_node_create(c, host, hlen, port, low, 0);
710             zend_hash_str_update_ptr(c->nodes, key, klen, master);
711         } else {
712             master = pnode;
713         }
714 
715         // Attach slaves
716         for (j = 3; j< r2->elements; j++) {
717             r3 = r2->element[j];
718             if (!VALIDATE_SLOTS_INNER(r3)) {
719                 return -1;
720             }
721 
722             // Skip slaves where the host is ""
723             if (r3->element[0]->len == 0) continue;
724 
725             // Attach this node to our slave
726             slave = cluster_node_create(c, r3->element[0]->str,
727                 (int)r3->element[0]->len,
728                 (unsigned short)r3->element[1]->integer, low, 1);
729             cluster_node_add_slave(master, slave);
730         }
731 
732         // Attach this node to each slot in the range
733         for (j = low; j<= high; j++) {
734             c->master[j] = master;
735         }
736 
737         /* Append to our list of slot ranges */
738         range.low = low; range.high = high;
739         zend_llist_add_element(&master->slots, &range);
740     }
741 
742     // Success
743     return 0;
744 }
745 
746 /* Free a redisClusterNode structure */
cluster_free_node(redisClusterNode * node)747 PHP_REDIS_API void cluster_free_node(redisClusterNode *node) {
748     if (node->slaves) {
749         zend_hash_destroy(node->slaves);
750         efree(node->slaves);
751     }
752 
753     zend_llist_destroy(&node->slots);
754     redis_free_socket(node->sock);
755 
756     efree(node);
757 }
758 
759 /* Get or create a redisClusterNode that corresponds to the asking redirection */
cluster_get_asking_node(redisCluster * c)760 static redisClusterNode *cluster_get_asking_node(redisCluster *c) {
761     redisClusterNode *pNode;
762     char key[1024];
763     int key_len;
764 
765     /* Hashed by host:port */
766     key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host, c->redir_port);
767 
768     /* See if we've already attached to it */
769     if ((pNode = zend_hash_str_find_ptr(c->nodes, key, key_len)) != NULL) {
770         return pNode;
771     }
772 
773     /* This host:port is unknown to us, so add it */
774     pNode = cluster_node_create(c, c->redir_host, c->redir_host_len,
775         c->redir_port, c->redir_slot, 0);
776 
777     /* Return the node */
778    return pNode;
779 }
780 
781 /* Get or create a node at the host:port we were asked to check, and return the
782  * redis_sock for it. */
cluster_get_asking_sock(redisCluster * c)783 static RedisSock *cluster_get_asking_sock(redisCluster *c) {
784     return cluster_get_asking_node(c)->sock;
785 }
786 
787 /* Our context seeds will be a hash table with RedisSock* pointers */
ht_free_seed(zval * data)788 static void ht_free_seed(zval *data) {
789     RedisSock *redis_sock = *(RedisSock**)data;
790     if (redis_sock) redis_free_socket(redis_sock);
791 }
792 
793 /* Free redisClusterNode objects we've stored */
ht_free_node(zval * data)794 static void ht_free_node(zval *data) {
795     redisClusterNode *node = *(redisClusterNode**)data;
796     cluster_free_node(node);
797 }
798 
799 /* zend_llist of slot ranges -> persistent array */
slot_range_list_clone(zend_llist * src,size_t * count)800 static redisSlotRange *slot_range_list_clone(zend_llist *src, size_t *count) {
801     redisSlotRange *dst, *range;
802     size_t i = 0;
803 
804     *count = zend_llist_count(src);
805     dst = pemalloc(*count * sizeof(*dst), 1);
806 
807     range = zend_llist_get_first(src);
808     while (range) {
809         memcpy(&dst[i++], range, sizeof(*range));
810         range = zend_llist_get_next(src);
811      }
812 
813     return dst;
814 }
815 
816 /* Construct a redisCluster object */
cluster_create(double timeout,double read_timeout,int failover,int persistent)817 PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
818                                            int failover, int persistent)
819 {
820     redisCluster *c;
821 
822     /* Actual our actual cluster structure */
823     c = ecalloc(1, sizeof(redisCluster));
824 
825     /* Initialize flags and settings */
826     c->flags = ecalloc(1, sizeof(RedisSock));
827     c->flags->timeout = timeout;
828     c->flags->read_timeout = read_timeout;
829     c->flags->persistent = persistent;
830     c->subscribed_slot = -1;
831     c->clusterdown = 0;
832     c->failover = failover;
833     c->err = NULL;
834 
835     /* Set up our waitms based on timeout */
836     c->waitms  = (long)(1000 * timeout);
837 
838     /* Allocate our seeds hash table */
839     ALLOC_HASHTABLE(c->seeds);
840     zend_hash_init(c->seeds, 0, NULL, ht_free_seed, 0);
841 
842     /* Allocate our nodes HashTable */
843     ALLOC_HASHTABLE(c->nodes);
844     zend_hash_init(c->nodes, 0, NULL, ht_free_node, 0);
845 
846     return c;
847 }
848 
849 PHP_REDIS_API void
cluster_free(redisCluster * c,int free_ctx)850 cluster_free(redisCluster *c, int free_ctx)
851 {
852     /* Disconnect from each node we're connected to */
853     cluster_disconnect(c, 0);
854 
855     /* Free any allocated prefix */
856     if (c->flags->prefix) zend_string_release(c->flags->prefix);
857 
858     redis_sock_free_auth(c->flags);
859     efree(c->flags);
860 
861     /* Call hash table destructors */
862     zend_hash_destroy(c->seeds);
863     zend_hash_destroy(c->nodes);
864 
865     /* Free hash tables themselves */
866     efree(c->seeds);
867     efree(c->nodes);
868 
869     /* Free any error we've got */
870     if (c->err) zend_string_release(c->err);
871 
872     if (c->cache_key) {
873         /* Invalidate persistent cache if the cluster has changed */
874         if (c->redirections) {
875             zend_hash_del(&EG(persistent_list), c->cache_key);
876         }
877 
878         /* Release our hold on the cache key */
879         zend_string_release(c->cache_key);
880     }
881 
882     /* Free structure itself */
883     if (free_ctx) efree(c);
884 }
885 
886 /* Create a cluster slot cache structure */
887 PHP_REDIS_API
cluster_cache_create(zend_string * hash,HashTable * nodes)888 redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes) {
889     redisCachedCluster *cc;
890     redisCachedMaster *cm;
891     redisClusterNode *node, *slave;
892 
893     cc = pecalloc(1, sizeof(*cc), 1);
894     cc->hash = zend_string_dup(hash, 1);
895 
896     /* Copy nodes */
897     cc->master = pecalloc(zend_hash_num_elements(nodes), sizeof(*cc->master), 1);
898     ZEND_HASH_FOREACH_PTR(nodes, node) {
899         /* Skip slaves */
900         if (node->slave) continue;
901 
902         cm = &cc->master[cc->count];
903 
904         /* Duplicate host/port and clone slot ranges */
905         cm->host.addr = zend_string_dup(node->sock->host, 1);
906         cm->host.port = node->sock->port;
907 
908         /* Copy over slot ranges */
909         cm->slot = slot_range_list_clone(&node->slots, &cm->slots);
910 
911         /* Attach any slave nodes we have. */
912         if (node->slaves) {
913             /* Allocate memory for slaves */
914             cm->slave = pecalloc(zend_hash_num_elements(node->slaves), sizeof(*cm->slave), 1);
915 
916             /* Copy host/port information for each slave */
917             ZEND_HASH_FOREACH_PTR(node->slaves, slave) {
918                 cm->slave[cm->slaves].addr = zend_string_dup(slave->sock->host, 1);
919                 cm->slave[cm->slaves].port = slave->sock->port;
920                 cm->slaves++;
921             } ZEND_HASH_FOREACH_END();
922         }
923 
924         cc->count++;
925     } ZEND_HASH_FOREACH_END();
926 
927     return cc;
928 }
929 
cluster_free_cached_master(redisCachedMaster * cm)930 static void cluster_free_cached_master(redisCachedMaster *cm) {
931     size_t i;
932 
933     /* Free each slave entry */
934     for (i = 0; i < cm->slaves; i++) {
935         zend_string_release(cm->slave[i].addr);
936     }
937 
938     /* Free other elements */
939     zend_string_release(cm->host.addr);
940     pefree(cm->slave, 1);
941     pefree(cm->slot, 1);
942 }
943 
944 static redisClusterNode*
cached_master_clone(redisCluster * c,redisCachedMaster * cm)945 cached_master_clone(redisCluster *c, redisCachedMaster *cm) {
946     redisClusterNode *node;
947     size_t i;
948 
949     node = cluster_node_create(c, ZSTR_VAL(cm->host.addr), ZSTR_LEN(cm->host.addr),
950                                cm->host.port, cm->slot[0].low, 0);
951 
952     /* Now copy in our slot ranges */
953     for (i = 0; i < cm->slots; i++) {
954         zend_llist_add_element(&node->slots, &cm->slot[i]);
955     }
956 
957     return node;
958 }
959 
960 /* Destroy a persistent cached cluster */
cluster_cache_free(redisCachedCluster * rcc)961 PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc) {
962     size_t i;
963 
964     /* Free masters */
965     for (i = 0; i < rcc->count; i++) {
966         cluster_free_cached_master(&rcc->master[i]);
967     }
968 
969     zend_string_release(rcc->hash);
970     pefree(rcc->master, 1);
971     pefree(rcc, 1);
972 }
973 
974 /* Initialize cluster from cached slots */
975 PHP_REDIS_API
cluster_init_cache(redisCluster * c,redisCachedCluster * cc)976 void cluster_init_cache(redisCluster *c, redisCachedCluster *cc) {
977     RedisSock *sock;
978     redisClusterNode *mnode, *slave;
979     redisCachedMaster *cm;
980     char key[HOST_NAME_MAX];
981     size_t keylen, i, j, s;
982     int *map;
983 
984     /* Randomize seeds */
985     map = emalloc(sizeof(*map) * cc->count);
986     for (i = 0; i < cc->count; i++) map[i] = i;
987     fyshuffle(map, cc->count);
988 
989     /* Duplicate the hash key so we can invalidate when redirected */
990     c->cache_key = zend_string_copy(cc->hash);
991 
992     /* Iterate over masters */
993     for (i = 0; i < cc->count; i++) {
994         /* Grab the next master */
995         cm = &cc->master[map[i]];
996 
997         /* Hash our host and port */
998         keylen = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(cm->host.addr), cm->host.port);
999 
1000         /* Create socket */
1001         sock = redis_sock_create(ZSTR_VAL(cm->host.addr), ZSTR_LEN(cm->host.addr), cm->host.port,
1002                                  c->flags->timeout, c->flags->read_timeout, c->flags->persistent,
1003                                  NULL, 0);
1004 
1005         /* Stream context */
1006         sock->stream_ctx = c->flags->stream_ctx;
1007 
1008         /* Add to seed nodes */
1009         zend_hash_str_update_ptr(c->seeds, key, keylen, sock);
1010 
1011         /* Create master node */
1012         mnode = cached_master_clone(c, cm);
1013 
1014         /* Add our master */
1015         zend_hash_str_update_ptr(c->nodes, key, keylen, mnode);
1016 
1017         /* Attach any slaves */
1018         for (s = 0; s < cm->slaves; s++) {
1019             zend_string *host = cm->slave[s].addr;
1020             slave = cluster_node_create(c, ZSTR_VAL(host), ZSTR_LEN(host), cm->slave[s].port, 0, 1);
1021             cluster_node_add_slave(mnode, slave);
1022         }
1023 
1024         /* Hook up direct slot access */
1025         for (j = 0; j < cm->slots; j++) {
1026             for (s = cm->slot[j].low; s <= cm->slot[j].high; s++) {
1027                 c->master[s] = mnode;
1028             }
1029         }
1030     }
1031 
1032     efree(map);
1033 }
1034 
1035 /* Initialize seeds.  By the time we get here we've already validated our
1036  * seeds array and know we have a non-empty array of strings all in
1037  * host:port format. */
1038 PHP_REDIS_API void
cluster_init_seeds(redisCluster * c,zend_string ** seeds,uint32_t nseeds)1039 cluster_init_seeds(redisCluster *c, zend_string **seeds, uint32_t nseeds)
1040 {
1041     RedisSock *sock;
1042     char *seed, *sep, key[1024];
1043     int key_len, i, *map;
1044 
1045     /* Get a randomized order to hit our seeds */
1046     map = ecalloc(nseeds, sizeof(*map));
1047     for (i = 0; i < nseeds; i++) map[i] = i;
1048     fyshuffle(map, nseeds);
1049 
1050     for (i = 0; i < nseeds; i++) {
1051         seed = ZSTR_VAL(seeds[map[i]]);
1052 
1053         sep = strrchr(seed, ':');
1054         ZEND_ASSERT(sep != NULL);
1055 
1056         // Allocate a structure for this seed
1057         sock = redis_sock_create(seed, sep - seed, atoi(sep + 1),
1058                                  c->flags->timeout, c->flags->read_timeout,
1059                                  c->flags->persistent, NULL, 0);
1060 
1061         /* Stream context */
1062         sock->stream_ctx = c->flags->stream_ctx;
1063 
1064         /* Credentials */
1065         redis_sock_set_auth(sock, c->flags->user, c->flags->pass);
1066 
1067         // Index this seed by host/port
1068         key_len = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(sock->host),
1069             sock->port);
1070 
1071         // Add to our seed HashTable
1072         zend_hash_str_update_ptr(c->seeds, key, key_len, sock);
1073     }
1074 
1075     efree(map);
1076 }
1077 
1078 /* Initial mapping of our cluster keyspace */
cluster_map_keyspace(redisCluster * c)1079 PHP_REDIS_API int cluster_map_keyspace(redisCluster *c) {
1080     RedisSock *seed;
1081     clusterReply *slots = NULL;
1082     int mapped = 0;
1083 
1084     // Iterate over seeds until we can get slots
1085     ZEND_HASH_FOREACH_PTR(c->seeds, seed) {
1086         // Attempt to connect to this seed node
1087         if (seed == NULL || redis_sock_server_open(seed) != SUCCESS) {
1088             continue;
1089         }
1090 
1091         // Parse out cluster nodes.  Flag mapped if we are valid
1092         slots = cluster_get_slots(seed);
1093         if (slots) {
1094             mapped = !cluster_map_slots(c, slots);
1095             // Bin anything mapped, if we failed somewhere
1096             if (!mapped) {
1097                 memset(c->master, 0, sizeof(redisClusterNode*)*REDIS_CLUSTER_SLOTS);
1098             }
1099         }
1100         redis_sock_disconnect(seed, 0);
1101         if (mapped) break;
1102     } ZEND_HASH_FOREACH_END();
1103 
1104     // Clean up slots reply if we got one
1105     if (slots) cluster_free_reply(slots, 1);
1106 
1107     // Throw an exception if we couldn't map
1108     if (!mapped) {
1109         CLUSTER_THROW_EXCEPTION("Couldn't map cluster keyspace using any provided seed", 0);
1110         return FAILURE;
1111     }
1112 
1113     return SUCCESS;
1114 }
1115 
1116 /* Parse the MOVED OR ASK redirection payload when we get such a response
1117  * and apply this information to our cluster.  If we encounter a parse error
1118  * nothing in the cluster will be modified, and -1 is returned. */
cluster_set_redirection(redisCluster * c,char * msg,int moved)1119 static int cluster_set_redirection(redisCluster* c, char *msg, int moved)
1120 {
1121     char *host, *port;
1122 
1123     /* Move past "MOVED" or "ASK */
1124     msg += moved ? MOVED_LEN : ASK_LEN;
1125 
1126     /* Make sure we can find host */
1127     if ((host = strchr(msg, ' ')) == NULL) return -1;
1128     *host++ = '\0';
1129 
1130     /* Find port, searching right to left in case of IPv6 */
1131     if ((port = strrchr(host, ':')) == NULL) return -1;
1132     *port++ = '\0';
1133 
1134     // Success, apply it
1135     c->redir_type = moved ? REDIR_MOVED : REDIR_ASK;
1136     strncpy(c->redir_host, host, sizeof(c->redir_host) - 1);
1137     c->redir_host_len = port - host - 1;
1138     c->redir_slot = (unsigned short)atoi(msg);
1139     c->redir_port = (unsigned short)atoi(port);
1140 
1141     return 0;
1142 }
1143 
1144 /* Once we write a command to a node in our cluster, this function will check
1145  * the reply type and extract information from those that will specify a length
1146  * bit.  If we encounter an error condition, we'll check for MOVED or ASK
1147  * redirection, parsing out slot host and port so the caller can take
1148  * appropriate action.
1149  *
1150  * In the case of a non MOVED/ASK error, we wlll set our cluster error
1151  * condition so GetLastError can be queried by the client.
1152  *
1153  * This function will return -1 on a critical error (e.g. parse/communication
1154  * error, 0 if no redirection was encountered, and 1 if the data was moved. */
cluster_check_response(redisCluster * c,REDIS_REPLY_TYPE * reply_type)1155 static int cluster_check_response(redisCluster *c, REDIS_REPLY_TYPE *reply_type)
1156 {
1157     size_t sz;
1158 
1159     // Clear out any prior error state and our last line response
1160     CLUSTER_CLEAR_ERROR(c);
1161     CLUSTER_CLEAR_REPLY(c);
1162 
1163     if (-1 == redis_check_eof(c->cmd_sock, 1) ||
1164        EOF == (*reply_type = php_stream_getc(c->cmd_sock->stream)))
1165     {
1166         return -1;
1167     }
1168 
1169     // In the event of an ERROR, check if it's a MOVED/ASK error
1170     if (*reply_type == TYPE_ERR) {
1171         char inbuf[4096];
1172         int moved;
1173 
1174         // Attempt to read the error
1175         if (!php_stream_gets(c->cmd_sock->stream, inbuf, sizeof(inbuf))) {
1176             return -1;
1177         }
1178 
1179         // Check for MOVED or ASK redirection
1180         if ((moved = IS_MOVED(inbuf)) || IS_ASK(inbuf)) {
1181             /* The Redis Cluster specification suggests clients do not update
1182              * their slot mapping for an ASK redirection, only for MOVED */
1183             if (moved) c->redirections++;
1184 
1185             /* Make sure we can parse the redirection host and port */
1186             if (cluster_set_redirection(c,inbuf,moved) < 0) {
1187                 return -1;
1188             }
1189 
1190             /* We've been redirected */
1191             return 1;
1192         } else {
1193             // Capture the error string Redis returned
1194             cluster_set_err(c, inbuf, strlen(inbuf)-2);
1195             return 0;
1196         }
1197     }
1198 
1199     // Fetch the first line of our response from Redis.
1200     if (redis_sock_gets(c->cmd_sock,c->line_reply,sizeof(c->line_reply),
1201                        &sz) < 0)
1202     {
1203         return -1;
1204     }
1205 
1206     // For replies that will give us a numeric length, convert it
1207     if (*reply_type != TYPE_LINE) {
1208         c->reply_len = strtol(c->line_reply, NULL, 10);
1209     } else {
1210         c->reply_len = (long long)sz;
1211     }
1212 
1213     // Clear out any previous error, and return that the data is here
1214     CLUSTER_CLEAR_ERROR(c);
1215     return 0;
1216 }
1217 
1218 /* Disconnect from each node we're connected to */
cluster_disconnect(redisCluster * c,int force)1219 PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force) {
1220     redisClusterNode *node, *slave;
1221 
1222     ZEND_HASH_FOREACH_PTR(c->nodes, node) {
1223         if (node == NULL) continue;
1224 
1225         /* Disconnect from the master */
1226         redis_sock_disconnect(node->sock, force);
1227 
1228         /* We also want to disconnect any slave connections so they will be pooled
1229          * in the event we are using persistent connections and connection pooling. */
1230         if (node->slaves) {
1231             ZEND_HASH_FOREACH_PTR(node->slaves, slave) {
1232                 redis_sock_disconnect(slave->sock, force);
1233             } ZEND_HASH_FOREACH_END();
1234         }
1235     } ZEND_HASH_FOREACH_END();
1236 }
1237 
1238 /* This method attempts to write our command at random to the master and any
1239  * attached slaves, until we either successufly do so, or fail. */
cluster_dist_write(redisCluster * c,const char * cmd,size_t sz,int nomaster)1240 static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
1241                               int nomaster)
1242 {
1243     int i, count = 1, *nodes;
1244     RedisSock *redis_sock;
1245 
1246     /* Determine our overall node count */
1247     if (c->master[c->cmd_slot]->slaves) {
1248         count += zend_hash_num_elements(c->master[c->cmd_slot]->slaves);
1249     }
1250 
1251     /* Allocate memory for master + slaves or just slaves */
1252     nodes = emalloc(sizeof(int)*count);
1253 
1254     /* Populate our array with the master and each of it's slaves, then
1255      * randomize them, so we will pick from the master or some slave.  */
1256     for (i = 0; i < count; i++) nodes[i] = i;
1257     fyshuffle(nodes, count);
1258 
1259     /* Iterate through our nodes until we find one we can write to or fail */
1260     for (i = 0; i < count; i++) {
1261         /* Skip if this is the master node and we don't want to query that */
1262         if (nomaster && nodes[i] == 0)
1263            continue;
1264 
1265         /* Get the slave for this index */
1266         redis_sock = cluster_slot_sock(c, c->cmd_slot, nodes[i]);
1267         if (!redis_sock) continue;
1268 
1269         /* If we're not on the master, attempt to send the READONLY command to
1270          * this slave, and skip it if that fails */
1271         if (nodes[i] == 0 || redis_sock->readonly ||
1272             cluster_send_readonly(redis_sock) == 0)
1273         {
1274             /* Attempt to send the command */
1275             if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) {
1276                 c->cmd_sock = redis_sock;
1277                 efree(nodes);
1278                 return 0;
1279             }
1280         }
1281     }
1282 
1283     /* Clean up our shuffled array */
1284     efree(nodes);
1285 
1286     /* Couldn't send to the master or any slave */
1287     return -1;
1288 }
1289 
1290 /* Attempt to write our command to the current c->cmd_sock socket.  For write
1291  * commands, we attempt to query the master for this slot, and in the event of
1292  * a failure, try to query every remaining node for a redirection.
1293  *
1294  * If we're issuing a readonly command, we use one of three strategies, depending
1295  * on our redisCluster->failover setting.
1296  *
1297  * REDIS_FAILOVER_NONE:
1298  *   The command is treated just like a write command, and will only be executed
1299  *   against the known master for this slot.
1300  * REDIS_FAILOVER_ERROR:
1301  *   If we're unable to communicate with this slot's master, we attempt the query
1302  *   against any slaves (at random) that this master has.
1303  * REDIS_FAILOVER_DISTRIBUTE:
1304  *   We pick at random from the master and any slaves it has.  This option will
1305  *   load balance between masters and slaves
1306  * REDIS_FAILOVER_DISTRIBUTE_SLAVES:
1307  *   We pick at random from slave nodes of a given master.  This option is
1308  *   used to load balance read queries against N slaves.
1309  *
1310  * Once we are able to find a node we can write to, we check for MOVED or
1311  * ASKING redirection, such that the keyspace can be updated.
1312 */
cluster_sock_write(redisCluster * c,const char * cmd,size_t sz,int direct)1313 static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz,
1314                               int direct)
1315 {
1316     redisClusterNode *seed_node;
1317     RedisSock *redis_sock;
1318     int failover, nomaster;
1319 
1320     /* First try the socket requested */
1321     redis_sock = c->cmd_sock;
1322 
1323     /* Readonly is irrelevant if we're not configured to failover */
1324     failover = c->readonly && c->failover != REDIS_FAILOVER_NONE ?
1325         c->failover : REDIS_FAILOVER_NONE;
1326 
1327     /* If in ASK redirection, get/create the node for that host:port, otherwise
1328      * just use the command socket. */
1329     if (c->redir_type == REDIR_ASK) {
1330         if (cluster_send_asking(c->cmd_sock) < 0) {
1331             return -1;
1332         }
1333     }
1334 
1335     /* Attempt to send our command payload to the cluster.  If we're not set up
1336      * to failover, just try the master.  If we're configured to failover on
1337      * error, try the master and then fall back to any slaves.  When we're set
1338      * up to distribute the commands, try to write to any node on this slot
1339      * at random. */
1340     if (failover == REDIS_FAILOVER_NONE) {
1341         /* Success if we can send our payload to the master */
1342         if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) return 0;
1343     } else if (failover == REDIS_FAILOVER_ERROR) {
1344         /* Try the master, then fall back to any slaves we may have */
1345         if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz) ||
1346            !cluster_dist_write(c, cmd, sz, 1)) return 0;
1347     } else {
1348         /* Include or exclude master node depending on failover option and
1349          * attempt to make our write */
1350         nomaster = failover == REDIS_FAILOVER_DISTRIBUTE_SLAVES;
1351         if (!cluster_dist_write(c, cmd, sz, nomaster)) {
1352             /* We were able to write to a master or slave at random */
1353             return 0;
1354         }
1355     }
1356 
1357     /* Don't fall back if direct communication with this slot is required. */
1358     if (direct) return -1;
1359 
1360     /* Fall back by attempting the request against every known node */
1361     ZEND_HASH_FOREACH_PTR(c->nodes, seed_node) {
1362         /* Skip this node if it's the one that failed, or if it's a slave */
1363         if (seed_node == NULL || seed_node->sock == redis_sock || seed_node->slave) continue;
1364 
1365         /* Connect to this node if we haven't already and attempt to write our request to this node */
1366         if (CLUSTER_SEND_PAYLOAD(seed_node->sock, cmd, sz)) {
1367             c->cmd_slot = seed_node->slot;
1368             c->cmd_sock = seed_node->sock;
1369             return 0;
1370         }
1371     } ZEND_HASH_FOREACH_END();
1372 
1373     /* We were unable to write to any node in our cluster */
1374     return -1;
1375 }
1376 
1377 /* Helper to find if we've got a host:port mapped in our cluster nodes. */
cluster_find_node(redisCluster * c,const char * host,unsigned short port)1378 static redisClusterNode *cluster_find_node(redisCluster *c, const char *host,
1379                                            unsigned short port)
1380 {
1381     int key_len;
1382     char key[1024];
1383 
1384     key_len = snprintf(key,sizeof(key),"%s:%d", host, port);
1385 
1386     return zend_hash_str_find_ptr(c->nodes, key, key_len);
1387 }
1388 
1389 /* Provided a redisCluster object, the slot where we thought data was and
1390  * the slot where data was moved, update our node mapping */
cluster_update_slot(redisCluster * c)1391 static void cluster_update_slot(redisCluster *c) {
1392     redisClusterNode *node;
1393     char key[1024];
1394     size_t klen;
1395 
1396     /* Do we already have the new slot mapped */
1397     if (c->master[c->redir_slot]) {
1398         /* No need to do anything if it's the same node */
1399         if (!CLUSTER_REDIR_CMP(c, SLOT_SOCK(c,c->redir_slot))) {
1400             return;
1401         }
1402 
1403         /* Check to see if we have this new node mapped */
1404         node = cluster_find_node(c, c->redir_host, c->redir_port);
1405 
1406         if (node) {
1407             /* Just point to this slot */
1408             c->master[c->redir_slot] = node;
1409         } else {
1410             /* If the redirected node is a replica of the previous slot owner, a failover has taken place.
1411             We must then remap the cluster's keyspace in order to update the cluster's topology. */
1412             redisClusterNode *prev_master = SLOT(c,c->redir_slot);
1413             redisClusterNode *slave;
1414             ZEND_HASH_FOREACH_PTR(prev_master->slaves, slave) {
1415                 if (slave == NULL) {
1416                     continue;
1417                 }
1418                 if (!CLUSTER_REDIR_CMP(c, slave->sock)) {
1419                     // Detected a failover, the redirected node was a replica
1420                     // Remap the cluster's keyspace
1421                     cluster_map_keyspace(c);
1422                     return;
1423                 }
1424             } ZEND_HASH_FOREACH_END();
1425 
1426             /* Create our node */
1427             node = cluster_node_create(c, c->redir_host, c->redir_host_len,
1428                 c->redir_port, c->redir_slot, 0);
1429 
1430             /* Our node is new, so keep track of it for cleanup */
1431             klen = snprintf(key, sizeof(key), "%s:%d", c->redir_host, c->redir_port);
1432             zend_hash_str_update_ptr(c->nodes, key, klen, node);
1433 
1434             /* Now point our slot at the node */
1435             c->master[c->redir_slot] = node;
1436         }
1437     } else {
1438         /* Check to see if the ip and port are mapped */
1439         node = cluster_find_node(c, c->redir_host, c->redir_port);
1440         if (!node) {
1441             node = cluster_node_create(c, c->redir_host, c->redir_host_len,
1442                 c->redir_port, c->redir_slot, 0);
1443         }
1444 
1445         /* Map the slot to this node */
1446         c->master[c->redir_slot] = node;
1447     }
1448 
1449     /* Update slot inside of node, so it can be found for command sending */
1450     node->slot = c->redir_slot;
1451 
1452     /* Make sure we unflag this node as a slave, as Redis Cluster will only ever
1453      * direct us to master nodes. */
1454     node->slave = 0;
1455 }
1456 
1457 /* Abort any transaction in process, by sending DISCARD to any nodes that
1458  * have active transactions in progress.  If we can't send DISCARD, we need
1459  * to disconnect as it would leave us in an undefined state. */
cluster_abort_exec(redisCluster * c)1460 PHP_REDIS_API int cluster_abort_exec(redisCluster *c) {
1461     clusterFoldItem *fi = c->multi_head;
1462 
1463     /* Loop through our fold items */
1464     while (fi) {
1465         if (SLOT_SOCK(c,fi->slot)->mode == MULTI) {
1466             if (cluster_send_discard(c, fi->slot) < 0) {
1467                 cluster_disconnect(c, 0);
1468                 return -1;
1469             }
1470             SLOT_SOCK(c,fi->slot)->mode = ATOMIC;
1471             SLOT_SOCK(c,fi->slot)->watching = 0;
1472         }
1473         fi = fi->next;
1474     }
1475 
1476     /* Update our overall cluster state */
1477     c->flags->mode = ATOMIC;
1478 
1479     /* Success */
1480     return 0;
1481 }
1482 
1483 /* Iterate through our slots, looking for the host/port in question.  This
1484  * should perform well enough as in almost all situations, a few or a few
1485  * dozen servers will map all the slots */
cluster_find_slot(redisCluster * c,const char * host,unsigned short port)1486 PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host,
1487                                unsigned short port)
1488 {
1489     int i;
1490 
1491     for (i = 0; i < REDIS_CLUSTER_SLOTS; i++) {
1492         if (c->master[i] && c->master[i]->sock &&
1493            c->master[i]->sock->port == port &&
1494            !strcasecmp(ZSTR_VAL(c->master[i]->sock->host), host))
1495         {
1496             return i;
1497         }
1498     }
1499 
1500     // We didn't find it
1501     return -1;
1502 }
1503 
1504 /* Send a command to a specific slot */
cluster_send_slot(redisCluster * c,short slot,char * cmd,int cmd_len,REDIS_REPLY_TYPE rtype)1505 PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
1506                              int cmd_len, REDIS_REPLY_TYPE rtype)
1507 {
1508     /* Point our cluster to this slot and it's socket */
1509     c->cmd_slot = slot;
1510     c->cmd_sock = SLOT_SOCK(c, slot);
1511 
1512     /* Enable multi mode on this slot if we've been directed to but haven't
1513      * send it to this node yet */
1514     if (c->flags->mode == MULTI && c->cmd_sock->mode != MULTI) {
1515         if (cluster_send_multi(c, slot) == -1) {
1516             CLUSTER_THROW_EXCEPTION("Unable to enter MULTI mode on requested slot", 0);
1517             return -1;
1518         }
1519     }
1520 
1521     /* Try the slot */
1522     if (cluster_sock_write(c, cmd, cmd_len, 1) == -1) {
1523         return -1;
1524     }
1525 
1526     /* Check our response */
1527     if (cluster_check_response(c, &c->reply_type) != 0 ||
1528        (rtype != TYPE_EOF && rtype != c->reply_type)) return -1;
1529 
1530     /* Success */
1531     return 0;
1532 }
1533 
1534 /* Send a command to given slot in our cluster.  If we get a MOVED or ASK error
1535  * we attempt to send the command to the node as directed. */
cluster_send_command(redisCluster * c,short slot,const char * cmd,int cmd_len)1536 PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
1537                                          int cmd_len)
1538 {
1539     int resp, timedout = 0;
1540     long msstart;
1541 
1542     if (!SLOT(c, slot)) {
1543         zend_throw_exception_ex(redis_cluster_exception_ce, 0,
1544             "The slot %d is not covered by any node in this cluster", slot);
1545         return -1;
1546     }
1547     /* Set the slot we're operating against as well as it's socket.  These can
1548      * change during our request loop if we have a master failure and are
1549      * configured to fall back to slave nodes, or if we have to fall back to
1550      * a different slot due to no nodes serving this slot being reachable. */
1551     c->cmd_slot = slot;
1552     c->cmd_sock = SLOT_SOCK(c, slot);
1553 
1554     /* Get the current time in milliseconds to handle any timeout */
1555     msstart = mstime();
1556 
1557     /* Our main cluster request/reply loop.  This loop runs until we're able to
1558      * get a valid reply from a node, hit our "request" timeout, or encounter a
1559      * CLUSTERDOWN state from Redis Cluster. */
1560     do {
1561         /* Send MULTI to the socket if we're in MULTI mode but haven't yet */
1562         if (c->flags->mode == MULTI && CMD_SOCK(c)->mode != MULTI) {
1563             /* We have to fail if we can't send MULTI to the node */
1564             if (cluster_send_multi(c, slot) == -1) {
1565                 CLUSTER_THROW_EXCEPTION("Unable to enter MULTI mode on requested slot", 0);
1566                 return -1;
1567             }
1568         }
1569 
1570         /* Attempt to deliver our command to the node, and that failing, to any
1571          * node until we find one that is available. */
1572         if (cluster_sock_write(c, cmd, cmd_len, 0) == -1) {
1573             /* We have to abort, as no nodes are reachable */
1574             CLUSTER_THROW_EXCEPTION("Can't communicate with any node in the cluster", 0);
1575             return -1;
1576         }
1577 
1578         /* Check response and short-circuit on success or communication error */
1579         resp = cluster_check_response(c, &c->reply_type);
1580         if (resp <= 0) {
1581             break;
1582         }
1583 
1584         /* Handle MOVED or ASKING redirection */
1585         if (resp == 1) {
1586            /* Abort if we're in a transaction as it will be invalid */
1587            if (c->flags->mode == MULTI) {
1588                CLUSTER_THROW_EXCEPTION("Can't process MULTI sequence when cluster is resharding", 0);
1589                return -1;
1590            }
1591 
1592            if (c->redir_type == REDIR_MOVED) {
1593                /* For MOVED redirection we want to update our cached mapping */
1594                cluster_update_slot(c);
1595                c->cmd_sock = SLOT_SOCK(c, slot);
1596            } else if (c->redir_type == REDIR_ASK) {
1597                /* For ASK redirection we want to redirect but not update slot mapping */
1598                c->cmd_sock = cluster_get_asking_sock(c);
1599            }
1600         }
1601 
1602         /* See if we've timed out in the command loop */
1603         timedout = c->waitms ? mstime() - msstart >= c->waitms : 0;
1604     } while (!c->clusterdown && !timedout);
1605 
1606     // If we've detected the cluster is down, throw an exception
1607     if (c->clusterdown) {
1608         CLUSTER_THROW_EXCEPTION("The Redis Cluster is down (CLUSTERDOWN)", 0);
1609         return -1;
1610     } else if (timedout || resp == -1) {
1611         // Make sure the socket is reconnected, it such that it is in a clean state
1612         redis_sock_disconnect(c->cmd_sock, 1);
1613 
1614         if (timedout) {
1615             CLUSTER_THROW_EXCEPTION("Timed out attempting to find data in the correct node!", 0);
1616         } else {
1617             CLUSTER_THROW_EXCEPTION("Error processing response from Redis node!", 0);
1618         }
1619 
1620         return -1;
1621     }
1622 
1623     /* Clear redirection flag */
1624     c->redir_type = REDIR_NONE;
1625 
1626     // Success, return the slot where data exists.
1627     return 0;
1628 }
1629 
1630 /* RedisCluster response handlers.  These methods all have the same prototype
1631  * and set the proper return value for the calling cluster method.  These
1632  * methods will never be called in the case of a communication error when
1633  * we try to send the request to the Cluster *or* if a non MOVED or ASK
1634  * error is encountered, in which case our response processing macro will
1635  * short circuit and RETURN_FALSE, as the error will have already been
1636  * consumed. */
1637 
1638 /* RAW bulk response handler */
cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1639 PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
1640                                   redisCluster *c, void *ctx)
1641 {
1642     char *resp;
1643 
1644     // Make sure we can read the response
1645     if (c->reply_type != TYPE_BULK ||
1646        (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1647     {
1648         if (c->flags->mode != MULTI) {
1649             RETURN_FALSE;
1650         } else {
1651             add_next_index_bool(&c->multi_resp, 0);
1652             return;
1653         }
1654     }
1655 
1656     // Return our response raw
1657     CLUSTER_RETURN_STRING(c, resp, c->reply_len);
1658     efree(resp);
1659 }
1660 
1661 PHP_REDIS_API void
cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1662 cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
1663 {
1664     char *p;
1665 
1666     /* Cluster already has the reply so abort if this isn't a LINE response *or* if for
1667      * some freaky reason we don't detect a null terminator */
1668     if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) {
1669         CLUSTER_RETURN_FALSE(c);
1670     }
1671 
1672     if (CLUSTER_IS_ATOMIC(c)) {
1673         CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply);
1674     } else {
1675         add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply);
1676     }
1677 }
1678 
1679 /* BULK response handler */
cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1680 PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1681                               void *ctx)
1682 {
1683     char *resp;
1684 
1685     // Make sure we can read the response
1686     if (c->reply_type != TYPE_BULK ||
1687        (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1688     {
1689         CLUSTER_RETURN_FALSE(c);
1690     }
1691 
1692     if (CLUSTER_IS_ATOMIC(c)) {
1693         if (!redis_unpack(c->flags, resp, c->reply_len, return_value)) {
1694             CLUSTER_RETURN_STRING(c, resp, c->reply_len);
1695         }
1696     } else {
1697         zval z_unpacked;
1698         if (redis_unpack(c->flags, resp, c->reply_len, &z_unpacked)) {
1699             add_next_index_zval(&c->multi_resp, &z_unpacked);
1700         } else {
1701             add_next_index_stringl(&c->multi_resp, resp, c->reply_len);
1702         }
1703     }
1704     efree(resp);
1705 }
1706 
1707 /* Bulk response where we expect a double */
cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1708 PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1709                              void *ctx)
1710 {
1711     char *resp;
1712     double dbl;
1713 
1714     // Make sure we can read the response
1715     if (c->reply_type != TYPE_BULK ||
1716        (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1717     {
1718         CLUSTER_RETURN_FALSE(c);
1719     }
1720 
1721     // Convert to double, free response
1722     dbl = atof(resp);
1723     efree(resp);
1724 
1725     CLUSTER_RETURN_DOUBLE(c, dbl);
1726 }
1727 
1728 /* A boolean response.  If we get here, we've consumed the '+' reply
1729  * type and will now just verify we can read the OK */
cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1730 PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1731                               void *ctx)
1732 {
1733     // Check that we have +OK
1734     if (c->reply_type != TYPE_LINE || c->reply_len != 2 ||
1735        c->line_reply[0] != 'O' || c->line_reply[1] != 'K')
1736     {
1737         CLUSTER_RETURN_FALSE(c);
1738     }
1739 
1740     CLUSTER_RETURN_BOOL(c, 1);
1741 }
1742 
1743 /* Boolean response, specialized for PING */
cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1744 PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1745                               void *ctx)
1746 {
1747     if (c->reply_type != TYPE_LINE || c->reply_len != 4 ||
1748        memcmp(c->line_reply,"PONG",sizeof("PONG")-1))
1749     {
1750         CLUSTER_RETURN_FALSE(c);
1751     }
1752 
1753     CLUSTER_RETURN_BOOL(c, 1);
1754 }
1755 
1756 /* 1 or 0 response, for things like SETNX */
cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1757 PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1758                            void *ctx)
1759 {
1760     // Validate our reply type, and check for a zero
1761     if (c->reply_type != TYPE_INT || c->reply_len == 0) {
1762         CLUSTER_RETURN_FALSE(c);
1763     }
1764 
1765     CLUSTER_RETURN_BOOL(c, 1);
1766 }
1767 
1768 /* Generic integer response */
cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1769 PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1770                               void *ctx)
1771 {
1772     if (c->reply_type != TYPE_INT) {
1773         CLUSTER_RETURN_FALSE(c);
1774     }
1775     CLUSTER_RETURN_LONG(c, c->reply_len);
1776 }
1777 
1778 /* TYPE response handler */
cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1779 PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1780                               void *ctx)
1781 {
1782     // Make sure we got the right kind of response
1783     if (c->reply_type != TYPE_LINE) {
1784         CLUSTER_RETURN_FALSE(c);
1785     }
1786 
1787     // Switch on the type
1788     if (strncmp (c->line_reply, "string", 6) == 0) {
1789         CLUSTER_RETURN_LONG(c, REDIS_STRING);
1790     } else if (strncmp(c->line_reply, "set", 3) == 0) {
1791         CLUSTER_RETURN_LONG(c, REDIS_SET);
1792     } else if (strncmp(c->line_reply, "list", 4) == 0) {
1793         CLUSTER_RETURN_LONG(c, REDIS_LIST);
1794     } else if (strncmp(c->line_reply, "hash", 4) == 0) {
1795         CLUSTER_RETURN_LONG(c, REDIS_HASH);
1796     } else if (strncmp(c->line_reply, "zset", 4) == 0) {
1797         CLUSTER_RETURN_LONG(c, REDIS_ZSET);
1798     } else if (strncmp(c->line_reply, "stream", 6) == 0) {
1799         CLUSTER_RETURN_LONG(c, REDIS_STREAM);
1800     } else {
1801         CLUSTER_RETURN_LONG(c, REDIS_NOT_FOUND);
1802     }
1803 }
1804 
1805 /* SUBSCRIBE/PSCUBSCRIBE handler */
cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1806 PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1807                              void *ctx)
1808 {
1809     subscribeContext *sctx = (subscribeContext*)ctx;
1810     zval z_tab, *z_tmp;
1811     int pull = 0;
1812 
1813 
1814     // Consume each MULTI BULK response (one per channel/pattern)
1815     while (sctx->argc--) {
1816         if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
1817             pull, mbulk_resp_loop_raw, &z_tab)
1818         ) {
1819             efree(sctx);
1820             RETURN_FALSE;
1821         }
1822 
1823         if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_tab), 0)) == NULL ||
1824             strcasecmp(Z_STRVAL_P(z_tmp), sctx->kw) != 0
1825         ) {
1826             zval_dtor(&z_tab);
1827             efree(sctx);
1828             RETURN_FALSE;
1829         }
1830 
1831         zval_dtor(&z_tab);
1832         pull = 1;
1833     }
1834 
1835     // Set up our callback pointers
1836     zval z_ret, z_args[4];
1837     sctx->cb.retval = &z_ret;
1838     sctx->cb.params = z_args;
1839 
1840     /* We're in a subscribe loop */
1841     c->subscribed_slot = c->cmd_slot;
1842 
1843     /* Multibulk response, {[pattern], type, channel, payload} */
1844     while (1) {
1845         /* Arguments */
1846         zval *z_type, *z_chan, *z_pat = NULL, *z_data;
1847         int tab_idx = 1, is_pmsg;
1848 
1849         // Get the next subscribe response
1850         if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, mbulk_resp_loop, &z_tab) ||
1851             (z_type = zend_hash_index_find(Z_ARRVAL(z_tab), 0)) == NULL
1852         ) {
1853             break;
1854         }
1855 
1856         // Make sure we have a message or pmessage
1857         if (!strncmp(Z_STRVAL_P(z_type), "message", 7) ||
1858             !strncmp(Z_STRVAL_P(z_type), "pmessage", 8)
1859         ) {
1860             is_pmsg = *Z_STRVAL_P(z_type) == 'p';
1861         } else {
1862             zval_dtor(&z_tab);
1863             continue;
1864         }
1865 
1866         if (is_pmsg && (z_pat = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL) {
1867             break;
1868         }
1869 
1870         // Extract channel and data
1871         if ((z_chan = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL ||
1872            (z_data = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL
1873         ) {
1874             break;
1875         }
1876 
1877         // Always pass our object through
1878         z_args[0] = *getThis();
1879 
1880         // Set up calbacks depending on type
1881         if (is_pmsg) {
1882             z_args[1] = *z_pat;
1883             z_args[2] = *z_chan;
1884             z_args[3] = *z_data;
1885         } else {
1886             z_args[1] = *z_chan;
1887             z_args[2] = *z_data;
1888         }
1889 
1890         // Set arg count
1891         sctx->cb.param_count = tab_idx;
1892 
1893         // Execute our callback
1894         if (zend_call_function(&(sctx->cb), &(sctx->cb_cache)) !=
1895                               SUCCESS)
1896         {
1897             break;
1898         }
1899 
1900         // If we have a return value, free it
1901         zval_ptr_dtor(&z_ret);
1902 
1903         zval_dtor(&z_tab);
1904     }
1905 
1906     // We're no longer subscribing, due to an error
1907     c->subscribed_slot = -1;
1908 
1909     // Cleanup
1910     zval_dtor(&z_tab);
1911     efree(sctx);
1912 
1913     // Failure
1914     RETURN_FALSE;
1915 }
1916 
1917 /* UNSUBSCRIBE/PUNSUBSCRIBE */
cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1918 PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,
1919                                redisCluster *c, void *ctx)
1920 {
1921     subscribeContext *sctx = (subscribeContext*)ctx;
1922     zval z_tab, *z_chan, *z_flag;
1923     int pull = 0, argc = sctx->argc;
1924 
1925     efree(sctx);
1926     array_init(return_value);
1927 
1928     // Consume each response
1929     while (argc--) {
1930         // Fail if we didn't get an array or can't find index 1
1931         if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, pull, mbulk_resp_loop_raw, &z_tab) ||
1932             (z_chan = zend_hash_index_find(Z_ARRVAL(z_tab), 1)) == NULL
1933         ) {
1934             zval_dtor(&z_tab);
1935             zval_dtor(return_value);
1936             RETURN_FALSE;
1937         }
1938 
1939         // Find the flag for this channel/pattern
1940         if ((z_flag = zend_hash_index_find(Z_ARRVAL(z_tab), 2)) == NULL ||
1941             Z_STRLEN_P(z_flag) != 2
1942         ) {
1943             zval_dtor(&z_tab);
1944             zval_dtor(return_value);
1945             RETURN_FALSE;
1946         }
1947 
1948         // Redis will give us either :1 or :0 here
1949         char *flag = Z_STRVAL_P(z_flag);
1950 
1951         // Add result
1952         add_assoc_bool(return_value, Z_STRVAL_P(z_chan), flag[1] == '1');
1953 
1954         zval_dtor(&z_tab);
1955         pull = 1;
1956     }
1957 }
1958 
1959 /* Recursive MULTI BULK -> PHP style response handling */
cluster_mbulk_variant_resp(clusterReply * r,int null_mbulk_as_null,zval * z_ret)1960 static void cluster_mbulk_variant_resp(clusterReply *r, int null_mbulk_as_null,
1961                                        zval *z_ret)
1962 {
1963     zval z_sub_ele;
1964     long long i;
1965 
1966     switch(r->type) {
1967         case TYPE_INT:
1968             add_next_index_long(z_ret, r->integer);
1969             break;
1970         case TYPE_LINE:
1971             if (r->str) {
1972                 add_next_index_stringl(z_ret, r->str, r->len);
1973             } else {
1974                 add_next_index_bool(z_ret, 1);
1975             }
1976             break;
1977         case TYPE_BULK:
1978             if (r->len > -1) {
1979                 add_next_index_stringl(z_ret, r->str, r->len);
1980             } else {
1981                 add_next_index_null(z_ret);
1982             }
1983             break;
1984         case TYPE_MULTIBULK:
1985             if (r->elements < 0 && null_mbulk_as_null) {
1986                 add_next_index_null(z_ret);
1987             } else {
1988                 array_init(&z_sub_ele);
1989                 for (i = 0; i < r->elements; i++) {
1990                     cluster_mbulk_variant_resp(r->element[i], null_mbulk_as_null, &z_sub_ele);
1991                 }
1992                 add_next_index_zval(z_ret, &z_sub_ele);
1993             }
1994             break;
1995         default:
1996             add_next_index_bool(z_ret, 0);
1997             break;
1998     }
1999 }
2000 
2001 /* Variant response handling, for things like EVAL and various other responses
2002  * where we just map the replies from Redis type values to PHP ones directly. */
2003 static void
cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,int status_strings,void * ctx)2004 cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2005                              int status_strings, void *ctx)
2006 {
2007     clusterReply *r;
2008     zval zv, *z_arr = &zv;
2009     long long i;
2010 
2011     // Make sure we can read it
2012     if ((r = cluster_read_resp(c, status_strings)) == NULL) {
2013         CLUSTER_RETURN_FALSE(c);
2014     }
2015 
2016     // Handle ATOMIC vs. MULTI mode in a separate switch
2017     if (CLUSTER_IS_ATOMIC(c)) {
2018         switch(r->type) {
2019             case TYPE_INT:
2020                 RETVAL_LONG(r->integer);
2021                 break;
2022             case TYPE_ERR:
2023                 RETVAL_FALSE;
2024                 break;
2025             case TYPE_LINE:
2026                 if (status_strings) {
2027                     RETVAL_STRINGL(r->str, r->len);
2028                 } else {
2029                     RETVAL_TRUE;
2030                 }
2031                 break;
2032             case TYPE_BULK:
2033                 if (r->len < 0) {
2034                     RETVAL_NULL();
2035                 } else {
2036                     RETVAL_STRINGL(r->str, r->len);
2037                 }
2038                 break;
2039             case TYPE_MULTIBULK:
2040                 if (r->elements < 0 && c->flags->null_mbulk_as_null) {
2041                     RETVAL_NULL();
2042                 } else {
2043                     array_init(z_arr);
2044                     for (i = 0; i < r->elements; i++) {
2045                         cluster_mbulk_variant_resp(r->element[i], c->flags->null_mbulk_as_null, z_arr);
2046                     }
2047                     RETVAL_ZVAL(z_arr, 0, 0);
2048                 }
2049                 break;
2050             default:
2051                 RETVAL_FALSE;
2052                 break;
2053         }
2054     } else {
2055         switch(r->type) {
2056             case TYPE_INT:
2057                 add_next_index_long(&c->multi_resp, r->integer);
2058                 break;
2059             case TYPE_ERR:
2060                 add_next_index_bool(&c->multi_resp, 0);
2061                 break;
2062             case TYPE_LINE:
2063                 if (status_strings) {
2064                     add_next_index_stringl(&c->multi_resp, r->str, r->len);
2065                 } else {
2066                     add_next_index_bool(&c->multi_resp, 1);
2067                 }
2068                 break;
2069             case TYPE_BULK:
2070                 if (r->len < 0) {
2071                     add_next_index_null(&c->multi_resp);
2072                 } else {
2073                     add_next_index_stringl(&c->multi_resp, r->str, r->len);
2074                 }
2075                 break;
2076             case TYPE_MULTIBULK:
2077                 if (r->elements < 0 && c->flags->null_mbulk_as_null) {
2078                     add_next_index_null(&c->multi_resp);
2079                 } else {
2080                     cluster_mbulk_variant_resp(r, c->flags->null_mbulk_as_null, &c->multi_resp);
2081                 }
2082                 break;
2083             default:
2084                 add_next_index_bool(&c->multi_resp, 0);
2085                 break;
2086         }
2087     }
2088 
2089     // Free our response structs, but not allocated data itself
2090     cluster_free_reply(r, 1);
2091 }
2092 
cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2093 PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2094                                         void *ctx)
2095 {
2096     cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx);
2097 }
2098 
cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2099 PHP_REDIS_API void cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2100                                             void *ctx)
2101 {
2102     cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2103                                  c->flags->reply_literal, ctx);
2104 }
2105 
cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2106 PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2107                                         void *ctx)
2108 {
2109     cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx);
2110 }
2111 
2112 /* Generic MULTI BULK response processor */
cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,mbulk_cb cb,void * ctx)2113 PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2114                                    redisCluster *c, mbulk_cb cb, void *ctx)
2115 {
2116     zval z_result;
2117 
2118     /* Abort if the reply isn't MULTIBULK or has an invalid length */
2119     if (c->reply_type != TYPE_MULTIBULK || c->reply_len < -1) {
2120         CLUSTER_RETURN_FALSE(c);
2121     }
2122 
2123     if (c->reply_len == -1 && c->flags->null_mbulk_as_null) {
2124         ZVAL_NULL(&z_result);
2125     } else {
2126         array_init(&z_result);
2127 
2128         if (c->reply_len > 0) {
2129             /* Push serialization settings from the cluster into our socket */
2130             c->cmd_sock->serializer = c->flags->serializer;
2131             c->cmd_sock->compression = c->flags->compression;
2132 
2133             /* Call our specified callback */
2134             if (cb(c->cmd_sock, &z_result, c->reply_len, ctx) == FAILURE) {
2135                 zval_dtor(&z_result);
2136                 CLUSTER_RETURN_FALSE(c);
2137             }
2138         }
2139     }
2140 
2141     // Success, make this array our return value
2142     if (CLUSTER_IS_ATOMIC(c)) {
2143         RETVAL_ZVAL(&z_result, 0, 1);
2144     } else {
2145         add_next_index_zval(&c->multi_resp, &z_result);
2146     }
2147 }
2148 
2149 /* HSCAN, SSCAN, ZSCAN */
cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,REDIS_SCAN_TYPE type,long * it)2150 PHP_REDIS_API int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2151                               REDIS_SCAN_TYPE type, long *it)
2152 {
2153     char *pit;
2154 
2155     // We always want to see a MULTIBULK response with two elements
2156     if (c->reply_type != TYPE_MULTIBULK || c->reply_len != 2)
2157     {
2158         return FAILURE;
2159     }
2160 
2161     // Read the BULK size
2162     if (cluster_check_response(c, &c->reply_type) ||
2163         c->reply_type != TYPE_BULK)
2164     {
2165         return FAILURE;
2166     }
2167 
2168     // Read the iterator
2169     if ((pit = redis_sock_read_bulk_reply(c->cmd_sock,c->reply_len)) == NULL)
2170     {
2171         return FAILURE;
2172     }
2173 
2174     // Push the new iterator value to our caller
2175     *it = atol(pit);
2176     efree(pit);
2177 
2178     // We'll need another MULTIBULK response for the payload
2179     if (cluster_check_response(c, &c->reply_type) < 0)
2180     {
2181         return FAILURE;
2182     }
2183 
2184     // Use the proper response callback depending on scan type
2185     switch(type) {
2186         case TYPE_SCAN:
2187             cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2188             break;
2189         case TYPE_SSCAN:
2190             cluster_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2191             break;
2192         case TYPE_HSCAN:
2193             cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2194             break;
2195         case TYPE_ZSCAN:
2196             cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2197             break;
2198         default:
2199             return FAILURE;
2200     }
2201 
2202     // Success
2203     return SUCCESS;
2204 }
2205 
2206 /* INFO response */
cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2207 PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2208                               void *ctx)
2209 {
2210     zval z_result;
2211     char *info;
2212 
2213     // Read our bulk response
2214     if ((info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
2215     {
2216         CLUSTER_RETURN_FALSE(c);
2217     }
2218 
2219     /* Parse response, free memory */
2220     redis_parse_info_response(info, &z_result);
2221     efree(info);
2222 
2223     // Return our array
2224     if (CLUSTER_IS_ATOMIC(c)) {
2225         RETVAL_ZVAL(&z_result, 0, 1);
2226     } else {
2227         add_next_index_zval(&c->multi_resp, &z_result);
2228     }
2229 }
2230 
2231 /* CLIENT LIST response */
cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2232 PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2233                                      void *ctx)
2234 {
2235     char *info;
2236     zval z_result;
2237 
2238     /* Read the bulk response */
2239     info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len);
2240     if (info == NULL) {
2241         CLUSTER_RETURN_FALSE(c);
2242     }
2243 
2244     /* Parse it and free the bulk string */
2245     redis_parse_client_list_response(info, &z_result);
2246     efree(info);
2247 
2248     if (CLUSTER_IS_ATOMIC(c)) {
2249         RETVAL_ZVAL(&z_result, 0, 1);
2250     } else {
2251         add_next_index_zval(&c->multi_resp, &z_result);
2252     }
2253 }
2254 
2255 /* XRANGE */
2256 PHP_REDIS_API void
cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2257 cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2258     zval z_messages;
2259 
2260     array_init(&z_messages);
2261 
2262     c->cmd_sock->serializer = c->flags->serializer;
2263     c->cmd_sock->compression = c->flags->compression;
2264 
2265     if (redis_read_stream_messages(c->cmd_sock, c->reply_len, &z_messages) < 0) {
2266         zval_dtor(&z_messages);
2267         CLUSTER_RETURN_FALSE(c);
2268     }
2269 
2270     if (CLUSTER_IS_ATOMIC(c)) {
2271         RETVAL_ZVAL(&z_messages, 0, 1);
2272     } else {
2273         add_next_index_zval(&c->multi_resp, &z_messages);
2274     }
2275 }
2276 
2277 /* XREAD */
2278 PHP_REDIS_API void
cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2279 cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2280     zval z_streams;
2281 
2282     c->cmd_sock->serializer = c->flags->serializer;
2283     c->cmd_sock->compression = c->flags->compression;
2284 
2285     if (c->reply_len == -1 && c->flags->null_mbulk_as_null) {
2286         ZVAL_NULL(&z_streams);
2287     } else {
2288         array_init(&z_streams);
2289         if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, &z_streams) < 0) {
2290             zval_dtor(&z_streams);
2291             CLUSTER_RETURN_FALSE(c);
2292         }
2293     }
2294 
2295     if (CLUSTER_IS_ATOMIC(c)) {
2296         RETVAL_ZVAL(&z_streams, 0, 1);
2297     } else {
2298         add_next_index_zval(&c->multi_resp, &z_streams);
2299     }
2300 }
2301 
2302 /* XCLAIM */
2303 PHP_REDIS_API void
cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2304 cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2305     zval z_msg;
2306 
2307     array_init(&z_msg);
2308 
2309     if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, &z_msg) < 0) {
2310         zval_dtor(&z_msg);
2311         CLUSTER_RETURN_FALSE(c);
2312     }
2313 
2314     if (CLUSTER_IS_ATOMIC(c)) {
2315         RETVAL_ZVAL(&z_msg, 0, 1);
2316     } else {
2317         add_next_index_zval(&c->multi_resp, &z_msg);
2318     }
2319 
2320 }
2321 
2322 /* XINFO */
2323 PHP_REDIS_API void
cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2324 cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2325 {
2326     zval z_ret;
2327 
2328     array_init(&z_ret);
2329     if (redis_read_xinfo_response(c->cmd_sock, &z_ret, c->reply_len) != SUCCESS) {
2330         zval_dtor(&z_ret);
2331         CLUSTER_RETURN_FALSE(c);
2332     }
2333 
2334     if (CLUSTER_IS_ATOMIC(c)) {
2335         RETURN_ZVAL(&z_ret, 0, 1);
2336     }
2337     add_next_index_zval(&c->multi_resp, &z_ret);
2338 }
2339 
2340 static void
cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx,int (* cb)(RedisSock *,zval *,long))2341 cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx,
2342                         int (*cb)(RedisSock*, zval*, long))
2343 {
2344     zval z_ret;
2345 
2346     array_init(&z_ret);
2347     if (cb(c->cmd_sock, &z_ret, c->reply_len) != SUCCESS) {
2348         zval_dtor(&z_ret);
2349         CLUSTER_RETURN_FALSE(c);
2350     }
2351 
2352     if (CLUSTER_IS_ATOMIC(c)) {
2353         RETURN_ZVAL(&z_ret, 0, 1);
2354     }
2355     add_next_index_zval(&c->multi_resp, &z_ret);
2356 }
2357 
2358 PHP_REDIS_API void
cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2359 cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2360     cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx, redis_read_acl_getuser_reply);
2361 }
2362 
2363 PHP_REDIS_API void
cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2364 cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2365     cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx, redis_read_acl_log_reply);
2366 }
2367 
2368 /* MULTI BULK response loop where we might pull the next one */
cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,int pull,mbulk_cb cb,zval * z_ret)2369 PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2370                                      redisCluster *c, int pull, mbulk_cb cb, zval *z_ret)
2371 {
2372     ZVAL_NULL(z_ret);
2373 
2374     // Pull our next response if directed
2375     if (pull) {
2376         if (cluster_check_response(c, &c->reply_type) < 0)
2377         {
2378             return NULL;
2379         }
2380     }
2381 
2382     // Validate reply type and length
2383     if (c->reply_type != TYPE_MULTIBULK || c->reply_len == -1) {
2384         return NULL;
2385     }
2386 
2387     array_init(z_ret);
2388 
2389     // Call our callback
2390     if (cb(c->cmd_sock, z_ret, c->reply_len, NULL) == FAILURE) {
2391         zval_dtor(z_ret);
2392         return NULL;
2393     }
2394 
2395     return z_ret;
2396 }
2397 
2398 /* MULTI MULTI BULK reply (for EXEC) */
cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2399 PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2400                                      redisCluster *c, void *ctx)
2401 {
2402     zval *multi_resp = &c->multi_resp;
2403     array_init(multi_resp);
2404 
2405     clusterFoldItem *fi = c->multi_head;
2406     while (fi) {
2407         /* Make sure our transaction didn't fail here */
2408         if (c->multi_len[fi->slot] > -1) {
2409             /* Set the slot where we should look for responses.  We don't allow
2410              * failover inside a transaction, so it will be the master we have
2411              * mapped. */
2412             c->cmd_slot = fi->slot;
2413             c->cmd_sock = SLOT_SOCK(c, fi->slot);
2414 
2415             if (cluster_check_response(c, &c->reply_type) < 0) {
2416                 zval_dtor(multi_resp);
2417                 RETURN_FALSE;
2418             }
2419 
2420             fi->callback(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, fi->ctx);
2421         } else {
2422             /* Just add false */
2423             add_next_index_bool(multi_resp, 0);
2424         }
2425         fi = fi->next;
2426     }
2427 
2428     // Set our return array
2429     zval_dtor(return_value);
2430     RETVAL_ZVAL(multi_resp, 0, 1);
2431 }
2432 
2433 /* Generic handler for MGET */
cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2434 PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
2435                                     redisCluster *c, void *ctx)
2436 {
2437     clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2438 
2439     /* Protect against an invalid response type, -1 response length, and failure
2440      * to consume the responses. */
2441     c->cmd_sock->serializer = c->flags->serializer;
2442     c->cmd_sock->compression = c->flags->compression;
2443     short fail = c->reply_type != TYPE_MULTIBULK || c->reply_len == -1 ||
2444         mbulk_resp_loop(c->cmd_sock, mctx->z_multi, c->reply_len, NULL) == FAILURE;
2445 
2446     // If we had a failure, pad results with FALSE to indicate failure.  Non
2447     // existent keys (e.g. for MGET will come back as NULL)
2448     if (fail) {
2449         while (mctx->count--) {
2450             add_next_index_bool(mctx->z_multi, 0);
2451         }
2452     }
2453 
2454     // If this is the tail of our multi command, we can set our returns
2455     if (mctx->last) {
2456         if (CLUSTER_IS_ATOMIC(c)) {
2457             RETVAL_ZVAL(mctx->z_multi, 0, 1);
2458         } else {
2459             add_next_index_zval(&c->multi_resp, mctx->z_multi);
2460         }
2461 
2462         efree(mctx->z_multi);
2463     }
2464 
2465     // Clean up this context item
2466     efree(mctx);
2467 }
2468 
2469 /* Handler for MSETNX */
cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2470 PHP_REDIS_API void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2471                                 void *ctx)
2472 {
2473     clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2474     int real_argc = mctx->count/2;
2475 
2476     // Protect against an invalid response type
2477     if (c->reply_type != TYPE_INT) {
2478         php_error_docref(0, E_WARNING,
2479             "Invalid response type for MSETNX");
2480         while (real_argc--) {
2481             add_next_index_bool(mctx->z_multi, 0);
2482         }
2483         return;
2484     }
2485 
2486     // Response will be 1/0 per key, so the client can match them up
2487     while (real_argc--) {
2488         add_next_index_long(mctx->z_multi, c->reply_len);
2489     }
2490 
2491     // Set return value if it's our last response
2492     if (mctx->last) {
2493         if (CLUSTER_IS_ATOMIC(c)) {
2494             RETVAL_ZVAL(mctx->z_multi, 0, 0);
2495         } else {
2496             add_next_index_zval(&c->multi_resp, mctx->z_multi);
2497         }
2498         efree(mctx->z_multi);
2499     }
2500 
2501     // Free multi context
2502     efree(mctx);
2503 }
2504 
2505 /* Handler for DEL */
cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2506 PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2507                              void *ctx)
2508 {
2509     clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2510 
2511     // If we get an invalid reply, inform the client
2512     if (c->reply_type != TYPE_INT) {
2513         php_error_docref(0, E_WARNING,
2514             "Invalid reply type returned for DEL command");
2515         efree(mctx);
2516         return;
2517     }
2518 
2519     // Increment by the number of keys deleted
2520     Z_LVAL_P(mctx->z_multi) += c->reply_len;
2521 
2522     if (mctx->last) {
2523         if (CLUSTER_IS_ATOMIC(c)) {
2524             ZVAL_LONG(return_value, Z_LVAL_P(mctx->z_multi));
2525         } else {
2526             add_next_index_long(&c->multi_resp, Z_LVAL_P(mctx->z_multi));
2527         }
2528         efree(mctx->z_multi);
2529     }
2530 
2531     efree(ctx);
2532 }
2533 
2534 /* Handler for MSET */
cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2535 PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2536                               void *ctx)
2537 {
2538     clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2539 
2540     // If we get an invalid reply type something very wrong has happened,
2541     // and we have to abort.
2542     if (c->reply_type != TYPE_LINE) {
2543         php_error_docref(0, E_ERROR,
2544             "Invalid reply type returned for MSET command");
2545         zval_dtor(mctx->z_multi);
2546         efree(mctx->z_multi);
2547         efree(mctx);
2548         RETURN_FALSE;
2549     }
2550 
2551     // Set our return if it's the last call
2552     if (mctx->last) {
2553         if (CLUSTER_IS_ATOMIC(c)) {
2554             ZVAL_BOOL(return_value, zval_is_true(mctx->z_multi));
2555         } else {
2556             add_next_index_bool(&c->multi_resp, zval_is_true(mctx->z_multi));
2557         }
2558         efree(mctx->z_multi);
2559     }
2560 
2561     efree(mctx);
2562 }
2563 
2564 /* Raw MULTI BULK reply */
2565 PHP_REDIS_API void
cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2566 cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2567 {
2568     cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2569         mbulk_resp_loop_raw, NULL);
2570 }
2571 
2572 /* Unserialize all the things */
2573 PHP_REDIS_API void
cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2574 cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2575 {
2576     cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2577         mbulk_resp_loop, NULL);
2578 }
2579 
2580 /* For handling responses where we get key, value, key, value that
2581  * we will turn into key => value, key => value. */
2582 PHP_REDIS_API void
cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2583 cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2584                           void *ctx)
2585 {
2586     cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2587         mbulk_resp_loop_zipstr, NULL);
2588 }
2589 
2590 /* Handling key,value to key=>value where the values are doubles */
2591 PHP_REDIS_API void
cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2592 cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2593                           void *ctx)
2594 {
2595     cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2596         mbulk_resp_loop_zipdbl, NULL);
2597 }
2598 
2599 /* Associate multi bulk response (for HMGET really) */
2600 PHP_REDIS_API void
cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2601 cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2602                          void *ctx)
2603 {
2604     cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2605         mbulk_resp_loop_assoc, ctx);
2606 }
2607 
2608 /*
2609  * Various MULTI BULK reply callback functions
2610  */
2611 
2612 /* MULTI BULK response where we don't touch the values (e.g. KEYS) */
mbulk_resp_loop_raw(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2613 int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result,
2614                         long long count, void *ctx)
2615 {
2616     char *line;
2617     int line_len;
2618 
2619     // Iterate over the number we have
2620     while (count--) {
2621         // Read the line, which should never come back null
2622         line = redis_sock_read(redis_sock, &line_len);
2623         if (line == NULL) return FAILURE;
2624 
2625         // Add to our result array
2626         add_next_index_stringl(z_result, line, line_len);
2627         efree(line);
2628     }
2629 
2630     // Success!
2631     return SUCCESS;
2632 }
2633 
2634 /* MULTI BULK response where we unserialize everything */
mbulk_resp_loop(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2635 int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result,
2636                     long long count, void *ctx)
2637 {
2638     char *line;
2639     int line_len;
2640 
2641     /* Iterate over the lines we have to process */
2642     while (count--) {
2643         /* Read our line */
2644         line = redis_sock_read(redis_sock, &line_len);
2645 
2646         if (line != NULL) {
2647             zval z_unpacked;
2648             if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2649                 add_next_index_zval(z_result, &z_unpacked);
2650             } else {
2651                 add_next_index_stringl(z_result, line, line_len);
2652             }
2653             efree(line);
2654         } else {
2655             add_next_index_bool(z_result, 0);
2656         }
2657     }
2658 
2659     return SUCCESS;
2660 }
2661 
2662 /* MULTI BULK response where we turn key1,value1 into key1=>value1 */
mbulk_resp_loop_zipstr(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2663 int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result,
2664                            long long count, void *ctx)
2665 {
2666     char *line, *key = NULL;
2667     int line_len, key_len = 0;
2668     long long idx = 0;
2669 
2670     // Our count will need to be divisible by 2
2671     if (count % 2 != 0) {
2672         return -1;
2673     }
2674 
2675     // Iterate through our elements
2676     while (count--) {
2677         // Grab our line, bomb out on failure
2678         line = redis_sock_read(redis_sock, &line_len);
2679         if (!line) return -1;
2680 
2681         if (idx++ % 2 == 0) {
2682             // Save our key and length
2683             key = line;
2684             key_len = line_len;
2685         } else {
2686             /* Attempt unpacking */
2687             zval z_unpacked;
2688             if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2689                 add_assoc_zval(z_result, key, &z_unpacked);
2690             } else {
2691                 add_assoc_stringl_ex(z_result, key, key_len, line, line_len);
2692             }
2693 
2694             efree(line);
2695             efree(key);
2696         }
2697     }
2698 
2699     return SUCCESS;
2700 }
2701 
2702 /* MULTI BULK loop processor where we expect key,score key, score */
mbulk_resp_loop_zipdbl(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2703 int mbulk_resp_loop_zipdbl(RedisSock *redis_sock, zval *z_result,
2704                            long long count, void *ctx)
2705 {
2706     char *line, *key = NULL;
2707     int line_len, key_len = 0;
2708     long long idx = 0;
2709 
2710     // Our context will need to be divisible by 2
2711     if (count %2 != 0) {
2712         return -1;
2713     }
2714 
2715     // While we have elements
2716     while (count--) {
2717         line = redis_sock_read(redis_sock, &line_len);
2718         if (line != NULL) {
2719             if (idx++ % 2 == 0) {
2720                 key = line;
2721                 key_len = line_len;
2722             } else {
2723                 zval zv, *z = &zv;
2724                 if (redis_unpack(redis_sock,key,key_len, z)) {
2725                     zend_string *zstr = zval_get_string(z);
2726                     add_assoc_double_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), atof(line));
2727                     zend_string_release(zstr);
2728                     zval_dtor(z);
2729                 } else {
2730                     add_assoc_double_ex(z_result, key, key_len, atof(line));
2731                 }
2732 
2733                 /* Free our key and line */
2734                 efree(key);
2735                 efree(line);
2736             }
2737         }
2738     }
2739 
2740     return SUCCESS;
2741 }
2742 
2743 /* MULTI BULK where we're passed the keys, and we attach vals */
mbulk_resp_loop_assoc(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2744 int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
2745                           long long count, void *ctx)
2746 {
2747     char *line;
2748     int line_len, i;
2749     zval *z_keys = ctx;
2750 
2751     // Loop while we've got replies
2752     for (i = 0; i < count; ++i) {
2753         zend_string *zstr = zval_get_string(&z_keys[i]);
2754         line = redis_sock_read(redis_sock, &line_len);
2755 
2756         if (line != NULL) {
2757             zval z_unpacked;
2758             if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2759                 add_assoc_zval_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), &z_unpacked);
2760             } else {
2761                 add_assoc_stringl_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), line, line_len);
2762             }
2763             efree(line);
2764         } else {
2765             add_assoc_bool_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), 0);
2766         }
2767 
2768         // Clean up key context
2769         zend_string_release(zstr);
2770         zval_dtor(&z_keys[i]);
2771     }
2772 
2773     // Clean up our keys overall
2774     efree(z_keys);
2775 
2776     // Success!
2777     return SUCCESS;
2778 }
2779 
2780 /* Free an array of zend_string seeds */
free_seed_array(zend_string ** seeds,uint32_t nseeds)2781 void free_seed_array(zend_string **seeds, uint32_t nseeds) {
2782     int i;
2783 
2784     if (seeds == NULL)
2785         return;
2786 
2787     for (i = 0; i < nseeds; i++)
2788         zend_string_release(seeds[i]);
2789 
2790     efree(seeds);
2791 }
2792 
get_valid_seeds(HashTable * input,uint32_t * nseeds)2793 static zend_string **get_valid_seeds(HashTable *input, uint32_t *nseeds) {
2794     HashTable *valid;
2795     uint32_t count, idx = 0;
2796     zval *z_seed;
2797     zend_string *zkey, **seeds = NULL;
2798 
2799     /* Short circuit if we don't have any sees */
2800     count = zend_hash_num_elements(input);
2801     if (count == 0)
2802         return NULL;
2803 
2804     ALLOC_HASHTABLE(valid);
2805     zend_hash_init(valid, count, NULL, NULL, 0);
2806 
2807     ZEND_HASH_FOREACH_VAL(input, z_seed) {
2808         ZVAL_DEREF(z_seed);
2809 
2810         if (Z_TYPE_P(z_seed) != IS_STRING) {
2811             php_error_docref(NULL, E_WARNING, "Skipping non-string entry in seeds array");
2812             continue;
2813         } else if (strrchr(Z_STRVAL_P(z_seed), ':') == NULL) {
2814             php_error_docref(NULL, E_WARNING,
2815                 "Seed '%s' not in host:port format, ignoring", Z_STRVAL_P(z_seed));
2816             continue;
2817         }
2818 
2819         /* Add as a key to avoid duplicates */
2820         zend_hash_str_update_ptr(valid, Z_STRVAL_P(z_seed), Z_STRLEN_P(z_seed), NULL);
2821     } ZEND_HASH_FOREACH_END();
2822 
2823     /* We need at least one valid seed */
2824     count = zend_hash_num_elements(valid);
2825     if (count == 0)
2826         goto cleanup;
2827 
2828     /* Populate our return array */
2829     seeds = ecalloc(count, sizeof(*seeds));
2830     ZEND_HASH_FOREACH_STR_KEY(valid, zkey) {
2831         seeds[idx++] = zend_string_copy(zkey);
2832     } ZEND_HASH_FOREACH_END();
2833 
2834     *nseeds = idx;
2835 
2836 cleanup:
2837     zend_hash_destroy(valid);
2838     FREE_HASHTABLE(valid);
2839 
2840     return seeds;
2841 }
2842 
2843 /* Validate cluster construction arguments and return a sanitized and validated
2844  * array of seeds */
2845 zend_string**
cluster_validate_args(double timeout,double read_timeout,HashTable * seeds,uint32_t * nseeds,char ** errstr)2846 cluster_validate_args(double timeout, double read_timeout, HashTable *seeds,
2847                       uint32_t *nseeds, char **errstr)
2848 {
2849     zend_string **retval;
2850 
2851     if (timeout < 0L || timeout > INT_MAX) {
2852         if (errstr) *errstr = "Invalid timeout";
2853         return NULL;
2854     }
2855 
2856     if (read_timeout < 0L || read_timeout > INT_MAX) {
2857         if (errstr) *errstr = "Invalid read timeout";
2858         return NULL;
2859     }
2860 
2861     retval = get_valid_seeds(seeds, nseeds);
2862     if (retval == NULL && errstr)
2863         *errstr = "No valid seeds detected";
2864 
2865     return retval;
2866 }
2867 
2868 /* Helper function to compare to host:port seeds */
cluster_cmp_seeds(const void * a,const void * b)2869 static int cluster_cmp_seeds(const void *a, const void *b) {
2870     zend_string *za = *(zend_string **)a;
2871     zend_string *zb = *(zend_string **)b;
2872     return strcmp(ZSTR_VAL(za), ZSTR_VAL(zb));
2873 }
2874 
cluster_swap_seeds(void * a,void * b)2875 static void cluster_swap_seeds(void *a, void *b) {
2876     zend_string **za, **zb, *tmp;
2877 
2878     za = a;
2879     zb = b;
2880 
2881     tmp = *za;
2882     *za = *zb;
2883     *zb = tmp;
2884 }
2885 
2886 /* Turn an array of cluster seeds into a string we can cache.  If we get here we know
2887  * we have at least one entry and that every entry is a string in the form host:port */
2888 #define SLOT_CACHE_PREFIX "phpredis_slots:"
cluster_hash_seeds(zend_string ** seeds,uint32_t count)2889 zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t count) {
2890     smart_str hash = {0};
2891     size_t i;
2892 
2893     /* Sort our seeds so any any array with identical seeds hashes to the same key
2894      * regardless of what order the user gives them to us in. */
2895     zend_sort(seeds, count, sizeof(*seeds), cluster_cmp_seeds, cluster_swap_seeds);
2896 
2897     /* Global phpredis hash prefix */
2898     smart_str_appendl(&hash, SLOT_CACHE_PREFIX, sizeof(SLOT_CACHE_PREFIX) - 1);
2899 
2900     /* Construct our actual hash */
2901     for (i = 0; i < count; i++) {
2902         smart_str_appendc(&hash, '[');
2903         smart_str_append_ex(&hash, seeds[i], 0);
2904         smart_str_appendc(&hash, ']');
2905     }
2906 
2907     /* Null terminate */
2908     smart_str_0(&hash);
2909 
2910     /* Return the internal zend_string */
2911     return hash.s;
2912 }
2913 
cluster_cache_load(zend_string * hash)2914 PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash) {
2915     zend_resource *le;
2916 
2917     /* Look for cached slot information */
2918     le = zend_hash_find_ptr(&EG(persistent_list), hash);
2919 
2920     if (le != NULL) {
2921         /* Sanity check on our list type */
2922         if (le->type != le_cluster_slot_cache) {
2923             php_error_docref(0, E_WARNING, "Invalid slot cache resource");
2924             return NULL;
2925         }
2926 
2927         /* Success, return the cached entry */
2928         return le->ptr;
2929     }
2930 
2931     /* Not found */
2932     return NULL;
2933 }
2934 
2935 /* Cache a cluster's slot information in persistent_list if it's enabled */
cluster_cache_store(zend_string * hash,HashTable * nodes)2936 PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes) {
2937     redisCachedCluster *cc = cluster_cache_create(hash, nodes);
2938 
2939     redis_register_persistent_resource(cc->hash, cc, le_cluster_slot_cache);
2940 
2941     return SUCCESS;
2942 }
2943 
2944 
2945 /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */
2946