1 #include "redis_nodeset.h"
2
3 #include <assert.h>
4 #include "store-private.h"
5 #include <store/store_common.h>
6 #include "store.h"
7 #include "redis_nginx_adapter.h"
8 #include "redis_nodeset_parser.h"
9 #include "redis_lua_commands.h"
10
11 //#define DEBUG_LEVEL NGX_LOG_WARN
12 #define DEBUG_LEVEL NGX_LOG_DEBUG
13
14 #define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "REDIS NODESET: " fmt, ##args)
15 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "REDIS NODESET: " fmt, ##args)
16
17 static redis_nodeset_t redis_nodeset[NCHAN_MAX_NODESETS];
18 static int redis_nodeset_count = 0;
19 static char *redis_worker_id = NULL;
20 static char *nchan_redis_blankname = "";
21 static redisCallbackFn *redis_subscribe_callback = NULL;
22
23 typedef struct {
24 ngx_event_t ev;
25 ngx_int_t (*cb)(redis_nodeset_t *, void *);
26 void *pd;
27 redis_nodeset_t *ns;
28 } nodeset_onready_callback_t;
29
30
31 static ngx_str_t default_redis_url = ngx_string(NCHAN_REDIS_DEFAULT_URL);
32
33 static void node_connector_callback(redisAsyncContext *ac, void *rep, void *privdata);
34 static int nodeset_cluster_keyslot_space_complete(redis_nodeset_t *ns);
35 static nchan_redis_ip_range_t *node_ip_blacklisted(redis_nodeset_t *ns, redis_connect_params_t *rcp);
36 static char *nodeset_name_cstr(redis_nodeset_t *nodeset, char *buf, size_t maxlen);
37
rbtree_cluster_keyslots_node_id(void * data)38 static void *rbtree_cluster_keyslots_node_id(void *data) {
39 return &((redis_nodeset_slot_range_node_t *)data)->range;
40 }
rbtree_cluster_keyslots_bucketer(void * vid)41 static uint32_t rbtree_cluster_keyslots_bucketer(void *vid) {
42 return 1; //no buckets
43 }
rbtree_cluster_keyslots_compare(void * v1,void * v2)44 static ngx_int_t rbtree_cluster_keyslots_compare(void *v1, void *v2) {
45 redis_slot_range_t *r1 = v1;
46 redis_slot_range_t *r2 = v2;
47
48 if(r1->max < r2->min) //r1 is strictly left of r2
49 return -1;
50 else if(r1->min > r2->max) //r1 is strictly right of r2
51 return 1;
52 else //there's an overlap
53 return 0;
54 }
55
reply_str_ok(redisReply * reply)56 static int reply_str_ok(redisReply *reply) {
57 return (reply != NULL && reply->type != REDIS_REPLY_ERROR && reply->type == REDIS_REPLY_STRING);
58 }
reply_status_ok(redisReply * reply)59 static int reply_status_ok(redisReply *reply) {
60 return (
61 reply != NULL
62 && reply->type != REDIS_REPLY_ERROR
63 && reply->type == REDIS_REPLY_STATUS
64 && reply->str
65 && strcmp(reply->str, "OK") == 0
66 );
67 }
68
nodeset_cluster_node_index_keyslot_ranges(redis_node_t * node)69 static int nodeset_cluster_node_index_keyslot_ranges(redis_node_t *node) {
70 unsigned i;
71 ngx_rbtree_node_t *rbtree_node;
72 redis_nodeset_slot_range_node_t *keyslot_tree_node;
73 rbtree_seed_t *tree = &node->nodeset->cluster.keyslots;
74 if(node->cluster.slot_range.indexed) {
75 node_log_error(node, "cluster keyslot range already indexed");
76 return 0;
77 }
78
79 for(i=0; i<node->cluster.slot_range.n; i++) {
80 if(nodeset_node_find_by_range(node->nodeset, &node->cluster.slot_range.range[i])) { //overlap!
81 return 0;
82 }
83 }
84
85 for(i=0; i<node->cluster.slot_range.n; i++) {
86 rbtree_node = rbtree_create_node(tree, sizeof(*keyslot_tree_node));
87 keyslot_tree_node = rbtree_data_from_node(rbtree_node);
88 keyslot_tree_node->range = node->cluster.slot_range.range[i];
89 keyslot_tree_node->node = node;
90 if(rbtree_insert_node(tree, rbtree_node) != NGX_OK) {
91 node_log_error(node, "couldn't insert keyslot node range %d-%d", keyslot_tree_node->range.min, keyslot_tree_node->range.max);
92 rbtree_destroy_node(tree, rbtree_node);
93 return 0;
94 }
95 else {
96 node_log_info(node, "inserted keyslot node range %d-%d", keyslot_tree_node->range.min, keyslot_tree_node->range.max);
97 }
98 }
99 node->cluster.slot_range.indexed = 1;
100 return 1;
101 }
102 /*
103 static ngx_int_t print_slot_range_node(rbtree_seed_t *tree, void *node_data, void *privdata) {
104 redis_nodeset_slot_range_node_t *rangenode = node_data;
105 node_log_notice(rangenode->node, "slots [%d - %d]", rangenode->range.min, rangenode->range.max);
106 return NGX_OK;
107 }
108 */
nodeset_cluster_node_unindex_keyslot_ranges(redis_node_t * node)109 static int nodeset_cluster_node_unindex_keyslot_ranges(redis_node_t *node) {
110 ngx_rbtree_node_t *rbtree_node;
111 redis_slot_range_t *range;
112 rbtree_seed_t *tree = &node->nodeset->cluster.keyslots;
113 unsigned i;
114 if(!node->cluster.slot_range.indexed) {
115 //node_log_notice(node, "already unindexed");
116 return 1;
117 }
118 //node_log_notice(node, "unindex keyslot ranges");
119
120 //rbtree_walk_incr(tree, print_slot_range_node, NULL);
121
122 for(i=0; i<node->cluster.slot_range.n; i++) {
123 range = &node->cluster.slot_range.range[i];
124 //node_log_notice(node, "unindexing range [%d - %d]", range->min, range->max);
125 if((rbtree_node = rbtree_find_node(tree, range)) != NULL) {
126 rbtree_remove_node(tree, rbtree_node);
127 rbtree_destroy_node(tree, rbtree_node);
128 }
129 else {
130 node_log_error(node, "unable to unindex keyslot range %d-%d: range not found in tree", range->min, range->max);
131 raise(SIGABRT);
132 }
133 }
134 node->cluster.slot_range.indexed = 0;
135 return 1;
136 }
137
138 #if REDIS_NODESET_DBG
139
140
nodeset_debug_rangetree_collector(rbtree_seed_t * tree,void * node_data,void * privdata)141 static ngx_int_t nodeset_debug_rangetree_collector(rbtree_seed_t *tree, void *node_data, void *privdata) {
142 redis_nodeset_slot_range_node_t *rangenode = node_data;
143 redis_nodeset_dbg_range_tree_t *dbg = privdata;
144 dbg->node[dbg->n].range = rangenode->range;
145 dbg->node[dbg->n].node = rangenode->node;
146 dbg->n++;
147 return NGX_OK;
148 }
149
nodeset_update_debuginfo(redis_nodeset_t * nodeset)150 static redis_node_dbg_list_t *nodeset_update_debuginfo(redis_nodeset_t *nodeset) {
151 rbtree_seed_t *tree = &nodeset->cluster.keyslots;
152 redis_node_dbg_list_t *node_dbg = &nodeset->dbg.nodes;
153 redis_node_t *cur;
154 ngx_memzero(&nodeset->dbg, sizeof(nodeset->dbg));
155 for(cur = nchan_list_first(&nodeset->nodes); cur != NULL; cur = nchan_list_next(cur)) {
156 node_dbg->node[node_dbg->n++]=cur;
157 }
158 nodeset->dbg.keyspace_complete = nodeset_cluster_keyslot_space_complete(nodeset);
159 rbtree_walk_incr(tree, nodeset_debug_rangetree_collector, &nodeset->dbg.ranges);
160 return NGX_OK;
161 }
162 #endif
163
__rcp_cstr(redis_connect_params_t * rcp,char * buf)164 static const char *__rcp_cstr(redis_connect_params_t *rcp, char *buf) {
165 ngx_snprintf((u_char *)buf, 512, "%V:%d%Z", &rcp->hostname, rcp->port, &rcp->peername);
166 return buf;
167 }
168
__node_cstr(redis_node_t * node,char * buf)169 static const char *__node_cstr(redis_node_t *node, char *buf) {
170 return __rcp_cstr(&node->connect_params, buf);
171 }
172
rcp_cstr(redis_connect_params_t * rcp)173 static const char *rcp_cstr(redis_connect_params_t *rcp) {
174 static char buf[512];
175 return __rcp_cstr(rcp, buf);
176 }
node_cstr(redis_node_t * node)177 static const char *node_cstr(redis_node_t *node) {
178 return rcp_cstr(&node->connect_params);
179 }
180
181 #define MAX_RUN_ID_LENGTH 64
182 #define MAX_CLUSTER_ID_LENGTH 64
183 #define MAX_VERSION_LENGTH 16
184 typedef struct {
185 redis_node_t node;
186 u_char peername[INET6_ADDRSTRLEN + 2];
187 u_char run_id[MAX_RUN_ID_LENGTH];
188 u_char cluster_id[MAX_CLUSTER_ID_LENGTH];
189 u_char version[MAX_VERSION_LENGTH];
190 } node_blob_t;
191
192 static void nodeset_check_status_event(ngx_event_t *ev);
193
nodeset_ready(redis_nodeset_t * nodeset)194 int nodeset_ready(redis_nodeset_t *nodeset) {
195 return nodeset && nodeset->status == REDIS_NODESET_READY;
196 }
197
nodeset_initialize(char * worker_id,redisCallbackFn * subscribe_handler)198 ngx_int_t nodeset_initialize(char *worker_id, redisCallbackFn *subscribe_handler) {
199 redis_worker_id = worker_id;
200 redis_subscribe_callback = subscribe_handler;
201 return NGX_OK;
202 }
203
nodeset_create(nchan_loc_conf_t * lcf)204 redis_nodeset_t *nodeset_create(nchan_loc_conf_t *lcf) {
205 nchan_redis_conf_t *rcf = &lcf->redis;
206 redis_nodeset_t *ns = &redis_nodeset[redis_nodeset_count]; //incremented once everything is ok
207 assert(rcf->enabled);
208 assert(!rcf->nodeset);
209
210 ns->first_loc_conf = lcf;
211
212 if(redis_nodeset_count >= NCHAN_MAX_NODESETS) {
213 nchan_log_error("Cannot create more than %d Redis nodesets", NCHAN_MAX_NODESETS);
214 return NULL;
215 }
216
217 assert(!nodeset_find(rcf)); //must be unique
218
219 nchan_list_init(&ns->urls, sizeof(ngx_str_t *), "redis urls");
220 nchan_list_init(&ns->nodes, sizeof(node_blob_t), "redis nodes");
221 nchan_list_init(&ns->onready_callbacks, sizeof(nodeset_onready_callback_t), "nodeset onReady callbacks");
222
223 nchan_slist_init(&ns->channels.all, rdstore_channel_head_t, redis.slist.nodeset.prev, redis.slist.nodeset.next);
224 nchan_slist_init(&ns->channels.disconnected_cmd, rdstore_channel_head_t, redis.slist.node_cmd.prev, redis.slist.node_cmd.next);
225 nchan_slist_init(&ns->channels.disconnected_pubsub, rdstore_channel_head_t, redis.slist.node_pubsub.prev, redis.slist.node_pubsub.next);
226
227 ns->reconnect_delay_sec = 5;
228 ns->current_status_times_checked = 0;
229 ns->current_status_start = 0;
230 ns->generation = 0;
231 ns->settings.namespace = &rcf->namespace;
232 ns->settings.storage_mode = rcf->storage_mode;
233 ns->settings.nostore_fastpublish = rcf->nostore_fastpublish;
234
235 ns->settings.ping_interval = rcf->ping_interval;
236 ns->settings.cluster_check_interval = rcf->cluster_check_interval;
237
238 ns->status = REDIS_NODESET_DISCONNECTED;
239 ngx_memzero(&ns->status_check_ev, sizeof(ns->status_check_ev));
240 ns->status_msg = NULL;
241 nchan_init_timer(&ns->status_check_ev, nodeset_check_status_event, ns);
242
243 //init cluster stuff
244 ns->cluster.enabled = 0;
245 rbtree_init(&ns->cluster.keyslots, "redis cluster node (by keyslot) data", rbtree_cluster_keyslots_node_id, rbtree_cluster_keyslots_bucketer, rbtree_cluster_keyslots_compare);
246
247 //urls
248 if(rcf->upstream) {
249 nchan_srv_conf_t *scf = NULL;
250 scf = ngx_http_conf_upstream_srv_conf(rcf->upstream, ngx_nchan_module);
251
252 ngx_uint_t i;
253 ngx_array_t *servers = rcf->upstream->servers;
254 ngx_http_upstream_server_t *usrv = servers->elts;
255 ngx_str_t *upstream_url, **urlref;
256 ns->upstream = rcf->upstream;
257
258 ns->settings.connect_timeout = scf->redis.connect_timeout == NGX_CONF_UNSET_MSEC ? NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC : scf->redis.connect_timeout;
259 ns->settings.node_weight.master = scf->redis.master_weight == NGX_CONF_UNSET ? 1 : scf->redis.master_weight;
260 ns->settings.node_weight.slave = scf->redis.slave_weight == NGX_CONF_UNSET ? 1 : scf->redis.slave_weight;
261
262 ns->settings.optimize_target = scf->redis.optimize_target == NCHAN_REDIS_OPTIMIZE_UNSET ? NCHAN_REDIS_OPTIMIZE_CPU : scf->redis.optimize_target;
263
264 ns->settings.blacklist.count = scf->redis.blacklist_count;
265 ns->settings.blacklist.list = scf->redis.blacklist;
266
267 for(i=0; i < servers->nelts; i++) {
268 #if nginx_version >= 1007002
269 upstream_url = &usrv[i].name;
270 #else
271 upstream_url = &usrv[i].addrs->name;
272 #endif
273 urlref = nchan_list_append(&ns->urls);
274 *urlref = upstream_url;
275 }
276 }
277 else {
278 ns->upstream = NULL;
279 ns->settings.connect_timeout = NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC;
280 ns->settings.node_weight.master = 1;
281 ns->settings.node_weight.slave = 1;
282 ns->settings.blacklist.count = 0;
283 ns->settings.blacklist.list = NULL;
284 ngx_str_t **urlref = nchan_list_append(&ns->urls);
285 *urlref = rcf->url.len > 0 ? &rcf->url : &default_redis_url;
286 }
287 DBG("nodeset created");
288
289 char buf[1024];
290 nodeset_name_cstr(ns, buf, 1024);
291 if(strlen(buf)>0) {
292 ns->name = ngx_alloc(strlen(buf)+1, ngx_cycle->log);
293 strcpy(ns->name, buf);
294 }
295 else {
296 ns->name = nchan_redis_blankname;
297 }
298 redis_nodeset_count++;
299 rcf->nodeset = ns;
300 return ns;
301 }
302
nodeset_find(nchan_redis_conf_t * rcf)303 redis_nodeset_t *nodeset_find(nchan_redis_conf_t *rcf) {
304 if(rcf->nodeset) {
305 return rcf->nodeset;
306 }
307 else {
308 int i;
309 redis_nodeset_t *ns;
310 for(i=0; i<redis_nodeset_count; i++) {
311 ns = &redis_nodeset[i];
312 if(nchan_ngx_str_match(&rcf->namespace, ns->settings.namespace) && rcf->storage_mode == ns->settings.storage_mode) {
313 if(rcf->upstream) {
314 if(ns->upstream == rcf->upstream)
315 return ns;
316 }
317 else {
318 ngx_str_t *search_url = rcf->url.len > 0 ? &rcf->url : &default_redis_url;
319 ngx_str_t **first_url = nchan_list_first(&ns->urls);
320
321 if(first_url && nchan_ngx_str_match(search_url, *first_url)) {
322 //cache it
323 rcf->nodeset = ns;
324 if(rcf->ping_interval > 0 && ns->settings.ping_interval > rcf->ping_interval) {
325 //use the smallest ping interval found in the settings
326 ns->settings.ping_interval = rcf->ping_interval;
327 }
328 return ns;
329 }
330 }
331 }
332 }
333 return NULL;
334 }
335 }
336
node_transfer_slaves(redis_node_t * src,redis_node_t * dst)337 static int node_transfer_slaves(redis_node_t *src, redis_node_t *dst) {
338 int transferred = 0;
339 redis_node_t **cur;
340 for(cur = nchan_list_first(&src->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
341 node_set_master_node(*cur, dst);
342 node_add_slave_node(dst, *cur); //won't be added if it's already there
343 transferred++;
344 }
345 return transferred;
346 }
347
equal_redis_connect_params(void * d1,void * d2)348 static int equal_redis_connect_params(void *d1, void *d2) {
349 redis_connect_params_t *cp1 = d1;
350 redis_connect_params_t *cp2 = d2;
351 if(cp1->port != cp2->port || cp1->db != cp2->db) {
352 return 0;
353 }
354 if( nchan_ngx_str_nonzero_match(&cp1->hostname, &cp2->hostname)
355 || nchan_ngx_str_nonzero_match(&cp1->peername, &cp2->peername)
356 || nchan_ngx_str_nonzero_match(&cp1->peername, &cp2->hostname)
357 || nchan_ngx_str_nonzero_match(&cp1->hostname, &cp2->peername)) {
358 return 1;
359 }
360 else {
361 return 0;
362 }
363 }
364
equal_nonzero_strings(void * s1,void * s2)365 static int equal_nonzero_strings(void *s1, void *s2) {
366 return ((ngx_str_t *)s1)->len > 0 && ((ngx_str_t *)s2)->len > 0 &&
367 nchan_ngx_str_match((ngx_str_t *)s1, (ngx_str_t *)s2);
368 }
369
370 typedef struct {
371 char *name;
372 off_t offset;
373 int (*match)(void *, void *);
374 } node_match_t;
375
376 static struct {
377 node_match_t run_id;
378 node_match_t cluster_id;
379 node_match_t connect_params;
380 } _node_match = {
381 .run_id = {"run_id", offsetof(redis_node_t, run_id), equal_nonzero_strings},
382 .cluster_id = {"cluster_id", offsetof(redis_node_t, cluster.id), equal_nonzero_strings},
383 .connect_params = {"url", offsetof(redis_node_t, connect_params), equal_redis_connect_params}
384 };
385
nodeset_node_deduplicate_by(redis_node_t * node,node_match_t * match)386 static int nodeset_node_deduplicate_by(redis_node_t *node, node_match_t *match) {
387 redis_node_t *cur;
388 void *d1, *d2;
389 d1 = &((char *)node)[match->offset];
390 for(cur = nchan_list_first(&node->nodeset->nodes); cur != NULL; cur = nchan_list_next(cur)) {
391 d2 = &((char *)cur)[match->offset];
392 if(cur != node && match->match(d1, d2)) {
393 node_log_notice(node, "deduplicated by %s", match->name);
394 node_transfer_slaves(node, cur); //node->cur
395 nodeset_node_destroy(node);
396 return 1;
397 }
398 }
399 return 0;
400 }
401
nodeset_node_deduplicate_by_connect_params(redis_node_t * node)402 int nodeset_node_deduplicate_by_connect_params(redis_node_t *node) {
403 return nodeset_node_deduplicate_by(node, &_node_match.connect_params);
404 }
nodeset_node_deduplicate_by_run_id(redis_node_t * node)405 int nodeset_node_deduplicate_by_run_id(redis_node_t *node) {
406 return nodeset_node_deduplicate_by(node, &_node_match.run_id);
407 }
nodeset_node_deduplicate_by_cluster_id(redis_node_t * node)408 int nodeset_node_deduplicate_by_cluster_id(redis_node_t *node) {
409 return nodeset_node_deduplicate_by(node, &_node_match.cluster_id);
410 }
411
nodeset_node_find_by(redis_nodeset_t * ns,node_match_t * match,void * data)412 static redis_node_t *nodeset_node_find_by(redis_nodeset_t *ns, node_match_t *match, void *data) {
413 redis_node_t *cur;
414 void *d2;
415 for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
416 d2 = &((char *)cur)[match->offset];
417 if(match->match(data, d2)) {
418 return cur;
419 }
420 }
421 return NULL;
422 }
nodeset_node_find_by_connect_params(redis_nodeset_t * ns,redis_connect_params_t * rcp)423 redis_node_t *nodeset_node_find_by_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
424 return nodeset_node_find_by(ns, &_node_match.connect_params, rcp);
425 }
nodeset_node_find_by_run_id(redis_nodeset_t * ns,ngx_str_t * run_id)426 redis_node_t *nodeset_node_find_by_run_id(redis_nodeset_t *ns, ngx_str_t *run_id) {
427 return nodeset_node_find_by(ns, &_node_match.run_id, run_id);
428 }
nodeset_node_find_by_cluster_id(redis_nodeset_t * ns,ngx_str_t * cluster_id)429 redis_node_t *nodeset_node_find_by_cluster_id(redis_nodeset_t *ns, ngx_str_t *cluster_id) {
430 return nodeset_node_find_by(ns, &_node_match.cluster_id, cluster_id);
431 }
432
keyslot_ranges_overlap(redis_slot_range_t * r1,redis_slot_range_t * r2)433 static int keyslot_ranges_overlap(redis_slot_range_t *r1, redis_slot_range_t *r2) {
434 return rbtree_cluster_keyslots_compare(r1, r2) == 0;
435 }
436
nodeset_node_find_by_range(redis_nodeset_t * ns,redis_slot_range_t * range)437 redis_node_t *nodeset_node_find_by_range(redis_nodeset_t *ns, redis_slot_range_t *range) {
438 ngx_rbtree_node_t *rbtree_node;
439 redis_nodeset_slot_range_node_t *keyslot_tree_node;
440
441 if((rbtree_node = rbtree_find_node(&ns->cluster.keyslots, range)) != NULL) {
442 keyslot_tree_node = rbtree_data_from_node(rbtree_node);
443 assert(keyslot_ranges_overlap(range, &keyslot_tree_node->range));
444 return keyslot_tree_node->node;
445 }
446 else {
447 return NULL;
448 }
449 }
450
nodeset_node_find_by_slot(redis_nodeset_t * ns,uint16_t slot)451 redis_node_t *nodeset_node_find_by_slot(redis_nodeset_t *ns, uint16_t slot) {
452 redis_slot_range_t range;
453 range.min = slot;
454 range.max = slot;
455 return nodeset_node_find_by_range(ns, &range);
456 }
nodeset_node_find_any_ready_master(redis_nodeset_t * ns)457 redis_node_t *nodeset_node_find_any_ready_master(redis_nodeset_t *ns) {
458 redis_node_t *cur;
459 for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
460 if(cur->state >= REDIS_NODE_READY && cur->role == REDIS_NODE_ROLE_MASTER) {
461 return cur;
462 }
463 }
464 return NULL;
465 }
466
nodeset_node_find_by_channel_id(redis_nodeset_t * ns,ngx_str_t * channel_id)467 redis_node_t *nodeset_node_find_by_channel_id(redis_nodeset_t *ns, ngx_str_t *channel_id) {
468 redis_node_t *node;
469 static uint16_t prefix_crc = 0;
470 uint16_t slot;
471
472 if(!ns->cluster.enabled) {
473 node = nodeset_node_find_any_ready_master(ns);
474 }
475 else {
476 if(prefix_crc == 0) {
477 prefix_crc = redis_crc16(0, "channel:", 8);
478 }
479 slot = redis_crc16(prefix_crc, (const char *)channel_id->data, channel_id->len) % 16384;
480 //DBG("channel id %V (key {channel:%V}) slot %i", str, str, slot);
481
482 node = nodeset_node_find_by_slot(ns, slot);
483 }
484
485 #if REDIS_NODESET_DBG
486 if(node == NULL) {
487 nodeset_update_debuginfo(ns);
488 raise(SIGABRT);
489 }
490 #endif
491
492 return node;
493 }
494
nodeset_node_find_by_key(redis_nodeset_t * ns,ngx_str_t * key)495 redis_node_t *nodeset_node_find_by_key(redis_nodeset_t *ns, ngx_str_t *key) {
496 if(!ns->cluster.enabled) {
497 return nodeset_node_find_any_ready_master(ns);
498 }
499
500 char *start, *end;
501 ngx_str_t hashable;
502 uint16_t slot;
503
504 if(((start = memchr(key->data, '{', key->len))) != NULL) {
505 start++;
506 end = memchr(start, '}', key->len - ((u_char *)start - key->data));
507 if(end && end - start > 1) {
508 hashable.data = (u_char *)start;
509 hashable.len = (end - start);
510 }
511 else {
512 hashable = *key;
513 // not quite right -- need to ignore zero-length {} and scan to the next {
514 // but it's good enough for the keys we're using
515 }
516 }
517 else {
518 hashable = *key;
519 }
520 slot = redis_crc16(0, (const char *)hashable.data, hashable.len) % 16384;
521
522 return nodeset_node_find_by_slot(ns, slot);
523 }
524
ping_command_callback(redisAsyncContext * ac,void * rep,void * privdata)525 static void ping_command_callback(redisAsyncContext *ac, void *rep, void *privdata) {
526 redisReply *reply = rep;
527 redis_node_t *node = privdata;
528 if(!reply || reply->type == REDIS_REPLY_ERROR || !ac || ac->err) {
529 node_log_error(node, "node ping failed");
530 return;
531 }
532 node_log_debug(node, "node ping command reply ok");
533 }
534
node_ping_event(ngx_event_t * ev)535 static void node_ping_event(ngx_event_t *ev) {
536 redis_node_t *node = ev->data;
537 redis_nodeset_t *ns = node->nodeset;
538 if(!ev->timedout || ngx_exiting || ngx_quit)
539 return;
540
541 node_log_debug(node, "node ping event");
542
543 ev->timedout = 0;
544 if(node->state == REDIS_NODE_READY) {
545 assert(node->ctx.cmd);
546
547 //we used to PUBLISH to the correct keyslot-mapped cluster node
548 //but Redis clusters don't shard the PUBSUB keyspace, so this discrimination isn't necessary
549 //just publish the damn thing if this is a master node, and just a PING for slaves
550 if(node->role == REDIS_NODE_ROLE_MASTER) {
551 redisAsyncCommand(node->ctx.cmd, ping_command_callback, node, "PUBLISH %s ping", redis_worker_id);
552 }
553 else {
554 redisAsyncCommand(node->ctx.cmd, ping_command_callback, node, "PING");
555 }
556
557 if(ns->settings.ping_interval > 0) {
558 ngx_add_timer(ev, ns->settings.ping_interval * 1000);
559 }
560 }
561 }
562
rearm_cluster_check_event(ngx_event_t * ev,redis_node_t * node)563 static void rearm_cluster_check_event(ngx_event_t *ev, redis_node_t *node) {
564 time_t max_interval = node->nodeset->settings.cluster_check_interval;
565 time_t interval_since_last_check = ngx_time() - node->cluster.last_successful_check;
566
567 if(interval_since_last_check <= 0 || interval_since_last_check >= max_interval) {
568 ngx_add_timer(ev, max_interval * 1000);
569 }
570 else {
571 ngx_add_timer(ev, (max_interval - interval_since_last_check) * 1000);
572 }
573 }
574
node_cluster_check_event_callback(redisAsyncContext * ac,void * rep,void * privdata)575 static void node_cluster_check_event_callback(redisAsyncContext *ac, void *rep, void *privdata) {
576 redisReply *reply = rep;
577 redis_node_t *node = privdata;
578
579 ngx_str_t epoch_str;
580 int epoch;
581
582 if(reply == NULL) {
583 node_log_error(node, "CLUSTER INFO command reply is NULL. Node should have already disconnected");
584 return;
585 }
586
587 if(!reply_str_ok(reply)) {
588 node_log_error(node, "CLUSTER INFO command failed");
589 if(node->state >= REDIS_NODE_READY) {
590 node_disconnect(node, REDIS_NODE_FAILED);
591 nodeset_examine(node->nodeset);
592 }
593 return;
594 }
595 if(!nchan_get_rest_of_line_in_cstr(reply->str, "cluster_current_epoch:", &epoch_str)) {
596 node_log_error(node, "CLUSTER INFO command reply is weird");
597 //why would this happen? dunno, fail node just in case
598 if(node->state >= REDIS_NODE_READY) {
599 node_disconnect(node, REDIS_NODE_FAILED);
600 nodeset_examine(node->nodeset);
601 }
602 return;
603 }
604 if((epoch = ngx_atoi(epoch_str.data, epoch_str.len)) == NGX_ERROR) {
605 node_log_error(node, "CLUSTER INFO command failed to parse current epoch number");
606 //why would this happen? dunno, fail node just in case
607 if(node->state >= REDIS_NODE_READY) {
608 node_disconnect(node, REDIS_NODE_FAILED);
609 nodeset_examine(node->nodeset);
610 }
611 return;
612 }
613
614 if(node->cluster.current_epoch < epoch) {
615 node_disconnect(node, REDIS_NODE_FAILED);
616 char errstr[512];
617 ngx_snprintf((u_char *)errstr, 512, "config epoch has changed on node %V:%d. Disconnecting to reconfigure.%Z", &(node)->connect_params.hostname, node->connect_params.port);
618 nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, errstr);
619 }
620 else {
621 rearm_cluster_check_event(&node->cluster.check_timer, node);
622 }
623 }
624
node_cluster_check_event(ngx_event_t * ev)625 static void node_cluster_check_event(ngx_event_t *ev) {
626 if(!ev->timedout || ngx_exiting || ngx_quit)
627 return;
628
629 redis_node_t *node = ev->data;
630 redis_nodeset_t *ns = node->nodeset;
631 time_t max_interval = ns->settings.cluster_check_interval;
632 time_t interval_since_last_check = ngx_time() - node->cluster.last_successful_check;
633 ev->timedout = 0;
634
635 if(node->state != REDIS_NODE_READY || !node->cluster.ok) {
636 rearm_cluster_check_event(ev, node);
637 return;
638 }
639 if(interval_since_last_check < max_interval) {
640 rearm_cluster_check_event(ev, node);
641 return;
642 }
643 redisAsyncCommand(node->ctx.cmd, node_cluster_check_event_callback, node, "CLUSTER INFO");
644 }
645
nodeset_node_create_with_space(redis_nodeset_t * ns,redis_connect_params_t * rcp,size_t extra_space,void ** extraspace_ptr)646 redis_node_t *nodeset_node_create_with_space(redis_nodeset_t *ns, redis_connect_params_t *rcp, size_t extra_space, void **extraspace_ptr) {
647 assert(!nodeset_node_find_by_connect_params(ns, rcp));
648 node_blob_t *node_blob;
649 if(extra_space == 0) {
650 assert(extraspace_ptr == NULL);
651 node_blob = nchan_list_append(&ns->nodes);
652 }
653 else {
654 assert(extraspace_ptr);
655 node_blob = nchan_list_append_sized(&ns->nodes, sizeof(*node_blob)+extra_space);
656 if(extra_space) {
657 *extraspace_ptr = (void *)(&node_blob[1]);
658 }
659 }
660 redis_node_t *node = &node_blob->node;
661
662 assert((void *)node_blob == (void *)node);
663 assert(node);
664 node->role = REDIS_NODE_ROLE_UNKNOWN,
665 node->state = REDIS_NODE_DISCONNECTED;
666 node->discovered = 0;
667 node->connect_timeout = NULL;
668 node->connect_params = *rcp;
669 node->connect_params.peername.data = node_blob->peername;
670 node->connect_params.peername.len = 0;
671 node->cluster.id.len = 0;
672 node->cluster.id.data = node_blob->cluster_id;
673 node->cluster.enabled = 0;
674 node->cluster.ok = 0;
675 node->cluster.slot_range.indexed = 0;
676 node->cluster.slot_range.n = 0;
677 node->cluster.slot_range.range = NULL;
678 node->cluster.last_successful_check = 0;
679 node->cluster.current_epoch = 0;
680 ngx_memzero(&node->cluster.check_timer, sizeof(node->cluster.check_timer));
681 nchan_init_timer(&node->cluster.check_timer, node_cluster_check_event, node);
682 node->pending_commands = 0;
683 node->run_id.len = 0;
684 node->run_id.data = node_blob->run_id;
685 node->nodeset = ns;
686 node->generation = 0;
687
688 nchan_slist_init(&node->channels.cmd, rdstore_channel_head_t, redis.slist.node_cmd.prev, redis.slist.node_cmd.next);
689 nchan_slist_init(&node->channels.pubsub, rdstore_channel_head_t, redis.slist.node_pubsub.prev, redis.slist.node_pubsub.next);
690
691 node->peers.master = NULL;
692 nchan_list_init(&node->peers.slaves, sizeof(redis_node_t *), "node slaves");
693
694 ngx_memzero(&node->ping_timer, sizeof(node->ping_timer));
695 nchan_init_timer(&node->ping_timer, node_ping_event, node);
696
697 node->ctx.cmd = NULL;
698 node->ctx.pubsub = NULL;
699 node->ctx.sync = NULL;
700
701 assert(nodeset_node_find_by_connect_params(ns, rcp));
702 return node;
703 }
704
nodeset_node_create(redis_nodeset_t * ns,redis_connect_params_t * rcp)705 redis_node_t *nodeset_node_create(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
706 return nodeset_node_create_with_space(ns, rcp, 0, NULL);
707 }
708
nodeset_node_create_with_connect_params(redis_nodeset_t * ns,redis_connect_params_t * rcp)709 redis_node_t *nodeset_node_create_with_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
710 redis_node_t *node;
711 u_char *space;
712 size_t sz = rcp->hostname.len + rcp->password.len;
713 node = nodeset_node_create_with_space(ns, rcp, sz, (void **)&space);
714 assert(node);
715 node->connect_params.hostname.data = space;
716 node->connect_params.hostname.len = 0;
717 nchan_strcpy(&node->connect_params.hostname, &rcp->hostname, 0);
718 node->connect_params.password.data = &space[rcp->hostname.len];
719 nchan_strcpy(&node->connect_params.password, &rcp->password, 0);
720 return node;
721 }
722
node_remove_peer(redis_node_t * node,redis_node_t * peer)723 static void node_remove_peer(redis_node_t *node, redis_node_t *peer) {
724 redis_node_t **cur;
725 if(node->peers.master == peer) {
726 node->peers.master = NULL;
727 }
728
729 for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
730 if(*cur == peer) {
731 nchan_list_remove(&node->peers.slaves, cur);
732 return;
733 }
734 }
735 }
736
nodeset_node_destroy(redis_node_t * node)737 ngx_int_t nodeset_node_destroy(redis_node_t *node) {
738 redisAsyncContext *ac;
739 redisContext *c;
740 node_set_role(node, REDIS_NODE_ROLE_UNKNOWN); //removes from all peer lists, and clears own slave list
741 if((ac = node->ctx.cmd) != NULL) {
742 node->ctx.cmd = NULL;
743 redisAsyncFree(ac);
744 }
745 if((ac = node->ctx.pubsub) != NULL) {
746 node->ctx.pubsub = NULL;
747 redisAsyncFree(ac);
748 }
749 if((c = node->ctx.sync) != NULL) {
750 node->ctx.sync = NULL;
751 redisFree(c);
752 }
753 if(node->connect_timeout) {
754 nchan_abort_oneshot_timer(node->connect_timeout);
755 node->connect_timeout = NULL;
756 }
757 nchan_list_remove(&node->nodeset->nodes, node);
758 return NGX_OK;
759 }
760
node_discover_slave(redis_node_t * master,redis_connect_params_t * rcp)761 static void node_discover_slave(redis_node_t *master, redis_connect_params_t *rcp) {
762 redis_node_t *slave;
763 if((slave = nodeset_node_find_by_connect_params(master->nodeset, rcp))!= NULL) {
764 //we know about it already
765 if(slave->role != REDIS_NODE_ROLE_SLAVE && slave->state > REDIS_NODE_GET_INFO) {
766 node_log_notice(slave, "Node appears to have changed to slave -- need to update");
767 node_set_role(slave, REDIS_NODE_ROLE_UNKNOWN);
768 node_disconnect(slave, REDIS_NODE_FAILED);
769 node_connect(slave);
770 }
771 //assert(slave->peers.master == master);
772 }
773 else {
774
775
776 slave = nodeset_node_create_with_connect_params(master->nodeset, rcp);
777 slave->discovered = 1;
778 node_set_role(slave, REDIS_NODE_ROLE_SLAVE);
779 node_log_notice(master, "Discovering own slave %s", rcp_cstr(rcp));
780 }
781 node_set_master_node(slave, master); //this is idempotent
782 node_add_slave_node(master, slave); //so is this
783 //try to connect
784 if(slave->state <= REDIS_NODE_DISCONNECTED) {
785 node_connect(slave);
786 }
787 }
788
node_discover_master(redis_node_t * slave,redis_connect_params_t * rcp)789 static void node_discover_master(redis_node_t *slave, redis_connect_params_t *rcp) {
790 redis_node_t *master;
791 if ((master = nodeset_node_find_by_connect_params(slave->nodeset, rcp)) != NULL) {
792 if(master->role != REDIS_NODE_ROLE_MASTER && master->state > REDIS_NODE_GET_INFO) {
793 node_log_notice(master, "Node appears to have changed to master -- need to update");
794 node_set_role(master, REDIS_NODE_ROLE_UNKNOWN);
795 node_disconnect(master, REDIS_NODE_FAILED);
796 node_connect(master);
797 }
798 //assert(node_find_slave_node(master, slave));
799 //node_log_notice(slave, "Discovering master %s... already known", rcp_cstr(rcp));
800 }
801 else {
802 master = nodeset_node_create_with_connect_params(slave->nodeset, rcp);
803 master->discovered = 1;
804 node_set_role(master, REDIS_NODE_ROLE_MASTER);
805 node_log_notice(slave, "Discovering own master %s", rcp_cstr(rcp));
806 }
807 node_set_master_node(slave, master);
808 node_add_slave_node(master, slave);
809 //try to connect
810 if(master->state <= REDIS_NODE_DISCONNECTED) {
811 node_connect(master);
812 }
813 }
814
node_skip_cluster_peer(redis_node_t * node,cluster_nodes_line_t * l)815 static int node_skip_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l) {
816 redis_connect_params_t rcp;
817 char *description = "";
818 char *detail = "";
819 char detail_buf[64];
820 char *role = NULL;
821 ngx_uint_t loglevel = NGX_LOG_NOTICE;
822 nchan_redis_ip_range_t *matched;
823 rcp.hostname = l->hostname;
824 rcp.port = l->port;
825 rcp.peername.len = 0;
826 rcp.db = node->connect_params.db;
827 rcp.password = node->connect_params.password;
828
829 if(l->noaddr) {
830 role = "node";
831 description = "no-address";
832 return 1;
833 }
834 else if(l->handshake) {
835 role = "node";
836 description = "handshaking";
837
838 }
839 else if(l->hostname.len == 0) {
840 role = "node";
841 description = "empty hostname";
842 }
843 else if(l->failed) {
844 description = "failed";
845 }
846 else if(!l->connected) {
847 description = "disconnected";
848 }
849 else if(l->self) {
850 description = "self";
851 loglevel = NGX_LOG_INFO;
852 }
853 else if((matched = node_ip_blacklisted(node->nodeset, &rcp)) != NULL) {
854 description = "blacklisted";
855 detail = detail_buf;
856 ngx_snprintf((u_char *)detail_buf, 64, " (matched blacklist entry %V)%Z", &matched->str);
857 }
858 else {
859 return 0;
860 }
861 if(!role) role = l->master ? "master" : "slave";
862 nodeset_log(node->nodeset, loglevel, "Skipping %s %s node %s%s", description, role, rcp_cstr(&rcp), detail);
863 return 1;
864 }
865
node_discover_cluster_peer(redis_node_t * node,cluster_nodes_line_t * l)866 static int node_discover_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l) {
867 redis_connect_params_t rcp;
868 redis_node_t *peer;
869 assert(!l->self);
870 if(l->failed || !l->connected || l->noaddr || l->self) {
871 return 0;
872 }
873 rcp.hostname = l->hostname;
874 rcp.port = l->port;
875 rcp.peername.len = 0;
876 rcp.db = node->connect_params.db;
877 rcp.password = node->connect_params.password;
878
879 if( ((peer = nodeset_node_find_by_connect_params(node->nodeset, &rcp)) != NULL)
880 || ((peer = nodeset_node_find_by_cluster_id(node->nodeset, &l->id)) != NULL)
881 ) {
882 //node_log_notice(node, "Discovering cluster node %s... already known", rcp_cstr(&rcp));
883 return 0; //we already know this one.
884 }
885 nodeset_log_notice(node->nodeset, "Discovering cluster %s %s", (l->master ? "master" : "slave"), rcp_cstr(&rcp));
886 peer = nodeset_node_create_with_connect_params(node->nodeset, &rcp);
887 peer->discovered = 1;
888 nchan_strcpy(&peer->cluster.id, &l->id, MAX_CLUSTER_ID_LENGTH);
889 node_set_role(peer, l->master ? REDIS_NODE_ROLE_MASTER : REDIS_NODE_ROLE_SLAVE);
890 //ignore all the other things for now
891 node_connect(peer);
892 return 1;
893 }
894
895 static ngx_int_t set_preallocated_peername(redisAsyncContext *ctx, ngx_str_t *dst);
896
node_connector_fail(redis_node_t * node,const char * err)897 static void node_connector_fail(redis_node_t *node, const char *err) {
898 const char *ctxerr = NULL;
899 if(node->ctx.cmd && node->ctx.cmd->err) {
900 ctxerr = node->ctx.cmd->errstr;
901 }
902 else if(node->ctx.pubsub && node->ctx.pubsub->err) {
903 ctxerr = node->ctx.pubsub->errstr;
904 }
905 else if(node->ctx.sync && node->ctx.sync->err) {
906 ctxerr = node->ctx.sync->errstr;
907 }
908 if(node->state == REDIS_NODE_CONNECTION_TIMED_OUT) {
909 node_log_error(node, "connection failed: %s", err);
910 }
911 else if(ctxerr) {
912 node_log_error(node, "connection failed: %s (%s)", err, ctxerr);
913 }
914 else {
915 node_log_error(node, "connection failed: %s", err);
916 }
917 node_disconnect(node, REDIS_NODE_FAILED);
918 }
919
node_connect(redis_node_t * node)920 int node_connect(redis_node_t *node) {
921 assert(node->state <= REDIS_NODE_DISCONNECTED);
922 node_connector_callback(NULL, NULL, node);
923 return 1;
924 }
925
node_disconnect(redis_node_t * node,int disconnected_state)926 int node_disconnect(redis_node_t *node, int disconnected_state) {
927 ngx_int_t prev_state = node->state;
928 node_log_debug(node, "disconnect");
929 redisAsyncContext *ac;
930 redisContext *c;
931 if((ac = node->ctx.cmd) != NULL) {
932 node->ctx.cmd->onDisconnect = NULL;
933 node->ctx.cmd = NULL;
934 //redisAsyncSetDisconnectCallback(ac, NULL); //this only sets the callback if it's currently null...
935 redisAsyncFree(ac);
936 node_log_debug(node, "redisAsyncFree %p", ac);
937 }
938 if((ac = node->ctx.pubsub) != NULL) {
939 node->ctx.pubsub->onDisconnect = NULL;
940 node->ctx.pubsub = NULL;
941 //redisAsyncSetDisconnectCallback(ac, NULL); //this only sets the callback if it's currently null...
942 redisAsyncFree(ac);
943 node_log_debug(node, "redisAsyncFree pubsub %p", ac);
944 }
945 if((c = node->ctx.sync) != NULL) {
946 node->ctx.sync = NULL;
947 redisFree(c);
948 }
949 if(node->connect_timeout) {
950 nchan_abort_oneshot_timer(node->connect_timeout);
951 node->connect_timeout = NULL;
952 }
953
954 node->state = disconnected_state;
955 if(prev_state >= REDIS_NODE_READY) {
956 nchan_update_stub_status(redis_connected_servers, -1);
957 }
958 if(node->cluster.enabled) {
959 nodeset_cluster_node_unindex_keyslot_ranges(node);
960 }
961 if(node->cluster.slot_range.range) {
962 ngx_free(node->cluster.slot_range.range);
963 node->cluster.slot_range.n=0;
964 node->cluster.slot_range.range = NULL;
965 }
966 if(node->ping_timer.timer_set) {
967 ngx_del_timer(&node->ping_timer);
968 }
969 if(node->cluster.check_timer.timer_set) {
970 ngx_del_timer(&node->cluster.check_timer);
971 }
972
973 rdstore_channel_head_t *cur;
974 nchan_slist_t *cmd = &node->channels.cmd;
975 nchan_slist_t *pubsub = &node->channels.pubsub;
976 nchan_slist_t *disconnected_cmd = &node->nodeset->channels.disconnected_cmd;
977 nchan_slist_t *disconnected_pubsub = &node->nodeset->channels.disconnected_pubsub;
978
979 for(cur = nchan_slist_first(cmd); cur != NULL; cur = nchan_slist_first(cmd)) {
980 nodeset_node_dissociate_chanhead(cur);
981 nchan_slist_append(disconnected_cmd, cur);
982 cur->redis.slist.in_disconnected_cmd_list = 1;
983 if(cur->status && cur->status == READY) {
984 cur->status = NOTREADY;
985 }
986 }
987 for(cur = nchan_slist_first(pubsub); cur != NULL; cur = nchan_slist_first(pubsub)) {
988 nodeset_node_dissociate_pubsub_chanhead(cur);
989 nchan_slist_append(disconnected_pubsub, cur);
990 cur->redis.slist.in_disconnected_pubsub_list = 1;
991
992 cur->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
993
994 if(cur->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP && cur->status == READY) {
995 cur->status = NOTREADY;
996 }
997
998 }
999 return 1;
1000 }
1001
node_set_role(redis_node_t * node,redis_node_role_t role)1002 void node_set_role(redis_node_t *node, redis_node_role_t role) {
1003 if(node->role == role) {
1004 return;
1005 }
1006 node->role = role;
1007 redis_node_t **cur;
1008 switch(node->role) {
1009 case REDIS_NODE_ROLE_UNKNOWN:
1010 if(node->peers.master) {
1011 node_remove_peer(node->peers.master, node);
1012 DBG("removed %p from peers of %p", node->peers.master, node);
1013 node->peers.master = NULL;
1014 }
1015 for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
1016 node_remove_peer(*cur, node);
1017 }
1018 nchan_list_empty(&node->peers.slaves);
1019 break;
1020
1021 case REDIS_NODE_ROLE_MASTER:
1022 if(node->peers.master) {
1023 node_remove_peer(node->peers.master, node);
1024 node->peers.master = NULL;
1025 }
1026 break;
1027
1028 case REDIS_NODE_ROLE_SLAVE:
1029 //do nothing
1030 break;
1031
1032 }
1033 }
1034
node_set_master_node(redis_node_t * node,redis_node_t * master)1035 int node_set_master_node(redis_node_t *node, redis_node_t *master) {
1036 if(node->peers.master && node->peers.master != master) {
1037 node_remove_slave_node(master, node);
1038 }
1039 node->peers.master = master;
1040 return 1;
1041 }
node_find_slave_node(redis_node_t * node,redis_node_t * slave)1042 redis_node_t *node_find_slave_node(redis_node_t *node, redis_node_t *slave) {
1043 redis_node_t **cur;
1044 for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
1045 if (*cur == slave) {
1046 return slave;
1047 }
1048 }
1049 return NULL;
1050 }
node_add_slave_node(redis_node_t * node,redis_node_t * slave)1051 int node_add_slave_node(redis_node_t *node, redis_node_t *slave) {
1052 if(!node_find_slave_node(node, slave)) {
1053 redis_node_t **slaveref;
1054 slaveref = nchan_list_append(&node->peers.slaves);
1055 *slaveref = slave;
1056 return 1;
1057 }
1058 return 1;
1059 }
node_remove_slave_node(redis_node_t * node,redis_node_t * slave)1060 int node_remove_slave_node(redis_node_t *node, redis_node_t *slave) {
1061 if(!node_find_slave_node(node, slave)) {
1062 nchan_list_remove(&node->peers.slaves, slave);
1063 }
1064 return 1;
1065 }
1066
node_parseinfo_set_preallocd_str(redis_node_t * node,ngx_str_t * target,const char * info,const char * linestart,size_t maxlen)1067 static int node_parseinfo_set_preallocd_str(redis_node_t *node, ngx_str_t *target, const char *info, const char *linestart, size_t maxlen) {
1068 ngx_str_t found;
1069 if(nchan_get_rest_of_line_in_cstr(info, linestart, &found)) {
1070 if(found.len > maxlen) {
1071 node_log_error(node, "\"%s\" is too long", linestart);
1072 return 0;
1073 }
1074 else {
1075 target->len = found.len;
1076 ngx_memcpy(target->data, found.data, found.len);
1077 return 1;
1078 }
1079 }
1080 return 0;
1081 }
1082
node_parseinfo_set_run_id(redis_node_t * node,const char * info)1083 static int node_parseinfo_set_run_id(redis_node_t *node, const char *info) {
1084 return node_parseinfo_set_preallocd_str(node, &node->run_id, info, "run_id:", MAX_RUN_ID_LENGTH);
1085 }
1086
node_connector_loadscript_reply_ok(redis_node_t * node,redis_lua_script_t * script,redisReply * reply)1087 static int node_connector_loadscript_reply_ok(redis_node_t *node, redis_lua_script_t *script, redisReply *reply) {
1088 if (reply == NULL) {
1089 node_log_error(node, "missing reply after loading Redis Lua script %s", script->name);
1090 return 0;
1091 }
1092 switch(reply->type) {
1093 case REDIS_REPLY_ERROR:
1094 node_log_error(node, "failed loading Redis Lua script %s: %s", script->name, reply->str);
1095 return 0;
1096
1097 case REDIS_REPLY_STRING:
1098 if(ngx_strncmp(reply->str, script->hash, REDIS_LUA_HASH_LENGTH)!=0) {
1099 node_log_error(node, "Lua script %s has unexpected hash %s (expected %s)", script->name, reply->str, script->hash);
1100 return 0;
1101 }
1102 else {
1103 return 1;
1104 }
1105 break;
1106
1107 default:
1108 node_log_error(node, "unexpected reply type while loading Redis Lua script %s", script->name);
1109 return 0;
1110 }
1111 }
1112
redis_nginx_unexpected_disconnect_event_handler(const redisAsyncContext * ac,int status)1113 static void redis_nginx_unexpected_disconnect_event_handler(const redisAsyncContext *ac, int status) {
1114 redis_node_t *node = ac->data;
1115 //char *which_ctx;
1116 //DBG("unexpected disconnect event handler ac %p", ac);
1117 if(node) {
1118 if(node->ctx.cmd == ac) {
1119 //which_ctx = "cmd";
1120 node->ctx.cmd = NULL;
1121 }
1122 else if(node->ctx.pubsub == ac) {
1123 node->ctx.pubsub = NULL;
1124 //which_ctx = "pubsub";
1125 }
1126 else {
1127 node_log_error(node, "unknown redisAsyncContext disconnected");
1128 //which_ctx = "unknown";
1129 }
1130
1131 if(node->state >= REDIS_NODE_READY && !ngx_exiting && !ngx_quit) {
1132 if(ac->err) {
1133 node_log_error(node, "connection lost: %s.", ac->errstr);
1134 }
1135 else {
1136 node_log_error(node, "connection lost");
1137 }
1138 }
1139 node_disconnect(node, REDIS_NODE_FAILED);
1140 nodeset_examine(node->nodeset);
1141 }
1142 }
1143
redis_nginx_connect_event_handler(const redisAsyncContext * ac,int status)1144 static void redis_nginx_connect_event_handler(const redisAsyncContext *ac, int status) {
1145 node_connector_callback((redisAsyncContext *)ac, NULL, ac->data);
1146 }
1147
node_connect_context(redis_node_t * node,ngx_str_t * host,ngx_int_t port)1148 static redisAsyncContext *node_connect_context(redis_node_t *node, ngx_str_t *host, ngx_int_t port) {
1149 redisAsyncContext *ctx = redis_nginx_open_context(host, port, node);
1150 if(ctx) {
1151 redisAsyncSetConnectCallback(ctx, redis_nginx_connect_event_handler);
1152 redisAsyncSetDisconnectCallback(ctx, redis_nginx_unexpected_disconnect_event_handler);
1153 }
1154 return ctx;
1155 }
1156
node_ip_blacklisted(redis_nodeset_t * ns,redis_connect_params_t * rcp)1157 static nchan_redis_ip_range_t *node_ip_blacklisted(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
1158 struct addrinfo *res;
1159 char hostname_buf[128];
1160 ngx_memzero(hostname_buf, sizeof(hostname_buf));
1161 ngx_memcpy(hostname_buf, rcp->hostname.data, rcp->hostname.len);
1162
1163 if(getaddrinfo(hostname_buf, NULL, NULL, &res) != 0) {
1164 nodeset_log_error(ns, "Failed to getaddrinfo for hostname %V while deciding if IP is blacklisted");
1165 return NULL;
1166 }
1167 int fam = res->ai_family;
1168 union {
1169 struct in_addr ipv4;
1170 #ifdef AF_INET6
1171 struct in6_addr ipv6;
1172 #endif
1173 } addr;
1174
1175 if(res->ai_family == AF_INET) {
1176 addr.ipv4 = ((struct sockaddr_in *)res->ai_addr)->sin_addr;
1177 }
1178 #ifdef AF_INET6
1179 else if(res->ai_family == AF_INET6) {
1180 addr.ipv6 = ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr;
1181 }
1182 #endif
1183 freeaddrinfo(res);
1184 int i;
1185 for(i=0; i < ns->settings.blacklist.count; i++) {
1186 nchan_redis_ip_range_t *entry = &ns->settings.blacklist.list[i];
1187 if(entry->family != fam) {
1188 continue;
1189 }
1190 if(fam == AF_INET) {
1191 if(entry->addr_block.ipv4.s_addr == (addr.ipv4.s_addr & entry->mask.ipv4.s_addr)) {
1192 return entry;
1193 }
1194 }
1195 #ifdef AF_INET6
1196 else if(fam == AF_INET6) {
1197 unsigned j;
1198 struct in6_addr buf = addr.ipv6;
1199 uint8_t *masked_addr = entry->addr_block.ipv6.s6_addr;
1200 uint8_t *mask = entry->mask.ipv6.s6_addr;
1201
1202 for(j=0; j<sizeof(entry->addr_block.ipv6.s6_addr); j++) {
1203 masked_addr[j] &= mask[j];
1204 }
1205
1206 if(memcmp(entry->addr_block.ipv6.s6_addr, &buf, sizeof(buf)) == 0) {
1207 return entry;
1208 }
1209 }
1210 #endif
1211 }
1212
1213 return NULL;
1214 }
1215
node_discover_slaves_from_info_reply(redis_node_t * node,redisReply * reply)1216 static int node_discover_slaves_from_info_reply(redis_node_t *node, redisReply *reply) {
1217 redis_connect_params_t *rcp;
1218 size_t i, n;
1219 if(!(rcp = parse_info_slaves(node, reply->str, &n))) {
1220 return 0;
1221 }
1222 for(i=0; i<n; i++) {
1223 nchan_redis_ip_range_t *matched = node_ip_blacklisted(node->nodeset, &rcp[i]);
1224 if(matched) {
1225 nodeset_log_notice(node->nodeset, "Skipping slave node %V blacklisted by %V", &rcp->hostname, &matched->str);
1226 }
1227 else {
1228 node_discover_slave(node, &rcp[i]);
1229 }
1230 }
1231 return 1;
1232 }
1233
nodeset_node_keyslot_changed(redis_node_t * node)1234 int nodeset_node_keyslot_changed(redis_node_t *node) {
1235 if(node->state >= REDIS_NODE_READY) {
1236 node_disconnect(node, REDIS_NODE_FAILED);
1237 }
1238
1239 char errstr[512];
1240 ngx_snprintf((u_char *)errstr, 512, "cluster keyspace needs to be updated as reported by node %V:%d%Z", &(node)->connect_params.hostname, node->connect_params.port);
1241 nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, errstr);
1242 return 1;
1243 }
1244
nodeset_node_reply_keyslot_ok(redis_node_t * node,redisReply * reply)1245 int nodeset_node_reply_keyslot_ok(redis_node_t *node, redisReply *reply) {
1246 if(reply && reply->type == REDIS_REPLY_ERROR) {
1247 char *script_nonlocal_key_error = "Lua script attempted to access a non local key in a cluster node";
1248 char *script_error_start = "ERR Error running script";
1249 char *command_move_error = "MOVED ";
1250 char *command_ask_error = "ASK ";
1251
1252 if((nchan_cstr_startswith(reply->str, script_error_start) && nchan_strstrn(reply->str, script_nonlocal_key_error))
1253 || nchan_cstr_startswith(reply->str, command_move_error)
1254 || nchan_cstr_startswith(reply->str, command_ask_error)) {
1255 if(!node->cluster.enabled) {
1256 node_log_error(node, "got a cluster error on a non-cluster redis connection: %s", reply->str);
1257 node_disconnect(node, REDIS_NODE_FAILED);
1258 nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, "Strange response from node");
1259 }
1260 else {
1261 nodeset_node_keyslot_changed(node);
1262 }
1263 return 0;
1264 }
1265 }
1266 if(node->cluster.enabled) {
1267 node->cluster.last_successful_check = ngx_time();
1268 }
1269 return 1;
1270 }
1271
node_subscribe_callback(redisAsyncContext * ac,void * rep,void * privdata)1272 static void node_subscribe_callback(redisAsyncContext *ac, void *rep, void *privdata) {
1273 redisReply *reply = rep;
1274 redis_node_t *node = privdata;
1275 if(node->state == REDIS_NODE_SUBSCRIBING_WORKER) {
1276 node_connector_callback(ac, rep, privdata);
1277 }
1278 else if(reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 3
1279 && reply->element[0]->type == REDIS_REPLY_STRING
1280 && reply->element[1]->type == REDIS_REPLY_STRING
1281 && reply->element[2]->type == REDIS_REPLY_STRING
1282 && strcmp(reply->element[0]->str, "message") == 0
1283 && strcmp(reply->element[1]->str, redis_worker_id) == 0
1284 && strcmp(reply->element[2]->str, "ping") == 0
1285 ) {
1286 node_log_debug(node, "received PUBSUB ping message");
1287 }
1288 else {
1289 redis_subscribe_callback(ac, rep, privdata);
1290 }
1291 }
1292
node_connector_connect_timeout(void * pd)1293 void node_connector_connect_timeout(void *pd) {
1294 redis_node_t *node = pd;
1295 node->connect_timeout = NULL;
1296 if(node->state == REDIS_NODE_CMD_CONNECTING || node->state == REDIS_NODE_PUBSUB_CONNECTING) {
1297 //onConnect won't be fired, so the connector must be called manually
1298 node->state = REDIS_NODE_CONNECTION_TIMED_OUT;
1299 node_connector_callback(NULL, NULL, node);
1300 }
1301 else {
1302 node_disconnect(node, REDIS_NODE_CONNECTION_TIMED_OUT);
1303 }
1304 }
1305
node_connector_callback(redisAsyncContext * ac,void * rep,void * privdata)1306 static void node_connector_callback(redisAsyncContext *ac, void *rep, void *privdata) {
1307 redisReply *reply = rep;
1308 redis_node_t *node = privdata;
1309 redis_nodeset_t *nodeset = node->nodeset;
1310 char errstr[1024];
1311 redis_connect_params_t *cp = &node->connect_params;
1312 redis_lua_script_t *next_script = (redis_lua_script_t *)&redis_lua_scripts;
1313 node_log_debug(node, "node_connector_callback state %d", node->state);
1314 ngx_str_t rest;
1315
1316 switch(node->state) {
1317 case REDIS_NODE_CONNECTION_TIMED_OUT:
1318 return node_connector_fail(node, "connection timed out");
1319 break;
1320 case REDIS_NODE_FAILED:
1321 case REDIS_NODE_DISCONNECTED:
1322 assert(!node->connect_timeout);
1323 if((node->ctx.cmd = node_connect_context(node, &cp->hostname, cp->port)) == NULL) { //always connect the cmd ctx to the hostname
1324 return node_connector_fail(node, "failed to open redis async context for cmd");
1325 }
1326 else if(cp->peername.len == 0) { //don't know peername yet
1327 set_preallocated_peername(node->ctx.cmd, &cp->peername);
1328 }
1329 node->connect_timeout = nchan_add_oneshot_timer(node_connector_connect_timeout, node, nodeset->settings.connect_timeout);
1330 node->state = REDIS_NODE_CMD_CONNECTING;
1331 break; //wait until the onConnect callback brings us back
1332
1333 case REDIS_NODE_CMD_CONNECTING:
1334 if(ac->err || ac->c.err) {
1335 node->ctx.cmd = NULL; //to avoid calling redisAsyncFree on the ctx during node_disconnect()
1336 //(it will be called automatically when this function returns control back to hiredis
1337 return node_connector_fail(node, ac->errstr);
1338 }
1339 if((node->ctx.pubsub = node_connect_context(node, &cp->peername, cp->port)) == NULL) {
1340 return node_connector_fail(node, "failed to open redis async context for pubsub");
1341 }
1342 node->state++;
1343 break; //wait until the onConnect callback brings us back
1344
1345 case REDIS_NODE_PUBSUB_CONNECTING:
1346 if(ac->err || ac->c.err) {
1347 node->ctx.pubsub = NULL; //to avoid calling redisAsyncFree on the ctx during node_disconnect()
1348 //(it will be called automatically when this function returns control back to hiredis
1349 ngx_snprintf((u_char *)errstr, 1024, "(pubsub) %s%Z", ac->errstr);
1350 return node_connector_fail(node, errstr);
1351 }
1352 //connection established. move on...
1353 node->state++;
1354 /* fall through */
1355 case REDIS_NODE_CONNECTED:
1356 //now we need to authenticate maybe?
1357 if(cp->password.len > 0) {
1358 if(!node->ctx.cmd) {
1359 return node_connector_fail(node, "cmd connection missing, can't send AUTH command");
1360 }
1361 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "AUTH %b", STR(&cp->password));
1362 node->state++;
1363 }
1364 else {
1365 node->state = REDIS_NODE_SELECT_DB;
1366 return node_connector_callback(NULL, NULL, node); //continue as if authenticated
1367 }
1368 break;
1369
1370 case REDIS_NODE_CMD_AUTHENTICATING:
1371 if(!reply_status_ok(reply)) {
1372 return node_connector_fail(node, "AUTH command failed");
1373 }
1374 if(!node->ctx.pubsub) {
1375 return node_connector_fail(node, "pubsub connection missing, can't send AUTH command");
1376 }
1377 //now authenticate pubsub ctx
1378 redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "AUTH %b", STR(&cp->password));
1379 node->state++;
1380 break;
1381
1382 case REDIS_NODE_PUBSUB_AUTHENTICATING:
1383 if(!reply_status_ok(reply)) {
1384 return node_connector_fail(node, "AUTH command failed");
1385 }
1386 node->state++;
1387 /* fall through */
1388 case REDIS_NODE_SELECT_DB:
1389 if(cp->db > 0) {
1390 if(!node->ctx.cmd) {
1391 return node_connector_fail(node, "cmd connection missing, SELECT command");
1392 }
1393 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SELECT %d", cp->db);
1394 node->state++;
1395 }
1396 else {
1397 node->state = REDIS_NODE_SCRIPTS_LOAD;
1398 return node_connector_callback(NULL, NULL, node);
1399 }
1400 break;
1401
1402 case REDIS_NODE_CMD_SELECTING_DB:
1403 if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1404 return node_connector_fail(node, "Redis SELECT command failed,");
1405 }
1406 if(!node->ctx.cmd) {
1407 return node_connector_fail(node, "pubsub connection missing, can't send SELECT command");
1408 }
1409 redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "SELECT %d", cp->db);
1410 node->state++;
1411 break;
1412
1413 case REDIS_NODE_PUBSUB_SELECTING_DB:
1414 if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1415 return node_connector_fail(node, "Redis SELECT command failed,");
1416 }
1417 node->state++; // fallthru
1418 /* fall through */
1419 case REDIS_NODE_SCRIPTS_LOAD:
1420 node->scripts_loaded = 0;
1421 if(!node->ctx.cmd) {
1422 return node_connector_fail(node, "cmd connection missing, can't send SCRIPT LOAD command");
1423 }
1424 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SCRIPT LOAD %s", next_script->script);
1425 node->state++;
1426 break;
1427
1428 case REDIS_NODE_SCRIPTS_LOADING:
1429 next_script = &next_script[node->scripts_loaded];
1430
1431 if(!node_connector_loadscript_reply_ok(node, next_script, reply)) {
1432 return node_connector_fail(node, "SCRIPT LOAD failed,");
1433 }
1434 else {
1435 //node_log_debug(node, "loaded script %s", next_script->name);
1436 node->scripts_loaded++;
1437 next_script++;
1438 }
1439 if(node->scripts_loaded < redis_lua_scripts_count) {
1440 //load next script
1441 if(!node->ctx.cmd) {
1442 return node_connector_fail(node, "cmd connection missing, can't send SCRIPT LOAD command");
1443 }
1444 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SCRIPT LOAD %s", next_script->script);
1445 return;
1446 }
1447 node_log_debug(node, "all scripts loaded");
1448 node->state++;
1449 /* fall through */
1450 case REDIS_NODE_GET_INFO:
1451 //getinfo time
1452 if(!node->ctx.cmd) {
1453 return node_connector_fail(node, "cmd connection missing, can't send INFO ALL command");
1454 }
1455 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "INFO ALL");
1456 node->state++;
1457 break;
1458
1459 case REDIS_NODE_GETTING_INFO:
1460 if(reply && reply->type == REDIS_REPLY_ERROR && nchan_cstr_startswith(reply->str, "NOAUTH")) {
1461 return node_connector_fail(node, "authentication required");
1462 }
1463 else if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1464 return node_connector_fail(node, "INFO command failed");
1465 }
1466 else if(!node_parseinfo_set_run_id(node, reply->str)) {
1467 return node_connector_fail(node, "failed to set node run_id");
1468 }
1469 else if(nodeset_node_deduplicate_by_run_id(node)) {
1470 // this node already exists
1471 // the deduplication has deleted it; we're done here.
1472 // commence rejoicing.
1473 return;
1474 }
1475
1476 if(nchan_cstr_match_line(reply->str, "loading:1")) {
1477 return node_connector_fail(node, "is busy loading data...");
1478 }
1479
1480 if(nchan_cstr_match_line(reply->str, "cluster_enabled:1")) {
1481 node->cluster.enabled = 1;
1482 }
1483
1484 if(nchan_cstr_match_line(reply->str, "role:master")) {
1485 node_set_role(node, REDIS_NODE_ROLE_MASTER);
1486 if(!node->cluster.enabled && !node_discover_slaves_from_info_reply(node, reply)) {
1487 return node_connector_fail(node, "failed parsing slaves from INFO");
1488 }
1489 }
1490 else if(nchan_cstr_match_line(reply->str, "role:slave")) {
1491 redis_connect_params_t *rcp;
1492 node_set_role(node, REDIS_NODE_ROLE_SLAVE);
1493 if(!(rcp = parse_info_master(node, reply->str))) {
1494 return node_connector_fail(node, "failed parsing master from INFO");
1495 }
1496 if(!node->cluster.enabled) {
1497 node_discover_master(node, rcp);
1498 }
1499 }
1500 else {
1501 return node_connector_fail(node, "can't tell if node is master or slave");
1502 }
1503 node->state++;
1504 /* fall through */
1505 case REDIS_NODE_PUBSUB_GET_INFO:
1506 if(!node->ctx.pubsub) {
1507 return node_connector_fail(node, "cmd connection missing, can't send INFO SERVER command");
1508 }
1509 redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "INFO SERVER");
1510 node->state++;
1511 break;
1512 case REDIS_NODE_PUBSUB_GETTING_INFO:
1513 if(reply && reply->type == REDIS_REPLY_ERROR && nchan_cstr_startswith(reply->str, "NOAUTH")) {
1514 return node_connector_fail(node, "authentication required");
1515 }
1516 else if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1517 return node_connector_fail(node, "INFO command failed");
1518 }
1519 else {
1520 u_char idbuf[MAX_RUN_ID_LENGTH];
1521 ngx_str_t pubsub_run_id = {0, idbuf};
1522 node_parseinfo_set_preallocd_str(node, &pubsub_run_id, reply->str, "run_id:", MAX_RUN_ID_LENGTH);
1523 if(!nchan_ngx_str_match(&node->run_id, &pubsub_run_id)) {
1524 return node_connector_fail(node, "IP address connects to more than one server. Is Redis behind a proxy?");
1525 }
1526 node->state++;
1527 }
1528 /* fall through */
1529 case REDIS_NODE_SUBSCRIBE_WORKER:
1530 if(!node->ctx.pubsub) {
1531 return node_connector_fail(node, "pubsub connection missing, can't send worker SUBSCRIBE command");
1532 }
1533 redisAsyncCommand(node->ctx.pubsub, node_subscribe_callback, node, "SUBSCRIBE %s", redis_worker_id);
1534 node->state++;
1535 break;
1536
1537 case REDIS_NODE_SUBSCRIBING_WORKER:
1538 if(!reply) {
1539 return node_connector_fail(node, "disconnected while subscribing to worker PUBSUB channel");
1540 }
1541 if( reply->type != REDIS_REPLY_ARRAY || reply->elements != 3
1542 || reply->element[0]->type != REDIS_REPLY_STRING || reply->element[1]->type != REDIS_REPLY_STRING
1543 || strcmp(reply->element[0]->str, "subscribe") != 0
1544 || strcmp(reply->element[1]->str, redis_worker_id) != 0
1545 ) {
1546 return node_connector_fail(node, "failed to subscribe to worker PUBSUB channel");
1547 }
1548 nchan_update_stub_status(redis_connected_servers, 1);
1549
1550 node->state++;
1551 /* fall through */
1552 case REDIS_NODE_GET_CLUSTERINFO:
1553 if(!node->cluster.enabled) {
1554 node->state = REDIS_NODE_READY;
1555 return node_connector_callback(NULL, NULL, node);
1556 }
1557 else {
1558 if(!node->ctx.cmd) {
1559 return node_connector_fail(node, "cmd connection missing, can't send CLUSTER INFO command");
1560 }
1561 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "CLUSTER INFO");
1562 node->state++;
1563 }
1564 break;
1565
1566 case REDIS_NODE_GETTING_CLUSTERINFO:
1567 if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1568 return node_connector_fail(node, "CLUSTER INFO command failed");
1569 }
1570 if(!nchan_cstr_match_line(reply->str, "cluster_state:ok")) {
1571 node->cluster.ok=0;
1572 return node_connector_fail(node, "cluster_state not ok");
1573 }
1574
1575 if(!nchan_get_rest_of_line_in_cstr(reply->str, "cluster_current_epoch:", &rest)) {
1576 return node_connector_fail(node, "CLUSTER INFO command failed to get current epoch");
1577 }
1578 if((node->cluster.current_epoch = ngx_atoi(rest.data, rest.len)) == NGX_ERROR) {
1579 return node_connector_fail(node, "CLUSTER INFO command failed to parse current epoch number");
1580 }
1581
1582 node->cluster.ok=1;
1583 node->state++;
1584 /* fall through */
1585 case REDIS_NODE_GET_CLUSTER_NODES:
1586 if(!node->ctx.cmd) {
1587 return node_connector_fail(node, "cmd connection missing, can't send CLUSTER NODES command");
1588 }
1589 redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "CLUSTER NODES");
1590 node->state++;
1591 break;
1592
1593 case REDIS_NODE_GETTING_CLUSTER_NODES:
1594 if(!reply_str_ok(reply)) {
1595 return node_connector_fail(node, "CLUSTER NODES command failed");
1596 }
1597 else {
1598 size_t i, n;
1599 cluster_nodes_line_t *l;
1600 if(!(l = parse_cluster_nodes(node, reply->str, &n))) {
1601 return node_connector_fail(node, "parsing CLUSTER NODES command failed");
1602 }
1603 for(i=0; i<n; i++) {
1604 if(l[i].self) {
1605 nchan_strcpy(&node->cluster.id, &l[i].id, MAX_CLUSTER_ID_LENGTH);
1606 if(l[i].slot_ranges_count == 0 && l[i].master) {
1607 node_log_notice(node, "is a master cluster node with no keyslots");
1608 }
1609 else {
1610 redis_node_t *conflict_node;
1611 node->cluster.slot_range.n = l[i].slot_ranges_count;
1612 size_t j;
1613 if(node->cluster.slot_range.range) {
1614 ngx_free(node->cluster.slot_range.range);
1615 }
1616 node->cluster.slot_range.range = ngx_alloc(sizeof(redis_slot_range_t) * node->cluster.slot_range.n, ngx_cycle->log);
1617 if(!node->cluster.slot_range.range) {
1618 return node_connector_fail(node, "failed allocating cluster slots range");
1619 }
1620 if(!parse_cluster_node_slots(&l[i], node->cluster.slot_range.range)) {
1621 return node_connector_fail(node, "failed parsing cluster slots range");
1622 }
1623 for(j = 0; j<node->cluster.slot_range.n; j++) {
1624 if((conflict_node = nodeset_node_find_by_range(nodeset, &node->cluster.slot_range.range[j]))!=NULL) {
1625 u_char buf[1024];
1626 ngx_snprintf(buf, 1024, "keyslot range conflict with node %s. These nodes are probably from different clusters.%Z", node_cstr(conflict_node));
1627 return node_connector_fail(node, (char *)buf);
1628 }
1629 }
1630
1631 if(!nodeset_cluster_node_index_keyslot_ranges(node)) {
1632 return node_connector_fail(node, "indexing keyslot ranges failed");
1633 }
1634 }
1635 }
1636 else if(!node_skip_cluster_peer(node, &l[i])) {
1637 node_discover_cluster_peer(node, &l[i]);
1638 }
1639 }
1640 }
1641 node->state = REDIS_NODE_READY;
1642 //intentional fall-through is affirmatively consensual
1643 //yes, i consent to being fallen through.
1644 // Signed,
1645 // REDIS_NODE_GETTING_CLUSTER_NODES
1646 //NOTE: consent required each time a fallthrough is imposed
1647 /* fall through */
1648 case REDIS_NODE_READY:
1649 if(node->connect_timeout) {
1650 nchan_abort_oneshot_timer(node->connect_timeout);
1651 node->connect_timeout = NULL;
1652 }
1653 if(!node->ping_timer.timer_set && nodeset->settings.ping_interval > 0) {
1654 ngx_add_timer(&node->ping_timer, nodeset->settings.ping_interval * 1000);
1655 }
1656 if(node->cluster.enabled) {
1657 if(!node->cluster.check_timer.timer_set && nodeset->settings.cluster_check_interval > 0) {
1658 ngx_add_timer(&node->cluster.check_timer, nodeset->settings.cluster_check_interval * 1000);
1659 }
1660 node->cluster.last_successful_check = ngx_time();
1661 }
1662 node_log_notice(node, "%s", node->generation == 0 ? "connected" : "reconnected");
1663 node->generation++;
1664 nodeset_examine(nodeset);
1665 break;
1666 }
1667 }
1668
nodeset_cluster_keyslot_space_complete(redis_nodeset_t * ns)1669 static int nodeset_cluster_keyslot_space_complete(redis_nodeset_t *ns) {
1670 ngx_rbtree_node_t *node;
1671 redis_slot_range_t range = {0, 0};
1672 redis_nodeset_slot_range_node_t *rangenode;
1673
1674 while(range.min <= 16383) {
1675 if((node = rbtree_find_node(&ns->cluster.keyslots, &range)) == NULL) {
1676 DBG("cluster slots range incomplete: can't find slot %i", range.min);
1677 return 0;
1678 }
1679 rangenode = rbtree_data_from_node(node);
1680
1681 if(rangenode->node->state < REDIS_NODE_READY) {
1682 node_log_notice(rangenode->node, "cluster node for range %d - %d not connected", rangenode->range.min, rangenode->range.max);
1683 return 0;
1684 }
1685 if(rangenode->node->role != REDIS_NODE_ROLE_MASTER) {
1686 node_log_notice(rangenode->node, "cluster node for range %d - %d is not a master. That's weird.", rangenode->range.min, rangenode->range.max);
1687 return 0;
1688 }
1689
1690 range.min = rangenode->range.max + 1;
1691 range.max = range.min;
1692 }
1693 DBG("cluster range complete");
1694 //print_cluster_slots(cluster);
1695 return 1;
1696 }
1697
nodeset_status_timer_interval(redis_nodeset_status_t status)1698 static int nodeset_status_timer_interval(redis_nodeset_status_t status) {
1699 switch(status) {
1700 case REDIS_NODESET_FAILED:
1701 case REDIS_NODESET_INVALID:
1702 case REDIS_NODESET_DISCONNECTED:
1703 return 2000;
1704 case REDIS_NODESET_FAILING:
1705 case REDIS_NODESET_CLUSTER_FAILING:
1706 return 300;
1707 case REDIS_NODESET_CONNECTING:
1708 return 1000;
1709 case REDIS_NODESET_READY:
1710 return 10000;
1711 }
1712 return 500; //default?
1713 }
1714
nodeset_onready_expire_event(ngx_event_t * ev)1715 void nodeset_onready_expire_event(ngx_event_t *ev) {
1716 nodeset_onready_callback_t *rcb = ev->data;
1717 rcb->cb(rcb->ns, rcb->pd);
1718 nchan_list_remove(&rcb->ns->onready_callbacks, rcb);
1719 }
1720
nodeset_callback_on_ready(redis_nodeset_t * ns,ngx_msec_t max_wait,ngx_int_t (* cb)(redis_nodeset_t *,void *),void * pd)1721 ngx_int_t nodeset_callback_on_ready(redis_nodeset_t *ns, ngx_msec_t max_wait, ngx_int_t (*cb)(redis_nodeset_t *, void *), void *pd) {
1722 nodeset_onready_callback_t *ncb;
1723
1724 if(ns->status == REDIS_NODESET_READY) {
1725 cb(ns, pd);
1726 return NGX_OK;
1727 }
1728
1729 ncb = nchan_list_append(&ns->onready_callbacks);
1730 if(ncb == NULL) {
1731 ERR("failed to add to onready_callback list");
1732 return NGX_ERROR;
1733 }
1734
1735 ncb->cb = cb;
1736 ncb->pd = pd;
1737 ncb->ns = ns;
1738 ngx_memzero(&ncb->ev, sizeof(ncb->ev));
1739 if(max_wait > 0) {
1740 nchan_init_timer(&ncb->ev, nodeset_onready_expire_event, ncb);
1741 ngx_add_timer(&ncb->ev, max_wait);
1742 }
1743
1744 return NGX_OK;
1745 }
1746
nodeset_run_on_ready_callbacks(redis_nodeset_t * ns)1747 ngx_int_t nodeset_run_on_ready_callbacks(redis_nodeset_t *ns) {
1748 nodeset_onready_callback_t *rcb;
1749 for(rcb = nchan_list_first(&ns->onready_callbacks); rcb != NULL; rcb = nchan_list_next(rcb)) {
1750 if(rcb->ev.timer_set) {
1751 ngx_del_timer(&rcb->ev);
1752 }
1753 rcb->cb(ns, rcb->pd);
1754 }
1755 nchan_list_empty(&ns->onready_callbacks);
1756 return NGX_OK;
1757 }
1758
nodeset_abort_on_ready_callbacks(redis_nodeset_t * ns)1759 ngx_int_t nodeset_abort_on_ready_callbacks(redis_nodeset_t *ns) {
1760 nodeset_onready_callback_t *rcb;
1761 for(rcb = nchan_list_first(&ns->onready_callbacks); rcb != NULL; rcb = nchan_list_next(rcb)) {
1762 if(rcb->ev.timer_set) {
1763 ngx_del_timer(&rcb->ev);
1764 }
1765 rcb->cb(ns, rcb->pd);
1766 }
1767 nchan_list_empty(&ns->onready_callbacks);
1768 return NGX_OK;
1769 }
1770
update_chanhead_status_on_reconnect(rdstore_channel_head_t * ch)1771 static ngx_int_t update_chanhead_status_on_reconnect(rdstore_channel_head_t *ch) {
1772 if(ch->redis.node.cmd && ch->redis.node.pubsub && ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBED && ch->status == NOTREADY) {
1773 ch->status = READY;
1774 }
1775 return NGX_OK;
1776 }
1777
nodeset_reconnect_disconnected_channels(redis_nodeset_t * ns)1778 ngx_int_t nodeset_reconnect_disconnected_channels(redis_nodeset_t *ns) {
1779 rdstore_channel_head_t *cur;
1780 nchan_slist_t *disconnected_cmd = &ns->channels.disconnected_cmd;
1781 nchan_slist_t *disconnected_pubsub = &ns->channels.disconnected_pubsub;
1782 assert(nodeset_ready(ns));
1783
1784 while((cur = nchan_slist_pop(disconnected_cmd)) != NULL) {
1785 assert(cur->redis.node.cmd == NULL);
1786 cur->redis.slist.in_disconnected_cmd_list = 0;
1787 assert(nodeset_node_find_by_chanhead(cur)); // this reuses the linked-list fields
1788 update_chanhead_status_on_reconnect(cur);
1789 }
1790
1791 while((cur = nchan_slist_pop(disconnected_pubsub)) != NULL) {
1792 assert(cur->redis.node.pubsub == NULL);
1793 cur->redis.slist.in_disconnected_pubsub_list = 0;
1794 assert(nodeset_node_pubsub_find_by_chanhead(cur)); // this reuses the linked-list fields
1795 redis_chanhead_catch_up_after_reconnect(cur);
1796 ensure_chanhead_pubsub_subscribed_if_needed(cur);
1797 update_chanhead_status_on_reconnect(cur);
1798 }
1799
1800 return NGX_OK;
1801 }
1802
nodeset_name_cstr(redis_nodeset_t * nodeset,char * buf,size_t maxlen)1803 static char* nodeset_name_cstr(redis_nodeset_t *nodeset, char *buf, size_t maxlen) {
1804 const char *what = NULL;
1805 ngx_str_t *name = NULL;
1806 if(nodeset->upstream) {
1807 what = "upstream";
1808 name = &nodeset->upstream->host;
1809 }
1810 else {
1811 ngx_str_t **url = nchan_list_first(&nodeset->urls);
1812 if(url && *url) {
1813 name = *url;
1814 }
1815 what = "host";
1816 }
1817
1818 if(what && name) {
1819 ngx_snprintf((u_char *)buf, maxlen, "%s %V%Z", what, name);
1820 }
1821 else if(what) {
1822 ngx_snprintf((u_char *)buf, maxlen, "%s%Z", what);
1823 }
1824 else if(name) {
1825 ngx_snprintf((u_char *)buf, maxlen, "%V%Z", name);
1826 }
1827 else {
1828 ngx_snprintf((u_char *)buf, maxlen, "node set%Z");
1829 }
1830 return buf;
1831 }
1832
__node_nickname_cstr(redis_node_t * node)1833 const char *__node_nickname_cstr(redis_node_t *node) {
1834 static char buf[512];
1835 if(node) {
1836 __node_cstr(node, buf);
1837 return buf;
1838 }
1839 else {
1840 return "???";
1841 }
1842 }
1843
nodeset_set_status(redis_nodeset_t * nodeset,redis_nodeset_status_t status,const char * msg)1844 ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t status, const char *msg) {
1845 nodeset->status_msg = msg;
1846 if(nodeset->status != status) {
1847 if(msg) {
1848 ngx_uint_t lvl;
1849 if(status == REDIS_NODESET_INVALID) {
1850 lvl = NGX_LOG_ERR;
1851 }
1852 else if(status == REDIS_NODESET_DISCONNECTED
1853 || status == REDIS_NODESET_CLUSTER_FAILING
1854 || status == REDIS_NODESET_FAILED
1855 ) {
1856 lvl = NGX_LOG_WARN;
1857 }
1858 else {
1859 lvl = NGX_LOG_NOTICE;
1860 }
1861 nodeset_log(nodeset, lvl, "%s", msg);
1862 }
1863 nodeset->current_status_start = ngx_time();
1864 nodeset->current_status_times_checked = 0;
1865 nodeset->status = status;
1866
1867 if(nodeset->status_check_ev.timer_set) {
1868 ngx_del_timer(&nodeset->status_check_ev);
1869 }
1870
1871 switch(status) {
1872 case REDIS_NODESET_FAILED:
1873 case REDIS_NODESET_DISCONNECTED:
1874 case REDIS_NODESET_INVALID:
1875 nodeset_disconnect(nodeset);
1876 break;
1877 case REDIS_NODESET_CLUSTER_FAILING:
1878
1879 //very intentional fallthrough
1880 case REDIS_NODESET_FAILING:
1881 nodeset_connect(nodeset);
1882 break;
1883 case REDIS_NODESET_CONNECTING:
1884 //no special actions
1885 break;
1886 case REDIS_NODESET_READY:
1887 nodeset_reconnect_disconnected_channels(nodeset);
1888 nodeset_run_on_ready_callbacks(nodeset);
1889 break;
1890 }
1891 }
1892
1893 if(!nodeset->status_check_ev.timer_set) {
1894 ngx_add_timer(&nodeset->status_check_ev, nodeset_status_timer_interval(status));
1895 }
1896 return NGX_OK;
1897 }
1898
node_find_slaves_callback(redisAsyncContext * ac,void * rep,void * pd)1899 static void node_find_slaves_callback(redisAsyncContext *ac, void *rep, void *pd) {
1900 redis_node_t *node = pd;
1901 redisReply *reply = rep;
1902 if(!reply) {
1903 node_log_debug(node, "INFO REPLICATION aborted reply");
1904 return;
1905 }
1906 node_discover_slaves_from_info_reply(node, reply);
1907 }
1908
nodeset_examine(redis_nodeset_t * nodeset)1909 ngx_int_t nodeset_examine(redis_nodeset_t *nodeset) {
1910 redis_node_t *cur, *next;
1911 int cluster = 0, masters = 0, slaves = 0, total = 0, connecting = 0, ready = 0, disconnected = 0;
1912 int discovered = 0, failed_masters=0, failed_slaves = 0, failed_unknowns = 0;
1913 int ready_cluster = 0, ready_non_cluster = 0, connecting_masters = 0;
1914 redis_nodeset_status_t current_status = nodeset->status;
1915 //ERR("check nodeset %p", nodeset);
1916
1917 for(cur = nchan_list_first(&nodeset->nodes); cur != NULL; cur = next) {
1918 next = nchan_list_next(cur);
1919 total++;
1920 if(cur->cluster.enabled == 1) {
1921 cluster++;
1922 }
1923 if(cur->discovered)
1924 discovered++;
1925 if(cur->role == REDIS_NODE_ROLE_MASTER) {
1926 masters++;
1927 if(cur->state > REDIS_NODE_DISCONNECTED && cur->state < REDIS_NODE_READY) {
1928 connecting_masters++;
1929 }
1930 }
1931 if(cur->role == REDIS_NODE_ROLE_SLAVE)
1932 slaves++;
1933 if(cur->state <= REDIS_NODE_DISCONNECTED)
1934 disconnected++;
1935 if(cur->state > REDIS_NODE_DISCONNECTED && cur->state < REDIS_NODE_READY)
1936 connecting++;
1937 if(cur->state == REDIS_NODE_READY) {
1938 ready++;
1939 if(cur->cluster.enabled == 1)
1940 ready_cluster++;
1941 else
1942 ready_non_cluster++;
1943 }
1944 if(cur->state == REDIS_NODE_FAILED) {
1945 if(cur->role == REDIS_NODE_ROLE_MASTER) {
1946 failed_masters++;
1947 }
1948 else if(cur->role == REDIS_NODE_ROLE_SLAVE) {
1949 failed_slaves++;
1950 if(cur->peers.master && cur->peers.master->state >= REDIS_NODE_READY && cur->nodeset->status == REDIS_NODESET_READY) {
1951 //rediscover slaves
1952 redisAsyncCommand(cur->peers.master->ctx.cmd, node_find_slaves_callback, cur->peers.master, "INFO REPLICATION");
1953 }
1954 //remove failed slave
1955 node_log_notice(cur, "removed failed slave node");
1956 node_disconnect(cur, REDIS_NODE_FAILED);
1957 nodeset_node_destroy(cur);
1958 total--;
1959 }
1960 else {
1961 failed_unknowns++;
1962 }
1963 }
1964 }
1965
1966 nodeset->cluster.enabled = cluster > 0;
1967
1968 if(current_status == REDIS_NODESET_CONNECTING && connecting > 0) {
1969 //still connecting, with a few nodws yet to try to connect
1970 return NGX_OK;
1971 }
1972 if(ready == 0 && total == 0) {
1973 nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "no reachable servers");
1974 }
1975 else if(cluster == 0 && masters > 1) {
1976 nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "invalid config, more than one master in non-cluster");
1977 }
1978 else if(ready_cluster > 0 && ready_non_cluster > 0) {
1979 nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "invalid config, cluster and non-cluster servers present");
1980 }
1981 else if(connecting > 0) {
1982 nodeset_set_status(nodeset, REDIS_NODESET_CONNECTING, NULL);
1983 }
1984 else if(failed_masters > 0) {
1985 if(current_status == REDIS_NODESET_READY) {
1986 nodeset_set_status(nodeset, REDIS_NODESET_FAILING, NULL);
1987 }
1988 else {
1989 nodeset_set_status(nodeset, REDIS_NODESET_FAILED, NULL);
1990 }
1991 }
1992 else if (masters == 0) {
1993 //this prevents slave-of-slave-of-master lookups
1994 nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "no reachable masters");
1995 }
1996 else if(cluster > 0 && !nodeset_cluster_keyslot_space_complete(nodeset)) {
1997 nodeset_set_status(nodeset, REDIS_NODESET_CONNECTING, "keyslot space incomplete");
1998 }
1999 else if(current_status == REDIS_NODESET_READY && (ready == 0 || ready < total)) {
2000 nodeset_set_status(nodeset, REDIS_NODESET_FAILING, NULL);
2001 }
2002 else if(ready == 0) {
2003 nodeset_set_status(nodeset, REDIS_NODESET_DISCONNECTED, "no connected servers");
2004 }
2005 else {
2006 nodeset_set_status(nodeset, REDIS_NODESET_READY, "ready");
2007 }
2008
2009 return NGX_OK;
2010 }
2011
nodeset_check_status_event(ngx_event_t * ev)2012 static void nodeset_check_status_event(ngx_event_t *ev) {
2013 redis_nodeset_t *ns = ev->data;
2014
2015 if(!ev->timedout) {
2016 return;
2017 }
2018 DBG("nodeset %p status check event", ns);
2019 ev->timedout = 0;
2020
2021 switch(ns->status) {
2022 case REDIS_NODESET_FAILED:
2023 //fall-through rather intentionally
2024 if(ngx_time() - ns->current_status_start > REDIS_NODESET_RECONNECT_WAIT_TIME_SEC) {
2025 nodeset_log_notice(ns, "reconnecting...");
2026 nodeset_connect(ns);
2027 }
2028 break;
2029 case REDIS_NODESET_INVALID:
2030 case REDIS_NODESET_DISCONNECTED:
2031 //connect whatever needs to be connected
2032 nodeset_connect(ns);
2033 break;
2034
2035 case REDIS_NODESET_CONNECTING:
2036 //wait it out
2037 if(ngx_time() - ns->current_status_start > REDIS_NODESET_MAX_CONNECTING_TIME_SEC) {
2038 nodeset_set_status(ns, REDIS_NODESET_DISCONNECTED, "Redis node set took too long to connect");
2039 }
2040 else {
2041 nodeset_examine(ns); // full status check
2042 }
2043 break;
2044 case REDIS_NODESET_CLUSTER_FAILING:
2045 case REDIS_NODESET_FAILING:
2046 if(ngx_time() - ns->current_status_start > REDIS_NODESET_MAX_FAILING_TIME_SEC) {
2047 nodeset_set_status(ns, REDIS_NODESET_FAILED, "Redis node set has failed");
2048 }
2049 break;
2050
2051 case REDIS_NODESET_READY:
2052 nodeset_reconnect_disconnected_channels(ns); //in case there are any waiting around
2053 nodeset_run_on_ready_callbacks(ns); //in case there are any that got left behind
2054 break;
2055 }
2056
2057 //check again soon!
2058 if(!ev->timer_set) {
2059 ngx_add_timer(ev, nodeset_status_timer_interval(ns->status));
2060 }
2061 }
2062
nodeset_connect(redis_nodeset_t * ns)2063 int nodeset_connect(redis_nodeset_t *ns) {
2064 redis_node_t *node;
2065 ngx_str_t **url;
2066 redis_connect_params_t rcp;
2067
2068 for(url = nchan_list_first(&ns->urls); url != NULL; url = nchan_list_next(url)) {
2069 parse_redis_url(*url, &rcp);
2070 if((node = nodeset_node_find_by_connect_params(ns, &rcp)) == NULL) {
2071 node = nodeset_node_create(ns, &rcp);
2072 node_log_debug(node, "created");
2073 }
2074 assert(node);
2075 }
2076 for(node = nchan_list_first(&ns->nodes); node != NULL; node = nchan_list_next(node)) {
2077 if(node->state <= REDIS_NODE_DISCONNECTED) {
2078 node_log_debug(node, "start connecting");
2079 node_connect(node);
2080 }
2081 }
2082 nodeset_set_status(ns, REDIS_NODESET_CONNECTING, NULL);
2083 return 1;
2084 }
2085
nodeset_disconnect(redis_nodeset_t * ns)2086 int nodeset_disconnect(redis_nodeset_t *ns) {
2087 redis_node_t *node;
2088 for(node = nchan_list_first(&ns->nodes); node != NULL; node = nchan_list_first(&ns->nodes)) {
2089 node_log_debug(node, "destroy %p", node);
2090 if(node->state > REDIS_NODE_DISCONNECTED) {
2091 node_log_debug(node, "intiating disconnect");
2092 node_disconnect(node, REDIS_NODE_DISCONNECTED);
2093 }
2094 nodeset_node_destroy(node);
2095 }
2096
2097 return 1;
2098 }
2099
2100
nodeset_connect_all(void)2101 ngx_int_t nodeset_connect_all(void) {
2102 int i;
2103 redis_nodeset_t *ns;
2104 DBG("connect all");
2105 for(i=0; i<redis_nodeset_count; i++) {
2106 ns = &redis_nodeset[i];
2107 nodeset_connect(ns);
2108 }
2109 return NGX_OK;
2110 }
2111
nodeset_destroy_all(void)2112 ngx_int_t nodeset_destroy_all(void) {
2113 int i;
2114 redis_nodeset_t *ns;
2115 DBG("nodeset destroy all");
2116 for(i=0; i<redis_nodeset_count; i++) {
2117 ns = &redis_nodeset[i];
2118 nodeset_disconnect(ns);
2119 if(ns->name && ns->name != nchan_redis_blankname) {
2120 ngx_free(ns->name);
2121 }
2122 nchan_list_empty(&ns->urls);
2123 }
2124 redis_nodeset_count = 0;
2125 return NGX_OK;
2126 }
2127
nodeset_each(void (* cb)(redis_nodeset_t *,void *),void * privdata)2128 ngx_int_t nodeset_each(void (*cb)(redis_nodeset_t *, void *), void *privdata) {
2129 int i;
2130 redis_nodeset_t *ns;
2131 for(i=0; i<redis_nodeset_count; i++) {
2132 ns = &redis_nodeset[i];
2133 cb(ns, privdata);
2134 }
2135 return NGX_OK;
2136 }
nodeset_each_node(redis_nodeset_t * ns,void (* cb)(redis_node_t *,void *),void * privdata)2137 ngx_int_t nodeset_each_node(redis_nodeset_t *ns, void (*cb)(redis_node_t *, void *), void *privdata) {
2138 redis_node_t *node, *next;
2139 for(node = nchan_list_first(&ns->nodes); node != NULL; node = next) {
2140 next = nchan_list_next(node);
2141 cb(node, privdata);
2142 }
2143 return NGX_OK;
2144 }
2145
set_preallocated_peername(redisAsyncContext * ctx,ngx_str_t * dst)2146 static ngx_int_t set_preallocated_peername(redisAsyncContext *ctx, ngx_str_t *dst) {
2147 char *ipstr = (char *)dst->data;
2148 struct sockaddr_in *s4;
2149 struct sockaddr_in6 *s6;
2150 // deal with both IPv4 and IPv6:
2151 switch(ctx->c.sockaddr.sa_family) {
2152 case AF_INET:
2153 s4 = (struct sockaddr_in *)&ctx->c.sockaddr;
2154 inet_ntop(AF_INET, &s4->sin_addr, ipstr, INET6_ADDRSTRLEN);
2155 break;
2156 #ifdef AF_INET6
2157 case AF_INET6:
2158 s6 = (struct sockaddr_in6 *)&ctx->c.sockaddr;
2159 inet_ntop(AF_INET6, &s6->sin6_addr, ipstr, INET6_ADDRSTRLEN);
2160 break;
2161 #endif
2162 case AF_UNSPEC:
2163 default:
2164 DBG("couldn't get sockaddr");
2165 return NGX_ERROR;
2166 }
2167 dst->len = strlen(ipstr);
2168 return NGX_OK;
2169 }
2170
2171
2172 //sneaky channel stuff
nodeset_associate_chanhead(redis_nodeset_t * ns,void * chan)2173 ngx_int_t nodeset_associate_chanhead(redis_nodeset_t *ns, void *chan) {
2174 rdstore_channel_head_t *ch = chan;
2175 if(ch->redis.nodeset && ch->redis.nodeset != ns) {
2176 nodeset_dissociate_chanhead(ch);
2177 }
2178 ngx_memzero(&ch->redis.slist, sizeof(ch->redis.slist));
2179 ch->redis.nodeset = ns;
2180 nchan_slist_append(&ns->channels.all, ch);
2181 return NGX_OK;
2182 }
nodeset_dissociate_chanhead(void * chan)2183 ngx_int_t nodeset_dissociate_chanhead(void *chan) {
2184 rdstore_channel_head_t *ch = chan;
2185 redis_nodeset_t *ns = ch->redis.nodeset;
2186
2187 if(ns) {
2188 if(ch->redis.node.cmd) {
2189 assert(!ch->redis.slist.in_disconnected_cmd_list);
2190 nodeset_node_dissociate_chanhead(ch);
2191 }
2192 else if(ch->redis.slist.in_disconnected_cmd_list) {
2193 ch->redis.slist.in_disconnected_cmd_list = 0;
2194 nchan_slist_remove(&ns->channels.disconnected_cmd, ch);
2195 }
2196
2197 if(ch->redis.node.pubsub) {
2198 assert(!ch->redis.slist.in_disconnected_pubsub_list);
2199 nodeset_node_dissociate_pubsub_chanhead(ch);
2200 }
2201 else if(ch->redis.slist.in_disconnected_pubsub_list) {
2202 ch->redis.slist.in_disconnected_pubsub_list = 0;
2203 nchan_slist_remove(&ns->channels.disconnected_pubsub, ch);
2204 }
2205
2206 ch->redis.nodeset = NULL;
2207 nchan_slist_remove(&ns->channels.all, ch);
2208 }
2209 return NGX_OK;
2210 }
nodeset_node_associate_chanhead(redis_node_t * node,void * chan)2211 ngx_int_t nodeset_node_associate_chanhead(redis_node_t *node, void *chan) {
2212 rdstore_channel_head_t *ch = chan;
2213 assert(ch->redis.node.cmd == NULL);
2214 assert(node->nodeset == ch->redis.nodeset);
2215 assert(ch->redis.slist.in_disconnected_cmd_list == 0);
2216 if(ch->redis.node.cmd != node) { //dat idempotence tho
2217 nchan_slist_append(&node->channels.cmd, ch);
2218 }
2219 ch->redis.node.cmd = node;
2220 return NGX_OK;
2221 }
nodeset_node_associate_pubsub_chanhead(redis_node_t * node,void * chan)2222 ngx_int_t nodeset_node_associate_pubsub_chanhead(redis_node_t *node, void *chan) {
2223 rdstore_channel_head_t *ch = chan;
2224 assert(ch->redis.node.pubsub == NULL);
2225 assert(node->nodeset == ch->redis.nodeset);
2226 assert(ch->redis.slist.in_disconnected_pubsub_list == 0);
2227 if(ch->redis.node.pubsub != node) { //dat idempotence tho
2228 nchan_slist_append(&node->channels.pubsub, ch);
2229 }
2230 ch->redis.node.pubsub = node;
2231 return NGX_OK;
2232 }
nodeset_node_dissociate_chanhead(void * chan)2233 ngx_int_t nodeset_node_dissociate_chanhead(void *chan) {
2234 rdstore_channel_head_t *ch = chan;
2235 if(ch->redis.node.cmd) {
2236 nchan_slist_remove(&ch->redis.node.cmd->channels.cmd, ch);
2237 }
2238 ch->redis.node.cmd = NULL;
2239 return NGX_OK;
2240 }
nodeset_node_dissociate_pubsub_chanhead(void * chan)2241 ngx_int_t nodeset_node_dissociate_pubsub_chanhead(void *chan) {
2242 rdstore_channel_head_t *ch = chan;
2243 if(ch->redis.node.pubsub) {
2244 nchan_slist_remove(&ch->redis.node.pubsub->channels.pubsub, ch);
2245 }
2246 ch->redis.node.pubsub = NULL;
2247 return NGX_OK;
2248 }
2249
nodeset_node_random_master_or_slave(redis_node_t * master)2250 static redis_node_t *nodeset_node_random_master_or_slave(redis_node_t *master) {
2251 redis_nodeset_t *ns = master->nodeset;
2252 int master_total = ns->settings.node_weight.master;
2253 int slave_total = master->peers.slaves.n * ns->settings.node_weight.slave;
2254 int n;
2255
2256 assert(master->role == REDIS_NODE_ROLE_MASTER);
2257
2258 if(master_total + slave_total == 0) {
2259 return master;
2260 }
2261
2262 n = ngx_random() % (slave_total + master_total);
2263 if(n < master_total) {
2264 return master;
2265 }
2266 else {
2267 int i = 0;
2268 n = ngx_random() % master->peers.slaves.n; //random slave
2269 redis_node_t **nodeptr;
2270 for(nodeptr = nchan_list_first(&master->peers.slaves); nodeptr != NULL && i < n; nodeptr = nchan_list_next(nodeptr)) {
2271 i++;
2272 }
2273 if(nodeptr == NULL || (*nodeptr)->state < REDIS_NODE_READY) {
2274 //not ready? play it safe.
2275 //node_log_error(*nodeptr, "got slave, but it's not ready. return master instead");
2276 return master;
2277 }
2278 else {
2279 //node_log_error(*nodeptr, "got slave");
2280 return *nodeptr;
2281 }
2282 }
2283 }
2284
nodeset_node_find_by_chanhead(void * chan)2285 redis_node_t *nodeset_node_find_by_chanhead(void *chan) {
2286 rdstore_channel_head_t *ch = chan;
2287 redis_node_t *node;
2288 if(ch->redis.node.cmd) {
2289 return ch->redis.node.cmd;
2290 }
2291 node = nodeset_node_find_by_channel_id(ch->redis.nodeset, &ch->id);
2292 nodeset_node_associate_chanhead(node, ch);
2293 return node;
2294 }
nodeset_node_pubsub_find_by_chanhead(void * chan)2295 redis_node_t *nodeset_node_pubsub_find_by_chanhead(void *chan) {
2296 rdstore_channel_head_t *ch = chan;
2297 redis_node_t *node;
2298 if(ch->redis.node.pubsub) {
2299 return ch->redis.node.pubsub;
2300 }
2301 node = nodeset_node_find_by_channel_id(ch->redis.nodeset, &ch->id);
2302 node = nodeset_node_random_master_or_slave(node);
2303 nodeset_node_associate_pubsub_chanhead(node, ch);
2304 return ch->redis.node.pubsub;
2305 }
2306
2307 /*
2308 * Copyright 2001-2010 Georges Menie (www.menie.org)
2309 * Copyright 2010 Salvatore Sanfilippo (adapted to Redis coding style)
2310 * All rights reserved.
2311 * Redistribution and use in source and binary forms, with or without
2312 * modification, are permitted provided that the following conditions are met:
2313 *
2314 * * Redistributions of source code must retain the above copyright
2315 * notice, this list of conditions and the following disclaimer.
2316 * * Redistributions in binary form must reproduce the above copyright
2317 * notice, this list of conditions and the following disclaimer in the
2318 * documentation and/or other materials provided with the distribution.
2319 * * Neither the name of the University of California, Berkeley nor the
2320 * names of its contributors may be used to endorse or promote products
2321 * derived from this software without specific prior written permission.
2322 *
2323 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
2324 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
2325 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
2326 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
2327 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
2328 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
2329 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
2330 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
2331 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2332 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2333 */
2334
2335 /* CRC16 implementation according to CCITT standards.
2336 *
2337 * Note by @antirez: this is actually the XMODEM CRC 16 algorithm, using the
2338 * following parameters:
2339 *
2340 * Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN"
2341 * Width : 16 bit
2342 * Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1)
2343 * Initialization : 0000
2344 * Reflect Input byte : False
2345 * Reflect Output CRC : False
2346 * Xor constant to output CRC : 0000
2347 * Output for "123456789" : 31C3
2348 */
2349
2350 static const uint16_t crc16tab[256]= {
2351 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
2352 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
2353 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
2354 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
2355 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
2356 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
2357 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
2358 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
2359 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
2360 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
2361 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
2362 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
2363 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
2364 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
2365 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
2366 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
2367 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
2368 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
2369 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
2370 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
2371 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
2372 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
2373 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
2374 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
2375 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
2376 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
2377 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
2378 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
2379 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
2380 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
2381 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
2382 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
2383 };
2384
redis_crc16(uint16_t crc,const char * buf,int len)2385 uint16_t redis_crc16(uint16_t crc, const char *buf, int len) {
2386 int counter;
2387 for (counter = 0; counter < len; counter++)
2388 crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF];
2389 return crc;
2390 }
2391
2392