1 #include "php_redis.h"
2 #include "common.h"
3 #include "library.h"
4 #include "redis_commands.h"
5 #include "cluster_library.h"
6 #include "crc16.h"
7 #include <zend_exceptions.h>
8
9 extern zend_class_entry *redis_cluster_exception_ce;
10 int le_cluster_slot_cache;
11
12 /* Debugging methods/
13 static void cluster_dump_nodes(redisCluster *c) {
14 redisClusterNode *p;
15
16 ZEND_HASH_FOREACH_PTR(c->nodes, p) {
17 if (p == NULL) {
18 continue;
19 }
20
21 const char *slave = (p->slave) ? "slave" : "master";
22 php_printf("%d %s %d %d", p->sock->port, slave,p->sock->prefix_len,
23 p->slot);
24
25 php_printf("\n");
26 } ZEND_HASH_FOREACH_END();
27 }
28
29 static void cluster_log(char *fmt, ...)
30 {
31 va_list args;
32 char buffer[1024];
33
34 va_start(args, fmt);
35 vsnprintf(buffer,sizeof(buffer),fmt,args);
36 va_end(args);
37
38 fprintf(stderr, "%s\n", buffer);
39 }
40
41 // Debug function to dump a clusterReply structure recursively
42 static void dump_reply(clusterReply *reply, int indent) {
43 smart_string buf = {0};
44 int i;
45
46 switch(reply->type) {
47 case TYPE_ERR:
48 smart_string_appendl(&buf, "(error) ", sizeof("(error) ")-1);
49 smart_string_appendl(&buf, reply->str, reply->len);
50 break;
51 case TYPE_LINE:
52 smart_string_appendl(&buf, reply->str, reply->len);
53 break;
54 case TYPE_INT:
55 smart_string_appendl(&buf, "(integer) ", sizeof("(integer) ")-1);
56 smart_string_append_long(&buf, reply->integer);
57 break;
58 case TYPE_BULK:
59 smart_string_appendl(&buf,"\"", 1);
60 smart_string_appendl(&buf, reply->str, reply->len);
61 smart_string_appendl(&buf, "\"", 1);
62 break;
63 case TYPE_MULTIBULK:
64 if (reply->elements < 0) {
65 smart_string_appendl(&buf, "(nil)", sizeof("(nil)")-1);
66 } else {
67 for (i = 0; i < reply->elements; i++) {
68 dump_reply(reply->element[i], indent+2);
69 }
70 }
71 break;
72 default:
73 break;
74 }
75
76 if (buf.len > 0) {
77 for (i = 0; i < indent; i++) {
78 php_printf(" ");
79 }
80
81 smart_string_0(&buf);
82 php_printf("%s", buf.c);
83 php_printf("\n");
84
85 efree(buf.c);
86 }
87 }
88 */
89
90
91 /* Recursively free our reply object. If free_data is non-zero we'll also free
92 * the payload data (strings) themselves. If not, we just free the structs */
cluster_free_reply(clusterReply * reply,int free_data)93 void cluster_free_reply(clusterReply *reply, int free_data) {
94 long long i;
95
96 switch(reply->type) {
97 case TYPE_ERR:
98 case TYPE_LINE:
99 case TYPE_BULK:
100 if (free_data && reply->str)
101 efree(reply->str);
102 break;
103 case TYPE_MULTIBULK:
104 if (reply->element) {
105 if (reply->elements > 0) {
106 for (i = 0; i < reply->elements && reply->element[i]; i++) {
107 cluster_free_reply(reply->element[i], free_data);
108 }
109 }
110 efree(reply->element);
111 }
112 break;
113 default:
114 break;
115 }
116 efree(reply);
117 }
118
119 static int
cluster_multibulk_resp_recursive(RedisSock * sock,size_t elements,clusterReply ** element,int status_strings)120 cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
121 clusterReply **element, int status_strings)
122 {
123 int i;
124 size_t sz;
125 clusterReply *r;
126 long len;
127 char buf[1024];
128
129 for (i = 0; i < elements; i++) {
130 r = element[i] = ecalloc(1, sizeof(clusterReply));
131
132 // Bomb out, flag error condition on a communication failure
133 if (redis_read_reply_type(sock, &r->type, &len) < 0) {
134 return FAILURE;
135 }
136
137 /* Set our reply len */
138 r->len = len;
139
140 switch(r->type) {
141 case TYPE_ERR:
142 case TYPE_LINE:
143 if (redis_sock_gets(sock,buf,sizeof(buf),&sz) < 0) {
144 return FAILURE;
145 }
146 r->len = (long long)sz;
147 if (status_strings) r->str = estrndup(buf, r->len);
148 break;
149 case TYPE_INT:
150 r->integer = len;
151 break;
152 case TYPE_BULK:
153 if (r->len >= 0) {
154 r->str = redis_sock_read_bulk_reply(sock,r->len);
155 if (!r->str) {
156 return FAILURE;
157 }
158 }
159 break;
160 case TYPE_MULTIBULK:
161 r->elements = r->len;
162 if (r->elements > 0) {
163 r->element = ecalloc(r->len, sizeof(*r->element));
164 if (cluster_multibulk_resp_recursive(sock, r->elements, r->element, status_strings) < 0) {
165 return FAILURE;
166 }
167 }
168 break;
169 default:
170 return FAILURE;
171 }
172 }
173 return SUCCESS;
174 }
175
176 /* Return the socket for a slot and slave index */
cluster_slot_sock(redisCluster * c,unsigned short slot,zend_ulong slaveidx)177 static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot,
178 zend_ulong slaveidx)
179 {
180 redisClusterNode *node;
181
182 /* Return the master if we're not looking for a slave */
183 if (slaveidx == 0) {
184 return SLOT_SOCK(c, slot);
185 }
186
187 /* Abort if we can't find this slave */
188 if (!SLOT_SLAVES(c, slot) ||
189 (node = zend_hash_index_find_ptr(SLOT_SLAVES(c,slot), slaveidx)) == NULL
190 ) {
191 return NULL;
192 }
193
194 /* Success, return the slave */
195 return node->sock;
196 }
197
198 /* Read the response from a cluster */
cluster_read_resp(redisCluster * c,int status_strings)199 clusterReply *cluster_read_resp(redisCluster *c, int status_strings) {
200 return cluster_read_sock_resp(c->cmd_sock, c->reply_type,
201 status_strings ? c->line_reply : NULL,
202 c->reply_len);
203 }
204
205 /* Read any sort of response from the socket, having already issued the
206 * command and consumed the reply type and meta info (length) */
207 clusterReply*
cluster_read_sock_resp(RedisSock * redis_sock,REDIS_REPLY_TYPE type,char * line_reply,long long len)208 cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
209 char *line_reply, long long len)
210 {
211 clusterReply *r;
212
213 r = ecalloc(1, sizeof(clusterReply));
214 r->type = type;
215
216 switch(r->type) {
217 case TYPE_INT:
218 r->integer = len;
219 break;
220 case TYPE_LINE:
221 if (line_reply) {
222 r->str = estrndup(line_reply, len);
223 r->len = len;
224 }
225 case TYPE_ERR:
226 return r;
227 case TYPE_BULK:
228 r->len = len;
229 r->str = redis_sock_read_bulk_reply(redis_sock, len);
230 if (r->len != -1 && !r->str) {
231 cluster_free_reply(r, 1);
232 return NULL;
233 }
234 break;
235 case TYPE_MULTIBULK:
236 r->elements = len;
237 if (r->elements > 0) {
238 r->element = ecalloc(len, sizeof(clusterReply*));
239 if (cluster_multibulk_resp_recursive(redis_sock, len, r->element, line_reply != NULL) < 0) {
240 cluster_free_reply(r, 1);
241 return NULL;
242 }
243 }
244 break;
245 default:
246 cluster_free_reply(r, 1);
247 return NULL;
248 }
249
250 // Success, return the reply
251 return r;
252 }
253
254 /*
255 * Helpers to send various 'control type commands to a specific node, e.g.
256 * MULTI, ASKING, READONLY, READWRITE, etc
257 */
258
259 /* Send a command to the specific socket and validate reply type */
cluster_send_direct(RedisSock * redis_sock,char * cmd,int cmd_len,REDIS_REPLY_TYPE type)260 static int cluster_send_direct(RedisSock *redis_sock, char *cmd, int cmd_len,
261 REDIS_REPLY_TYPE type)
262 {
263 char buf[1024];
264
265 /* Connect to the socket if we aren't yet and send our command, validate the reply type, and consume the first line */
266 if (!CLUSTER_SEND_PAYLOAD(redis_sock,cmd,cmd_len) ||
267 !CLUSTER_VALIDATE_REPLY_TYPE(redis_sock, type) ||
268 !php_stream_gets(redis_sock->stream, buf, sizeof(buf))) return -1;
269
270 /* Success! */
271 return 0;
272 }
273
cluster_send_asking(RedisSock * redis_sock)274 static int cluster_send_asking(RedisSock *redis_sock) {
275 return cluster_send_direct(redis_sock, RESP_ASKING_CMD,
276 sizeof(RESP_ASKING_CMD)-1, TYPE_LINE);
277 }
278
279 /* Send READONLY to a specific RedisSock unless it's already flagged as being
280 * in READONLY mode. If we can send the command, we flag the socket as being
281 * in that mode. */
cluster_send_readonly(RedisSock * redis_sock)282 static int cluster_send_readonly(RedisSock *redis_sock) {
283 int ret;
284
285 /* We don't have to do anything if we're already in readonly mode */
286 if (redis_sock->readonly) return 0;
287
288 /* Return success if we can send it */
289 ret = cluster_send_direct(redis_sock, RESP_READONLY_CMD,
290 sizeof(RESP_READONLY_CMD) - 1, TYPE_LINE);
291
292 /* Flag this socket as READONLY if our command worked */
293 redis_sock->readonly = !ret;
294
295 /* Return the result of our send */
296 return ret;
297 }
298
299 /* Send MULTI to a specific ReidsSock */
cluster_send_multi(redisCluster * c,short slot)300 static int cluster_send_multi(redisCluster *c, short slot) {
301 if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_MULTI_CMD,
302 sizeof(RESP_MULTI_CMD) - 1, TYPE_LINE) == 0)
303 {
304 c->cmd_sock->mode = MULTI;
305 return 0;
306 }
307 return -1;
308 }
309
310 /* Send EXEC to a given slot. We can use the normal command processing mechanism
311 * here because we know we'll only have sent MULTI to the master nodes. We can't
312 * failover inside a transaction, as we don't know if the transaction will only
313 * be readonly commands, or contain write commands as well */
cluster_send_exec(redisCluster * c,short slot)314 PHP_REDIS_API int cluster_send_exec(redisCluster *c, short slot) {
315 int retval;
316
317 /* Send exec */
318 retval = cluster_send_slot(c, slot, RESP_EXEC_CMD, sizeof(RESP_EXEC_CMD)-1,
319 TYPE_MULTIBULK);
320
321 /* We'll either get a length corresponding to the number of commands sent to
322 * this node, or -1 in the case of EXECABORT or WATCH failure. */
323 c->multi_len[slot] = c->reply_len > 0 ? 1 : -1;
324
325 /* Return our retval */
326 return retval;
327 }
328
cluster_send_discard(redisCluster * c,short slot)329 PHP_REDIS_API int cluster_send_discard(redisCluster *c, short slot) {
330 if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_DISCARD_CMD,
331 sizeof(RESP_DISCARD_CMD)-1, TYPE_LINE))
332 {
333 return 0;
334 }
335 return -1;
336 }
337
338 /*
339 * Cluster key distribution helpers. For a small handlful of commands, we want
340 * to distribute them across 1-N nodes. These methods provide simple containers
341 * for the purposes of splitting keys/values in this way
342 * */
343
344 /* Free cluster distribution list inside a HashTable */
cluster_dist_free_ht(zval * p)345 static void cluster_dist_free_ht(zval *p) {
346 clusterDistList *dl = *(clusterDistList**)p;
347 int i;
348
349 for (i = 0; i < dl->len; i++) {
350 if (dl->entry[i].key_free)
351 efree(dl->entry[i].key);
352 if (dl->entry[i].val_free)
353 efree(dl->entry[i].val);
354 }
355
356 efree(dl->entry);
357 efree(dl);
358 }
359
360 /* Spin up a HashTable that will contain distribution lists */
cluster_dist_create()361 HashTable *cluster_dist_create() {
362 HashTable *ret;
363
364 ALLOC_HASHTABLE(ret);
365 zend_hash_init(ret, 0, NULL, cluster_dist_free_ht, 0);
366
367 return ret;
368 }
369
370 /* Free distribution list */
cluster_dist_free(HashTable * ht)371 void cluster_dist_free(HashTable *ht) {
372 zend_hash_destroy(ht);
373 efree(ht);
374 }
375
376 /* Create a clusterDistList object */
cluster_dl_create()377 static clusterDistList *cluster_dl_create() {
378 clusterDistList *dl;
379
380 dl = emalloc(sizeof(clusterDistList));
381 dl->entry = emalloc(CLUSTER_KEYDIST_ALLOC * sizeof(clusterKeyVal));
382 dl->size = CLUSTER_KEYDIST_ALLOC;
383 dl->len = 0;
384
385 return dl;
386 }
387
388 /* Add a key to a dist list, returning the keval entry */
cluster_dl_add_key(clusterDistList * dl,char * key,int key_len,int key_free)389 static clusterKeyVal *cluster_dl_add_key(clusterDistList *dl, char *key,
390 int key_len, int key_free)
391 {
392 // Reallocate if required
393 if (dl->len == dl->size) {
394 dl->entry = erealloc(dl->entry, sizeof(clusterKeyVal) * dl->size * 2);
395 dl->size *= 2;
396 }
397
398 // Set key info
399 dl->entry[dl->len].key = key;
400 dl->entry[dl->len].key_len = key_len;
401 dl->entry[dl->len].key_free = key_free;
402
403 // NULL out any values
404 dl->entry[dl->len].val = NULL;
405 dl->entry[dl->len].val_len = 0;
406 dl->entry[dl->len].val_free = 0;
407
408 return &(dl->entry[dl->len++]);
409 }
410
411 /* Add a key, returning a pointer to the entry where passed for easy adding
412 * of values to match this key */
cluster_dist_add_key(redisCluster * c,HashTable * ht,char * key,size_t key_len,clusterKeyVal ** kv)413 int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
414 size_t key_len, clusterKeyVal **kv)
415 {
416 int key_free;
417 short slot;
418 clusterDistList *dl;
419 clusterKeyVal *retptr;
420
421 // Prefix our key and hash it
422 key_free = redis_key_prefix(c->flags, &key, &key_len);
423 slot = cluster_hash_key(key, key_len);
424
425 // We can't do this if we don't fully understand the keyspace
426 if (c->master[slot] == NULL) {
427 if (key_free) efree(key);
428 return FAILURE;
429 }
430
431 // Look for this slot
432 if ((dl = zend_hash_index_find_ptr(ht, (zend_ulong)slot)) == NULL) {
433 dl = cluster_dl_create();
434 zend_hash_index_update_ptr(ht, (zend_ulong)slot, dl);
435 }
436
437 // Now actually add this key
438 retptr = cluster_dl_add_key(dl, key, key_len, key_free);
439
440 // Push our return pointer if requested
441 if (kv) *kv = retptr;
442
443 return SUCCESS;
444 }
445
446 /* Provided a clusterKeyVal, add a value */
cluster_dist_add_val(redisCluster * c,clusterKeyVal * kv,zval * z_val)447 void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *z_val
448 )
449 {
450 char *val;
451 size_t val_len;
452 int val_free;
453
454 // Serialize our value
455 val_free = redis_pack(c->flags, z_val, &val, &val_len);
456
457 // Attach it to the provied keyval entry
458 kv->val = val;
459 kv->val_len = val_len;
460 kv->val_free = val_free;
461 }
462
463 /* Free allocated memory for a clusterMultiCmd */
cluster_multi_free(clusterMultiCmd * mc)464 void cluster_multi_free(clusterMultiCmd *mc) {
465 efree(mc->cmd.c);
466 efree(mc->args.c);
467 }
468
469 /* Add an argument to a clusterMultiCmd */
cluster_multi_add(clusterMultiCmd * mc,char * data,int data_len)470 void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len) {
471 mc->argc++;
472 redis_cmd_append_sstr(&(mc->args), data, data_len);
473 }
474
475 /* Finalize a clusterMutliCmd by constructing the whole thing */
cluster_multi_fini(clusterMultiCmd * mc)476 void cluster_multi_fini(clusterMultiCmd *mc) {
477 mc->cmd.len = 0;
478 redis_cmd_init_sstr(&(mc->cmd), mc->argc, mc->kw, mc->kw_len);
479 smart_string_appendl(&(mc->cmd), mc->args.c, mc->args.len);
480 }
481
482 /* Set our last error string encountered */
483 static void
cluster_set_err(redisCluster * c,char * err,int err_len)484 cluster_set_err(redisCluster *c, char *err, int err_len)
485 {
486 // Free our last error
487 if (c->err != NULL) {
488 zend_string_release(c->err);
489 c->err = NULL;
490 }
491 if (err != NULL && err_len > 0) {
492 c->err = zend_string_init(err, err_len, 0);
493 if (err_len >= sizeof("CLUSTERDOWN") - 1 &&
494 !memcmp(err, "CLUSTERDOWN", sizeof("CLUSTERDOWN") - 1)
495 ) {
496 c->clusterdown = 1;
497 }
498 }
499 }
500
501 /* Destructor for slaves */
ht_free_slave(zval * data)502 static void ht_free_slave(zval *data) {
503 if (*(redisClusterNode**)data) {
504 cluster_free_node(*(redisClusterNode**)data);
505 }
506 }
507
508 /* Get the hash slot for a given key */
cluster_hash_key(const char * key,int len)509 unsigned short cluster_hash_key(const char *key, int len) {
510 int s, e;
511
512 // Find first occurrence of {, if any
513 for (s = 0; s < len; s++) {
514 if (key[s]=='{') break;
515 }
516
517 // There is no '{', hash everything
518 if (s == len) return crc16(key, len) & REDIS_CLUSTER_MOD;
519
520 // Found it, look for a tailing '}'
521 for (e =s + 1; e < len; e++) {
522 if (key[e] == '}') break;
523 }
524
525 // Hash the whole key if we don't find a tailing } or if {} is empty
526 if (e == len || e == s+1) return crc16(key, len) & REDIS_CLUSTER_MOD;
527
528 // Hash just the bit between { and }
529 return crc16((char*)key+s+1,e-s-1) & REDIS_CLUSTER_MOD;
530 }
531
532 /* Grab the current time in milliseconds */
mstime(void)533 long long mstime(void) {
534 struct timeval tv;
535 long long mst;
536
537 gettimeofday(&tv, NULL);
538 mst = ((long long)tv.tv_sec)*1000;
539 mst += tv.tv_usec/1000;
540
541 return mst;
542 }
543
544 /* Hash a key from a ZVAL */
cluster_hash_key_zval(zval * z_key)545 unsigned short cluster_hash_key_zval(zval *z_key) {
546 const char *kptr;
547 char buf[255];
548 int klen;
549
550 // Switch based on ZVAL type
551 switch(Z_TYPE_P(z_key)) {
552 case IS_STRING:
553 kptr = Z_STRVAL_P(z_key);
554 klen = Z_STRLEN_P(z_key);
555 break;
556 case IS_LONG:
557 klen = snprintf(buf,sizeof(buf),ZEND_LONG_FMT,Z_LVAL_P(z_key));
558 kptr = (const char *)buf;
559 break;
560 case IS_DOUBLE:
561 klen = snprintf(buf,sizeof(buf),"%f",Z_DVAL_P(z_key));
562 kptr = (const char *)buf;
563 break;
564 case IS_ARRAY:
565 kptr = "Array";
566 klen = sizeof("Array")-1;
567 break;
568 case IS_OBJECT:
569 kptr = "Object";
570 klen = sizeof("Object")-1;
571 break;
572 default:
573 kptr = "";
574 klen = 0;
575 }
576
577 // Hash the string representation
578 return cluster_hash_key(kptr, klen);
579 }
580
581 /* Fisher-Yates shuffle for integer array */
fyshuffle(int * array,size_t len)582 static void fyshuffle(int *array, size_t len) {
583 int temp, n = len;
584 size_t r;
585
586 /* Randomize */
587 while (n > 1) {
588 r = ((int)((double)n-- * (rand() / (RAND_MAX+1.0))));
589 temp = array[n];
590 array[n] = array[r];
591 array[r] = temp;
592 };
593 }
594
595 /* Execute a CLUSTER SLOTS command against the seed socket, and return the
596 * reply or NULL on failure. */
cluster_get_slots(RedisSock * redis_sock)597 clusterReply* cluster_get_slots(RedisSock *redis_sock)
598 {
599 clusterReply *r;
600 REDIS_REPLY_TYPE type;
601 long len;
602
603 // Send the command to the socket and consume reply type
604 if (redis_sock_write(redis_sock, RESP_CLUSTER_SLOTS_CMD,
605 sizeof(RESP_CLUSTER_SLOTS_CMD)-1) < 0 ||
606 redis_read_reply_type(redis_sock, &type, &len) < 0)
607 {
608 return NULL;
609 }
610
611 // Consume the rest of our response
612 if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len)) == NULL ||
613 r->type != TYPE_MULTIBULK || r->elements < 1)
614 {
615 if (r) cluster_free_reply(r, 1);
616 return NULL;
617 }
618
619 // Return our reply
620 return r;
621 }
622
623 /* Create a cluster node */
624 static redisClusterNode*
cluster_node_create(redisCluster * c,char * host,size_t host_len,unsigned short port,unsigned short slot,short slave)625 cluster_node_create(redisCluster *c, char *host, size_t host_len,
626 unsigned short port, unsigned short slot, short slave)
627 {
628 redisClusterNode *node = emalloc(sizeof(redisClusterNode));
629
630 // It lives in at least this slot, flag slave status
631 node->slot = slot;
632 node->slave = slave;
633 node->slaves = NULL;
634
635 /* Initialize our list of slot ranges */
636 zend_llist_init(&node->slots, sizeof(redisSlotRange), NULL, 0);
637
638 // Attach socket
639 node->sock = redis_sock_create(host, host_len, port,
640 c->flags->timeout, c->flags->read_timeout,
641 c->flags->persistent, NULL, 0);
642
643 /* Stream context */
644 node->sock->stream_ctx = c->flags->stream_ctx;
645
646 redis_sock_set_auth(node->sock, c->flags->user, c->flags->pass);
647
648 return node;
649 }
650
651 /* Attach a slave to a master */
652 PHP_REDIS_API int
cluster_node_add_slave(redisClusterNode * master,redisClusterNode * slave)653 cluster_node_add_slave(redisClusterNode *master, redisClusterNode *slave)
654 {
655 zend_ulong index;
656
657 // Allocate our slaves hash table if we haven't yet
658 if (!master->slaves) {
659 ALLOC_HASHTABLE(master->slaves);
660 zend_hash_init(master->slaves, 0, NULL, ht_free_slave, 0);
661 index = 1;
662 } else {
663 index = master->slaves->nNextFreeElement;
664 }
665
666 return zend_hash_index_update_ptr(master->slaves, index, slave) != NULL;
667 }
668
669 /* Sanity check/validation for CLUSTER SLOTS command */
670 #define VALIDATE_SLOTS_OUTER(r) \
671 (r->elements >= 3 && r2->element[0]->type == TYPE_INT && \
672 r->element[1]->type == TYPE_INT)
673 #define VALIDATE_SLOTS_INNER(r) \
674 (r->type == TYPE_MULTIBULK && r->elements >= 2 && \
675 r->element[0]->type == TYPE_BULK && r->element[1]->type == TYPE_INT)
676
677 /* Use the output of CLUSTER SLOTS to map our nodes */
cluster_map_slots(redisCluster * c,clusterReply * r)678 static int cluster_map_slots(redisCluster *c, clusterReply *r) {
679 redisClusterNode *pnode, *master, *slave;
680 redisSlotRange range;
681 int i,j, hlen, klen;
682 short low, high;
683 clusterReply *r2, *r3;
684 unsigned short port;
685 char *host, key[1024];
686 zend_hash_clean(c->nodes);
687 for (i = 0; i < r->elements; i++) {
688 // Inner response
689 r2 = r->element[i];
690
691 // Validate outer and master slot
692 if (!VALIDATE_SLOTS_OUTER(r2) || !VALIDATE_SLOTS_INNER(r2->element[2])) {
693 return -1;
694 }
695
696 // Master
697 r3 = r2->element[2];
698
699 // Grab our slot range, as well as master host/port
700 low = (unsigned short)r2->element[0]->integer;
701 high = (unsigned short)r2->element[1]->integer;
702 host = r3->element[0]->str;
703 hlen = r3->element[0]->len;
704 port = (unsigned short)r3->element[1]->integer;
705
706 // If the node is new, create and add to nodes. Otherwise use it.
707 klen = snprintf(key, sizeof(key), "%s:%d", host, port);
708 if ((pnode = zend_hash_str_find_ptr(c->nodes, key, klen)) == NULL) {
709 master = cluster_node_create(c, host, hlen, port, low, 0);
710 zend_hash_str_update_ptr(c->nodes, key, klen, master);
711 } else {
712 master = pnode;
713 }
714
715 // Attach slaves
716 for (j = 3; j< r2->elements; j++) {
717 r3 = r2->element[j];
718 if (!VALIDATE_SLOTS_INNER(r3)) {
719 return -1;
720 }
721
722 // Skip slaves where the host is ""
723 if (r3->element[0]->len == 0) continue;
724
725 // Attach this node to our slave
726 slave = cluster_node_create(c, r3->element[0]->str,
727 (int)r3->element[0]->len,
728 (unsigned short)r3->element[1]->integer, low, 1);
729 cluster_node_add_slave(master, slave);
730 }
731
732 // Attach this node to each slot in the range
733 for (j = low; j<= high; j++) {
734 c->master[j] = master;
735 }
736
737 /* Append to our list of slot ranges */
738 range.low = low; range.high = high;
739 zend_llist_add_element(&master->slots, &range);
740 }
741
742 // Success
743 return 0;
744 }
745
746 /* Free a redisClusterNode structure */
cluster_free_node(redisClusterNode * node)747 PHP_REDIS_API void cluster_free_node(redisClusterNode *node) {
748 if (node->slaves) {
749 zend_hash_destroy(node->slaves);
750 efree(node->slaves);
751 }
752
753 zend_llist_destroy(&node->slots);
754 redis_free_socket(node->sock);
755
756 efree(node);
757 }
758
759 /* Get or create a redisClusterNode that corresponds to the asking redirection */
cluster_get_asking_node(redisCluster * c)760 static redisClusterNode *cluster_get_asking_node(redisCluster *c) {
761 redisClusterNode *pNode;
762 char key[1024];
763 int key_len;
764
765 /* Hashed by host:port */
766 key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host, c->redir_port);
767
768 /* See if we've already attached to it */
769 if ((pNode = zend_hash_str_find_ptr(c->nodes, key, key_len)) != NULL) {
770 return pNode;
771 }
772
773 /* This host:port is unknown to us, so add it */
774 pNode = cluster_node_create(c, c->redir_host, c->redir_host_len,
775 c->redir_port, c->redir_slot, 0);
776
777 /* Return the node */
778 return pNode;
779 }
780
781 /* Get or create a node at the host:port we were asked to check, and return the
782 * redis_sock for it. */
cluster_get_asking_sock(redisCluster * c)783 static RedisSock *cluster_get_asking_sock(redisCluster *c) {
784 return cluster_get_asking_node(c)->sock;
785 }
786
787 /* Our context seeds will be a hash table with RedisSock* pointers */
ht_free_seed(zval * data)788 static void ht_free_seed(zval *data) {
789 RedisSock *redis_sock = *(RedisSock**)data;
790 if (redis_sock) redis_free_socket(redis_sock);
791 }
792
793 /* Free redisClusterNode objects we've stored */
ht_free_node(zval * data)794 static void ht_free_node(zval *data) {
795 redisClusterNode *node = *(redisClusterNode**)data;
796 cluster_free_node(node);
797 }
798
799 /* zend_llist of slot ranges -> persistent array */
slot_range_list_clone(zend_llist * src,size_t * count)800 static redisSlotRange *slot_range_list_clone(zend_llist *src, size_t *count) {
801 redisSlotRange *dst, *range;
802 size_t i = 0;
803
804 *count = zend_llist_count(src);
805 dst = pemalloc(*count * sizeof(*dst), 1);
806
807 range = zend_llist_get_first(src);
808 while (range) {
809 memcpy(&dst[i++], range, sizeof(*range));
810 range = zend_llist_get_next(src);
811 }
812
813 return dst;
814 }
815
816 /* Construct a redisCluster object */
cluster_create(double timeout,double read_timeout,int failover,int persistent)817 PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
818 int failover, int persistent)
819 {
820 redisCluster *c;
821
822 /* Actual our actual cluster structure */
823 c = ecalloc(1, sizeof(redisCluster));
824
825 /* Initialize flags and settings */
826 c->flags = ecalloc(1, sizeof(RedisSock));
827 c->flags->timeout = timeout;
828 c->flags->read_timeout = read_timeout;
829 c->flags->persistent = persistent;
830 c->subscribed_slot = -1;
831 c->clusterdown = 0;
832 c->failover = failover;
833 c->err = NULL;
834
835 /* Set up our waitms based on timeout */
836 c->waitms = (long)(1000 * timeout);
837
838 /* Allocate our seeds hash table */
839 ALLOC_HASHTABLE(c->seeds);
840 zend_hash_init(c->seeds, 0, NULL, ht_free_seed, 0);
841
842 /* Allocate our nodes HashTable */
843 ALLOC_HASHTABLE(c->nodes);
844 zend_hash_init(c->nodes, 0, NULL, ht_free_node, 0);
845
846 return c;
847 }
848
849 PHP_REDIS_API void
cluster_free(redisCluster * c,int free_ctx)850 cluster_free(redisCluster *c, int free_ctx)
851 {
852 /* Disconnect from each node we're connected to */
853 cluster_disconnect(c, 0);
854
855 /* Free any allocated prefix */
856 if (c->flags->prefix) zend_string_release(c->flags->prefix);
857
858 redis_sock_free_auth(c->flags);
859 efree(c->flags);
860
861 /* Call hash table destructors */
862 zend_hash_destroy(c->seeds);
863 zend_hash_destroy(c->nodes);
864
865 /* Free hash tables themselves */
866 efree(c->seeds);
867 efree(c->nodes);
868
869 /* Free any error we've got */
870 if (c->err) zend_string_release(c->err);
871
872 if (c->cache_key) {
873 /* Invalidate persistent cache if the cluster has changed */
874 if (c->redirections) {
875 zend_hash_del(&EG(persistent_list), c->cache_key);
876 }
877
878 /* Release our hold on the cache key */
879 zend_string_release(c->cache_key);
880 }
881
882 /* Free structure itself */
883 if (free_ctx) efree(c);
884 }
885
886 /* Create a cluster slot cache structure */
887 PHP_REDIS_API
cluster_cache_create(zend_string * hash,HashTable * nodes)888 redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes) {
889 redisCachedCluster *cc;
890 redisCachedMaster *cm;
891 redisClusterNode *node, *slave;
892
893 cc = pecalloc(1, sizeof(*cc), 1);
894 cc->hash = zend_string_dup(hash, 1);
895
896 /* Copy nodes */
897 cc->master = pecalloc(zend_hash_num_elements(nodes), sizeof(*cc->master), 1);
898 ZEND_HASH_FOREACH_PTR(nodes, node) {
899 /* Skip slaves */
900 if (node->slave) continue;
901
902 cm = &cc->master[cc->count];
903
904 /* Duplicate host/port and clone slot ranges */
905 cm->host.addr = zend_string_dup(node->sock->host, 1);
906 cm->host.port = node->sock->port;
907
908 /* Copy over slot ranges */
909 cm->slot = slot_range_list_clone(&node->slots, &cm->slots);
910
911 /* Attach any slave nodes we have. */
912 if (node->slaves) {
913 /* Allocate memory for slaves */
914 cm->slave = pecalloc(zend_hash_num_elements(node->slaves), sizeof(*cm->slave), 1);
915
916 /* Copy host/port information for each slave */
917 ZEND_HASH_FOREACH_PTR(node->slaves, slave) {
918 cm->slave[cm->slaves].addr = zend_string_dup(slave->sock->host, 1);
919 cm->slave[cm->slaves].port = slave->sock->port;
920 cm->slaves++;
921 } ZEND_HASH_FOREACH_END();
922 }
923
924 cc->count++;
925 } ZEND_HASH_FOREACH_END();
926
927 return cc;
928 }
929
cluster_free_cached_master(redisCachedMaster * cm)930 static void cluster_free_cached_master(redisCachedMaster *cm) {
931 size_t i;
932
933 /* Free each slave entry */
934 for (i = 0; i < cm->slaves; i++) {
935 zend_string_release(cm->slave[i].addr);
936 }
937
938 /* Free other elements */
939 zend_string_release(cm->host.addr);
940 pefree(cm->slave, 1);
941 pefree(cm->slot, 1);
942 }
943
944 static redisClusterNode*
cached_master_clone(redisCluster * c,redisCachedMaster * cm)945 cached_master_clone(redisCluster *c, redisCachedMaster *cm) {
946 redisClusterNode *node;
947 size_t i;
948
949 node = cluster_node_create(c, ZSTR_VAL(cm->host.addr), ZSTR_LEN(cm->host.addr),
950 cm->host.port, cm->slot[0].low, 0);
951
952 /* Now copy in our slot ranges */
953 for (i = 0; i < cm->slots; i++) {
954 zend_llist_add_element(&node->slots, &cm->slot[i]);
955 }
956
957 return node;
958 }
959
960 /* Destroy a persistent cached cluster */
cluster_cache_free(redisCachedCluster * rcc)961 PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc) {
962 size_t i;
963
964 /* Free masters */
965 for (i = 0; i < rcc->count; i++) {
966 cluster_free_cached_master(&rcc->master[i]);
967 }
968
969 zend_string_release(rcc->hash);
970 pefree(rcc->master, 1);
971 pefree(rcc, 1);
972 }
973
974 /* Initialize cluster from cached slots */
975 PHP_REDIS_API
cluster_init_cache(redisCluster * c,redisCachedCluster * cc)976 void cluster_init_cache(redisCluster *c, redisCachedCluster *cc) {
977 RedisSock *sock;
978 redisClusterNode *mnode, *slave;
979 redisCachedMaster *cm;
980 char key[HOST_NAME_MAX];
981 size_t keylen, i, j, s;
982 int *map;
983
984 /* Randomize seeds */
985 map = emalloc(sizeof(*map) * cc->count);
986 for (i = 0; i < cc->count; i++) map[i] = i;
987 fyshuffle(map, cc->count);
988
989 /* Duplicate the hash key so we can invalidate when redirected */
990 c->cache_key = zend_string_copy(cc->hash);
991
992 /* Iterate over masters */
993 for (i = 0; i < cc->count; i++) {
994 /* Grab the next master */
995 cm = &cc->master[map[i]];
996
997 /* Hash our host and port */
998 keylen = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(cm->host.addr), cm->host.port);
999
1000 /* Create socket */
1001 sock = redis_sock_create(ZSTR_VAL(cm->host.addr), ZSTR_LEN(cm->host.addr), cm->host.port,
1002 c->flags->timeout, c->flags->read_timeout, c->flags->persistent,
1003 NULL, 0);
1004
1005 /* Stream context */
1006 sock->stream_ctx = c->flags->stream_ctx;
1007
1008 /* Add to seed nodes */
1009 zend_hash_str_update_ptr(c->seeds, key, keylen, sock);
1010
1011 /* Create master node */
1012 mnode = cached_master_clone(c, cm);
1013
1014 /* Add our master */
1015 zend_hash_str_update_ptr(c->nodes, key, keylen, mnode);
1016
1017 /* Attach any slaves */
1018 for (s = 0; s < cm->slaves; s++) {
1019 zend_string *host = cm->slave[s].addr;
1020 slave = cluster_node_create(c, ZSTR_VAL(host), ZSTR_LEN(host), cm->slave[s].port, 0, 1);
1021 cluster_node_add_slave(mnode, slave);
1022 }
1023
1024 /* Hook up direct slot access */
1025 for (j = 0; j < cm->slots; j++) {
1026 for (s = cm->slot[j].low; s <= cm->slot[j].high; s++) {
1027 c->master[s] = mnode;
1028 }
1029 }
1030 }
1031
1032 efree(map);
1033 }
1034
1035 /* Initialize seeds. By the time we get here we've already validated our
1036 * seeds array and know we have a non-empty array of strings all in
1037 * host:port format. */
1038 PHP_REDIS_API void
cluster_init_seeds(redisCluster * c,zend_string ** seeds,uint32_t nseeds)1039 cluster_init_seeds(redisCluster *c, zend_string **seeds, uint32_t nseeds)
1040 {
1041 RedisSock *sock;
1042 char *seed, *sep, key[1024];
1043 int key_len, i, *map;
1044
1045 /* Get a randomized order to hit our seeds */
1046 map = ecalloc(nseeds, sizeof(*map));
1047 for (i = 0; i < nseeds; i++) map[i] = i;
1048 fyshuffle(map, nseeds);
1049
1050 for (i = 0; i < nseeds; i++) {
1051 seed = ZSTR_VAL(seeds[map[i]]);
1052
1053 sep = strrchr(seed, ':');
1054 ZEND_ASSERT(sep != NULL);
1055
1056 // Allocate a structure for this seed
1057 sock = redis_sock_create(seed, sep - seed, atoi(sep + 1),
1058 c->flags->timeout, c->flags->read_timeout,
1059 c->flags->persistent, NULL, 0);
1060
1061 /* Stream context */
1062 sock->stream_ctx = c->flags->stream_ctx;
1063
1064 /* Credentials */
1065 redis_sock_set_auth(sock, c->flags->user, c->flags->pass);
1066
1067 // Index this seed by host/port
1068 key_len = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(sock->host),
1069 sock->port);
1070
1071 // Add to our seed HashTable
1072 zend_hash_str_update_ptr(c->seeds, key, key_len, sock);
1073 }
1074
1075 efree(map);
1076 }
1077
1078 /* Initial mapping of our cluster keyspace */
cluster_map_keyspace(redisCluster * c)1079 PHP_REDIS_API int cluster_map_keyspace(redisCluster *c) {
1080 RedisSock *seed;
1081 clusterReply *slots = NULL;
1082 int mapped = 0;
1083
1084 // Iterate over seeds until we can get slots
1085 ZEND_HASH_FOREACH_PTR(c->seeds, seed) {
1086 // Attempt to connect to this seed node
1087 if (seed == NULL || redis_sock_server_open(seed) != SUCCESS) {
1088 continue;
1089 }
1090
1091 // Parse out cluster nodes. Flag mapped if we are valid
1092 slots = cluster_get_slots(seed);
1093 if (slots) {
1094 mapped = !cluster_map_slots(c, slots);
1095 // Bin anything mapped, if we failed somewhere
1096 if (!mapped) {
1097 memset(c->master, 0, sizeof(redisClusterNode*)*REDIS_CLUSTER_SLOTS);
1098 }
1099 }
1100 redis_sock_disconnect(seed, 0);
1101 if (mapped) break;
1102 } ZEND_HASH_FOREACH_END();
1103
1104 // Clean up slots reply if we got one
1105 if (slots) cluster_free_reply(slots, 1);
1106
1107 // Throw an exception if we couldn't map
1108 if (!mapped) {
1109 CLUSTER_THROW_EXCEPTION("Couldn't map cluster keyspace using any provided seed", 0);
1110 return FAILURE;
1111 }
1112
1113 return SUCCESS;
1114 }
1115
1116 /* Parse the MOVED OR ASK redirection payload when we get such a response
1117 * and apply this information to our cluster. If we encounter a parse error
1118 * nothing in the cluster will be modified, and -1 is returned. */
cluster_set_redirection(redisCluster * c,char * msg,int moved)1119 static int cluster_set_redirection(redisCluster* c, char *msg, int moved)
1120 {
1121 char *host, *port;
1122
1123 /* Move past "MOVED" or "ASK */
1124 msg += moved ? MOVED_LEN : ASK_LEN;
1125
1126 /* Make sure we can find host */
1127 if ((host = strchr(msg, ' ')) == NULL) return -1;
1128 *host++ = '\0';
1129
1130 /* Find port, searching right to left in case of IPv6 */
1131 if ((port = strrchr(host, ':')) == NULL) return -1;
1132 *port++ = '\0';
1133
1134 // Success, apply it
1135 c->redir_type = moved ? REDIR_MOVED : REDIR_ASK;
1136 strncpy(c->redir_host, host, sizeof(c->redir_host) - 1);
1137 c->redir_host_len = port - host - 1;
1138 c->redir_slot = (unsigned short)atoi(msg);
1139 c->redir_port = (unsigned short)atoi(port);
1140
1141 return 0;
1142 }
1143
1144 /* Once we write a command to a node in our cluster, this function will check
1145 * the reply type and extract information from those that will specify a length
1146 * bit. If we encounter an error condition, we'll check for MOVED or ASK
1147 * redirection, parsing out slot host and port so the caller can take
1148 * appropriate action.
1149 *
1150 * In the case of a non MOVED/ASK error, we wlll set our cluster error
1151 * condition so GetLastError can be queried by the client.
1152 *
1153 * This function will return -1 on a critical error (e.g. parse/communication
1154 * error, 0 if no redirection was encountered, and 1 if the data was moved. */
cluster_check_response(redisCluster * c,REDIS_REPLY_TYPE * reply_type)1155 static int cluster_check_response(redisCluster *c, REDIS_REPLY_TYPE *reply_type)
1156 {
1157 size_t sz;
1158
1159 // Clear out any prior error state and our last line response
1160 CLUSTER_CLEAR_ERROR(c);
1161 CLUSTER_CLEAR_REPLY(c);
1162
1163 if (-1 == redis_check_eof(c->cmd_sock, 1) ||
1164 EOF == (*reply_type = php_stream_getc(c->cmd_sock->stream)))
1165 {
1166 return -1;
1167 }
1168
1169 // In the event of an ERROR, check if it's a MOVED/ASK error
1170 if (*reply_type == TYPE_ERR) {
1171 char inbuf[4096];
1172 int moved;
1173
1174 // Attempt to read the error
1175 if (!php_stream_gets(c->cmd_sock->stream, inbuf, sizeof(inbuf))) {
1176 return -1;
1177 }
1178
1179 // Check for MOVED or ASK redirection
1180 if ((moved = IS_MOVED(inbuf)) || IS_ASK(inbuf)) {
1181 /* The Redis Cluster specification suggests clients do not update
1182 * their slot mapping for an ASK redirection, only for MOVED */
1183 if (moved) c->redirections++;
1184
1185 /* Make sure we can parse the redirection host and port */
1186 if (cluster_set_redirection(c,inbuf,moved) < 0) {
1187 return -1;
1188 }
1189
1190 /* We've been redirected */
1191 return 1;
1192 } else {
1193 // Capture the error string Redis returned
1194 cluster_set_err(c, inbuf, strlen(inbuf)-2);
1195 return 0;
1196 }
1197 }
1198
1199 // Fetch the first line of our response from Redis.
1200 if (redis_sock_gets(c->cmd_sock,c->line_reply,sizeof(c->line_reply),
1201 &sz) < 0)
1202 {
1203 return -1;
1204 }
1205
1206 // For replies that will give us a numeric length, convert it
1207 if (*reply_type != TYPE_LINE) {
1208 c->reply_len = strtol(c->line_reply, NULL, 10);
1209 } else {
1210 c->reply_len = (long long)sz;
1211 }
1212
1213 // Clear out any previous error, and return that the data is here
1214 CLUSTER_CLEAR_ERROR(c);
1215 return 0;
1216 }
1217
1218 /* Disconnect from each node we're connected to */
cluster_disconnect(redisCluster * c,int force)1219 PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force) {
1220 redisClusterNode *node, *slave;
1221
1222 ZEND_HASH_FOREACH_PTR(c->nodes, node) {
1223 if (node == NULL) continue;
1224
1225 /* Disconnect from the master */
1226 redis_sock_disconnect(node->sock, force);
1227
1228 /* We also want to disconnect any slave connections so they will be pooled
1229 * in the event we are using persistent connections and connection pooling. */
1230 if (node->slaves) {
1231 ZEND_HASH_FOREACH_PTR(node->slaves, slave) {
1232 redis_sock_disconnect(slave->sock, force);
1233 } ZEND_HASH_FOREACH_END();
1234 }
1235 } ZEND_HASH_FOREACH_END();
1236 }
1237
1238 /* This method attempts to write our command at random to the master and any
1239 * attached slaves, until we either successufly do so, or fail. */
cluster_dist_write(redisCluster * c,const char * cmd,size_t sz,int nomaster)1240 static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
1241 int nomaster)
1242 {
1243 int i, count = 1, *nodes;
1244 RedisSock *redis_sock;
1245
1246 /* Determine our overall node count */
1247 if (c->master[c->cmd_slot]->slaves) {
1248 count += zend_hash_num_elements(c->master[c->cmd_slot]->slaves);
1249 }
1250
1251 /* Allocate memory for master + slaves or just slaves */
1252 nodes = emalloc(sizeof(int)*count);
1253
1254 /* Populate our array with the master and each of it's slaves, then
1255 * randomize them, so we will pick from the master or some slave. */
1256 for (i = 0; i < count; i++) nodes[i] = i;
1257 fyshuffle(nodes, count);
1258
1259 /* Iterate through our nodes until we find one we can write to or fail */
1260 for (i = 0; i < count; i++) {
1261 /* Skip if this is the master node and we don't want to query that */
1262 if (nomaster && nodes[i] == 0)
1263 continue;
1264
1265 /* Get the slave for this index */
1266 redis_sock = cluster_slot_sock(c, c->cmd_slot, nodes[i]);
1267 if (!redis_sock) continue;
1268
1269 /* If we're not on the master, attempt to send the READONLY command to
1270 * this slave, and skip it if that fails */
1271 if (nodes[i] == 0 || redis_sock->readonly ||
1272 cluster_send_readonly(redis_sock) == 0)
1273 {
1274 /* Attempt to send the command */
1275 if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) {
1276 c->cmd_sock = redis_sock;
1277 efree(nodes);
1278 return 0;
1279 }
1280 }
1281 }
1282
1283 /* Clean up our shuffled array */
1284 efree(nodes);
1285
1286 /* Couldn't send to the master or any slave */
1287 return -1;
1288 }
1289
1290 /* Attempt to write our command to the current c->cmd_sock socket. For write
1291 * commands, we attempt to query the master for this slot, and in the event of
1292 * a failure, try to query every remaining node for a redirection.
1293 *
1294 * If we're issuing a readonly command, we use one of three strategies, depending
1295 * on our redisCluster->failover setting.
1296 *
1297 * REDIS_FAILOVER_NONE:
1298 * The command is treated just like a write command, and will only be executed
1299 * against the known master for this slot.
1300 * REDIS_FAILOVER_ERROR:
1301 * If we're unable to communicate with this slot's master, we attempt the query
1302 * against any slaves (at random) that this master has.
1303 * REDIS_FAILOVER_DISTRIBUTE:
1304 * We pick at random from the master and any slaves it has. This option will
1305 * load balance between masters and slaves
1306 * REDIS_FAILOVER_DISTRIBUTE_SLAVES:
1307 * We pick at random from slave nodes of a given master. This option is
1308 * used to load balance read queries against N slaves.
1309 *
1310 * Once we are able to find a node we can write to, we check for MOVED or
1311 * ASKING redirection, such that the keyspace can be updated.
1312 */
cluster_sock_write(redisCluster * c,const char * cmd,size_t sz,int direct)1313 static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz,
1314 int direct)
1315 {
1316 redisClusterNode *seed_node;
1317 RedisSock *redis_sock;
1318 int failover, nomaster;
1319
1320 /* First try the socket requested */
1321 redis_sock = c->cmd_sock;
1322
1323 /* Readonly is irrelevant if we're not configured to failover */
1324 failover = c->readonly && c->failover != REDIS_FAILOVER_NONE ?
1325 c->failover : REDIS_FAILOVER_NONE;
1326
1327 /* If in ASK redirection, get/create the node for that host:port, otherwise
1328 * just use the command socket. */
1329 if (c->redir_type == REDIR_ASK) {
1330 if (cluster_send_asking(c->cmd_sock) < 0) {
1331 return -1;
1332 }
1333 }
1334
1335 /* Attempt to send our command payload to the cluster. If we're not set up
1336 * to failover, just try the master. If we're configured to failover on
1337 * error, try the master and then fall back to any slaves. When we're set
1338 * up to distribute the commands, try to write to any node on this slot
1339 * at random. */
1340 if (failover == REDIS_FAILOVER_NONE) {
1341 /* Success if we can send our payload to the master */
1342 if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) return 0;
1343 } else if (failover == REDIS_FAILOVER_ERROR) {
1344 /* Try the master, then fall back to any slaves we may have */
1345 if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz) ||
1346 !cluster_dist_write(c, cmd, sz, 1)) return 0;
1347 } else {
1348 /* Include or exclude master node depending on failover option and
1349 * attempt to make our write */
1350 nomaster = failover == REDIS_FAILOVER_DISTRIBUTE_SLAVES;
1351 if (!cluster_dist_write(c, cmd, sz, nomaster)) {
1352 /* We were able to write to a master or slave at random */
1353 return 0;
1354 }
1355 }
1356
1357 /* Don't fall back if direct communication with this slot is required. */
1358 if (direct) return -1;
1359
1360 /* Fall back by attempting the request against every known node */
1361 ZEND_HASH_FOREACH_PTR(c->nodes, seed_node) {
1362 /* Skip this node if it's the one that failed, or if it's a slave */
1363 if (seed_node == NULL || seed_node->sock == redis_sock || seed_node->slave) continue;
1364
1365 /* Connect to this node if we haven't already and attempt to write our request to this node */
1366 if (CLUSTER_SEND_PAYLOAD(seed_node->sock, cmd, sz)) {
1367 c->cmd_slot = seed_node->slot;
1368 c->cmd_sock = seed_node->sock;
1369 return 0;
1370 }
1371 } ZEND_HASH_FOREACH_END();
1372
1373 /* We were unable to write to any node in our cluster */
1374 return -1;
1375 }
1376
1377 /* Helper to find if we've got a host:port mapped in our cluster nodes. */
cluster_find_node(redisCluster * c,const char * host,unsigned short port)1378 static redisClusterNode *cluster_find_node(redisCluster *c, const char *host,
1379 unsigned short port)
1380 {
1381 int key_len;
1382 char key[1024];
1383
1384 key_len = snprintf(key,sizeof(key),"%s:%d", host, port);
1385
1386 return zend_hash_str_find_ptr(c->nodes, key, key_len);
1387 }
1388
1389 /* Provided a redisCluster object, the slot where we thought data was and
1390 * the slot where data was moved, update our node mapping */
cluster_update_slot(redisCluster * c)1391 static void cluster_update_slot(redisCluster *c) {
1392 redisClusterNode *node;
1393 char key[1024];
1394 size_t klen;
1395
1396 /* Do we already have the new slot mapped */
1397 if (c->master[c->redir_slot]) {
1398 /* No need to do anything if it's the same node */
1399 if (!CLUSTER_REDIR_CMP(c, SLOT_SOCK(c,c->redir_slot))) {
1400 return;
1401 }
1402
1403 /* Check to see if we have this new node mapped */
1404 node = cluster_find_node(c, c->redir_host, c->redir_port);
1405
1406 if (node) {
1407 /* Just point to this slot */
1408 c->master[c->redir_slot] = node;
1409 } else {
1410 /* If the redirected node is a replica of the previous slot owner, a failover has taken place.
1411 We must then remap the cluster's keyspace in order to update the cluster's topology. */
1412 redisClusterNode *prev_master = SLOT(c,c->redir_slot);
1413 redisClusterNode *slave;
1414 ZEND_HASH_FOREACH_PTR(prev_master->slaves, slave) {
1415 if (slave == NULL) {
1416 continue;
1417 }
1418 if (!CLUSTER_REDIR_CMP(c, slave->sock)) {
1419 // Detected a failover, the redirected node was a replica
1420 // Remap the cluster's keyspace
1421 cluster_map_keyspace(c);
1422 return;
1423 }
1424 } ZEND_HASH_FOREACH_END();
1425
1426 /* Create our node */
1427 node = cluster_node_create(c, c->redir_host, c->redir_host_len,
1428 c->redir_port, c->redir_slot, 0);
1429
1430 /* Our node is new, so keep track of it for cleanup */
1431 klen = snprintf(key, sizeof(key), "%s:%d", c->redir_host, c->redir_port);
1432 zend_hash_str_update_ptr(c->nodes, key, klen, node);
1433
1434 /* Now point our slot at the node */
1435 c->master[c->redir_slot] = node;
1436 }
1437 } else {
1438 /* Check to see if the ip and port are mapped */
1439 node = cluster_find_node(c, c->redir_host, c->redir_port);
1440 if (!node) {
1441 node = cluster_node_create(c, c->redir_host, c->redir_host_len,
1442 c->redir_port, c->redir_slot, 0);
1443 }
1444
1445 /* Map the slot to this node */
1446 c->master[c->redir_slot] = node;
1447 }
1448
1449 /* Update slot inside of node, so it can be found for command sending */
1450 node->slot = c->redir_slot;
1451
1452 /* Make sure we unflag this node as a slave, as Redis Cluster will only ever
1453 * direct us to master nodes. */
1454 node->slave = 0;
1455 }
1456
1457 /* Abort any transaction in process, by sending DISCARD to any nodes that
1458 * have active transactions in progress. If we can't send DISCARD, we need
1459 * to disconnect as it would leave us in an undefined state. */
cluster_abort_exec(redisCluster * c)1460 PHP_REDIS_API int cluster_abort_exec(redisCluster *c) {
1461 clusterFoldItem *fi = c->multi_head;
1462
1463 /* Loop through our fold items */
1464 while (fi) {
1465 if (SLOT_SOCK(c,fi->slot)->mode == MULTI) {
1466 if (cluster_send_discard(c, fi->slot) < 0) {
1467 cluster_disconnect(c, 0);
1468 return -1;
1469 }
1470 SLOT_SOCK(c,fi->slot)->mode = ATOMIC;
1471 SLOT_SOCK(c,fi->slot)->watching = 0;
1472 }
1473 fi = fi->next;
1474 }
1475
1476 /* Update our overall cluster state */
1477 c->flags->mode = ATOMIC;
1478
1479 /* Success */
1480 return 0;
1481 }
1482
1483 /* Iterate through our slots, looking for the host/port in question. This
1484 * should perform well enough as in almost all situations, a few or a few
1485 * dozen servers will map all the slots */
cluster_find_slot(redisCluster * c,const char * host,unsigned short port)1486 PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host,
1487 unsigned short port)
1488 {
1489 int i;
1490
1491 for (i = 0; i < REDIS_CLUSTER_SLOTS; i++) {
1492 if (c->master[i] && c->master[i]->sock &&
1493 c->master[i]->sock->port == port &&
1494 !strcasecmp(ZSTR_VAL(c->master[i]->sock->host), host))
1495 {
1496 return i;
1497 }
1498 }
1499
1500 // We didn't find it
1501 return -1;
1502 }
1503
1504 /* Send a command to a specific slot */
cluster_send_slot(redisCluster * c,short slot,char * cmd,int cmd_len,REDIS_REPLY_TYPE rtype)1505 PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
1506 int cmd_len, REDIS_REPLY_TYPE rtype)
1507 {
1508 /* Point our cluster to this slot and it's socket */
1509 c->cmd_slot = slot;
1510 c->cmd_sock = SLOT_SOCK(c, slot);
1511
1512 /* Enable multi mode on this slot if we've been directed to but haven't
1513 * send it to this node yet */
1514 if (c->flags->mode == MULTI && c->cmd_sock->mode != MULTI) {
1515 if (cluster_send_multi(c, slot) == -1) {
1516 CLUSTER_THROW_EXCEPTION("Unable to enter MULTI mode on requested slot", 0);
1517 return -1;
1518 }
1519 }
1520
1521 /* Try the slot */
1522 if (cluster_sock_write(c, cmd, cmd_len, 1) == -1) {
1523 return -1;
1524 }
1525
1526 /* Check our response */
1527 if (cluster_check_response(c, &c->reply_type) != 0 ||
1528 (rtype != TYPE_EOF && rtype != c->reply_type)) return -1;
1529
1530 /* Success */
1531 return 0;
1532 }
1533
1534 /* Send a command to given slot in our cluster. If we get a MOVED or ASK error
1535 * we attempt to send the command to the node as directed. */
cluster_send_command(redisCluster * c,short slot,const char * cmd,int cmd_len)1536 PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
1537 int cmd_len)
1538 {
1539 int resp, timedout = 0;
1540 long msstart;
1541
1542 if (!SLOT(c, slot)) {
1543 zend_throw_exception_ex(redis_cluster_exception_ce, 0,
1544 "The slot %d is not covered by any node in this cluster", slot);
1545 return -1;
1546 }
1547 /* Set the slot we're operating against as well as it's socket. These can
1548 * change during our request loop if we have a master failure and are
1549 * configured to fall back to slave nodes, or if we have to fall back to
1550 * a different slot due to no nodes serving this slot being reachable. */
1551 c->cmd_slot = slot;
1552 c->cmd_sock = SLOT_SOCK(c, slot);
1553
1554 /* Get the current time in milliseconds to handle any timeout */
1555 msstart = mstime();
1556
1557 /* Our main cluster request/reply loop. This loop runs until we're able to
1558 * get a valid reply from a node, hit our "request" timeout, or encounter a
1559 * CLUSTERDOWN state from Redis Cluster. */
1560 do {
1561 /* Send MULTI to the socket if we're in MULTI mode but haven't yet */
1562 if (c->flags->mode == MULTI && CMD_SOCK(c)->mode != MULTI) {
1563 /* We have to fail if we can't send MULTI to the node */
1564 if (cluster_send_multi(c, slot) == -1) {
1565 CLUSTER_THROW_EXCEPTION("Unable to enter MULTI mode on requested slot", 0);
1566 return -1;
1567 }
1568 }
1569
1570 /* Attempt to deliver our command to the node, and that failing, to any
1571 * node until we find one that is available. */
1572 if (cluster_sock_write(c, cmd, cmd_len, 0) == -1) {
1573 /* We have to abort, as no nodes are reachable */
1574 CLUSTER_THROW_EXCEPTION("Can't communicate with any node in the cluster", 0);
1575 return -1;
1576 }
1577
1578 /* Check response and short-circuit on success or communication error */
1579 resp = cluster_check_response(c, &c->reply_type);
1580 if (resp <= 0) {
1581 break;
1582 }
1583
1584 /* Handle MOVED or ASKING redirection */
1585 if (resp == 1) {
1586 /* Abort if we're in a transaction as it will be invalid */
1587 if (c->flags->mode == MULTI) {
1588 CLUSTER_THROW_EXCEPTION("Can't process MULTI sequence when cluster is resharding", 0);
1589 return -1;
1590 }
1591
1592 if (c->redir_type == REDIR_MOVED) {
1593 /* For MOVED redirection we want to update our cached mapping */
1594 cluster_update_slot(c);
1595 c->cmd_sock = SLOT_SOCK(c, slot);
1596 } else if (c->redir_type == REDIR_ASK) {
1597 /* For ASK redirection we want to redirect but not update slot mapping */
1598 c->cmd_sock = cluster_get_asking_sock(c);
1599 }
1600 }
1601
1602 /* See if we've timed out in the command loop */
1603 timedout = c->waitms ? mstime() - msstart >= c->waitms : 0;
1604 } while (!c->clusterdown && !timedout);
1605
1606 // If we've detected the cluster is down, throw an exception
1607 if (c->clusterdown) {
1608 CLUSTER_THROW_EXCEPTION("The Redis Cluster is down (CLUSTERDOWN)", 0);
1609 return -1;
1610 } else if (timedout || resp == -1) {
1611 // Make sure the socket is reconnected, it such that it is in a clean state
1612 redis_sock_disconnect(c->cmd_sock, 1);
1613
1614 if (timedout) {
1615 CLUSTER_THROW_EXCEPTION("Timed out attempting to find data in the correct node!", 0);
1616 } else {
1617 CLUSTER_THROW_EXCEPTION("Error processing response from Redis node!", 0);
1618 }
1619
1620 return -1;
1621 }
1622
1623 /* Clear redirection flag */
1624 c->redir_type = REDIR_NONE;
1625
1626 // Success, return the slot where data exists.
1627 return 0;
1628 }
1629
1630 /* RedisCluster response handlers. These methods all have the same prototype
1631 * and set the proper return value for the calling cluster method. These
1632 * methods will never be called in the case of a communication error when
1633 * we try to send the request to the Cluster *or* if a non MOVED or ASK
1634 * error is encountered, in which case our response processing macro will
1635 * short circuit and RETURN_FALSE, as the error will have already been
1636 * consumed. */
1637
1638 /* RAW bulk response handler */
cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1639 PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
1640 redisCluster *c, void *ctx)
1641 {
1642 char *resp;
1643
1644 // Make sure we can read the response
1645 if (c->reply_type != TYPE_BULK ||
1646 (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1647 {
1648 if (c->flags->mode != MULTI) {
1649 RETURN_FALSE;
1650 } else {
1651 add_next_index_bool(&c->multi_resp, 0);
1652 return;
1653 }
1654 }
1655
1656 // Return our response raw
1657 CLUSTER_RETURN_STRING(c, resp, c->reply_len);
1658 efree(resp);
1659 }
1660
1661 PHP_REDIS_API void
cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1662 cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
1663 {
1664 char *p;
1665
1666 /* Cluster already has the reply so abort if this isn't a LINE response *or* if for
1667 * some freaky reason we don't detect a null terminator */
1668 if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) {
1669 CLUSTER_RETURN_FALSE(c);
1670 }
1671
1672 if (CLUSTER_IS_ATOMIC(c)) {
1673 CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply);
1674 } else {
1675 add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply);
1676 }
1677 }
1678
1679 /* BULK response handler */
cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1680 PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1681 void *ctx)
1682 {
1683 char *resp;
1684
1685 // Make sure we can read the response
1686 if (c->reply_type != TYPE_BULK ||
1687 (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1688 {
1689 CLUSTER_RETURN_FALSE(c);
1690 }
1691
1692 if (CLUSTER_IS_ATOMIC(c)) {
1693 if (!redis_unpack(c->flags, resp, c->reply_len, return_value)) {
1694 CLUSTER_RETURN_STRING(c, resp, c->reply_len);
1695 }
1696 } else {
1697 zval z_unpacked;
1698 if (redis_unpack(c->flags, resp, c->reply_len, &z_unpacked)) {
1699 add_next_index_zval(&c->multi_resp, &z_unpacked);
1700 } else {
1701 add_next_index_stringl(&c->multi_resp, resp, c->reply_len);
1702 }
1703 }
1704 efree(resp);
1705 }
1706
1707 /* Bulk response where we expect a double */
cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1708 PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1709 void *ctx)
1710 {
1711 char *resp;
1712 double dbl;
1713
1714 // Make sure we can read the response
1715 if (c->reply_type != TYPE_BULK ||
1716 (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
1717 {
1718 CLUSTER_RETURN_FALSE(c);
1719 }
1720
1721 // Convert to double, free response
1722 dbl = atof(resp);
1723 efree(resp);
1724
1725 CLUSTER_RETURN_DOUBLE(c, dbl);
1726 }
1727
1728 /* A boolean response. If we get here, we've consumed the '+' reply
1729 * type and will now just verify we can read the OK */
cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1730 PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1731 void *ctx)
1732 {
1733 // Check that we have +OK
1734 if (c->reply_type != TYPE_LINE || c->reply_len != 2 ||
1735 c->line_reply[0] != 'O' || c->line_reply[1] != 'K')
1736 {
1737 CLUSTER_RETURN_FALSE(c);
1738 }
1739
1740 CLUSTER_RETURN_BOOL(c, 1);
1741 }
1742
1743 /* Boolean response, specialized for PING */
cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1744 PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1745 void *ctx)
1746 {
1747 if (c->reply_type != TYPE_LINE || c->reply_len != 4 ||
1748 memcmp(c->line_reply,"PONG",sizeof("PONG")-1))
1749 {
1750 CLUSTER_RETURN_FALSE(c);
1751 }
1752
1753 CLUSTER_RETURN_BOOL(c, 1);
1754 }
1755
1756 /* 1 or 0 response, for things like SETNX */
cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1757 PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1758 void *ctx)
1759 {
1760 // Validate our reply type, and check for a zero
1761 if (c->reply_type != TYPE_INT || c->reply_len == 0) {
1762 CLUSTER_RETURN_FALSE(c);
1763 }
1764
1765 CLUSTER_RETURN_BOOL(c, 1);
1766 }
1767
1768 /* Generic integer response */
cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1769 PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1770 void *ctx)
1771 {
1772 if (c->reply_type != TYPE_INT) {
1773 CLUSTER_RETURN_FALSE(c);
1774 }
1775 CLUSTER_RETURN_LONG(c, c->reply_len);
1776 }
1777
1778 /* TYPE response handler */
cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1779 PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1780 void *ctx)
1781 {
1782 // Make sure we got the right kind of response
1783 if (c->reply_type != TYPE_LINE) {
1784 CLUSTER_RETURN_FALSE(c);
1785 }
1786
1787 // Switch on the type
1788 if (strncmp (c->line_reply, "string", 6) == 0) {
1789 CLUSTER_RETURN_LONG(c, REDIS_STRING);
1790 } else if (strncmp(c->line_reply, "set", 3) == 0) {
1791 CLUSTER_RETURN_LONG(c, REDIS_SET);
1792 } else if (strncmp(c->line_reply, "list", 4) == 0) {
1793 CLUSTER_RETURN_LONG(c, REDIS_LIST);
1794 } else if (strncmp(c->line_reply, "hash", 4) == 0) {
1795 CLUSTER_RETURN_LONG(c, REDIS_HASH);
1796 } else if (strncmp(c->line_reply, "zset", 4) == 0) {
1797 CLUSTER_RETURN_LONG(c, REDIS_ZSET);
1798 } else if (strncmp(c->line_reply, "stream", 6) == 0) {
1799 CLUSTER_RETURN_LONG(c, REDIS_STREAM);
1800 } else {
1801 CLUSTER_RETURN_LONG(c, REDIS_NOT_FOUND);
1802 }
1803 }
1804
1805 /* SUBSCRIBE/PSCUBSCRIBE handler */
cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1806 PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
1807 void *ctx)
1808 {
1809 subscribeContext *sctx = (subscribeContext*)ctx;
1810 zval z_tab, *z_tmp;
1811 int pull = 0;
1812
1813
1814 // Consume each MULTI BULK response (one per channel/pattern)
1815 while (sctx->argc--) {
1816 if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
1817 pull, mbulk_resp_loop_raw, &z_tab)
1818 ) {
1819 efree(sctx);
1820 RETURN_FALSE;
1821 }
1822
1823 if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_tab), 0)) == NULL ||
1824 strcasecmp(Z_STRVAL_P(z_tmp), sctx->kw) != 0
1825 ) {
1826 zval_dtor(&z_tab);
1827 efree(sctx);
1828 RETURN_FALSE;
1829 }
1830
1831 zval_dtor(&z_tab);
1832 pull = 1;
1833 }
1834
1835 // Set up our callback pointers
1836 zval z_ret, z_args[4];
1837 sctx->cb.retval = &z_ret;
1838 sctx->cb.params = z_args;
1839
1840 /* We're in a subscribe loop */
1841 c->subscribed_slot = c->cmd_slot;
1842
1843 /* Multibulk response, {[pattern], type, channel, payload} */
1844 while (1) {
1845 /* Arguments */
1846 zval *z_type, *z_chan, *z_pat = NULL, *z_data;
1847 int tab_idx = 1, is_pmsg;
1848
1849 // Get the next subscribe response
1850 if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, mbulk_resp_loop, &z_tab) ||
1851 (z_type = zend_hash_index_find(Z_ARRVAL(z_tab), 0)) == NULL
1852 ) {
1853 break;
1854 }
1855
1856 // Make sure we have a message or pmessage
1857 if (!strncmp(Z_STRVAL_P(z_type), "message", 7) ||
1858 !strncmp(Z_STRVAL_P(z_type), "pmessage", 8)
1859 ) {
1860 is_pmsg = *Z_STRVAL_P(z_type) == 'p';
1861 } else {
1862 zval_dtor(&z_tab);
1863 continue;
1864 }
1865
1866 if (is_pmsg && (z_pat = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL) {
1867 break;
1868 }
1869
1870 // Extract channel and data
1871 if ((z_chan = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL ||
1872 (z_data = zend_hash_index_find(Z_ARRVAL(z_tab), tab_idx++)) == NULL
1873 ) {
1874 break;
1875 }
1876
1877 // Always pass our object through
1878 z_args[0] = *getThis();
1879
1880 // Set up calbacks depending on type
1881 if (is_pmsg) {
1882 z_args[1] = *z_pat;
1883 z_args[2] = *z_chan;
1884 z_args[3] = *z_data;
1885 } else {
1886 z_args[1] = *z_chan;
1887 z_args[2] = *z_data;
1888 }
1889
1890 // Set arg count
1891 sctx->cb.param_count = tab_idx;
1892
1893 // Execute our callback
1894 if (zend_call_function(&(sctx->cb), &(sctx->cb_cache)) !=
1895 SUCCESS)
1896 {
1897 break;
1898 }
1899
1900 // If we have a return value, free it
1901 zval_ptr_dtor(&z_ret);
1902
1903 zval_dtor(&z_tab);
1904 }
1905
1906 // We're no longer subscribing, due to an error
1907 c->subscribed_slot = -1;
1908
1909 // Cleanup
1910 zval_dtor(&z_tab);
1911 efree(sctx);
1912
1913 // Failure
1914 RETURN_FALSE;
1915 }
1916
1917 /* UNSUBSCRIBE/PUNSUBSCRIBE */
cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)1918 PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,
1919 redisCluster *c, void *ctx)
1920 {
1921 subscribeContext *sctx = (subscribeContext*)ctx;
1922 zval z_tab, *z_chan, *z_flag;
1923 int pull = 0, argc = sctx->argc;
1924
1925 efree(sctx);
1926 array_init(return_value);
1927
1928 // Consume each response
1929 while (argc--) {
1930 // Fail if we didn't get an array or can't find index 1
1931 if (!cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, pull, mbulk_resp_loop_raw, &z_tab) ||
1932 (z_chan = zend_hash_index_find(Z_ARRVAL(z_tab), 1)) == NULL
1933 ) {
1934 zval_dtor(&z_tab);
1935 zval_dtor(return_value);
1936 RETURN_FALSE;
1937 }
1938
1939 // Find the flag for this channel/pattern
1940 if ((z_flag = zend_hash_index_find(Z_ARRVAL(z_tab), 2)) == NULL ||
1941 Z_STRLEN_P(z_flag) != 2
1942 ) {
1943 zval_dtor(&z_tab);
1944 zval_dtor(return_value);
1945 RETURN_FALSE;
1946 }
1947
1948 // Redis will give us either :1 or :0 here
1949 char *flag = Z_STRVAL_P(z_flag);
1950
1951 // Add result
1952 add_assoc_bool(return_value, Z_STRVAL_P(z_chan), flag[1] == '1');
1953
1954 zval_dtor(&z_tab);
1955 pull = 1;
1956 }
1957 }
1958
1959 /* Recursive MULTI BULK -> PHP style response handling */
cluster_mbulk_variant_resp(clusterReply * r,int null_mbulk_as_null,zval * z_ret)1960 static void cluster_mbulk_variant_resp(clusterReply *r, int null_mbulk_as_null,
1961 zval *z_ret)
1962 {
1963 zval z_sub_ele;
1964 long long i;
1965
1966 switch(r->type) {
1967 case TYPE_INT:
1968 add_next_index_long(z_ret, r->integer);
1969 break;
1970 case TYPE_LINE:
1971 if (r->str) {
1972 add_next_index_stringl(z_ret, r->str, r->len);
1973 } else {
1974 add_next_index_bool(z_ret, 1);
1975 }
1976 break;
1977 case TYPE_BULK:
1978 if (r->len > -1) {
1979 add_next_index_stringl(z_ret, r->str, r->len);
1980 } else {
1981 add_next_index_null(z_ret);
1982 }
1983 break;
1984 case TYPE_MULTIBULK:
1985 if (r->elements < 0 && null_mbulk_as_null) {
1986 add_next_index_null(z_ret);
1987 } else {
1988 array_init(&z_sub_ele);
1989 for (i = 0; i < r->elements; i++) {
1990 cluster_mbulk_variant_resp(r->element[i], null_mbulk_as_null, &z_sub_ele);
1991 }
1992 add_next_index_zval(z_ret, &z_sub_ele);
1993 }
1994 break;
1995 default:
1996 add_next_index_bool(z_ret, 0);
1997 break;
1998 }
1999 }
2000
2001 /* Variant response handling, for things like EVAL and various other responses
2002 * where we just map the replies from Redis type values to PHP ones directly. */
2003 static void
cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,int status_strings,void * ctx)2004 cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2005 int status_strings, void *ctx)
2006 {
2007 clusterReply *r;
2008 zval zv, *z_arr = &zv;
2009 long long i;
2010
2011 // Make sure we can read it
2012 if ((r = cluster_read_resp(c, status_strings)) == NULL) {
2013 CLUSTER_RETURN_FALSE(c);
2014 }
2015
2016 // Handle ATOMIC vs. MULTI mode in a separate switch
2017 if (CLUSTER_IS_ATOMIC(c)) {
2018 switch(r->type) {
2019 case TYPE_INT:
2020 RETVAL_LONG(r->integer);
2021 break;
2022 case TYPE_ERR:
2023 RETVAL_FALSE;
2024 break;
2025 case TYPE_LINE:
2026 if (status_strings) {
2027 RETVAL_STRINGL(r->str, r->len);
2028 } else {
2029 RETVAL_TRUE;
2030 }
2031 break;
2032 case TYPE_BULK:
2033 if (r->len < 0) {
2034 RETVAL_NULL();
2035 } else {
2036 RETVAL_STRINGL(r->str, r->len);
2037 }
2038 break;
2039 case TYPE_MULTIBULK:
2040 if (r->elements < 0 && c->flags->null_mbulk_as_null) {
2041 RETVAL_NULL();
2042 } else {
2043 array_init(z_arr);
2044 for (i = 0; i < r->elements; i++) {
2045 cluster_mbulk_variant_resp(r->element[i], c->flags->null_mbulk_as_null, z_arr);
2046 }
2047 RETVAL_ZVAL(z_arr, 0, 0);
2048 }
2049 break;
2050 default:
2051 RETVAL_FALSE;
2052 break;
2053 }
2054 } else {
2055 switch(r->type) {
2056 case TYPE_INT:
2057 add_next_index_long(&c->multi_resp, r->integer);
2058 break;
2059 case TYPE_ERR:
2060 add_next_index_bool(&c->multi_resp, 0);
2061 break;
2062 case TYPE_LINE:
2063 if (status_strings) {
2064 add_next_index_stringl(&c->multi_resp, r->str, r->len);
2065 } else {
2066 add_next_index_bool(&c->multi_resp, 1);
2067 }
2068 break;
2069 case TYPE_BULK:
2070 if (r->len < 0) {
2071 add_next_index_null(&c->multi_resp);
2072 } else {
2073 add_next_index_stringl(&c->multi_resp, r->str, r->len);
2074 }
2075 break;
2076 case TYPE_MULTIBULK:
2077 if (r->elements < 0 && c->flags->null_mbulk_as_null) {
2078 add_next_index_null(&c->multi_resp);
2079 } else {
2080 cluster_mbulk_variant_resp(r, c->flags->null_mbulk_as_null, &c->multi_resp);
2081 }
2082 break;
2083 default:
2084 add_next_index_bool(&c->multi_resp, 0);
2085 break;
2086 }
2087 }
2088
2089 // Free our response structs, but not allocated data itself
2090 cluster_free_reply(r, 1);
2091 }
2092
cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2093 PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2094 void *ctx)
2095 {
2096 cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx);
2097 }
2098
cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2099 PHP_REDIS_API void cluster_variant_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2100 void *ctx)
2101 {
2102 cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2103 c->flags->reply_literal, ctx);
2104 }
2105
cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2106 PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2107 void *ctx)
2108 {
2109 cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx);
2110 }
2111
2112 /* Generic MULTI BULK response processor */
cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,mbulk_cb cb,void * ctx)2113 PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2114 redisCluster *c, mbulk_cb cb, void *ctx)
2115 {
2116 zval z_result;
2117
2118 /* Abort if the reply isn't MULTIBULK or has an invalid length */
2119 if (c->reply_type != TYPE_MULTIBULK || c->reply_len < -1) {
2120 CLUSTER_RETURN_FALSE(c);
2121 }
2122
2123 if (c->reply_len == -1 && c->flags->null_mbulk_as_null) {
2124 ZVAL_NULL(&z_result);
2125 } else {
2126 array_init(&z_result);
2127
2128 if (c->reply_len > 0) {
2129 /* Push serialization settings from the cluster into our socket */
2130 c->cmd_sock->serializer = c->flags->serializer;
2131 c->cmd_sock->compression = c->flags->compression;
2132
2133 /* Call our specified callback */
2134 if (cb(c->cmd_sock, &z_result, c->reply_len, ctx) == FAILURE) {
2135 zval_dtor(&z_result);
2136 CLUSTER_RETURN_FALSE(c);
2137 }
2138 }
2139 }
2140
2141 // Success, make this array our return value
2142 if (CLUSTER_IS_ATOMIC(c)) {
2143 RETVAL_ZVAL(&z_result, 0, 1);
2144 } else {
2145 add_next_index_zval(&c->multi_resp, &z_result);
2146 }
2147 }
2148
2149 /* HSCAN, SSCAN, ZSCAN */
cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,REDIS_SCAN_TYPE type,long * it)2150 PHP_REDIS_API int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2151 REDIS_SCAN_TYPE type, long *it)
2152 {
2153 char *pit;
2154
2155 // We always want to see a MULTIBULK response with two elements
2156 if (c->reply_type != TYPE_MULTIBULK || c->reply_len != 2)
2157 {
2158 return FAILURE;
2159 }
2160
2161 // Read the BULK size
2162 if (cluster_check_response(c, &c->reply_type) ||
2163 c->reply_type != TYPE_BULK)
2164 {
2165 return FAILURE;
2166 }
2167
2168 // Read the iterator
2169 if ((pit = redis_sock_read_bulk_reply(c->cmd_sock,c->reply_len)) == NULL)
2170 {
2171 return FAILURE;
2172 }
2173
2174 // Push the new iterator value to our caller
2175 *it = atol(pit);
2176 efree(pit);
2177
2178 // We'll need another MULTIBULK response for the payload
2179 if (cluster_check_response(c, &c->reply_type) < 0)
2180 {
2181 return FAILURE;
2182 }
2183
2184 // Use the proper response callback depending on scan type
2185 switch(type) {
2186 case TYPE_SCAN:
2187 cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2188 break;
2189 case TYPE_SSCAN:
2190 cluster_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2191 break;
2192 case TYPE_HSCAN:
2193 cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2194 break;
2195 case TYPE_ZSCAN:
2196 cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,c,NULL);
2197 break;
2198 default:
2199 return FAILURE;
2200 }
2201
2202 // Success
2203 return SUCCESS;
2204 }
2205
2206 /* INFO response */
cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2207 PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2208 void *ctx)
2209 {
2210 zval z_result;
2211 char *info;
2212
2213 // Read our bulk response
2214 if ((info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len)) == NULL)
2215 {
2216 CLUSTER_RETURN_FALSE(c);
2217 }
2218
2219 /* Parse response, free memory */
2220 redis_parse_info_response(info, &z_result);
2221 efree(info);
2222
2223 // Return our array
2224 if (CLUSTER_IS_ATOMIC(c)) {
2225 RETVAL_ZVAL(&z_result, 0, 1);
2226 } else {
2227 add_next_index_zval(&c->multi_resp, &z_result);
2228 }
2229 }
2230
2231 /* CLIENT LIST response */
cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2232 PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2233 void *ctx)
2234 {
2235 char *info;
2236 zval z_result;
2237
2238 /* Read the bulk response */
2239 info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len);
2240 if (info == NULL) {
2241 CLUSTER_RETURN_FALSE(c);
2242 }
2243
2244 /* Parse it and free the bulk string */
2245 redis_parse_client_list_response(info, &z_result);
2246 efree(info);
2247
2248 if (CLUSTER_IS_ATOMIC(c)) {
2249 RETVAL_ZVAL(&z_result, 0, 1);
2250 } else {
2251 add_next_index_zval(&c->multi_resp, &z_result);
2252 }
2253 }
2254
2255 /* XRANGE */
2256 PHP_REDIS_API void
cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2257 cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2258 zval z_messages;
2259
2260 array_init(&z_messages);
2261
2262 c->cmd_sock->serializer = c->flags->serializer;
2263 c->cmd_sock->compression = c->flags->compression;
2264
2265 if (redis_read_stream_messages(c->cmd_sock, c->reply_len, &z_messages) < 0) {
2266 zval_dtor(&z_messages);
2267 CLUSTER_RETURN_FALSE(c);
2268 }
2269
2270 if (CLUSTER_IS_ATOMIC(c)) {
2271 RETVAL_ZVAL(&z_messages, 0, 1);
2272 } else {
2273 add_next_index_zval(&c->multi_resp, &z_messages);
2274 }
2275 }
2276
2277 /* XREAD */
2278 PHP_REDIS_API void
cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2279 cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2280 zval z_streams;
2281
2282 c->cmd_sock->serializer = c->flags->serializer;
2283 c->cmd_sock->compression = c->flags->compression;
2284
2285 if (c->reply_len == -1 && c->flags->null_mbulk_as_null) {
2286 ZVAL_NULL(&z_streams);
2287 } else {
2288 array_init(&z_streams);
2289 if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, &z_streams) < 0) {
2290 zval_dtor(&z_streams);
2291 CLUSTER_RETURN_FALSE(c);
2292 }
2293 }
2294
2295 if (CLUSTER_IS_ATOMIC(c)) {
2296 RETVAL_ZVAL(&z_streams, 0, 1);
2297 } else {
2298 add_next_index_zval(&c->multi_resp, &z_streams);
2299 }
2300 }
2301
2302 /* XCLAIM */
2303 PHP_REDIS_API void
cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2304 cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2305 zval z_msg;
2306
2307 array_init(&z_msg);
2308
2309 if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, &z_msg) < 0) {
2310 zval_dtor(&z_msg);
2311 CLUSTER_RETURN_FALSE(c);
2312 }
2313
2314 if (CLUSTER_IS_ATOMIC(c)) {
2315 RETVAL_ZVAL(&z_msg, 0, 1);
2316 } else {
2317 add_next_index_zval(&c->multi_resp, &z_msg);
2318 }
2319
2320 }
2321
2322 /* XINFO */
2323 PHP_REDIS_API void
cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2324 cluster_xinfo_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2325 {
2326 zval z_ret;
2327
2328 array_init(&z_ret);
2329 if (redis_read_xinfo_response(c->cmd_sock, &z_ret, c->reply_len) != SUCCESS) {
2330 zval_dtor(&z_ret);
2331 CLUSTER_RETURN_FALSE(c);
2332 }
2333
2334 if (CLUSTER_IS_ATOMIC(c)) {
2335 RETURN_ZVAL(&z_ret, 0, 1);
2336 }
2337 add_next_index_zval(&c->multi_resp, &z_ret);
2338 }
2339
2340 static void
cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx,int (* cb)(RedisSock *,zval *,long))2341 cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx,
2342 int (*cb)(RedisSock*, zval*, long))
2343 {
2344 zval z_ret;
2345
2346 array_init(&z_ret);
2347 if (cb(c->cmd_sock, &z_ret, c->reply_len) != SUCCESS) {
2348 zval_dtor(&z_ret);
2349 CLUSTER_RETURN_FALSE(c);
2350 }
2351
2352 if (CLUSTER_IS_ATOMIC(c)) {
2353 RETURN_ZVAL(&z_ret, 0, 1);
2354 }
2355 add_next_index_zval(&c->multi_resp, &z_ret);
2356 }
2357
2358 PHP_REDIS_API void
cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2359 cluster_acl_getuser_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2360 cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx, redis_read_acl_getuser_reply);
2361 }
2362
2363 PHP_REDIS_API void
cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2364 cluster_acl_log_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
2365 cluster_acl_custom_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx, redis_read_acl_log_reply);
2366 }
2367
2368 /* MULTI BULK response loop where we might pull the next one */
cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,int pull,mbulk_cb cb,zval * z_ret)2369 PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2370 redisCluster *c, int pull, mbulk_cb cb, zval *z_ret)
2371 {
2372 ZVAL_NULL(z_ret);
2373
2374 // Pull our next response if directed
2375 if (pull) {
2376 if (cluster_check_response(c, &c->reply_type) < 0)
2377 {
2378 return NULL;
2379 }
2380 }
2381
2382 // Validate reply type and length
2383 if (c->reply_type != TYPE_MULTIBULK || c->reply_len == -1) {
2384 return NULL;
2385 }
2386
2387 array_init(z_ret);
2388
2389 // Call our callback
2390 if (cb(c->cmd_sock, z_ret, c->reply_len, NULL) == FAILURE) {
2391 zval_dtor(z_ret);
2392 return NULL;
2393 }
2394
2395 return z_ret;
2396 }
2397
2398 /* MULTI MULTI BULK reply (for EXEC) */
cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2399 PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
2400 redisCluster *c, void *ctx)
2401 {
2402 zval *multi_resp = &c->multi_resp;
2403 array_init(multi_resp);
2404
2405 clusterFoldItem *fi = c->multi_head;
2406 while (fi) {
2407 /* Make sure our transaction didn't fail here */
2408 if (c->multi_len[fi->slot] > -1) {
2409 /* Set the slot where we should look for responses. We don't allow
2410 * failover inside a transaction, so it will be the master we have
2411 * mapped. */
2412 c->cmd_slot = fi->slot;
2413 c->cmd_sock = SLOT_SOCK(c, fi->slot);
2414
2415 if (cluster_check_response(c, &c->reply_type) < 0) {
2416 zval_dtor(multi_resp);
2417 RETURN_FALSE;
2418 }
2419
2420 fi->callback(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, fi->ctx);
2421 } else {
2422 /* Just add false */
2423 add_next_index_bool(multi_resp, 0);
2424 }
2425 fi = fi->next;
2426 }
2427
2428 // Set our return array
2429 zval_dtor(return_value);
2430 RETVAL_ZVAL(multi_resp, 0, 1);
2431 }
2432
2433 /* Generic handler for MGET */
cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2434 PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
2435 redisCluster *c, void *ctx)
2436 {
2437 clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2438
2439 /* Protect against an invalid response type, -1 response length, and failure
2440 * to consume the responses. */
2441 c->cmd_sock->serializer = c->flags->serializer;
2442 c->cmd_sock->compression = c->flags->compression;
2443 short fail = c->reply_type != TYPE_MULTIBULK || c->reply_len == -1 ||
2444 mbulk_resp_loop(c->cmd_sock, mctx->z_multi, c->reply_len, NULL) == FAILURE;
2445
2446 // If we had a failure, pad results with FALSE to indicate failure. Non
2447 // existent keys (e.g. for MGET will come back as NULL)
2448 if (fail) {
2449 while (mctx->count--) {
2450 add_next_index_bool(mctx->z_multi, 0);
2451 }
2452 }
2453
2454 // If this is the tail of our multi command, we can set our returns
2455 if (mctx->last) {
2456 if (CLUSTER_IS_ATOMIC(c)) {
2457 RETVAL_ZVAL(mctx->z_multi, 0, 1);
2458 } else {
2459 add_next_index_zval(&c->multi_resp, mctx->z_multi);
2460 }
2461
2462 efree(mctx->z_multi);
2463 }
2464
2465 // Clean up this context item
2466 efree(mctx);
2467 }
2468
2469 /* Handler for MSETNX */
cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2470 PHP_REDIS_API void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2471 void *ctx)
2472 {
2473 clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2474 int real_argc = mctx->count/2;
2475
2476 // Protect against an invalid response type
2477 if (c->reply_type != TYPE_INT) {
2478 php_error_docref(0, E_WARNING,
2479 "Invalid response type for MSETNX");
2480 while (real_argc--) {
2481 add_next_index_bool(mctx->z_multi, 0);
2482 }
2483 return;
2484 }
2485
2486 // Response will be 1/0 per key, so the client can match them up
2487 while (real_argc--) {
2488 add_next_index_long(mctx->z_multi, c->reply_len);
2489 }
2490
2491 // Set return value if it's our last response
2492 if (mctx->last) {
2493 if (CLUSTER_IS_ATOMIC(c)) {
2494 RETVAL_ZVAL(mctx->z_multi, 0, 0);
2495 } else {
2496 add_next_index_zval(&c->multi_resp, mctx->z_multi);
2497 }
2498 efree(mctx->z_multi);
2499 }
2500
2501 // Free multi context
2502 efree(mctx);
2503 }
2504
2505 /* Handler for DEL */
cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2506 PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2507 void *ctx)
2508 {
2509 clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2510
2511 // If we get an invalid reply, inform the client
2512 if (c->reply_type != TYPE_INT) {
2513 php_error_docref(0, E_WARNING,
2514 "Invalid reply type returned for DEL command");
2515 efree(mctx);
2516 return;
2517 }
2518
2519 // Increment by the number of keys deleted
2520 Z_LVAL_P(mctx->z_multi) += c->reply_len;
2521
2522 if (mctx->last) {
2523 if (CLUSTER_IS_ATOMIC(c)) {
2524 ZVAL_LONG(return_value, Z_LVAL_P(mctx->z_multi));
2525 } else {
2526 add_next_index_long(&c->multi_resp, Z_LVAL_P(mctx->z_multi));
2527 }
2528 efree(mctx->z_multi);
2529 }
2530
2531 efree(ctx);
2532 }
2533
2534 /* Handler for MSET */
cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2535 PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2536 void *ctx)
2537 {
2538 clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
2539
2540 // If we get an invalid reply type something very wrong has happened,
2541 // and we have to abort.
2542 if (c->reply_type != TYPE_LINE) {
2543 php_error_docref(0, E_ERROR,
2544 "Invalid reply type returned for MSET command");
2545 zval_dtor(mctx->z_multi);
2546 efree(mctx->z_multi);
2547 efree(mctx);
2548 RETURN_FALSE;
2549 }
2550
2551 // Set our return if it's the last call
2552 if (mctx->last) {
2553 if (CLUSTER_IS_ATOMIC(c)) {
2554 ZVAL_BOOL(return_value, zval_is_true(mctx->z_multi));
2555 } else {
2556 add_next_index_bool(&c->multi_resp, zval_is_true(mctx->z_multi));
2557 }
2558 efree(mctx->z_multi);
2559 }
2560
2561 efree(mctx);
2562 }
2563
2564 /* Raw MULTI BULK reply */
2565 PHP_REDIS_API void
cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2566 cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2567 {
2568 cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2569 mbulk_resp_loop_raw, NULL);
2570 }
2571
2572 /* Unserialize all the things */
2573 PHP_REDIS_API void
cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2574 cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
2575 {
2576 cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2577 mbulk_resp_loop, NULL);
2578 }
2579
2580 /* For handling responses where we get key, value, key, value that
2581 * we will turn into key => value, key => value. */
2582 PHP_REDIS_API void
cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2583 cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2584 void *ctx)
2585 {
2586 cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2587 mbulk_resp_loop_zipstr, NULL);
2588 }
2589
2590 /* Handling key,value to key=>value where the values are doubles */
2591 PHP_REDIS_API void
cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2592 cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2593 void *ctx)
2594 {
2595 cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2596 mbulk_resp_loop_zipdbl, NULL);
2597 }
2598
2599 /* Associate multi bulk response (for HMGET really) */
2600 PHP_REDIS_API void
cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,redisCluster * c,void * ctx)2601 cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
2602 void *ctx)
2603 {
2604 cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
2605 mbulk_resp_loop_assoc, ctx);
2606 }
2607
2608 /*
2609 * Various MULTI BULK reply callback functions
2610 */
2611
2612 /* MULTI BULK response where we don't touch the values (e.g. KEYS) */
mbulk_resp_loop_raw(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2613 int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result,
2614 long long count, void *ctx)
2615 {
2616 char *line;
2617 int line_len;
2618
2619 // Iterate over the number we have
2620 while (count--) {
2621 // Read the line, which should never come back null
2622 line = redis_sock_read(redis_sock, &line_len);
2623 if (line == NULL) return FAILURE;
2624
2625 // Add to our result array
2626 add_next_index_stringl(z_result, line, line_len);
2627 efree(line);
2628 }
2629
2630 // Success!
2631 return SUCCESS;
2632 }
2633
2634 /* MULTI BULK response where we unserialize everything */
mbulk_resp_loop(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2635 int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result,
2636 long long count, void *ctx)
2637 {
2638 char *line;
2639 int line_len;
2640
2641 /* Iterate over the lines we have to process */
2642 while (count--) {
2643 /* Read our line */
2644 line = redis_sock_read(redis_sock, &line_len);
2645
2646 if (line != NULL) {
2647 zval z_unpacked;
2648 if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2649 add_next_index_zval(z_result, &z_unpacked);
2650 } else {
2651 add_next_index_stringl(z_result, line, line_len);
2652 }
2653 efree(line);
2654 } else {
2655 add_next_index_bool(z_result, 0);
2656 }
2657 }
2658
2659 return SUCCESS;
2660 }
2661
2662 /* MULTI BULK response where we turn key1,value1 into key1=>value1 */
mbulk_resp_loop_zipstr(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2663 int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result,
2664 long long count, void *ctx)
2665 {
2666 char *line, *key = NULL;
2667 int line_len, key_len = 0;
2668 long long idx = 0;
2669
2670 // Our count will need to be divisible by 2
2671 if (count % 2 != 0) {
2672 return -1;
2673 }
2674
2675 // Iterate through our elements
2676 while (count--) {
2677 // Grab our line, bomb out on failure
2678 line = redis_sock_read(redis_sock, &line_len);
2679 if (!line) return -1;
2680
2681 if (idx++ % 2 == 0) {
2682 // Save our key and length
2683 key = line;
2684 key_len = line_len;
2685 } else {
2686 /* Attempt unpacking */
2687 zval z_unpacked;
2688 if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2689 add_assoc_zval(z_result, key, &z_unpacked);
2690 } else {
2691 add_assoc_stringl_ex(z_result, key, key_len, line, line_len);
2692 }
2693
2694 efree(line);
2695 efree(key);
2696 }
2697 }
2698
2699 return SUCCESS;
2700 }
2701
2702 /* MULTI BULK loop processor where we expect key,score key, score */
mbulk_resp_loop_zipdbl(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2703 int mbulk_resp_loop_zipdbl(RedisSock *redis_sock, zval *z_result,
2704 long long count, void *ctx)
2705 {
2706 char *line, *key = NULL;
2707 int line_len, key_len = 0;
2708 long long idx = 0;
2709
2710 // Our context will need to be divisible by 2
2711 if (count %2 != 0) {
2712 return -1;
2713 }
2714
2715 // While we have elements
2716 while (count--) {
2717 line = redis_sock_read(redis_sock, &line_len);
2718 if (line != NULL) {
2719 if (idx++ % 2 == 0) {
2720 key = line;
2721 key_len = line_len;
2722 } else {
2723 zval zv, *z = &zv;
2724 if (redis_unpack(redis_sock,key,key_len, z)) {
2725 zend_string *zstr = zval_get_string(z);
2726 add_assoc_double_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), atof(line));
2727 zend_string_release(zstr);
2728 zval_dtor(z);
2729 } else {
2730 add_assoc_double_ex(z_result, key, key_len, atof(line));
2731 }
2732
2733 /* Free our key and line */
2734 efree(key);
2735 efree(line);
2736 }
2737 }
2738 }
2739
2740 return SUCCESS;
2741 }
2742
2743 /* MULTI BULK where we're passed the keys, and we attach vals */
mbulk_resp_loop_assoc(RedisSock * redis_sock,zval * z_result,long long count,void * ctx)2744 int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
2745 long long count, void *ctx)
2746 {
2747 char *line;
2748 int line_len, i;
2749 zval *z_keys = ctx;
2750
2751 // Loop while we've got replies
2752 for (i = 0; i < count; ++i) {
2753 zend_string *zstr = zval_get_string(&z_keys[i]);
2754 line = redis_sock_read(redis_sock, &line_len);
2755
2756 if (line != NULL) {
2757 zval z_unpacked;
2758 if (redis_unpack(redis_sock, line, line_len, &z_unpacked)) {
2759 add_assoc_zval_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), &z_unpacked);
2760 } else {
2761 add_assoc_stringl_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), line, line_len);
2762 }
2763 efree(line);
2764 } else {
2765 add_assoc_bool_ex(z_result, ZSTR_VAL(zstr), ZSTR_LEN(zstr), 0);
2766 }
2767
2768 // Clean up key context
2769 zend_string_release(zstr);
2770 zval_dtor(&z_keys[i]);
2771 }
2772
2773 // Clean up our keys overall
2774 efree(z_keys);
2775
2776 // Success!
2777 return SUCCESS;
2778 }
2779
2780 /* Free an array of zend_string seeds */
free_seed_array(zend_string ** seeds,uint32_t nseeds)2781 void free_seed_array(zend_string **seeds, uint32_t nseeds) {
2782 int i;
2783
2784 if (seeds == NULL)
2785 return;
2786
2787 for (i = 0; i < nseeds; i++)
2788 zend_string_release(seeds[i]);
2789
2790 efree(seeds);
2791 }
2792
get_valid_seeds(HashTable * input,uint32_t * nseeds)2793 static zend_string **get_valid_seeds(HashTable *input, uint32_t *nseeds) {
2794 HashTable *valid;
2795 uint32_t count, idx = 0;
2796 zval *z_seed;
2797 zend_string *zkey, **seeds = NULL;
2798
2799 /* Short circuit if we don't have any sees */
2800 count = zend_hash_num_elements(input);
2801 if (count == 0)
2802 return NULL;
2803
2804 ALLOC_HASHTABLE(valid);
2805 zend_hash_init(valid, count, NULL, NULL, 0);
2806
2807 ZEND_HASH_FOREACH_VAL(input, z_seed) {
2808 ZVAL_DEREF(z_seed);
2809
2810 if (Z_TYPE_P(z_seed) != IS_STRING) {
2811 php_error_docref(NULL, E_WARNING, "Skipping non-string entry in seeds array");
2812 continue;
2813 } else if (strrchr(Z_STRVAL_P(z_seed), ':') == NULL) {
2814 php_error_docref(NULL, E_WARNING,
2815 "Seed '%s' not in host:port format, ignoring", Z_STRVAL_P(z_seed));
2816 continue;
2817 }
2818
2819 /* Add as a key to avoid duplicates */
2820 zend_hash_str_update_ptr(valid, Z_STRVAL_P(z_seed), Z_STRLEN_P(z_seed), NULL);
2821 } ZEND_HASH_FOREACH_END();
2822
2823 /* We need at least one valid seed */
2824 count = zend_hash_num_elements(valid);
2825 if (count == 0)
2826 goto cleanup;
2827
2828 /* Populate our return array */
2829 seeds = ecalloc(count, sizeof(*seeds));
2830 ZEND_HASH_FOREACH_STR_KEY(valid, zkey) {
2831 seeds[idx++] = zend_string_copy(zkey);
2832 } ZEND_HASH_FOREACH_END();
2833
2834 *nseeds = idx;
2835
2836 cleanup:
2837 zend_hash_destroy(valid);
2838 FREE_HASHTABLE(valid);
2839
2840 return seeds;
2841 }
2842
2843 /* Validate cluster construction arguments and return a sanitized and validated
2844 * array of seeds */
2845 zend_string**
cluster_validate_args(double timeout,double read_timeout,HashTable * seeds,uint32_t * nseeds,char ** errstr)2846 cluster_validate_args(double timeout, double read_timeout, HashTable *seeds,
2847 uint32_t *nseeds, char **errstr)
2848 {
2849 zend_string **retval;
2850
2851 if (timeout < 0L || timeout > INT_MAX) {
2852 if (errstr) *errstr = "Invalid timeout";
2853 return NULL;
2854 }
2855
2856 if (read_timeout < 0L || read_timeout > INT_MAX) {
2857 if (errstr) *errstr = "Invalid read timeout";
2858 return NULL;
2859 }
2860
2861 retval = get_valid_seeds(seeds, nseeds);
2862 if (retval == NULL && errstr)
2863 *errstr = "No valid seeds detected";
2864
2865 return retval;
2866 }
2867
2868 /* Helper function to compare to host:port seeds */
cluster_cmp_seeds(const void * a,const void * b)2869 static int cluster_cmp_seeds(const void *a, const void *b) {
2870 zend_string *za = *(zend_string **)a;
2871 zend_string *zb = *(zend_string **)b;
2872 return strcmp(ZSTR_VAL(za), ZSTR_VAL(zb));
2873 }
2874
cluster_swap_seeds(void * a,void * b)2875 static void cluster_swap_seeds(void *a, void *b) {
2876 zend_string **za, **zb, *tmp;
2877
2878 za = a;
2879 zb = b;
2880
2881 tmp = *za;
2882 *za = *zb;
2883 *zb = tmp;
2884 }
2885
2886 /* Turn an array of cluster seeds into a string we can cache. If we get here we know
2887 * we have at least one entry and that every entry is a string in the form host:port */
2888 #define SLOT_CACHE_PREFIX "phpredis_slots:"
cluster_hash_seeds(zend_string ** seeds,uint32_t count)2889 zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t count) {
2890 smart_str hash = {0};
2891 size_t i;
2892
2893 /* Sort our seeds so any any array with identical seeds hashes to the same key
2894 * regardless of what order the user gives them to us in. */
2895 zend_sort(seeds, count, sizeof(*seeds), cluster_cmp_seeds, cluster_swap_seeds);
2896
2897 /* Global phpredis hash prefix */
2898 smart_str_appendl(&hash, SLOT_CACHE_PREFIX, sizeof(SLOT_CACHE_PREFIX) - 1);
2899
2900 /* Construct our actual hash */
2901 for (i = 0; i < count; i++) {
2902 smart_str_appendc(&hash, '[');
2903 smart_str_append_ex(&hash, seeds[i], 0);
2904 smart_str_appendc(&hash, ']');
2905 }
2906
2907 /* Null terminate */
2908 smart_str_0(&hash);
2909
2910 /* Return the internal zend_string */
2911 return hash.s;
2912 }
2913
cluster_cache_load(zend_string * hash)2914 PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash) {
2915 zend_resource *le;
2916
2917 /* Look for cached slot information */
2918 le = zend_hash_find_ptr(&EG(persistent_list), hash);
2919
2920 if (le != NULL) {
2921 /* Sanity check on our list type */
2922 if (le->type != le_cluster_slot_cache) {
2923 php_error_docref(0, E_WARNING, "Invalid slot cache resource");
2924 return NULL;
2925 }
2926
2927 /* Success, return the cached entry */
2928 return le->ptr;
2929 }
2930
2931 /* Not found */
2932 return NULL;
2933 }
2934
2935 /* Cache a cluster's slot information in persistent_list if it's enabled */
cluster_cache_store(zend_string * hash,HashTable * nodes)2936 PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes) {
2937 redisCachedCluster *cc = cluster_cache_create(hash, nodes);
2938
2939 redis_register_persistent_resource(cc->hash, cc, le_cluster_slot_cache);
2940
2941 return SUCCESS;
2942 }
2943
2944
2945 /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */
2946