1 /*
2 +----------------------------------------------------------------------+
3 | Copyright (c) 1997-2009 The PHP Group |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 3.01 of the PHP license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.php.net/license/3_01.txt |
9 | If you did not receive a copy of the PHP license and are unable to |
10 | obtain it through the world-wide-web, please send a note to |
11 | license@php.net so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | Author: Nicolas Favre-Felix <n.favre-felix@owlient.eu> |
14 | Maintainer: Michael Grunder <michael.grunder@gmail.com> |
15 +----------------------------------------------------------------------+
16 */
17 #include "redis_array_impl.h"
18 #include "php_redis.h"
19 #include "library.h"
20
21 #include "php_variables.h"
22 #include "SAPI.h"
23 #include "ext/standard/url.h"
24 #include "ext/standard/crc32.h"
25 #include "ext/standard/md5.h"
26
27 #include "ext/hash/php_hash.h"
28
29 #define PHPREDIS_INDEX_NAME "__phpredis_array_index__"
30
31 extern zend_class_entry *redis_ce;
32
33 static RedisArray *
ra_load_hosts(RedisArray * ra,HashTable * hosts,zend_string * user,zend_string * pass,long retry_interval,zend_bool b_lazy_connect)34 ra_load_hosts(RedisArray *ra, HashTable *hosts, zend_string *user,
35 zend_string *pass, long retry_interval, zend_bool b_lazy_connect)
36 {
37 int i = 0, host_len;
38 char *host, *p;
39 short port;
40 zval *zpData;
41 redis_object *redis;
42
43 /* init connections */
44 ZEND_HASH_FOREACH_VAL(hosts, zpData) {
45 if (Z_TYPE_P(zpData) != IS_STRING) {
46 return NULL;
47 }
48
49 /* default values */
50 host = Z_STRVAL_P(zpData);
51 host_len = Z_STRLEN_P(zpData);
52 ra->hosts[i] = zend_string_init(host, host_len, 0);
53 port = 6379;
54
55 if((p = strrchr(host, ':'))) { /* found port */
56 host_len = p - host;
57 port = (short)atoi(p+1);
58 } else if(strchr(host,'/') != NULL) { /* unix socket */
59 port = -1;
60 }
61
62 /* create Redis object */
63 object_init_ex(&ra->redis[i], redis_ce);
64 redis = PHPREDIS_ZVAL_GET_OBJECT(redis_object, &ra->redis[i]);
65
66 /* create socket */
67 redis->sock = redis_sock_create(host, host_len, port, ra->connect_timeout,
68 ra->read_timeout, ra->pconnect, NULL,
69 retry_interval);
70
71 redis_sock_set_auth(redis->sock, user, pass);
72
73 if (!b_lazy_connect) {
74 if (redis_sock_server_open(redis->sock) < 0) {
75 ra->count = ++i;
76 return NULL;
77 }
78 }
79
80 ra->count = ++i;
81 } ZEND_HASH_FOREACH_END();
82
83 return ra;
84 }
85
86 /* List pure functions */
87 void
ra_init_function_table(RedisArray * ra)88 ra_init_function_table(RedisArray *ra)
89 {
90 ALLOC_HASHTABLE(ra->pure_cmds);
91 zend_hash_init(ra->pure_cmds, 0, NULL, NULL, 0);
92
93 zend_hash_str_update_ptr(ra->pure_cmds, "EXISTS", sizeof("EXISTS") - 1, NULL);
94 zend_hash_str_update_ptr(ra->pure_cmds, "GET", sizeof("GET") - 1, NULL);
95 zend_hash_str_update_ptr(ra->pure_cmds, "GETBIT", sizeof("GETBIT") - 1, NULL);
96 zend_hash_str_update_ptr(ra->pure_cmds, "GETRANGE", sizeof("GETRANGE") - 1, NULL);
97 zend_hash_str_update_ptr(ra->pure_cmds, "HEXISTS", sizeof("HEXISTS") - 1, NULL);
98 zend_hash_str_update_ptr(ra->pure_cmds, "HGET", sizeof("HGET") - 1, NULL);
99 zend_hash_str_update_ptr(ra->pure_cmds, "HGETALL", sizeof("HGETALL") - 1, NULL);
100 zend_hash_str_update_ptr(ra->pure_cmds, "HKEYS", sizeof("HKEYS") - 1, NULL);
101 zend_hash_str_update_ptr(ra->pure_cmds, "HLEN", sizeof("HLEN") - 1, NULL);
102 zend_hash_str_update_ptr(ra->pure_cmds, "HMGET", sizeof("HMGET") - 1, NULL);
103 zend_hash_str_update_ptr(ra->pure_cmds, "HVALS", sizeof("HVALS") - 1, NULL);
104 zend_hash_str_update_ptr(ra->pure_cmds, "LINDEX", sizeof("LINDEX") - 1, NULL);
105 zend_hash_str_update_ptr(ra->pure_cmds, "LLEN", sizeof("LLEN") - 1, NULL);
106 zend_hash_str_update_ptr(ra->pure_cmds, "LRANGE", sizeof("LRANGE") - 1, NULL);
107 zend_hash_str_update_ptr(ra->pure_cmds, "OBJECT", sizeof("OBJECT") - 1, NULL);
108 zend_hash_str_update_ptr(ra->pure_cmds, "SCARD", sizeof("SCARD") - 1, NULL);
109 zend_hash_str_update_ptr(ra->pure_cmds, "SDIFF", sizeof("SDIFF") - 1, NULL);
110 zend_hash_str_update_ptr(ra->pure_cmds, "SINTER", sizeof("SINTER") - 1, NULL);
111 zend_hash_str_update_ptr(ra->pure_cmds, "SISMEMBER", sizeof("SISMEMBER") - 1, NULL);
112 zend_hash_str_update_ptr(ra->pure_cmds, "SMEMBERS", sizeof("SMEMBERS") - 1, NULL);
113 zend_hash_str_update_ptr(ra->pure_cmds, "SRANDMEMBER", sizeof("SRANDMEMBER") - 1, NULL);
114 zend_hash_str_update_ptr(ra->pure_cmds, "STRLEN", sizeof("STRLEN") - 1, NULL);
115 zend_hash_str_update_ptr(ra->pure_cmds, "SUNION", sizeof("SUNION") - 1, NULL);
116 zend_hash_str_update_ptr(ra->pure_cmds, "TYPE", sizeof("TYPE") - 1, NULL);
117 zend_hash_str_update_ptr(ra->pure_cmds, "ZCARD", sizeof("ZCARD") - 1, NULL);
118 zend_hash_str_update_ptr(ra->pure_cmds, "ZCOUNT", sizeof("ZCOUNT") - 1, NULL);
119 zend_hash_str_update_ptr(ra->pure_cmds, "ZRANGE", sizeof("ZRANGE") - 1, NULL);
120 zend_hash_str_update_ptr(ra->pure_cmds, "ZRANK", sizeof("ZRANK") - 1, NULL);
121 zend_hash_str_update_ptr(ra->pure_cmds, "ZREVRANGE", sizeof("ZREVRANGE") - 1, NULL);
122 zend_hash_str_update_ptr(ra->pure_cmds, "ZREVRANGEBYSCORE", sizeof("ZREVRANGEBYSCORE") - 1, NULL);
123 zend_hash_str_update_ptr(ra->pure_cmds, "ZREVRANK", sizeof("ZREVRANK") - 1, NULL);
124 zend_hash_str_update_ptr(ra->pure_cmds, "ZSCORE", sizeof("ZSCORE") - 1, NULL);
125 }
126
127 static int
ra_find_name(const char * name)128 ra_find_name(const char *name) {
129
130 const char *ini_names, *p, *next;
131 /* php_printf("Loading redis array with name=[%s]\n", name); */
132
133 ini_names = INI_STR("redis.arrays.names");
134 for(p = ini_names; p;) {
135 next = strchr(p, ',');
136 if(next) {
137 if(strncmp(p, name, next - p) == 0) {
138 return 1;
139 }
140 } else {
141 if(strcmp(p, name) == 0) {
142 return 1;
143 }
144 break;
145 }
146 p = next + 1;
147 }
148
149 return 0;
150 }
151
152 /* load array from INI settings */
ra_load_array(const char * name)153 RedisArray *ra_load_array(const char *name) {
154 zval *z_data, z_tmp, z_fun, z_dist;
155 zval z_params_hosts;
156 zval z_params_prev;
157 RedisArray *ra = NULL;
158
159 zend_string *algorithm = NULL, *user = NULL, *pass = NULL;
160 zend_bool b_index = 0, b_autorehash = 0, b_pconnect = 0, consistent = 0;
161 zend_long l_retry_interval = 0;
162 zend_bool b_lazy_connect = 0;
163 double d_connect_timeout = 0, read_timeout = 0.0;
164 HashTable *hHosts = NULL, *hPrev = NULL;
165 size_t name_len = strlen(name);
166 char *iptr;
167
168 /* find entry */
169 if(!ra_find_name(name))
170 return ra;
171
172 ZVAL_NULL(&z_fun);
173 ZVAL_NULL(&z_dist);
174
175 /* find hosts */
176 array_init(&z_params_hosts);
177 if ((iptr = INI_STR("redis.arrays.hosts")) != NULL) {
178 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_params_hosts);
179 if ((z_data = zend_hash_str_find(Z_ARRVAL(z_params_hosts), name, name_len)) != NULL) {
180 hHosts = Z_ARRVAL_P(z_data);
181 }
182 }
183
184 /* find previous hosts */
185 array_init(&z_params_prev);
186 if ((iptr = INI_STR("redis.arrays.previous")) != NULL) {
187 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_params_prev);
188 if ((z_data = zend_hash_str_find(Z_ARRVAL(z_params_prev), name, name_len)) != NULL) {
189 if (Z_TYPE_P(z_data) == IS_ARRAY) {
190 hPrev = Z_ARRVAL_P(z_data);
191 }
192 }
193 }
194
195 /* find function */
196 if ((iptr = INI_STR("redis.arrays.functions")) != NULL) {
197 array_init(&z_tmp);
198 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
199 redis_conf_zval(Z_ARRVAL(z_tmp), name, name_len, &z_fun, 1, 0);
200 zval_dtor(&z_tmp);
201 }
202
203 /* find distributor */
204 if ((iptr = INI_STR("redis.arrays.distributor")) != NULL) {
205 array_init(&z_tmp);
206 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
207 redis_conf_zval(Z_ARRVAL(z_tmp), name, name_len, &z_dist, 1, 0);
208 zval_dtor(&z_tmp);
209 }
210
211 /* find hash algorithm */
212 if ((iptr = INI_STR("redis.arrays.algorithm")) != NULL) {
213 array_init(&z_tmp);
214 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
215 redis_conf_string(Z_ARRVAL(z_tmp), name, name_len, &algorithm);
216 zval_dtor(&z_tmp);
217 }
218
219 /* find index option */
220 if ((iptr = INI_STR("redis.arrays.index")) != NULL) {
221 array_init(&z_tmp);
222 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
223 redis_conf_zend_bool(Z_ARRVAL(z_tmp), name, name_len, &b_index);
224 zval_dtor(&z_tmp);
225 }
226
227 /* find autorehash option */
228 if ((iptr = INI_STR("redis.arrays.autorehash")) != NULL) {
229 array_init(&z_tmp);
230 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
231 redis_conf_zend_bool(Z_ARRVAL(z_tmp), name, name_len, &b_autorehash);
232 zval_dtor(&z_tmp);
233 }
234
235 /* find retry interval option */
236 if ((iptr = INI_STR("redis.arrays.retryinterval")) != NULL) {
237 array_init(&z_tmp);
238 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
239 redis_conf_long(Z_ARRVAL(z_tmp), name, name_len, &l_retry_interval);
240 zval_dtor(&z_tmp);
241 }
242
243 /* find pconnect option */
244 if ((iptr = INI_STR("redis.arrays.pconnect")) != NULL) {
245 array_init(&z_tmp);
246 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
247 redis_conf_zend_bool(Z_ARRVAL(z_tmp), name, name_len, &b_pconnect);
248 zval_dtor(&z_tmp);
249 }
250
251 /* find lazy connect option */
252 if ((iptr = INI_STR("redis.arrays.lazyconnect")) != NULL) {
253 array_init(&z_tmp);
254 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
255 redis_conf_zend_bool(Z_ARRVAL(z_tmp), name, name_len, &b_lazy_connect);
256 zval_dtor(&z_tmp);
257 }
258
259 /* find connect timeout option */
260 if ((iptr = INI_STR("redis.arrays.connecttimeout")) != NULL) {
261 array_init(&z_tmp);
262 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
263 redis_conf_double(Z_ARRVAL(z_tmp), name, name_len, &d_connect_timeout);
264 zval_dtor(&z_tmp);
265 }
266
267 /* find read timeout option */
268 if ((iptr = INI_STR("redis.arrays.readtimeout")) != NULL) {
269 array_init(&z_tmp);
270 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
271 redis_conf_double(Z_ARRVAL(z_tmp), name, name_len, &read_timeout);
272 zval_dtor(&z_tmp);
273 }
274
275 /* find consistent option */
276 if ((iptr = INI_STR("redis.arrays.consistent")) != NULL) {
277 array_init(&z_tmp);
278 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
279 if ((z_data = zend_hash_str_find(Z_ARRVAL(z_tmp), name, name_len)) != NULL) {
280 consistent = Z_TYPE_P(z_data) == IS_STRING && strncmp(Z_STRVAL_P(z_data), "1", 1) == 0;
281 }
282 zval_dtor(&z_tmp);
283 }
284
285 /* find auth option */
286 if ((iptr = INI_STR("redis.arrays.auth")) != NULL) {
287 array_init(&z_tmp);
288 sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_tmp);
289 redis_conf_auth(Z_ARRVAL(z_tmp), name, name_len, &user, &pass);
290 zval_dtor(&z_tmp);
291 }
292
293 /* create RedisArray object */
294 ra = ra_make_array(hHosts, &z_fun, &z_dist, hPrev, b_index, b_pconnect, l_retry_interval,
295 b_lazy_connect, d_connect_timeout, read_timeout, consistent, algorithm,
296 user, pass);
297 if (ra) {
298 ra->auto_rehash = b_autorehash;
299 if(ra->prev) ra->prev->auto_rehash = b_autorehash;
300 }
301
302 if (algorithm) zend_string_release(algorithm);
303 if (user) zend_string_release(user);
304 if (pass) zend_string_release(pass);
305
306 zval_dtor(&z_params_hosts);
307 zval_dtor(&z_params_prev);
308 zval_dtor(&z_dist);
309 zval_dtor(&z_fun);
310
311 return ra;
312 }
313
314 static int
ra_points_cmp(const void * v1,const void * v2)315 ra_points_cmp(const void *v1, const void *v2)
316 {
317 const ContinuumPoint *p1 = v1, *p2 = v2;
318
319 return p1->value < p2->value ? - 1 : p1->value > p2->value;
320 }
321
322 static Continuum *
ra_make_continuum(zend_string ** hosts,int nb_hosts)323 ra_make_continuum(zend_string **hosts, int nb_hosts)
324 {
325 int i, j, k, len, idx = 0;
326 char host[HOST_NAME_MAX];
327 unsigned char digest[16];
328 PHP_MD5_CTX ctx;
329 Continuum *c;
330
331 c = ecalloc(1, sizeof(*c));
332 c->nb_points = nb_hosts * 160; /* 40 hashes, 4 numbers per hash = 160 points per server */
333 c->points = ecalloc(c->nb_points, sizeof(*c->points));
334
335 for (i = 0; i < nb_hosts; ++i) {
336 for (j = 0; j < 40; ++j) {
337 len = snprintf(host, sizeof(host), "%.*s-%u", (int)ZSTR_LEN(hosts[i]), ZSTR_VAL(hosts[i]), j);
338 PHP_MD5Init(&ctx);
339 PHP_MD5Update(&ctx, host, len);
340 PHP_MD5Final(digest, &ctx);
341 for (k = 0; k < 4; ++k) {
342 c->points[idx].index = i;
343 c->points[idx++].value = (digest[3 + k * 4] << 24)
344 | (digest[2 + k * 4] << 16)
345 | (digest[1 + k * 4] << 8)
346 | (digest[k * 4]);
347 }
348 }
349 }
350 qsort(c->points, c->nb_points, sizeof(*c->points), ra_points_cmp);
351 return c;
352 }
353
354 RedisArray *
ra_make_array(HashTable * hosts,zval * z_fun,zval * z_dist,HashTable * hosts_prev,zend_bool b_index,zend_bool b_pconnect,long retry_interval,zend_bool b_lazy_connect,double connect_timeout,double read_timeout,zend_bool consistent,zend_string * algorithm,zend_string * user,zend_string * pass)355 ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev,
356 zend_bool b_index, zend_bool b_pconnect, long retry_interval,
357 zend_bool b_lazy_connect, double connect_timeout, double read_timeout,
358 zend_bool consistent, zend_string *algorithm, zend_string *user,
359 zend_string *pass)
360 {
361 int i, count;
362 RedisArray *ra;
363
364 if (!hosts || (count = zend_hash_num_elements(hosts)) == 0) return NULL;
365
366 /* create object */
367 ra = emalloc(sizeof(RedisArray));
368 ra->hosts = ecalloc(count, sizeof(*ra->hosts));
369 ra->redis = ecalloc(count, sizeof(*ra->redis));
370 ra->count = 0;
371 ra->z_multi_exec = NULL;
372 ra->index = b_index;
373 ra->auto_rehash = 0;
374 ra->pconnect = b_pconnect;
375 ra->connect_timeout = connect_timeout;
376 ra->read_timeout = read_timeout;
377 ra->continuum = NULL;
378 ra->algorithm = NULL;
379
380 if (ra_load_hosts(ra, hosts, user, pass, retry_interval, b_lazy_connect) == NULL || !ra->count) {
381 for (i = 0; i < ra->count; ++i) {
382 zval_dtor(&ra->redis[i]);
383 zend_string_release(ra->hosts[i]);
384 }
385 efree(ra->redis);
386 efree(ra->hosts);
387 efree(ra);
388 return NULL;
389 }
390
391 ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index, b_pconnect, retry_interval, b_lazy_connect, connect_timeout, read_timeout, consistent, algorithm, user, pass) : NULL;
392
393 /* init array data structures */
394 ra_init_function_table(ra);
395
396 /* Set hash function and distribtor if provided */
397 ZVAL_ZVAL(&ra->z_fun, z_fun, 1, 0);
398 ZVAL_ZVAL(&ra->z_dist, z_dist, 1, 0);
399 if (algorithm) ra->algorithm = zend_string_copy(algorithm);
400
401 /* init continuum */
402 if (consistent) {
403 ra->continuum = ra_make_continuum(ra->hosts, ra->count);
404 }
405
406 return ra;
407 }
408
409
410 /* call userland key extraction function */
411 zend_string *
ra_call_extractor(RedisArray * ra,const char * key,int key_len)412 ra_call_extractor(RedisArray *ra, const char *key, int key_len)
413 {
414 zend_string *out = NULL;
415 zval z_ret, z_argv;
416
417 /* check that we can call the extractor function */
418 if (!zend_is_callable_ex(&ra->z_fun, NULL, 0, NULL, NULL, NULL)) {
419 php_error_docref(NULL, E_ERROR, "Could not call extractor function");
420 return NULL;
421 }
422
423 ZVAL_NULL(&z_ret);
424 /* call extraction function */
425 ZVAL_STRINGL(&z_argv, key, key_len);
426 call_user_function(EG(function_table), NULL, &ra->z_fun, &z_ret, 1, &z_argv);
427
428 if (Z_TYPE(z_ret) == IS_STRING) {
429 out = zval_get_string(&z_ret);
430 }
431
432 zval_dtor(&z_argv);
433 zval_dtor(&z_ret);
434 return out;
435 }
436
437 static zend_string *
ra_extract_key(RedisArray * ra,const char * key,int key_len)438 ra_extract_key(RedisArray *ra, const char *key, int key_len)
439 {
440 char *start, *end;
441
442 if (Z_TYPE(ra->z_fun) != IS_NULL) {
443 return ra_call_extractor(ra, key, key_len);
444 } else if ((start = strchr(key, '{')) == NULL || (end = strchr(start + 1, '}')) == NULL) {
445 return zend_string_init(key, key_len, 0);
446 }
447 /* found substring */
448 return zend_string_init(start + 1, end - start - 1, 0);
449 }
450
451 /* call userland key distributor function */
452 int
ra_call_distributor(RedisArray * ra,const char * key,int key_len)453 ra_call_distributor(RedisArray *ra, const char *key, int key_len)
454 {
455 int ret;
456 zval z_ret, z_argv;
457
458 /* check that we can call the extractor function */
459 if (!zend_is_callable_ex(&ra->z_dist, NULL, 0, NULL, NULL, NULL)) {
460 php_error_docref(NULL, E_ERROR, "Could not call distributor function");
461 return -1;
462 }
463
464 ZVAL_NULL(&z_ret);
465 /* call extraction function */
466 ZVAL_STRINGL(&z_argv, key, key_len);
467 call_user_function(EG(function_table), NULL, &ra->z_dist, &z_ret, 1, &z_argv);
468
469 ret = (Z_TYPE(z_ret) == IS_LONG) ? Z_LVAL(z_ret) : -1;
470
471 zval_dtor(&z_argv);
472 zval_dtor(&z_ret);
473 return ret;
474 }
475
476 zval *
ra_find_node(RedisArray * ra,const char * key,int key_len,int * out_pos)477 ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos)
478 {
479 int pos;
480 zend_string *out;
481
482 /* extract relevant part of the key */
483 if ((out = ra_extract_key(ra, key, key_len)) == NULL) {
484 return NULL;
485 }
486
487 if (Z_TYPE(ra->z_dist) == IS_NULL) {
488 int i;
489 unsigned long ret = 0xffffffff;
490 const php_hash_ops *ops;
491
492 /* hash */
493 if (ra->algorithm && (ops = redis_hash_fetch_ops(ra->algorithm))) {
494 void *ctx = emalloc(ops->context_size);
495 unsigned char *digest = emalloc(ops->digest_size);
496
497 #if PHP_VERSION_ID >= 80100
498 ops->hash_init(ctx,NULL);
499 #else
500 ops->hash_init(ctx);
501 #endif
502 ops->hash_update(ctx, (const unsigned char *)ZSTR_VAL(out), ZSTR_LEN(out));
503 ops->hash_final(digest, ctx);
504
505 memcpy(&ret, digest, MIN(sizeof(ret), ops->digest_size));
506 ret %= 0xffffffff;
507
508 efree(digest);
509 efree(ctx);
510 } else {
511 for (i = 0; i < ZSTR_LEN(out); ++i) {
512 CRC32(ret, ZSTR_VAL(out)[i]);
513 }
514 }
515
516 /* get position on ring */
517 if (ra->continuum) {
518 int left = 0, right = ra->continuum->nb_points;
519 while (left < right) {
520 i = (int)((left + right) / 2);
521 if (ra->continuum->points[i].value < ret) {
522 left = i + 1;
523 } else {
524 right = i;
525 }
526 }
527 if (right == ra->continuum->nb_points) {
528 right = 0;
529 }
530 pos = ra->continuum->points[right].index;
531 } else {
532 pos = (int)((ret ^ 0xffffffff) * ra->count / 0xffffffff);
533 }
534 } else {
535 pos = ra_call_distributor(ra, key, key_len);
536 if (pos < 0 || pos >= ra->count) {
537 zend_string_release(out);
538 return NULL;
539 }
540 }
541 zend_string_release(out);
542
543 if(out_pos) *out_pos = pos;
544
545 return &ra->redis[pos];
546 }
547
548 zval *
ra_find_node_by_name(RedisArray * ra,zend_string * host)549 ra_find_node_by_name(RedisArray *ra, zend_string *host) {
550
551 int i;
552 for(i = 0; i < ra->count; ++i) {
553 if (zend_string_equals(host, ra->hosts[i])) {
554 return &ra->redis[i];
555 }
556 }
557 return NULL;
558 }
559
560 void
ra_index_multi(zval * z_redis,long multi_value)561 ra_index_multi(zval *z_redis, long multi_value) {
562
563 zval z_fun_multi, z_ret;
564 zval z_args[1];
565
566 /* run MULTI */
567 ZVAL_STRINGL(&z_fun_multi, "MULTI", 5);
568 ZVAL_LONG(&z_args[0], multi_value);
569 call_user_function(&redis_ce->function_table, z_redis, &z_fun_multi, &z_ret, 1, z_args);
570 zval_dtor(&z_fun_multi);
571 zval_dtor(&z_ret);
572 }
573
574 static void
ra_index_change_keys(const char * cmd,zval * z_keys,zval * z_redis)575 ra_index_change_keys(const char *cmd, zval *z_keys, zval *z_redis) {
576
577 int i, argc;
578 zval z_fun, z_ret, *z_args;
579
580 /* alloc */
581 argc = 1 + zend_hash_num_elements(Z_ARRVAL_P(z_keys));
582 z_args = ecalloc(argc, sizeof(zval));
583
584 /* prepare first parameters */
585 ZVAL_STRING(&z_fun, cmd);
586 ZVAL_STRINGL(&z_args[0], PHPREDIS_INDEX_NAME, sizeof(PHPREDIS_INDEX_NAME) - 1);
587
588 /* prepare keys */
589 for(i = 0; i < argc - 1; ++i) {
590 zval *zv = zend_hash_index_find(Z_ARRVAL_P(z_keys), i);
591 if (zv == NULL) {
592 ZVAL_NULL(&z_args[i+1]);
593 } else {
594 z_args[i+1] = *zv;
595 }
596 }
597
598 /* run cmd */
599 call_user_function(&redis_ce->function_table, z_redis, &z_fun, &z_ret, argc, z_args);
600
601 zval_dtor(&z_args[0]);
602 zval_dtor(&z_fun);
603 zval_dtor(&z_ret);
604 efree(z_args); /* free container */
605 }
606
607 void
ra_index_del(zval * z_keys,zval * z_redis)608 ra_index_del(zval *z_keys, zval *z_redis) {
609 ra_index_change_keys("SREM", z_keys, z_redis);
610 }
611
612 void
ra_index_keys(zval * z_pairs,zval * z_redis)613 ra_index_keys(zval *z_pairs, zval *z_redis) {
614
615 zval z_keys, *z_val;
616 zend_string *zkey;
617 zend_ulong idx;
618
619 /* Initialize key array */
620 array_init_size(&z_keys, zend_hash_num_elements(Z_ARRVAL_P(z_pairs)));
621
622 /* Go through input array and add values to the key array */
623 ZEND_HASH_FOREACH_KEY_VAL(Z_ARRVAL_P(z_pairs), idx, zkey, z_val) {
624 zval z_new;
625
626 PHPREDIS_NOTUSED(z_val);
627
628 if (zkey) {
629 ZVAL_STRINGL(&z_new, ZSTR_VAL(zkey), ZSTR_LEN(zkey));
630 } else {
631 ZVAL_LONG(&z_new, idx);
632 }
633 zend_hash_next_index_insert(Z_ARRVAL(z_keys), &z_new);
634 } ZEND_HASH_FOREACH_END();
635
636 /* add keys to index */
637 ra_index_change_keys("SADD", &z_keys, z_redis);
638
639 /* cleanup */
640 zval_dtor(&z_keys);
641 }
642
643 void
ra_index_key(const char * key,int key_len,zval * z_redis)644 ra_index_key(const char *key, int key_len, zval *z_redis) {
645
646 zval z_fun_sadd, z_ret, z_args[2];
647
648 /* prepare args */
649 ZVAL_STRINGL(&z_fun_sadd, "SADD", 4);
650
651 ZVAL_STRINGL(&z_args[0], PHPREDIS_INDEX_NAME, sizeof(PHPREDIS_INDEX_NAME) - 1);
652 ZVAL_STRINGL(&z_args[1], key, key_len);
653
654 /* run SADD */
655 call_user_function(&redis_ce->function_table, z_redis, &z_fun_sadd, &z_ret, 2, z_args);
656 zval_dtor(&z_fun_sadd);
657 zval_dtor(&z_args[1]);
658 zval_dtor(&z_args[0]);
659 zval_dtor(&z_ret);
660 }
661
662 void
ra_index_exec(zval * z_redis,zval * return_value,int keep_all)663 ra_index_exec(zval *z_redis, zval *return_value, int keep_all) {
664
665 zval z_fun_exec, z_ret, *zp_tmp;
666
667 /* run EXEC */
668 ZVAL_STRINGL(&z_fun_exec, "EXEC", 4);
669 call_user_function(&redis_ce->function_table, z_redis, &z_fun_exec, &z_ret, 0, NULL);
670 zval_dtor(&z_fun_exec);
671
672 /* extract first element of exec array and put into return_value. */
673 if(Z_TYPE(z_ret) == IS_ARRAY) {
674 if(return_value) {
675 if(keep_all) {
676 zp_tmp = &z_ret;
677 RETVAL_ZVAL(zp_tmp, 1, 0);
678 } else if ((zp_tmp = zend_hash_index_find(Z_ARRVAL(z_ret), 0)) != NULL) {
679 RETVAL_ZVAL(zp_tmp, 1, 0);
680 }
681 }
682 }
683 zval_dtor(&z_ret);
684
685 /* zval *zptr = &z_ret; */
686 /* php_var_dump(&zptr, 0); */
687 }
688
689 void
ra_index_discard(zval * z_redis,zval * return_value)690 ra_index_discard(zval *z_redis, zval *return_value) {
691
692 zval z_fun_discard, z_ret;
693
694 /* run DISCARD */
695 ZVAL_STRINGL(&z_fun_discard, "DISCARD", 7);
696 call_user_function(&redis_ce->function_table, z_redis, &z_fun_discard, &z_ret, 0, NULL);
697
698 zval_dtor(&z_fun_discard);
699 zval_dtor(&z_ret);
700 }
701
702 void
ra_index_unwatch(zval * z_redis,zval * return_value)703 ra_index_unwatch(zval *z_redis, zval *return_value) {
704
705 zval z_fun_unwatch, z_ret;
706
707 /* run UNWATCH */
708 ZVAL_STRINGL(&z_fun_unwatch, "UNWATCH", 7);
709 call_user_function(&redis_ce->function_table, z_redis, &z_fun_unwatch, &z_ret, 0, NULL);
710
711 zval_dtor(&z_fun_unwatch);
712 zval_dtor(&z_ret);
713 }
714
715 zend_bool
ra_is_write_cmd(RedisArray * ra,const char * cmd,int cmd_len)716 ra_is_write_cmd(RedisArray *ra, const char *cmd, int cmd_len) {
717
718 zend_bool ret;
719 int i;
720 char *cmd_up = emalloc(1 + cmd_len);
721 /* convert to uppercase */
722 for(i = 0; i < cmd_len; ++i)
723 cmd_up[i] = toupper(cmd[i]);
724 cmd_up[cmd_len] = 0;
725
726 ret = zend_hash_str_exists(ra->pure_cmds, cmd_up, cmd_len);
727
728 efree(cmd_up);
729 return !ret;
730 }
731
732 /* run TYPE to find the type */
733 static zend_bool
ra_get_key_type(zval * z_redis,const char * key,int key_len,zval * z_from,long * res)734 ra_get_key_type(zval *z_redis, const char *key, int key_len, zval *z_from, long *res) {
735
736 int i = 0;
737 zval z_fun, z_ret, z_arg, *z_data;
738 long success = 1;
739
740 /* Pipelined */
741 ra_index_multi(z_from, PIPELINE);
742
743 /* prepare args */
744 ZVAL_STRINGL(&z_arg, key, key_len);
745
746 /* run TYPE */
747 ZVAL_NULL(&z_ret);
748 ZVAL_STRINGL(&z_fun, "TYPE", 4);
749 call_user_function(&redis_ce->function_table, z_redis, &z_fun, &z_ret, 1, &z_arg);
750 zval_dtor(&z_fun);
751 zval_dtor(&z_ret);
752
753 /* run TYPE */
754 ZVAL_NULL(&z_ret);
755 ZVAL_STRINGL(&z_fun, "TTL", 3);
756 call_user_function(&redis_ce->function_table, z_redis, &z_fun, &z_ret, 1, &z_arg);
757 zval_dtor(&z_fun);
758 zval_dtor(&z_ret);
759
760 /* Get the result from the pipeline. */
761 ra_index_exec(z_from, &z_ret, 1);
762 if (Z_TYPE(z_ret) == IS_ARRAY) {
763 ZEND_HASH_FOREACH_VAL(Z_ARRVAL(z_ret), z_data) {
764 if (z_data == NULL || Z_TYPE_P(z_data) != IS_LONG) {
765 success = 0;
766 break;
767 }
768 /* Get the result - Might change in the future to handle doubles as well */
769 res[i++] = Z_LVAL_P(z_data);
770 } ZEND_HASH_FOREACH_END();
771 }
772 zval_dtor(&z_arg);
773 zval_dtor(&z_ret);
774 return success;
775 }
776
777 /* delete key from source server index during rehashing */
778 static void
ra_remove_from_index(zval * z_redis,const char * key,int key_len)779 ra_remove_from_index(zval *z_redis, const char *key, int key_len) {
780
781 zval z_fun_srem, z_ret, z_args[2];
782
783 /* run SREM on source index */
784 ZVAL_STRINGL(&z_fun_srem, "SREM", 4);
785 ZVAL_STRINGL(&z_args[0], PHPREDIS_INDEX_NAME, sizeof(PHPREDIS_INDEX_NAME) - 1);
786 ZVAL_STRINGL(&z_args[1], key, key_len);
787
788 call_user_function(&redis_ce->function_table, z_redis, &z_fun_srem, &z_ret, 2, z_args);
789
790 /* cleanup */
791 zval_dtor(&z_fun_srem);
792 zval_dtor(&z_args[1]);
793 zval_dtor(&z_args[0]);
794 zval_dtor(&z_ret);
795 }
796
797
798 /* delete key from source server during rehashing */
799 static zend_bool
ra_del_key(const char * key,int key_len,zval * z_from)800 ra_del_key(const char *key, int key_len, zval *z_from) {
801
802 zval z_fun_del, z_ret, z_args[1];
803
804 /* in a transaction */
805 ra_index_multi(z_from, MULTI);
806
807 /* run DEL on source */
808 ZVAL_STRINGL(&z_fun_del, "DEL", 3);
809 ZVAL_STRINGL(&z_args[0], key, key_len);
810 call_user_function(&redis_ce->function_table, z_from, &z_fun_del, &z_ret, 1, z_args);
811 zval_dtor(&z_fun_del);
812 zval_dtor(&z_args[0]);
813 zval_dtor(&z_ret);
814
815 /* remove key from index */
816 ra_remove_from_index(z_from, key, key_len);
817
818 /* close transaction */
819 ra_index_exec(z_from, NULL, 0);
820
821 return 1;
822 }
823
824 static zend_bool
ra_expire_key(const char * key,int key_len,zval * z_to,long ttl)825 ra_expire_key(const char *key, int key_len, zval *z_to, long ttl) {
826
827 zval z_fun_expire, z_ret, z_args[2];
828
829 if (ttl > 0)
830 {
831 /* run EXPIRE on target */
832 ZVAL_STRINGL(&z_fun_expire, "EXPIRE", 6);
833 ZVAL_STRINGL(&z_args[0], key, key_len);
834 ZVAL_LONG(&z_args[1], ttl);
835 call_user_function(&redis_ce->function_table, z_to, &z_fun_expire, &z_ret, 2, z_args);
836 zval_dtor(&z_fun_expire);
837 zval_dtor(&z_args[0]);
838 zval_dtor(&z_ret);
839 }
840
841 return 1;
842 }
843
844 static zend_bool
ra_move_zset(const char * key,int key_len,zval * z_from,zval * z_to,long ttl)845 ra_move_zset(const char *key, int key_len, zval *z_from, zval *z_to, long ttl) {
846
847 zval z_fun_zrange, z_fun_zadd, z_ret, z_ret_dest, z_args[4], *z_zadd_args, *z_score_p;
848 int i, count;
849 HashTable *h_zset_vals;
850 zend_string *zkey;
851 zend_ulong idx;
852
853 /* run ZRANGE key 0 -1 WITHSCORES on source */
854 ZVAL_STRINGL(&z_fun_zrange, "ZRANGE", 6);
855 ZVAL_STRINGL(&z_args[0], key, key_len);
856 ZVAL_STRINGL(&z_args[1], "0", 1);
857 ZVAL_STRINGL(&z_args[2], "-1", 2);
858 ZVAL_BOOL(&z_args[3], 1);
859 call_user_function(&redis_ce->function_table, z_from, &z_fun_zrange, &z_ret, 4, z_args);
860 zval_dtor(&z_fun_zrange);
861 zval_dtor(&z_args[2]);
862 zval_dtor(&z_args[1]);
863 zval_dtor(&z_args[0]);
864
865
866 if(Z_TYPE(z_ret) != IS_ARRAY) { /* key not found or replaced */
867 /* TODO: report? */
868 zval_dtor(&z_ret);
869 return 0;
870 }
871
872 /* we now have an array of value → score pairs in z_ret. */
873 h_zset_vals = Z_ARRVAL(z_ret);
874
875 /* allocate argument array for ZADD */
876 count = zend_hash_num_elements(h_zset_vals);
877 z_zadd_args = ecalloc((1 + 2*count), sizeof(zval));
878
879 ZVAL_STRINGL(&z_zadd_args[0], key, key_len);
880
881 i = 1;
882 ZEND_HASH_FOREACH_KEY_VAL(h_zset_vals, idx, zkey, z_score_p) {
883 /* add score */
884 ZVAL_DOUBLE(&z_zadd_args[i], Z_DVAL_P(z_score_p));
885
886 /* add value */
887 if (zkey) {
888 ZVAL_STRINGL(&z_zadd_args[i+1], ZSTR_VAL(zkey), ZSTR_LEN(zkey));
889 } else {
890 ZVAL_LONG(&z_zadd_args[i+1], (long)idx);
891 }
892 i += 2;
893 } ZEND_HASH_FOREACH_END();
894
895 /* run ZADD on target */
896 ZVAL_STRINGL(&z_fun_zadd, "ZADD", 4);
897 call_user_function(&redis_ce->function_table, z_to, &z_fun_zadd, &z_ret_dest, 1 + 2 * count, z_zadd_args);
898
899 /* Expire if needed */
900 ra_expire_key(key, key_len, z_to, ttl);
901
902 /* cleanup */
903 zval_dtor(&z_fun_zadd);
904 zval_dtor(&z_ret_dest);
905 zval_dtor(&z_ret);
906
907 /* Free the array itself */
908 for (i = 0; i < 1 + 2 * count; i++) {
909 zval_dtor(&z_zadd_args[i]);
910 }
911 efree(z_zadd_args);
912
913 return 1;
914 }
915
916 static zend_bool
ra_move_string(const char * key,int key_len,zval * z_from,zval * z_to,long ttl)917 ra_move_string(const char *key, int key_len, zval *z_from, zval *z_to, long ttl) {
918
919 zval z_fun_get, z_fun_set, z_ret, z_args[3];
920
921 /* run GET on source */
922 ZVAL_STRINGL(&z_fun_get, "GET", 3);
923 ZVAL_STRINGL(&z_args[0], key, key_len);
924 call_user_function(&redis_ce->function_table, z_from, &z_fun_get, &z_ret, 1, z_args);
925 zval_dtor(&z_fun_get);
926
927 if(Z_TYPE(z_ret) != IS_STRING) { /* key not found or replaced */
928 /* TODO: report? */
929 zval_dtor(&z_args[0]);
930 zval_dtor(&z_ret);
931 return 0;
932 }
933
934 /* run SET on target */
935 if (ttl > 0) {
936 ZVAL_STRINGL(&z_fun_set, "SETEX", 5);
937 ZVAL_LONG(&z_args[1], ttl);
938 ZVAL_STRINGL(&z_args[2], Z_STRVAL(z_ret), Z_STRLEN(z_ret)); /* copy z_ret to arg 1 */
939 zval_dtor(&z_ret); /* free memory from our previous call */
940 call_user_function(&redis_ce->function_table, z_to, &z_fun_set, &z_ret, 3, z_args);
941 /* cleanup */
942 zval_dtor(&z_args[2]);
943 } else {
944 ZVAL_STRINGL(&z_fun_set, "SET", 3);
945 ZVAL_STRINGL(&z_args[1], Z_STRVAL(z_ret), Z_STRLEN(z_ret)); /* copy z_ret to arg 1 */
946 zval_dtor(&z_ret); /* free memory from our previous return value */
947 call_user_function(&redis_ce->function_table, z_to, &z_fun_set, &z_ret, 2, z_args);
948 /* cleanup */
949 zval_dtor(&z_args[1]);
950 }
951 zval_dtor(&z_fun_set);
952 zval_dtor(&z_args[0]);
953 zval_dtor(&z_ret);
954
955 return 1;
956 }
957
958 static zend_bool
ra_move_hash(const char * key,int key_len,zval * z_from,zval * z_to,long ttl)959 ra_move_hash(const char *key, int key_len, zval *z_from, zval *z_to, long ttl) {
960 zval z_fun_hgetall, z_fun_hmset, z_ret_dest, z_args[2];
961
962 /* run HGETALL on source */
963 ZVAL_STRINGL(&z_args[0], key, key_len);
964 ZVAL_STRINGL(&z_fun_hgetall, "HGETALL", 7);
965 call_user_function(&redis_ce->function_table, z_from, &z_fun_hgetall, &z_args[1], 1, z_args);
966 zval_dtor(&z_fun_hgetall);
967
968 if (Z_TYPE(z_args[1]) != IS_ARRAY) { /* key not found or replaced */
969 /* TODO: report? */
970 zval_dtor(&z_args[1]);
971 zval_dtor(&z_args[0]);
972 return 0;
973 }
974
975 /* run HMSET on target */
976 ZVAL_STRINGL(&z_fun_hmset, "HMSET", 5);
977 call_user_function(&redis_ce->function_table, z_to, &z_fun_hmset, &z_ret_dest, 2, z_args);
978 zval_dtor(&z_fun_hmset);
979 zval_dtor(&z_ret_dest);
980
981 /* Expire if needed */
982 ra_expire_key(key, key_len, z_to, ttl);
983
984 /* cleanup */
985 zval_dtor(&z_args[1]);
986 zval_dtor(&z_args[0]);
987
988 return 1;
989 }
990
991 static zend_bool
ra_move_collection(const char * key,int key_len,zval * z_from,zval * z_to,int list_count,const char ** cmd_list,int add_count,const char ** cmd_add,long ttl)992 ra_move_collection(const char *key, int key_len, zval *z_from, zval *z_to,
993 int list_count, const char **cmd_list,
994 int add_count, const char **cmd_add, long ttl) {
995
996 zval z_fun_retrieve, z_fun_sadd, z_ret, *z_retrieve_args, *z_sadd_args, *z_data_p;
997 int count, i;
998 HashTable *h_set_vals;
999
1000 /* run retrieval command on source */
1001 ZVAL_STRING(&z_fun_retrieve, cmd_list[0]); /* set the command */
1002
1003 z_retrieve_args = ecalloc(list_count, sizeof(zval));
1004 /* set the key */
1005 ZVAL_STRINGL(&z_retrieve_args[0], key, key_len);
1006
1007 /* possibly add some other args if they were provided. */
1008 for(i = 1; i < list_count; ++i) {
1009 ZVAL_STRING(&z_retrieve_args[i], cmd_list[i]);
1010 }
1011
1012 call_user_function(&redis_ce->function_table, z_from, &z_fun_retrieve, &z_ret, list_count, z_retrieve_args);
1013
1014 /* cleanup */
1015 zval_dtor(&z_fun_retrieve);
1016 for(i = 0; i < list_count; ++i) {
1017 zval_dtor(&z_retrieve_args[i]);
1018 }
1019 efree(z_retrieve_args);
1020
1021 if(Z_TYPE(z_ret) != IS_ARRAY) { /* key not found or replaced */
1022 /* TODO: report? */
1023 zval_dtor(&z_ret);
1024 return 0;
1025 }
1026
1027 /* run SADD/RPUSH on target */
1028 h_set_vals = Z_ARRVAL(z_ret);
1029 count = 1 + zend_hash_num_elements(h_set_vals);
1030 ZVAL_STRING(&z_fun_sadd, cmd_add[0]);
1031 z_sadd_args = ecalloc(count, sizeof(zval));
1032 ZVAL_STRINGL(&z_sadd_args[0], key, key_len);
1033
1034 i = 1;
1035 ZEND_HASH_FOREACH_VAL(h_set_vals, z_data_p) {
1036 /* add set elements */
1037 ZVAL_ZVAL(&z_sadd_args[i], z_data_p, 1, 0);
1038 i++;
1039 } ZEND_HASH_FOREACH_END();
1040
1041 /* Clean up our input return value */
1042 zval_dtor(&z_ret);
1043
1044 call_user_function(&redis_ce->function_table, z_to, &z_fun_sadd, &z_ret, count, z_sadd_args);
1045
1046 /* cleanup */
1047 zval_dtor(&z_fun_sadd);
1048 for (i = 0; i < count; i++) {
1049 zval_dtor(&z_sadd_args[i]);
1050 }
1051 efree(z_sadd_args);
1052
1053 /* Clean up our output return value */
1054 zval_dtor(&z_ret);
1055
1056 /* Expire if needed */
1057 ra_expire_key(key, key_len, z_to, ttl);
1058
1059 return 1;
1060 }
1061
1062 static zend_bool
ra_move_set(const char * key,int key_len,zval * z_from,zval * z_to,long ttl)1063 ra_move_set(const char *key, int key_len, zval *z_from, zval *z_to, long ttl) {
1064
1065 const char *cmd_list[] = {"SMEMBERS"};
1066 const char *cmd_add[] = {"SADD"};
1067 return ra_move_collection(key, key_len, z_from, z_to, 1, cmd_list, 1, cmd_add, ttl);
1068 }
1069
1070 static zend_bool
ra_move_list(const char * key,int key_len,zval * z_from,zval * z_to,long ttl)1071 ra_move_list(const char *key, int key_len, zval *z_from, zval *z_to, long ttl) {
1072
1073 const char *cmd_list[] = {"LRANGE", "0", "-1"};
1074 const char *cmd_add[] = {"RPUSH"};
1075 return ra_move_collection(key, key_len, z_from, z_to, 3, cmd_list, 1, cmd_add, ttl);
1076 }
1077
1078 void
ra_move_key(const char * key,int key_len,zval * z_from,zval * z_to)1079 ra_move_key(const char *key, int key_len, zval *z_from, zval *z_to) {
1080
1081 long res[2] = {0}, type, ttl;
1082 zend_bool success = 0;
1083 if (ra_get_key_type(z_from, key, key_len, z_from, res)) {
1084 type = res[0];
1085 ttl = res[1];
1086 /* open transaction on target server */
1087 ra_index_multi(z_to, MULTI);
1088 switch(type) {
1089 case REDIS_STRING:
1090 success = ra_move_string(key, key_len, z_from, z_to, ttl);
1091 break;
1092
1093 case REDIS_SET:
1094 success = ra_move_set(key, key_len, z_from, z_to, ttl);
1095 break;
1096
1097 case REDIS_LIST:
1098 success = ra_move_list(key, key_len, z_from, z_to, ttl);
1099 break;
1100
1101 case REDIS_ZSET:
1102 success = ra_move_zset(key, key_len, z_from, z_to, ttl);
1103 break;
1104
1105 case REDIS_HASH:
1106 success = ra_move_hash(key, key_len, z_from, z_to, ttl);
1107 break;
1108
1109 default:
1110 /* TODO: report? */
1111 break;
1112 }
1113 }
1114
1115 if(success) {
1116 ra_del_key(key, key_len, z_from);
1117 ra_index_key(key, key_len, z_to);
1118 }
1119
1120 /* close transaction */
1121 ra_index_exec(z_to, NULL, 0);
1122 }
1123
1124 /* callback with the current progress, with hostname and count */
1125 static void
zval_rehash_callback(zend_fcall_info * z_cb,zend_fcall_info_cache * z_cb_cache,zend_string * hostname,long count)1126 zval_rehash_callback(zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache,
1127 zend_string *hostname, long count) {
1128
1129 zval zv, *z_ret = &zv;
1130
1131 ZVAL_NULL(z_ret);
1132
1133 zval z_args[2];
1134
1135 ZVAL_STRINGL(&z_args[0], ZSTR_VAL(hostname), ZSTR_LEN(hostname));
1136 ZVAL_LONG(&z_args[1], count);
1137
1138 z_cb->params = z_args;
1139 z_cb->retval = z_ret;
1140
1141 z_cb->param_count = 2;
1142
1143 /* run cb(hostname, count) */
1144 zend_call_function(z_cb, z_cb_cache);
1145
1146 /* cleanup */
1147 zval_dtor(&z_args[0]);
1148 zval_dtor(z_ret);
1149 }
1150
1151 static void
ra_rehash_server(RedisArray * ra,zval * z_redis,zend_string * hostname,zend_bool b_index,zend_fcall_info * z_cb,zend_fcall_info_cache * z_cb_cache)1152 ra_rehash_server(RedisArray *ra, zval *z_redis, zend_string *hostname, zend_bool b_index,
1153 zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache) {
1154
1155 HashTable *h_keys;
1156 long count = 0;
1157 zval z_fun, z_ret, z_argv, *z_ele;
1158
1159 /* list all keys */
1160 if (b_index) {
1161 ZVAL_STRING(&z_fun, "SMEMBERS");
1162 ZVAL_STRING(&z_argv, PHPREDIS_INDEX_NAME);
1163 } else {
1164 ZVAL_STRING(&z_fun, "KEYS");
1165 ZVAL_STRING(&z_argv, "*");
1166 }
1167 ZVAL_NULL(&z_ret);
1168 call_user_function(&redis_ce->function_table, z_redis, &z_fun, &z_ret, 1, &z_argv);
1169 zval_dtor(&z_argv);
1170 zval_dtor(&z_fun);
1171
1172 if (Z_TYPE(z_ret) == IS_ARRAY) {
1173 h_keys = Z_ARRVAL(z_ret);
1174 count = zend_hash_num_elements(h_keys);
1175 }
1176
1177 if (!count) {
1178 zval_dtor(&z_ret);
1179 return;
1180 }
1181
1182 /* callback */
1183 if(z_cb && z_cb_cache) {
1184 zval_rehash_callback(z_cb, z_cb_cache, hostname, count);
1185 }
1186
1187 /* for each key, redistribute */
1188 ZEND_HASH_FOREACH_VAL(h_keys, z_ele) {
1189 int pos = 0;
1190 /* check that we're not moving to the same node. */
1191 zval *z_target = ra_find_node(ra, Z_STRVAL_P(z_ele), Z_STRLEN_P(z_ele), &pos);
1192
1193 if (z_target && !zend_string_equals(hostname, ra->hosts[pos])) { /* different host */
1194 ra_move_key(Z_STRVAL_P(z_ele), Z_STRLEN_P(z_ele), z_redis, z_target);
1195 }
1196
1197 } ZEND_HASH_FOREACH_END();
1198
1199 /* cleanup */
1200 zval_dtor(&z_ret);
1201 }
1202
1203 void
ra_rehash(RedisArray * ra,zend_fcall_info * z_cb,zend_fcall_info_cache * z_cb_cache)1204 ra_rehash(RedisArray *ra, zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache) {
1205 int i;
1206
1207 /* redistribute the data, server by server. */
1208 if(!ra->prev)
1209 return; /* TODO: compare the two rings for equality */
1210
1211 for(i = 0; i < ra->prev->count; ++i) {
1212 ra_rehash_server(ra, &ra->prev->redis[i], ra->prev->hosts[i], ra->index, z_cb, z_cb_cache);
1213 }
1214 }
1215
1216