1 #include "redis_nodeset.h"
2 
3 #include <assert.h>
4 #include "store-private.h"
5 #include <store/store_common.h>
6 #include "store.h"
7 #include "redis_nginx_adapter.h"
8 #include "redis_nodeset_parser.h"
9 #include "redis_lua_commands.h"
10 
11 //#define DEBUG_LEVEL NGX_LOG_WARN
12 #define DEBUG_LEVEL NGX_LOG_DEBUG
13 
14 #define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "REDIS NODESET: " fmt, ##args)
15 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "REDIS NODESET: " fmt, ##args)
16 
17 static redis_nodeset_t  redis_nodeset[NCHAN_MAX_NODESETS];
18 static int              redis_nodeset_count = 0;
19 static char            *redis_worker_id = NULL;
20 static char            *nchan_redis_blankname = "";
21 static redisCallbackFn *redis_subscribe_callback = NULL;
22 
23 typedef struct {
24   ngx_event_t      ev;
25   ngx_int_t      (*cb)(redis_nodeset_t *, void *);
26   void            *pd;
27   redis_nodeset_t *ns;
28 } nodeset_onready_callback_t;
29 
30 
31 static ngx_str_t       default_redis_url = ngx_string(NCHAN_REDIS_DEFAULT_URL);
32 
33 static void node_connector_callback(redisAsyncContext *ac, void *rep, void *privdata);
34 static int nodeset_cluster_keyslot_space_complete(redis_nodeset_t *ns);
35 static nchan_redis_ip_range_t *node_ip_blacklisted(redis_nodeset_t *ns, redis_connect_params_t *rcp);
36 static char *nodeset_name_cstr(redis_nodeset_t *nodeset, char *buf, size_t maxlen);
37 
rbtree_cluster_keyslots_node_id(void * data)38 static void *rbtree_cluster_keyslots_node_id(void *data) {
39   return &((redis_nodeset_slot_range_node_t *)data)->range;
40 }
rbtree_cluster_keyslots_bucketer(void * vid)41 static uint32_t rbtree_cluster_keyslots_bucketer(void *vid) {
42   return 1; //no buckets
43 }
rbtree_cluster_keyslots_compare(void * v1,void * v2)44 static ngx_int_t rbtree_cluster_keyslots_compare(void *v1, void *v2) {
45   redis_slot_range_t   *r1 = v1;
46   redis_slot_range_t   *r2 = v2;
47 
48   if(r1->max < r2->min) //r1 is strictly left of r2
49     return -1;
50   else if(r1->min > r2->max) //r1 is strictly right of r2
51     return 1;
52   else //there's an overlap
53     return 0;
54 }
55 
reply_str_ok(redisReply * reply)56 static int reply_str_ok(redisReply *reply) {
57   return (reply != NULL && reply->type != REDIS_REPLY_ERROR && reply->type == REDIS_REPLY_STRING);
58 }
reply_status_ok(redisReply * reply)59 static int reply_status_ok(redisReply *reply) {
60   return (
61     reply != NULL
62     && reply->type != REDIS_REPLY_ERROR
63     && reply->type == REDIS_REPLY_STATUS
64     && reply->str
65     && strcmp(reply->str, "OK") == 0
66   );
67 }
68 
nodeset_cluster_node_index_keyslot_ranges(redis_node_t * node)69 static int nodeset_cluster_node_index_keyslot_ranges(redis_node_t *node) {
70   unsigned                         i;
71   ngx_rbtree_node_t               *rbtree_node;
72   redis_nodeset_slot_range_node_t *keyslot_tree_node;
73   rbtree_seed_t                   *tree = &node->nodeset->cluster.keyslots;
74   if(node->cluster.slot_range.indexed) {
75     node_log_error(node, "cluster keyslot range already indexed");
76     return 0;
77   }
78 
79   for(i=0; i<node->cluster.slot_range.n; i++) {
80     if(nodeset_node_find_by_range(node->nodeset, &node->cluster.slot_range.range[i])) { //overlap!
81       return 0;
82     }
83   }
84 
85   for(i=0; i<node->cluster.slot_range.n; i++) {
86     rbtree_node = rbtree_create_node(tree, sizeof(*keyslot_tree_node));
87     keyslot_tree_node = rbtree_data_from_node(rbtree_node);
88     keyslot_tree_node->range = node->cluster.slot_range.range[i];
89     keyslot_tree_node->node = node;
90     if(rbtree_insert_node(tree, rbtree_node) != NGX_OK) {
91       node_log_error(node, "couldn't insert keyslot node range %d-%d", keyslot_tree_node->range.min, keyslot_tree_node->range.max);
92       rbtree_destroy_node(tree, rbtree_node);
93       return 0;
94     }
95     else {
96       node_log_info(node, "inserted keyslot node range %d-%d", keyslot_tree_node->range.min, keyslot_tree_node->range.max);
97     }
98   }
99   node->cluster.slot_range.indexed = 1;
100   return 1;
101 }
102 /*
103 static ngx_int_t print_slot_range_node(rbtree_seed_t *tree, void *node_data, void *privdata) {
104   redis_nodeset_slot_range_node_t        *rangenode = node_data;
105   node_log_notice(rangenode->node, "slots [%d - %d]", rangenode->range.min, rangenode->range.max);
106   return NGX_OK;
107 }
108 */
nodeset_cluster_node_unindex_keyslot_ranges(redis_node_t * node)109 static int nodeset_cluster_node_unindex_keyslot_ranges(redis_node_t *node) {
110   ngx_rbtree_node_t               *rbtree_node;
111   redis_slot_range_t              *range;
112   rbtree_seed_t                   *tree = &node->nodeset->cluster.keyslots;
113   unsigned                         i;
114   if(!node->cluster.slot_range.indexed) {
115     //node_log_notice(node, "already unindexed");
116     return 1;
117   }
118   //node_log_notice(node, "unindex keyslot ranges");
119 
120   //rbtree_walk_incr(tree, print_slot_range_node, NULL);
121 
122   for(i=0; i<node->cluster.slot_range.n; i++) {
123     range = &node->cluster.slot_range.range[i];
124     //node_log_notice(node, "unindexing range [%d - %d]", range->min, range->max);
125     if((rbtree_node = rbtree_find_node(tree, range)) != NULL) {
126       rbtree_remove_node(tree, rbtree_node);
127       rbtree_destroy_node(tree, rbtree_node);
128     }
129     else {
130       node_log_error(node, "unable to unindex keyslot range %d-%d: range not found in tree", range->min, range->max);
131       raise(SIGABRT);
132     }
133   }
134   node->cluster.slot_range.indexed = 0;
135   return 1;
136 }
137 
138 #if REDIS_NODESET_DBG
139 
140 
nodeset_debug_rangetree_collector(rbtree_seed_t * tree,void * node_data,void * privdata)141 static ngx_int_t nodeset_debug_rangetree_collector(rbtree_seed_t *tree, void *node_data, void *privdata) {
142   redis_nodeset_slot_range_node_t     *rangenode = node_data;
143   redis_nodeset_dbg_range_tree_t      *dbg = privdata;
144   dbg->node[dbg->n].range = rangenode->range;
145   dbg->node[dbg->n].node = rangenode->node;
146   dbg->n++;
147   return NGX_OK;
148 }
149 
nodeset_update_debuginfo(redis_nodeset_t * nodeset)150 static redis_node_dbg_list_t *nodeset_update_debuginfo(redis_nodeset_t *nodeset) {
151   rbtree_seed_t         *tree = &nodeset->cluster.keyslots;
152   redis_node_dbg_list_t *node_dbg = &nodeset->dbg.nodes;
153   redis_node_t  *cur;
154   ngx_memzero(&nodeset->dbg, sizeof(nodeset->dbg));
155   for(cur = nchan_list_first(&nodeset->nodes); cur != NULL; cur = nchan_list_next(cur)) {
156     node_dbg->node[node_dbg->n++]=cur;
157   }
158   nodeset->dbg.keyspace_complete = nodeset_cluster_keyslot_space_complete(nodeset);
159   rbtree_walk_incr(tree, nodeset_debug_rangetree_collector, &nodeset->dbg.ranges);
160   return NGX_OK;
161 }
162 #endif
163 
__rcp_cstr(redis_connect_params_t * rcp,char * buf)164 static const char *__rcp_cstr(redis_connect_params_t *rcp, char *buf) {
165   ngx_snprintf((u_char *)buf, 512, "%V:%d%Z", &rcp->hostname, rcp->port, &rcp->peername);
166   return buf;
167 }
168 
__node_cstr(redis_node_t * node,char * buf)169 static const char *__node_cstr(redis_node_t *node, char *buf) {
170   return __rcp_cstr(&node->connect_params, buf);
171 }
172 
rcp_cstr(redis_connect_params_t * rcp)173 static const char *rcp_cstr(redis_connect_params_t *rcp) {
174   static char    buf[512];
175   return __rcp_cstr(rcp, buf);
176 }
node_cstr(redis_node_t * node)177 static const char *node_cstr(redis_node_t *node) {
178   return rcp_cstr(&node->connect_params);
179 }
180 
181 #define MAX_RUN_ID_LENGTH 64
182 #define MAX_CLUSTER_ID_LENGTH 64
183 #define MAX_VERSION_LENGTH 16
184 typedef struct {
185   redis_node_t    node;
186   u_char          peername[INET6_ADDRSTRLEN + 2];
187   u_char          run_id[MAX_RUN_ID_LENGTH];
188   u_char          cluster_id[MAX_CLUSTER_ID_LENGTH];
189   u_char          version[MAX_VERSION_LENGTH];
190 } node_blob_t;
191 
192 static void nodeset_check_status_event(ngx_event_t *ev);
193 
nodeset_ready(redis_nodeset_t * nodeset)194 int nodeset_ready(redis_nodeset_t *nodeset) {
195   return nodeset && nodeset->status == REDIS_NODESET_READY;
196 }
197 
nodeset_initialize(char * worker_id,redisCallbackFn * subscribe_handler)198 ngx_int_t nodeset_initialize(char *worker_id, redisCallbackFn *subscribe_handler) {
199   redis_worker_id = worker_id;
200   redis_subscribe_callback = subscribe_handler;
201   return NGX_OK;
202 }
203 
nodeset_create(nchan_loc_conf_t * lcf)204 redis_nodeset_t *nodeset_create(nchan_loc_conf_t *lcf) {
205   nchan_redis_conf_t  *rcf = &lcf->redis;
206   redis_nodeset_t     *ns = &redis_nodeset[redis_nodeset_count]; //incremented once everything is ok
207   assert(rcf->enabled);
208   assert(!rcf->nodeset);
209 
210   ns->first_loc_conf = lcf;
211 
212   if(redis_nodeset_count >= NCHAN_MAX_NODESETS) {
213     nchan_log_error("Cannot create more than %d Redis nodesets", NCHAN_MAX_NODESETS);
214     return NULL;
215   }
216 
217   assert(!nodeset_find(rcf)); //must be unique
218 
219   nchan_list_init(&ns->urls, sizeof(ngx_str_t *), "redis urls");
220   nchan_list_init(&ns->nodes, sizeof(node_blob_t), "redis nodes");
221   nchan_list_init(&ns->onready_callbacks, sizeof(nodeset_onready_callback_t), "nodeset onReady callbacks");
222 
223   nchan_slist_init(&ns->channels.all, rdstore_channel_head_t, redis.slist.nodeset.prev, redis.slist.nodeset.next);
224   nchan_slist_init(&ns->channels.disconnected_cmd, rdstore_channel_head_t, redis.slist.node_cmd.prev, redis.slist.node_cmd.next);
225   nchan_slist_init(&ns->channels.disconnected_pubsub, rdstore_channel_head_t, redis.slist.node_pubsub.prev, redis.slist.node_pubsub.next);
226 
227   ns->reconnect_delay_sec = 5;
228   ns->current_status_times_checked = 0;
229   ns->current_status_start = 0;
230   ns->generation = 0;
231   ns->settings.namespace = &rcf->namespace;
232   ns->settings.storage_mode = rcf->storage_mode;
233   ns->settings.nostore_fastpublish = rcf->nostore_fastpublish;
234 
235   ns->settings.ping_interval = rcf->ping_interval;
236   ns->settings.cluster_check_interval = rcf->cluster_check_interval;
237 
238   ns->status = REDIS_NODESET_DISCONNECTED;
239   ngx_memzero(&ns->status_check_ev, sizeof(ns->status_check_ev));
240   ns->status_msg = NULL;
241   nchan_init_timer(&ns->status_check_ev, nodeset_check_status_event, ns);
242 
243   //init cluster stuff
244   ns->cluster.enabled = 0;
245   rbtree_init(&ns->cluster.keyslots, "redis cluster node (by keyslot) data", rbtree_cluster_keyslots_node_id, rbtree_cluster_keyslots_bucketer, rbtree_cluster_keyslots_compare);
246 
247   //urls
248   if(rcf->upstream) {
249     nchan_srv_conf_t           *scf = NULL;
250     scf = ngx_http_conf_upstream_srv_conf(rcf->upstream, ngx_nchan_module);
251 
252     ngx_uint_t                   i;
253     ngx_array_t                 *servers = rcf->upstream->servers;
254     ngx_http_upstream_server_t  *usrv = servers->elts;
255     ngx_str_t                   *upstream_url, **urlref;
256     ns->upstream = rcf->upstream;
257 
258     ns->settings.connect_timeout = scf->redis.connect_timeout == NGX_CONF_UNSET_MSEC ? NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC : scf->redis.connect_timeout;
259     ns->settings.node_weight.master = scf->redis.master_weight == NGX_CONF_UNSET ? 1 : scf->redis.master_weight;
260     ns->settings.node_weight.slave = scf->redis.slave_weight == NGX_CONF_UNSET ? 1 : scf->redis.slave_weight;
261 
262     ns->settings.optimize_target = scf->redis.optimize_target == NCHAN_REDIS_OPTIMIZE_UNSET ? NCHAN_REDIS_OPTIMIZE_CPU : scf->redis.optimize_target;
263 
264     ns->settings.blacklist.count = scf->redis.blacklist_count;
265     ns->settings.blacklist.list = scf->redis.blacklist;
266 
267     for(i=0; i < servers->nelts; i++) {
268 #if nginx_version >= 1007002
269       upstream_url = &usrv[i].name;
270 #else
271       upstream_url = &usrv[i].addrs->name;
272 #endif
273       urlref = nchan_list_append(&ns->urls);
274       *urlref = upstream_url;
275     }
276   }
277   else {
278     ns->upstream = NULL;
279     ns->settings.connect_timeout = NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC;
280     ns->settings.node_weight.master = 1;
281     ns->settings.node_weight.slave = 1;
282     ns->settings.blacklist.count = 0;
283     ns->settings.blacklist.list = NULL;
284     ngx_str_t **urlref = nchan_list_append(&ns->urls);
285     *urlref = rcf->url.len > 0 ? &rcf->url : &default_redis_url;
286   }
287   DBG("nodeset created");
288 
289   char buf[1024];
290   nodeset_name_cstr(ns, buf, 1024);
291   if(strlen(buf)>0) {
292     ns->name = ngx_alloc(strlen(buf)+1, ngx_cycle->log);
293     strcpy(ns->name, buf);
294   }
295   else {
296     ns->name = nchan_redis_blankname;
297   }
298   redis_nodeset_count++;
299   rcf->nodeset = ns;
300   return ns;
301 }
302 
nodeset_find(nchan_redis_conf_t * rcf)303 redis_nodeset_t *nodeset_find(nchan_redis_conf_t *rcf) {
304   if(rcf->nodeset) {
305     return rcf->nodeset;
306   }
307   else {
308     int              i;
309     redis_nodeset_t *ns;
310     for(i=0; i<redis_nodeset_count; i++) {
311       ns = &redis_nodeset[i];
312       if(nchan_ngx_str_match(&rcf->namespace, ns->settings.namespace) && rcf->storage_mode == ns->settings.storage_mode) {
313         if(rcf->upstream) {
314           if(ns->upstream == rcf->upstream)
315             return ns;
316         }
317         else {
318           ngx_str_t *search_url = rcf->url.len > 0 ? &rcf->url : &default_redis_url;
319           ngx_str_t **first_url = nchan_list_first(&ns->urls);
320 
321           if(first_url && nchan_ngx_str_match(search_url, *first_url)) {
322             //cache it
323             rcf->nodeset = ns;
324             if(rcf->ping_interval > 0 && ns->settings.ping_interval > rcf->ping_interval) {
325               //use the smallest ping interval found in the settings
326               ns->settings.ping_interval = rcf->ping_interval;
327             }
328             return ns;
329           }
330         }
331       }
332     }
333     return NULL;
334   }
335 }
336 
node_transfer_slaves(redis_node_t * src,redis_node_t * dst)337 static int node_transfer_slaves(redis_node_t *src, redis_node_t *dst) {
338   int transferred = 0;
339   redis_node_t  **cur;
340   for(cur = nchan_list_first(&src->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
341     node_set_master_node(*cur, dst);
342     node_add_slave_node(dst, *cur); //won't be added if it's already there
343     transferred++;
344   }
345   return transferred;
346 }
347 
equal_redis_connect_params(void * d1,void * d2)348 static int equal_redis_connect_params(void *d1, void *d2) {
349   redis_connect_params_t *cp1 = d1;
350   redis_connect_params_t *cp2 = d2;
351   if(cp1->port != cp2->port || cp1->db != cp2->db) {
352     return 0;
353   }
354   if( nchan_ngx_str_nonzero_match(&cp1->hostname, &cp2->hostname)
355    || nchan_ngx_str_nonzero_match(&cp1->peername, &cp2->peername)
356    || nchan_ngx_str_nonzero_match(&cp1->peername, &cp2->hostname)
357    || nchan_ngx_str_nonzero_match(&cp1->hostname, &cp2->peername)) {
358     return 1;
359   }
360   else {
361     return 0;
362   }
363 }
364 
equal_nonzero_strings(void * s1,void * s2)365 static int equal_nonzero_strings(void *s1, void *s2) {
366   return ((ngx_str_t *)s1)->len > 0 && ((ngx_str_t *)s2)->len > 0 &&
367     nchan_ngx_str_match((ngx_str_t *)s1, (ngx_str_t *)s2);
368 }
369 
370 typedef struct {
371   char          *name;
372   off_t          offset;
373   int          (*match)(void *, void *);
374 } node_match_t;
375 
376 static struct {
377   node_match_t    run_id;
378   node_match_t    cluster_id;
379   node_match_t    connect_params;
380 } _node_match = {
381   .run_id =          {"run_id",      offsetof(redis_node_t, run_id),          equal_nonzero_strings},
382   .cluster_id =      {"cluster_id",  offsetof(redis_node_t, cluster.id),      equal_nonzero_strings},
383   .connect_params =  {"url",         offsetof(redis_node_t, connect_params),  equal_redis_connect_params}
384 };
385 
nodeset_node_deduplicate_by(redis_node_t * node,node_match_t * match)386 static int nodeset_node_deduplicate_by(redis_node_t *node, node_match_t *match) {
387   redis_node_t   *cur;
388   void *d1, *d2;
389   d1 = &((char *)node)[match->offset];
390   for(cur = nchan_list_first(&node->nodeset->nodes); cur != NULL; cur = nchan_list_next(cur)) {
391     d2 = &((char *)cur)[match->offset];
392     if(cur != node && match->match(d1, d2)) {
393       node_log_notice(node, "deduplicated by %s", match->name);
394       node_transfer_slaves(node, cur); //node->cur
395       nodeset_node_destroy(node);
396       return 1;
397     }
398   }
399   return 0;
400 }
401 
nodeset_node_deduplicate_by_connect_params(redis_node_t * node)402 int nodeset_node_deduplicate_by_connect_params(redis_node_t *node) {
403   return nodeset_node_deduplicate_by(node, &_node_match.connect_params);
404 }
nodeset_node_deduplicate_by_run_id(redis_node_t * node)405 int nodeset_node_deduplicate_by_run_id(redis_node_t *node) {
406   return nodeset_node_deduplicate_by(node, &_node_match.run_id);
407 }
nodeset_node_deduplicate_by_cluster_id(redis_node_t * node)408 int nodeset_node_deduplicate_by_cluster_id(redis_node_t *node) {
409   return nodeset_node_deduplicate_by(node, &_node_match.cluster_id);
410 }
411 
nodeset_node_find_by(redis_nodeset_t * ns,node_match_t * match,void * data)412 static redis_node_t *nodeset_node_find_by(redis_nodeset_t *ns, node_match_t *match, void *data) {
413   redis_node_t *cur;
414   void *d2;
415   for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
416     d2 = &((char *)cur)[match->offset];
417     if(match->match(data, d2)) {
418       return cur;
419     }
420   }
421   return NULL;
422 }
nodeset_node_find_by_connect_params(redis_nodeset_t * ns,redis_connect_params_t * rcp)423 redis_node_t *nodeset_node_find_by_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
424   return nodeset_node_find_by(ns, &_node_match.connect_params, rcp);
425 }
nodeset_node_find_by_run_id(redis_nodeset_t * ns,ngx_str_t * run_id)426 redis_node_t *nodeset_node_find_by_run_id(redis_nodeset_t *ns, ngx_str_t *run_id) {
427   return nodeset_node_find_by(ns, &_node_match.run_id, run_id);
428 }
nodeset_node_find_by_cluster_id(redis_nodeset_t * ns,ngx_str_t * cluster_id)429 redis_node_t *nodeset_node_find_by_cluster_id(redis_nodeset_t *ns, ngx_str_t *cluster_id) {
430   return nodeset_node_find_by(ns, &_node_match.cluster_id, cluster_id);
431 }
432 
keyslot_ranges_overlap(redis_slot_range_t * r1,redis_slot_range_t * r2)433 static int keyslot_ranges_overlap(redis_slot_range_t *r1, redis_slot_range_t *r2) {
434   return rbtree_cluster_keyslots_compare(r1, r2) == 0;
435 }
436 
nodeset_node_find_by_range(redis_nodeset_t * ns,redis_slot_range_t * range)437 redis_node_t *nodeset_node_find_by_range(redis_nodeset_t *ns, redis_slot_range_t *range) {
438   ngx_rbtree_node_t                   *rbtree_node;
439   redis_nodeset_slot_range_node_t     *keyslot_tree_node;
440 
441   if((rbtree_node = rbtree_find_node(&ns->cluster.keyslots, range)) != NULL) {
442     keyslot_tree_node = rbtree_data_from_node(rbtree_node);
443     assert(keyslot_ranges_overlap(range, &keyslot_tree_node->range));
444     return keyslot_tree_node->node;
445   }
446   else {
447     return NULL;
448   }
449 }
450 
nodeset_node_find_by_slot(redis_nodeset_t * ns,uint16_t slot)451 redis_node_t *nodeset_node_find_by_slot(redis_nodeset_t *ns, uint16_t slot) {
452   redis_slot_range_t range;
453   range.min = slot;
454   range.max = slot;
455   return nodeset_node_find_by_range(ns, &range);
456 }
nodeset_node_find_any_ready_master(redis_nodeset_t * ns)457 redis_node_t *nodeset_node_find_any_ready_master(redis_nodeset_t *ns) {
458   redis_node_t *cur;
459   for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
460     if(cur->state >= REDIS_NODE_READY && cur->role == REDIS_NODE_ROLE_MASTER) {
461       return cur;
462     }
463   }
464   return NULL;
465 }
466 
nodeset_node_find_by_channel_id(redis_nodeset_t * ns,ngx_str_t * channel_id)467 redis_node_t *nodeset_node_find_by_channel_id(redis_nodeset_t *ns, ngx_str_t *channel_id) {
468   redis_node_t      *node;
469   static uint16_t    prefix_crc = 0;
470   uint16_t           slot;
471 
472   if(!ns->cluster.enabled) {
473     node = nodeset_node_find_any_ready_master(ns);
474   }
475   else {
476     if(prefix_crc == 0) {
477       prefix_crc = redis_crc16(0, "channel:", 8);
478     }
479     slot = redis_crc16(prefix_crc, (const char *)channel_id->data, channel_id->len) % 16384;
480     //DBG("channel id %V (key {channel:%V}) slot %i", str, str, slot);
481 
482     node = nodeset_node_find_by_slot(ns, slot);
483   }
484 
485 #if REDIS_NODESET_DBG
486   if(node == NULL) {
487     nodeset_update_debuginfo(ns);
488     raise(SIGABRT);
489   }
490 #endif
491 
492   return node;
493 }
494 
nodeset_node_find_by_key(redis_nodeset_t * ns,ngx_str_t * key)495 redis_node_t *nodeset_node_find_by_key(redis_nodeset_t *ns, ngx_str_t *key) {
496   if(!ns->cluster.enabled) {
497     return nodeset_node_find_any_ready_master(ns);
498   }
499 
500   char        *start, *end;
501   ngx_str_t    hashable;
502   uint16_t     slot;
503 
504   if(((start = memchr(key->data, '{', key->len))) != NULL) {
505     start++;
506     end = memchr(start, '}', key->len - ((u_char *)start - key->data));
507     if(end && end - start > 1) {
508       hashable.data = (u_char *)start;
509       hashable.len = (end - start);
510     }
511     else {
512       hashable = *key;
513       // not quite right -- need to ignore zero-length {} and scan to the next {
514       // but it's good enough for the keys we're using
515     }
516   }
517   else {
518     hashable = *key;
519   }
520   slot = redis_crc16(0, (const char *)hashable.data, hashable.len) % 16384;
521 
522   return nodeset_node_find_by_slot(ns, slot);
523 }
524 
ping_command_callback(redisAsyncContext * ac,void * rep,void * privdata)525 static void ping_command_callback(redisAsyncContext *ac, void *rep, void *privdata) {
526   redisReply                 *reply = rep;
527   redis_node_t               *node = privdata;
528   if(!reply || reply->type == REDIS_REPLY_ERROR || !ac || ac->err) {
529     node_log_error(node, "node ping failed");
530     return;
531   }
532   node_log_debug(node, "node ping command reply ok");
533 }
534 
node_ping_event(ngx_event_t * ev)535 static void node_ping_event(ngx_event_t *ev) {
536   redis_node_t       *node = ev->data;
537   redis_nodeset_t    *ns = node->nodeset;
538   if(!ev->timedout || ngx_exiting || ngx_quit)
539     return;
540 
541   node_log_debug(node, "node ping event");
542 
543   ev->timedout = 0;
544   if(node->state == REDIS_NODE_READY) {
545     assert(node->ctx.cmd);
546 
547     //we used to PUBLISH to the correct keyslot-mapped cluster node
548     //but Redis clusters don't shard the PUBSUB keyspace, so this discrimination isn't necessary
549     //just publish the damn thing if this is a master node, and just a PING for slaves
550     if(node->role == REDIS_NODE_ROLE_MASTER) {
551       redisAsyncCommand(node->ctx.cmd, ping_command_callback, node, "PUBLISH %s ping", redis_worker_id);
552     }
553     else {
554       redisAsyncCommand(node->ctx.cmd, ping_command_callback, node, "PING");
555     }
556 
557     if(ns->settings.ping_interval > 0) {
558       ngx_add_timer(ev, ns->settings.ping_interval * 1000);
559     }
560   }
561 }
562 
rearm_cluster_check_event(ngx_event_t * ev,redis_node_t * node)563 static void rearm_cluster_check_event(ngx_event_t *ev, redis_node_t *node) {
564   time_t              max_interval = node->nodeset->settings.cluster_check_interval;
565   time_t              interval_since_last_check = ngx_time() - node->cluster.last_successful_check;
566 
567   if(interval_since_last_check <= 0 || interval_since_last_check >= max_interval) {
568     ngx_add_timer(ev, max_interval * 1000);
569   }
570   else {
571     ngx_add_timer(ev, (max_interval - interval_since_last_check) * 1000);
572   }
573 }
574 
node_cluster_check_event_callback(redisAsyncContext * ac,void * rep,void * privdata)575 static void node_cluster_check_event_callback(redisAsyncContext *ac, void *rep, void *privdata) {
576   redisReply                 *reply = rep;
577   redis_node_t               *node = privdata;
578 
579   ngx_str_t                   epoch_str;
580   int                         epoch;
581 
582   if(reply == NULL) {
583     node_log_error(node, "CLUSTER INFO command reply is NULL. Node should have already disconnected");
584     return;
585   }
586 
587   if(!reply_str_ok(reply)) {
588     node_log_error(node, "CLUSTER INFO command failed");
589     if(node->state >= REDIS_NODE_READY) {
590       node_disconnect(node, REDIS_NODE_FAILED);
591       nodeset_examine(node->nodeset);
592     }
593     return;
594   }
595   if(!nchan_get_rest_of_line_in_cstr(reply->str, "cluster_current_epoch:", &epoch_str)) {
596     node_log_error(node, "CLUSTER INFO command reply is weird");
597     //why would this happen? dunno, fail node just in case
598     if(node->state >= REDIS_NODE_READY) {
599       node_disconnect(node, REDIS_NODE_FAILED);
600       nodeset_examine(node->nodeset);
601     }
602     return;
603   }
604   if((epoch = ngx_atoi(epoch_str.data, epoch_str.len)) == NGX_ERROR) {
605     node_log_error(node, "CLUSTER INFO command failed to parse current epoch number");
606     //why would this happen? dunno, fail node just in case
607     if(node->state >= REDIS_NODE_READY) {
608       node_disconnect(node, REDIS_NODE_FAILED);
609       nodeset_examine(node->nodeset);
610     }
611     return;
612   }
613 
614   if(node->cluster.current_epoch < epoch) {
615     node_disconnect(node, REDIS_NODE_FAILED);
616     char errstr[512];
617     ngx_snprintf((u_char *)errstr, 512, "config epoch has changed on node %V:%d. Disconnecting to reconfigure.%Z", &(node)->connect_params.hostname, node->connect_params.port);
618     nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, errstr);
619   }
620   else {
621     rearm_cluster_check_event(&node->cluster.check_timer, node);
622   }
623 }
624 
node_cluster_check_event(ngx_event_t * ev)625 static void node_cluster_check_event(ngx_event_t *ev) {
626   if(!ev->timedout || ngx_exiting || ngx_quit)
627     return;
628 
629   redis_node_t       *node = ev->data;
630   redis_nodeset_t    *ns = node->nodeset;
631   time_t              max_interval = ns->settings.cluster_check_interval;
632   time_t              interval_since_last_check = ngx_time() - node->cluster.last_successful_check;
633   ev->timedout = 0;
634 
635   if(node->state != REDIS_NODE_READY || !node->cluster.ok) {
636     rearm_cluster_check_event(ev, node);
637     return;
638   }
639   if(interval_since_last_check < max_interval) {
640     rearm_cluster_check_event(ev, node);
641     return;
642   }
643   redisAsyncCommand(node->ctx.cmd, node_cluster_check_event_callback, node, "CLUSTER INFO");
644 }
645 
nodeset_node_create_with_space(redis_nodeset_t * ns,redis_connect_params_t * rcp,size_t extra_space,void ** extraspace_ptr)646 redis_node_t *nodeset_node_create_with_space(redis_nodeset_t *ns, redis_connect_params_t *rcp, size_t extra_space, void **extraspace_ptr) {
647   assert(!nodeset_node_find_by_connect_params(ns, rcp));
648   node_blob_t      *node_blob;
649   if(extra_space == 0) {
650     assert(extraspace_ptr == NULL);
651     node_blob = nchan_list_append(&ns->nodes);
652   }
653   else {
654     assert(extraspace_ptr);
655     node_blob = nchan_list_append_sized(&ns->nodes, sizeof(*node_blob)+extra_space);
656     if(extra_space) {
657       *extraspace_ptr = (void *)(&node_blob[1]);
658     }
659   }
660   redis_node_t     *node = &node_blob->node;
661 
662   assert((void *)node_blob == (void *)node);
663   assert(node);
664   node->role = REDIS_NODE_ROLE_UNKNOWN,
665   node->state = REDIS_NODE_DISCONNECTED;
666   node->discovered = 0;
667   node->connect_timeout = NULL;
668   node->connect_params = *rcp;
669   node->connect_params.peername.data = node_blob->peername;
670   node->connect_params.peername.len = 0;
671   node->cluster.id.len = 0;
672   node->cluster.id.data = node_blob->cluster_id;
673   node->cluster.enabled = 0;
674   node->cluster.ok = 0;
675   node->cluster.slot_range.indexed = 0;
676   node->cluster.slot_range.n = 0;
677   node->cluster.slot_range.range = NULL;
678   node->cluster.last_successful_check = 0;
679   node->cluster.current_epoch = 0;
680   ngx_memzero(&node->cluster.check_timer, sizeof(node->cluster.check_timer));
681   nchan_init_timer(&node->cluster.check_timer, node_cluster_check_event, node);
682   node->pending_commands = 0;
683   node->run_id.len = 0;
684   node->run_id.data = node_blob->run_id;
685   node->nodeset = ns;
686   node->generation = 0;
687 
688   nchan_slist_init(&node->channels.cmd, rdstore_channel_head_t, redis.slist.node_cmd.prev, redis.slist.node_cmd.next);
689   nchan_slist_init(&node->channels.pubsub, rdstore_channel_head_t, redis.slist.node_pubsub.prev, redis.slist.node_pubsub.next);
690 
691   node->peers.master = NULL;
692   nchan_list_init(&node->peers.slaves, sizeof(redis_node_t *), "node slaves");
693 
694   ngx_memzero(&node->ping_timer, sizeof(node->ping_timer));
695   nchan_init_timer(&node->ping_timer, node_ping_event, node);
696 
697   node->ctx.cmd = NULL;
698   node->ctx.pubsub = NULL;
699   node->ctx.sync = NULL;
700 
701   assert(nodeset_node_find_by_connect_params(ns, rcp));
702   return node;
703 }
704 
nodeset_node_create(redis_nodeset_t * ns,redis_connect_params_t * rcp)705 redis_node_t *nodeset_node_create(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
706   return nodeset_node_create_with_space(ns, rcp, 0, NULL);
707 }
708 
nodeset_node_create_with_connect_params(redis_nodeset_t * ns,redis_connect_params_t * rcp)709 redis_node_t *nodeset_node_create_with_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
710   redis_node_t  *node;
711   u_char        *space;
712   size_t         sz = rcp->hostname.len + rcp->password.len;
713   node = nodeset_node_create_with_space(ns, rcp, sz, (void **)&space);
714   assert(node);
715   node->connect_params.hostname.data = space;
716   node->connect_params.hostname.len = 0;
717   nchan_strcpy(&node->connect_params.hostname, &rcp->hostname, 0);
718   node->connect_params.password.data = &space[rcp->hostname.len];
719   nchan_strcpy(&node->connect_params.password, &rcp->password, 0);
720   return node;
721 }
722 
node_remove_peer(redis_node_t * node,redis_node_t * peer)723 static void node_remove_peer(redis_node_t *node, redis_node_t *peer) {
724   redis_node_t  **cur;
725   if(node->peers.master == peer) {
726     node->peers.master = NULL;
727   }
728 
729   for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
730     if(*cur == peer) {
731       nchan_list_remove(&node->peers.slaves, cur);
732       return;
733     }
734   }
735 }
736 
nodeset_node_destroy(redis_node_t * node)737 ngx_int_t nodeset_node_destroy(redis_node_t *node) {
738   redisAsyncContext *ac;
739   redisContext      *c;
740   node_set_role(node, REDIS_NODE_ROLE_UNKNOWN); //removes from all peer lists, and clears own slave list
741   if((ac = node->ctx.cmd) != NULL) {
742     node->ctx.cmd = NULL;
743     redisAsyncFree(ac);
744   }
745   if((ac = node->ctx.pubsub) != NULL) {
746     node->ctx.pubsub = NULL;
747     redisAsyncFree(ac);
748   }
749   if((c = node->ctx.sync) != NULL) {
750     node->ctx.sync = NULL;
751     redisFree(c);
752   }
753   if(node->connect_timeout) {
754     nchan_abort_oneshot_timer(node->connect_timeout);
755     node->connect_timeout = NULL;
756   }
757   nchan_list_remove(&node->nodeset->nodes, node);
758   return NGX_OK;
759 }
760 
node_discover_slave(redis_node_t * master,redis_connect_params_t * rcp)761 static void node_discover_slave(redis_node_t *master, redis_connect_params_t *rcp) {
762   redis_node_t    *slave;
763   if((slave = nodeset_node_find_by_connect_params(master->nodeset, rcp))!= NULL) {
764     //we know about it already
765     if(slave->role != REDIS_NODE_ROLE_SLAVE && slave->state > REDIS_NODE_GET_INFO) {
766       node_log_notice(slave, "Node appears to have changed to slave -- need to update");
767       node_set_role(slave, REDIS_NODE_ROLE_UNKNOWN);
768       node_disconnect(slave, REDIS_NODE_FAILED);
769       node_connect(slave);
770     }
771     //assert(slave->peers.master == master);
772   }
773   else {
774 
775 
776     slave = nodeset_node_create_with_connect_params(master->nodeset, rcp);
777     slave->discovered = 1;
778     node_set_role(slave, REDIS_NODE_ROLE_SLAVE);
779     node_log_notice(master, "Discovering own slave %s", rcp_cstr(rcp));
780   }
781   node_set_master_node(slave, master); //this is idempotent
782   node_add_slave_node(master, slave);  //so is this
783   //try to connect
784   if(slave->state <= REDIS_NODE_DISCONNECTED) {
785     node_connect(slave);
786   }
787 }
788 
node_discover_master(redis_node_t * slave,redis_connect_params_t * rcp)789 static void node_discover_master(redis_node_t  *slave, redis_connect_params_t *rcp) {
790   redis_node_t *master;
791   if ((master = nodeset_node_find_by_connect_params(slave->nodeset, rcp)) != NULL) {
792       if(master->role != REDIS_NODE_ROLE_MASTER && master->state > REDIS_NODE_GET_INFO) {
793         node_log_notice(master, "Node appears to have changed to master -- need to update");
794         node_set_role(master, REDIS_NODE_ROLE_UNKNOWN);
795         node_disconnect(master, REDIS_NODE_FAILED);
796         node_connect(master);
797       }
798     //assert(node_find_slave_node(master, slave));
799     //node_log_notice(slave, "Discovering master %s... already known", rcp_cstr(rcp));
800   }
801   else {
802     master = nodeset_node_create_with_connect_params(slave->nodeset, rcp);
803     master->discovered = 1;
804     node_set_role(master, REDIS_NODE_ROLE_MASTER);
805     node_log_notice(slave, "Discovering own master %s", rcp_cstr(rcp));
806   }
807   node_set_master_node(slave, master);
808   node_add_slave_node(master, slave);
809   //try to connect
810   if(master->state <= REDIS_NODE_DISCONNECTED) {
811     node_connect(master);
812   }
813 }
814 
node_skip_cluster_peer(redis_node_t * node,cluster_nodes_line_t * l)815 static int node_skip_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l) {
816   redis_connect_params_t   rcp;
817   char                    *description = "";
818   char                    *detail = "";
819   char                     detail_buf[64];
820   char                    *role = NULL;
821   ngx_uint_t               loglevel = NGX_LOG_NOTICE;
822   nchan_redis_ip_range_t  *matched;
823   rcp.hostname = l->hostname;
824   rcp.port = l->port;
825   rcp.peername.len = 0;
826   rcp.db = node->connect_params.db;
827   rcp.password = node->connect_params.password;
828 
829   if(l->noaddr) {
830     role = "node";
831     description = "no-address";
832     return 1;
833   }
834   else if(l->handshake) {
835     role = "node";
836     description = "handshaking";
837 
838   }
839   else if(l->hostname.len == 0) {
840     role = "node";
841     description = "empty hostname";
842   }
843   else if(l->failed) {
844     description = "failed";
845   }
846   else if(!l->connected) {
847     description = "disconnected";
848   }
849   else if(l->self) {
850     description = "self";
851     loglevel = NGX_LOG_INFO;
852   }
853   else if((matched = node_ip_blacklisted(node->nodeset, &rcp)) != NULL) {
854     description = "blacklisted";
855     detail = detail_buf;
856     ngx_snprintf((u_char *)detail_buf, 64, " (matched blacklist entry %V)%Z", &matched->str);
857   }
858   else {
859     return 0;
860   }
861   if(!role) role = l->master ? "master" : "slave";
862   nodeset_log(node->nodeset, loglevel, "Skipping %s %s node %s%s", description, role, rcp_cstr(&rcp), detail);
863   return 1;
864 }
865 
node_discover_cluster_peer(redis_node_t * node,cluster_nodes_line_t * l)866 static int node_discover_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l) {
867   redis_connect_params_t   rcp;
868   redis_node_t            *peer;
869   assert(!l->self);
870   if(l->failed || !l->connected || l->noaddr || l->self) {
871     return 0;
872   }
873   rcp.hostname = l->hostname;
874   rcp.port = l->port;
875   rcp.peername.len = 0;
876   rcp.db = node->connect_params.db;
877   rcp.password = node->connect_params.password;
878 
879   if( ((peer = nodeset_node_find_by_connect_params(node->nodeset, &rcp)) != NULL)
880    || ((peer = nodeset_node_find_by_cluster_id(node->nodeset, &l->id)) != NULL)
881   ) {
882     //node_log_notice(node, "Discovering cluster node %s... already known", rcp_cstr(&rcp));
883     return 0; //we already know this one.
884   }
885   nodeset_log_notice(node->nodeset, "Discovering cluster %s %s", (l->master ? "master" : "slave"), rcp_cstr(&rcp));
886   peer = nodeset_node_create_with_connect_params(node->nodeset, &rcp);
887   peer->discovered = 1;
888   nchan_strcpy(&peer->cluster.id, &l->id, MAX_CLUSTER_ID_LENGTH);
889   node_set_role(peer, l->master ? REDIS_NODE_ROLE_MASTER : REDIS_NODE_ROLE_SLAVE);
890   //ignore all the other things for now
891   node_connect(peer);
892   return 1;
893 }
894 
895 static ngx_int_t set_preallocated_peername(redisAsyncContext *ctx, ngx_str_t *dst);
896 
node_connector_fail(redis_node_t * node,const char * err)897 static void node_connector_fail(redis_node_t *node, const char *err) {
898   const char  *ctxerr = NULL;
899   if(node->ctx.cmd && node->ctx.cmd->err) {
900     ctxerr = node->ctx.cmd->errstr;
901   }
902   else if(node->ctx.pubsub && node->ctx.pubsub->err) {
903     ctxerr = node->ctx.pubsub->errstr;
904   }
905   else if(node->ctx.sync && node->ctx.sync->err) {
906     ctxerr = node->ctx.sync->errstr;
907   }
908   if(node->state == REDIS_NODE_CONNECTION_TIMED_OUT) {
909     node_log_error(node, "connection failed: %s", err);
910   }
911   else if(ctxerr) {
912     node_log_error(node, "connection failed: %s (%s)", err, ctxerr);
913   }
914   else {
915     node_log_error(node, "connection failed: %s", err);
916   }
917   node_disconnect(node, REDIS_NODE_FAILED);
918 }
919 
node_connect(redis_node_t * node)920 int node_connect(redis_node_t *node) {
921   assert(node->state <= REDIS_NODE_DISCONNECTED);
922   node_connector_callback(NULL, NULL, node);
923   return 1;
924 }
925 
node_disconnect(redis_node_t * node,int disconnected_state)926 int node_disconnect(redis_node_t *node, int disconnected_state) {
927   ngx_int_t prev_state = node->state;
928   node_log_debug(node, "disconnect");
929   redisAsyncContext *ac;
930   redisContext      *c;
931   if((ac = node->ctx.cmd) != NULL) {
932     node->ctx.cmd->onDisconnect = NULL;
933     node->ctx.cmd = NULL;
934     //redisAsyncSetDisconnectCallback(ac, NULL); //this only sets the callback if it's currently null...
935     redisAsyncFree(ac);
936     node_log_debug(node, "redisAsyncFree %p", ac);
937   }
938   if((ac = node->ctx.pubsub) != NULL) {
939     node->ctx.pubsub->onDisconnect = NULL;
940     node->ctx.pubsub = NULL;
941     //redisAsyncSetDisconnectCallback(ac, NULL);  //this only sets the callback if it's currently null...
942     redisAsyncFree(ac);
943     node_log_debug(node, "redisAsyncFree pubsub %p", ac);
944   }
945   if((c = node->ctx.sync) != NULL) {
946     node->ctx.sync = NULL;
947     redisFree(c);
948   }
949   if(node->connect_timeout) {
950     nchan_abort_oneshot_timer(node->connect_timeout);
951     node->connect_timeout = NULL;
952   }
953 
954   node->state = disconnected_state;
955   if(prev_state >= REDIS_NODE_READY) {
956     nchan_update_stub_status(redis_connected_servers, -1);
957   }
958   if(node->cluster.enabled) {
959     nodeset_cluster_node_unindex_keyslot_ranges(node);
960   }
961   if(node->cluster.slot_range.range) {
962     ngx_free(node->cluster.slot_range.range);
963     node->cluster.slot_range.n=0;
964     node->cluster.slot_range.range = NULL;
965   }
966   if(node->ping_timer.timer_set) {
967     ngx_del_timer(&node->ping_timer);
968   }
969   if(node->cluster.check_timer.timer_set) {
970     ngx_del_timer(&node->cluster.check_timer);
971   }
972 
973   rdstore_channel_head_t *cur;
974   nchan_slist_t *cmd = &node->channels.cmd;
975   nchan_slist_t *pubsub = &node->channels.pubsub;
976   nchan_slist_t *disconnected_cmd = &node->nodeset->channels.disconnected_cmd;
977   nchan_slist_t *disconnected_pubsub = &node->nodeset->channels.disconnected_pubsub;
978 
979   for(cur = nchan_slist_first(cmd); cur != NULL; cur = nchan_slist_first(cmd)) {
980     nodeset_node_dissociate_chanhead(cur);
981     nchan_slist_append(disconnected_cmd, cur);
982     cur->redis.slist.in_disconnected_cmd_list = 1;
983     if(cur->status && cur->status == READY) {
984       cur->status = NOTREADY;
985     }
986   }
987   for(cur = nchan_slist_first(pubsub); cur != NULL; cur = nchan_slist_first(pubsub)) {
988     nodeset_node_dissociate_pubsub_chanhead(cur);
989     nchan_slist_append(disconnected_pubsub, cur);
990     cur->redis.slist.in_disconnected_pubsub_list = 1;
991 
992     cur->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
993 
994     if(cur->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP && cur->status == READY) {
995       cur->status = NOTREADY;
996     }
997 
998   }
999   return 1;
1000 }
1001 
node_set_role(redis_node_t * node,redis_node_role_t role)1002 void node_set_role(redis_node_t *node, redis_node_role_t role) {
1003   if(node->role == role) {
1004     return;
1005   }
1006   node->role = role;
1007   redis_node_t  **cur;
1008   switch(node->role) {
1009     case REDIS_NODE_ROLE_UNKNOWN:
1010       if(node->peers.master) {
1011         node_remove_peer(node->peers.master, node);
1012         DBG("removed %p from peers of %p", node->peers.master, node);
1013         node->peers.master = NULL;
1014       }
1015       for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
1016         node_remove_peer(*cur, node);
1017       }
1018       nchan_list_empty(&node->peers.slaves);
1019       break;
1020 
1021     case REDIS_NODE_ROLE_MASTER:
1022       if(node->peers.master) {
1023         node_remove_peer(node->peers.master, node);
1024         node->peers.master = NULL;
1025       }
1026       break;
1027 
1028     case REDIS_NODE_ROLE_SLAVE:
1029       //do nothing
1030       break;
1031 
1032   }
1033 }
1034 
node_set_master_node(redis_node_t * node,redis_node_t * master)1035 int node_set_master_node(redis_node_t *node, redis_node_t *master) {
1036   if(node->peers.master && node->peers.master != master) {
1037     node_remove_slave_node(master, node);
1038   }
1039   node->peers.master = master;
1040   return 1;
1041 }
node_find_slave_node(redis_node_t * node,redis_node_t * slave)1042 redis_node_t *node_find_slave_node(redis_node_t *node, redis_node_t *slave) {
1043   redis_node_t **cur;
1044   for(cur = nchan_list_first(&node->peers.slaves); cur != NULL; cur = nchan_list_next(cur)) {
1045     if (*cur == slave) {
1046       return slave;
1047     }
1048   }
1049   return NULL;
1050 }
node_add_slave_node(redis_node_t * node,redis_node_t * slave)1051 int node_add_slave_node(redis_node_t *node, redis_node_t *slave) {
1052   if(!node_find_slave_node(node, slave)) {
1053     redis_node_t **slaveref;
1054     slaveref = nchan_list_append(&node->peers.slaves);
1055     *slaveref = slave;
1056     return 1;
1057   }
1058   return 1;
1059 }
node_remove_slave_node(redis_node_t * node,redis_node_t * slave)1060 int node_remove_slave_node(redis_node_t *node, redis_node_t *slave) {
1061   if(!node_find_slave_node(node, slave)) {
1062     nchan_list_remove(&node->peers.slaves, slave);
1063   }
1064   return 1;
1065 }
1066 
node_parseinfo_set_preallocd_str(redis_node_t * node,ngx_str_t * target,const char * info,const char * linestart,size_t maxlen)1067 static int node_parseinfo_set_preallocd_str(redis_node_t *node, ngx_str_t *target, const char *info, const char *linestart, size_t maxlen) {
1068   ngx_str_t found;
1069   if(nchan_get_rest_of_line_in_cstr(info, linestart, &found)) {
1070     if(found.len > maxlen) {
1071       node_log_error(node, "\"%s\" is too long", linestart);
1072       return 0;
1073     }
1074     else {
1075       target->len = found.len;
1076       ngx_memcpy(target->data, found.data, found.len);
1077       return 1;
1078     }
1079   }
1080   return 0;
1081 }
1082 
node_parseinfo_set_run_id(redis_node_t * node,const char * info)1083 static int node_parseinfo_set_run_id(redis_node_t *node, const char *info) {
1084   return node_parseinfo_set_preallocd_str(node, &node->run_id, info, "run_id:", MAX_RUN_ID_LENGTH);
1085 }
1086 
node_connector_loadscript_reply_ok(redis_node_t * node,redis_lua_script_t * script,redisReply * reply)1087 static int node_connector_loadscript_reply_ok(redis_node_t *node, redis_lua_script_t *script, redisReply *reply) {
1088   if (reply == NULL) {
1089     node_log_error(node, "missing reply after loading Redis Lua script %s", script->name);
1090     return 0;
1091   }
1092   switch(reply->type) {
1093     case REDIS_REPLY_ERROR:
1094       node_log_error(node, "failed loading Redis Lua script %s: %s", script->name, reply->str);
1095       return 0;
1096 
1097     case REDIS_REPLY_STRING:
1098       if(ngx_strncmp(reply->str, script->hash, REDIS_LUA_HASH_LENGTH)!=0) {
1099         node_log_error(node, "Lua script %s has unexpected hash %s (expected %s)", script->name, reply->str, script->hash);
1100         return 0;
1101       }
1102       else {
1103         return 1;
1104       }
1105       break;
1106 
1107     default:
1108       node_log_error(node, "unexpected reply type while loading Redis Lua script %s", script->name);
1109       return 0;
1110   }
1111 }
1112 
redis_nginx_unexpected_disconnect_event_handler(const redisAsyncContext * ac,int status)1113 static void redis_nginx_unexpected_disconnect_event_handler(const redisAsyncContext *ac, int status) {
1114   redis_node_t    *node = ac->data;
1115   //char            *which_ctx;
1116   //DBG("unexpected disconnect event handler ac %p", ac);
1117   if(node) {
1118     if(node->ctx.cmd == ac) {
1119       //which_ctx = "cmd";
1120       node->ctx.cmd = NULL;
1121     }
1122     else if(node->ctx.pubsub == ac) {
1123       node->ctx.pubsub = NULL;
1124       //which_ctx = "pubsub";
1125     }
1126     else {
1127       node_log_error(node, "unknown redisAsyncContext disconnected");
1128       //which_ctx = "unknown";
1129     }
1130 
1131     if(node->state >= REDIS_NODE_READY && !ngx_exiting && !ngx_quit) {
1132       if(ac->err) {
1133         node_log_error(node, "connection lost: %s.", ac->errstr);
1134       }
1135       else {
1136         node_log_error(node, "connection lost");
1137       }
1138     }
1139     node_disconnect(node, REDIS_NODE_FAILED);
1140     nodeset_examine(node->nodeset);
1141   }
1142 }
1143 
redis_nginx_connect_event_handler(const redisAsyncContext * ac,int status)1144 static void redis_nginx_connect_event_handler(const redisAsyncContext *ac, int status) {
1145   node_connector_callback((redisAsyncContext *)ac, NULL, ac->data);
1146 }
1147 
node_connect_context(redis_node_t * node,ngx_str_t * host,ngx_int_t port)1148 static redisAsyncContext *node_connect_context(redis_node_t *node, ngx_str_t *host, ngx_int_t port) {
1149   redisAsyncContext *ctx = redis_nginx_open_context(host, port, node);
1150   if(ctx) {
1151     redisAsyncSetConnectCallback(ctx, redis_nginx_connect_event_handler);
1152     redisAsyncSetDisconnectCallback(ctx, redis_nginx_unexpected_disconnect_event_handler);
1153   }
1154   return ctx;
1155 }
1156 
node_ip_blacklisted(redis_nodeset_t * ns,redis_connect_params_t * rcp)1157 static nchan_redis_ip_range_t *node_ip_blacklisted(redis_nodeset_t *ns, redis_connect_params_t *rcp) {
1158   struct addrinfo *res;
1159   char hostname_buf[128];
1160   ngx_memzero(hostname_buf, sizeof(hostname_buf));
1161   ngx_memcpy(hostname_buf, rcp->hostname.data, rcp->hostname.len);
1162 
1163   if(getaddrinfo(hostname_buf, NULL, NULL, &res) != 0) {
1164     nodeset_log_error(ns, "Failed to getaddrinfo for hostname %V while deciding if IP is blacklisted");
1165     return NULL;
1166   }
1167   int fam = res->ai_family;
1168   union {
1169     struct in_addr  ipv4;
1170 #ifdef AF_INET6
1171     struct in6_addr ipv6;
1172 #endif
1173   } addr;
1174 
1175   if(res->ai_family == AF_INET) {
1176     addr.ipv4 = ((struct sockaddr_in *)res->ai_addr)->sin_addr;
1177   }
1178 #ifdef AF_INET6
1179   else if(res->ai_family == AF_INET6) {
1180     addr.ipv6 = ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr;
1181   }
1182 #endif
1183   freeaddrinfo(res);
1184   int i;
1185   for(i=0; i < ns->settings.blacklist.count; i++) {
1186     nchan_redis_ip_range_t *entry = &ns->settings.blacklist.list[i];
1187     if(entry->family != fam) {
1188       continue;
1189     }
1190     if(fam == AF_INET) {
1191       if(entry->addr_block.ipv4.s_addr == (addr.ipv4.s_addr & entry->mask.ipv4.s_addr)) {
1192         return entry;
1193       }
1194     }
1195 #ifdef AF_INET6
1196     else if(fam == AF_INET6) {
1197       unsigned           j;
1198       struct in6_addr    buf = addr.ipv6;
1199       uint8_t           *masked_addr = entry->addr_block.ipv6.s6_addr;
1200       uint8_t           *mask = entry->mask.ipv6.s6_addr;
1201 
1202       for(j=0; j<sizeof(entry->addr_block.ipv6.s6_addr); j++) {
1203         masked_addr[j] &= mask[j];
1204       }
1205 
1206       if(memcmp(entry->addr_block.ipv6.s6_addr, &buf, sizeof(buf)) == 0) {
1207         return entry;
1208       }
1209     }
1210 #endif
1211   }
1212 
1213   return NULL;
1214 }
1215 
node_discover_slaves_from_info_reply(redis_node_t * node,redisReply * reply)1216 static int node_discover_slaves_from_info_reply(redis_node_t *node, redisReply *reply) {
1217   redis_connect_params_t   *rcp;
1218   size_t                    i, n;
1219   if(!(rcp = parse_info_slaves(node, reply->str, &n))) {
1220     return 0;
1221   }
1222   for(i=0; i<n; i++) {
1223     nchan_redis_ip_range_t *matched = node_ip_blacklisted(node->nodeset, &rcp[i]);
1224     if(matched) {
1225       nodeset_log_notice(node->nodeset, "Skipping slave node %V blacklisted by %V", &rcp->hostname, &matched->str);
1226     }
1227     else {
1228       node_discover_slave(node, &rcp[i]);
1229     }
1230   }
1231   return 1;
1232 }
1233 
nodeset_node_keyslot_changed(redis_node_t * node)1234 int nodeset_node_keyslot_changed(redis_node_t *node) {
1235   if(node->state >= REDIS_NODE_READY) {
1236     node_disconnect(node, REDIS_NODE_FAILED);
1237   }
1238 
1239   char errstr[512];
1240   ngx_snprintf((u_char *)errstr, 512, "cluster keyspace needs to be updated as reported by node %V:%d%Z", &(node)->connect_params.hostname, node->connect_params.port);
1241   nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, errstr);
1242   return 1;
1243 }
1244 
nodeset_node_reply_keyslot_ok(redis_node_t * node,redisReply * reply)1245 int nodeset_node_reply_keyslot_ok(redis_node_t *node, redisReply *reply) {
1246   if(reply && reply->type == REDIS_REPLY_ERROR) {
1247     char    *script_nonlocal_key_error = "Lua script attempted to access a non local key in a cluster node";
1248     char    *script_error_start = "ERR Error running script";
1249     char    *command_move_error = "MOVED ";
1250     char    *command_ask_error = "ASK ";
1251 
1252     if((nchan_cstr_startswith(reply->str, script_error_start) && nchan_strstrn(reply->str, script_nonlocal_key_error))
1253      || nchan_cstr_startswith(reply->str, command_move_error)
1254      || nchan_cstr_startswith(reply->str, command_ask_error)) {
1255       if(!node->cluster.enabled) {
1256         node_log_error(node, "got a cluster error on a non-cluster redis connection: %s", reply->str);
1257         node_disconnect(node, REDIS_NODE_FAILED);
1258         nodeset_set_status(node->nodeset, REDIS_NODESET_CLUSTER_FAILING, "Strange response from node");
1259       }
1260       else {
1261         nodeset_node_keyslot_changed(node);
1262       }
1263       return 0;
1264     }
1265   }
1266   if(node->cluster.enabled) {
1267     node->cluster.last_successful_check = ngx_time();
1268   }
1269   return 1;
1270 }
1271 
node_subscribe_callback(redisAsyncContext * ac,void * rep,void * privdata)1272 static void node_subscribe_callback(redisAsyncContext *ac, void *rep, void *privdata) {
1273   redisReply                 *reply = rep;
1274   redis_node_t               *node = privdata;
1275   if(node->state == REDIS_NODE_SUBSCRIBING_WORKER) {
1276     node_connector_callback(ac, rep, privdata);
1277   }
1278   else if(reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 3
1279    && reply->element[0]->type == REDIS_REPLY_STRING
1280    && reply->element[1]->type == REDIS_REPLY_STRING
1281    && reply->element[2]->type == REDIS_REPLY_STRING
1282    && strcmp(reply->element[0]->str, "message") == 0
1283    && strcmp(reply->element[1]->str, redis_worker_id) == 0
1284    && strcmp(reply->element[2]->str, "ping") == 0
1285   ) {
1286     node_log_debug(node, "received PUBSUB ping message");
1287   }
1288   else {
1289     redis_subscribe_callback(ac, rep, privdata);
1290   }
1291 }
1292 
node_connector_connect_timeout(void * pd)1293 void node_connector_connect_timeout(void *pd) {
1294   redis_node_t  *node = pd;
1295   node->connect_timeout = NULL;
1296   if(node->state == REDIS_NODE_CMD_CONNECTING || node->state == REDIS_NODE_PUBSUB_CONNECTING) {
1297     //onConnect won't be fired, so the connector must be called manually
1298     node->state = REDIS_NODE_CONNECTION_TIMED_OUT;
1299     node_connector_callback(NULL, NULL, node);
1300   }
1301   else {
1302     node_disconnect(node, REDIS_NODE_CONNECTION_TIMED_OUT);
1303   }
1304 }
1305 
node_connector_callback(redisAsyncContext * ac,void * rep,void * privdata)1306 static void node_connector_callback(redisAsyncContext *ac, void *rep, void *privdata) {
1307   redisReply                 *reply = rep;
1308   redis_node_t               *node = privdata;
1309   redis_nodeset_t            *nodeset = node->nodeset;
1310   char                        errstr[1024];
1311   redis_connect_params_t     *cp = &node->connect_params;
1312   redis_lua_script_t         *next_script = (redis_lua_script_t *)&redis_lua_scripts;
1313   node_log_debug(node, "node_connector_callback state %d", node->state);
1314   ngx_str_t                   rest;
1315 
1316   switch(node->state) {
1317     case REDIS_NODE_CONNECTION_TIMED_OUT:
1318       return node_connector_fail(node, "connection timed out");
1319       break;
1320     case REDIS_NODE_FAILED:
1321     case REDIS_NODE_DISCONNECTED:
1322       assert(!node->connect_timeout);
1323       if((node->ctx.cmd = node_connect_context(node, &cp->hostname, cp->port)) == NULL) { //always connect the cmd ctx to the hostname
1324         return node_connector_fail(node, "failed to open redis async context for cmd");
1325       }
1326       else if(cp->peername.len == 0) { //don't know peername yet
1327         set_preallocated_peername(node->ctx.cmd, &cp->peername);
1328       }
1329       node->connect_timeout = nchan_add_oneshot_timer(node_connector_connect_timeout, node, nodeset->settings.connect_timeout);
1330       node->state = REDIS_NODE_CMD_CONNECTING;
1331       break; //wait until the onConnect callback brings us back
1332 
1333     case REDIS_NODE_CMD_CONNECTING:
1334       if(ac->err || ac->c.err) {
1335         node->ctx.cmd = NULL; //to avoid calling redisAsyncFree on the ctx during node_disconnect()
1336         //(it will be called automatically when this function returns control back to hiredis
1337         return node_connector_fail(node, ac->errstr);
1338       }
1339       if((node->ctx.pubsub = node_connect_context(node, &cp->peername, cp->port)) == NULL) {
1340         return node_connector_fail(node, "failed to open redis async context for pubsub");
1341       }
1342       node->state++;
1343       break; //wait until the onConnect callback brings us back
1344 
1345     case REDIS_NODE_PUBSUB_CONNECTING:
1346       if(ac->err || ac->c.err) {
1347         node->ctx.pubsub = NULL; //to avoid calling redisAsyncFree on the ctx during node_disconnect()
1348         //(it will be called automatically when this function returns control back to hiredis
1349         ngx_snprintf((u_char *)errstr, 1024, "(pubsub) %s%Z", ac->errstr);
1350         return node_connector_fail(node, errstr);
1351       }
1352       //connection established. move on...
1353       node->state++;
1354       /* fall through */
1355     case REDIS_NODE_CONNECTED:
1356       //now we need to authenticate maybe?
1357       if(cp->password.len > 0) {
1358         if(!node->ctx.cmd) {
1359           return node_connector_fail(node, "cmd connection missing, can't send AUTH command");
1360         }
1361         redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "AUTH %b", STR(&cp->password));
1362         node->state++;
1363       }
1364       else {
1365         node->state = REDIS_NODE_SELECT_DB;
1366         return node_connector_callback(NULL, NULL, node); //continue as if authenticated
1367       }
1368       break;
1369 
1370     case REDIS_NODE_CMD_AUTHENTICATING:
1371       if(!reply_status_ok(reply)) {
1372         return node_connector_fail(node, "AUTH command failed");
1373       }
1374       if(!node->ctx.pubsub) {
1375         return node_connector_fail(node, "pubsub connection missing, can't send AUTH command");
1376       }
1377       //now authenticate pubsub ctx
1378       redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "AUTH %b", STR(&cp->password));
1379       node->state++;
1380       break;
1381 
1382     case REDIS_NODE_PUBSUB_AUTHENTICATING:
1383       if(!reply_status_ok(reply)) {
1384         return node_connector_fail(node, "AUTH command failed");
1385       }
1386       node->state++;
1387       /* fall through */
1388     case REDIS_NODE_SELECT_DB:
1389       if(cp->db > 0) {
1390         if(!node->ctx.cmd) {
1391           return node_connector_fail(node, "cmd connection missing, SELECT command");
1392         }
1393         redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SELECT %d", cp->db);
1394         node->state++;
1395       }
1396       else {
1397         node->state = REDIS_NODE_SCRIPTS_LOAD;
1398         return node_connector_callback(NULL, NULL, node);
1399       }
1400       break;
1401 
1402     case REDIS_NODE_CMD_SELECTING_DB:
1403       if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1404         return node_connector_fail(node, "Redis SELECT command failed,");
1405       }
1406       if(!node->ctx.cmd) {
1407         return node_connector_fail(node, "pubsub connection missing, can't send SELECT command");
1408       }
1409       redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "SELECT %d", cp->db);
1410       node->state++;
1411       break;
1412 
1413     case REDIS_NODE_PUBSUB_SELECTING_DB:
1414       if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1415         return node_connector_fail(node, "Redis SELECT command failed,");
1416       }
1417       node->state++; // fallthru
1418       /* fall through */
1419     case REDIS_NODE_SCRIPTS_LOAD:
1420       node->scripts_loaded = 0;
1421       if(!node->ctx.cmd) {
1422         return node_connector_fail(node, "cmd connection missing, can't send SCRIPT LOAD command");
1423       }
1424       redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SCRIPT LOAD %s", next_script->script);
1425       node->state++;
1426       break;
1427 
1428     case REDIS_NODE_SCRIPTS_LOADING:
1429       next_script = &next_script[node->scripts_loaded];
1430 
1431       if(!node_connector_loadscript_reply_ok(node, next_script, reply)) {
1432         return node_connector_fail(node, "SCRIPT LOAD failed,");
1433       }
1434       else {
1435         //node_log_debug(node, "loaded script %s", next_script->name);
1436         node->scripts_loaded++;
1437         next_script++;
1438       }
1439       if(node->scripts_loaded < redis_lua_scripts_count) {
1440         //load next script
1441         if(!node->ctx.cmd) {
1442           return node_connector_fail(node, "cmd connection missing, can't send SCRIPT LOAD command");
1443         }
1444         redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "SCRIPT LOAD %s", next_script->script);
1445         return;
1446       }
1447       node_log_debug(node, "all scripts loaded");
1448       node->state++;
1449       /* fall through */
1450     case REDIS_NODE_GET_INFO:
1451       //getinfo time
1452       if(!node->ctx.cmd) {
1453         return node_connector_fail(node, "cmd connection missing, can't send INFO ALL command");
1454       }
1455       redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "INFO ALL");
1456       node->state++;
1457       break;
1458 
1459     case REDIS_NODE_GETTING_INFO:
1460       if(reply && reply->type == REDIS_REPLY_ERROR && nchan_cstr_startswith(reply->str, "NOAUTH")) {
1461         return node_connector_fail(node, "authentication required");
1462       }
1463       else if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1464         return node_connector_fail(node, "INFO command failed");
1465       }
1466       else if(!node_parseinfo_set_run_id(node, reply->str)) {
1467         return node_connector_fail(node, "failed to set node run_id");
1468       }
1469       else if(nodeset_node_deduplicate_by_run_id(node)) {
1470         // this node already exists
1471         // the deduplication has deleted it; we're done here.
1472         // commence rejoicing.
1473         return;
1474       }
1475 
1476       if(nchan_cstr_match_line(reply->str, "loading:1")) {
1477         return node_connector_fail(node, "is busy loading data...");
1478       }
1479 
1480       if(nchan_cstr_match_line(reply->str, "cluster_enabled:1")) {
1481         node->cluster.enabled = 1;
1482       }
1483 
1484       if(nchan_cstr_match_line(reply->str, "role:master")) {
1485         node_set_role(node, REDIS_NODE_ROLE_MASTER);
1486         if(!node->cluster.enabled && !node_discover_slaves_from_info_reply(node, reply)) {
1487           return node_connector_fail(node, "failed parsing slaves from INFO");
1488         }
1489       }
1490       else if(nchan_cstr_match_line(reply->str, "role:slave")) {
1491         redis_connect_params_t   *rcp;
1492         node_set_role(node, REDIS_NODE_ROLE_SLAVE);
1493         if(!(rcp = parse_info_master(node, reply->str))) {
1494           return node_connector_fail(node, "failed parsing master from INFO");
1495         }
1496         if(!node->cluster.enabled) {
1497           node_discover_master(node, rcp);
1498         }
1499       }
1500       else {
1501         return node_connector_fail(node, "can't tell if node is master or slave");
1502       }
1503       node->state++;
1504       /* fall through */
1505     case REDIS_NODE_PUBSUB_GET_INFO:
1506       if(!node->ctx.pubsub) {
1507         return node_connector_fail(node, "cmd connection missing, can't send INFO SERVER command");
1508       }
1509       redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "INFO SERVER");
1510       node->state++;
1511       break;
1512     case REDIS_NODE_PUBSUB_GETTING_INFO:
1513       if(reply && reply->type == REDIS_REPLY_ERROR && nchan_cstr_startswith(reply->str, "NOAUTH")) {
1514         return node_connector_fail(node, "authentication required");
1515       }
1516       else if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1517         return node_connector_fail(node, "INFO command failed");
1518       }
1519       else {
1520         u_char    idbuf[MAX_RUN_ID_LENGTH];
1521         ngx_str_t pubsub_run_id = {0, idbuf};
1522         node_parseinfo_set_preallocd_str(node, &pubsub_run_id, reply->str, "run_id:", MAX_RUN_ID_LENGTH);
1523         if(!nchan_ngx_str_match(&node->run_id, &pubsub_run_id)) {
1524           return node_connector_fail(node, "IP address connects to more than one server. Is Redis behind a proxy?");
1525         }
1526         node->state++;
1527       }
1528       /* fall through */
1529     case REDIS_NODE_SUBSCRIBE_WORKER:
1530       if(!node->ctx.pubsub) {
1531         return node_connector_fail(node, "pubsub connection missing, can't send worker SUBSCRIBE command");
1532       }
1533       redisAsyncCommand(node->ctx.pubsub, node_subscribe_callback, node, "SUBSCRIBE %s", redis_worker_id);
1534       node->state++;
1535       break;
1536 
1537     case REDIS_NODE_SUBSCRIBING_WORKER:
1538       if(!reply) {
1539         return node_connector_fail(node, "disconnected while subscribing to worker PUBSUB channel");
1540       }
1541       if( reply->type != REDIS_REPLY_ARRAY || reply->elements != 3
1542        || reply->element[0]->type != REDIS_REPLY_STRING || reply->element[1]->type != REDIS_REPLY_STRING
1543        || strcmp(reply->element[0]->str, "subscribe") != 0
1544        || strcmp(reply->element[1]->str, redis_worker_id) != 0
1545       ) {
1546         return node_connector_fail(node, "failed to subscribe to worker PUBSUB channel");
1547       }
1548       nchan_update_stub_status(redis_connected_servers, 1);
1549 
1550       node->state++;
1551       /* fall through */
1552     case REDIS_NODE_GET_CLUSTERINFO:
1553       if(!node->cluster.enabled) {
1554         node->state = REDIS_NODE_READY;
1555         return node_connector_callback(NULL, NULL, node);
1556       }
1557       else {
1558         if(!node->ctx.cmd) {
1559           return node_connector_fail(node, "cmd connection missing, can't send CLUSTER INFO command");
1560         }
1561         redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "CLUSTER INFO");
1562         node->state++;
1563       }
1564       break;
1565 
1566     case REDIS_NODE_GETTING_CLUSTERINFO:
1567       if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1568         return node_connector_fail(node, "CLUSTER INFO command failed");
1569       }
1570       if(!nchan_cstr_match_line(reply->str, "cluster_state:ok")) {
1571         node->cluster.ok=0;
1572         return node_connector_fail(node, "cluster_state not ok");
1573       }
1574 
1575       if(!nchan_get_rest_of_line_in_cstr(reply->str, "cluster_current_epoch:", &rest)) {
1576         return node_connector_fail(node, "CLUSTER INFO command failed to get current epoch");
1577       }
1578       if((node->cluster.current_epoch = ngx_atoi(rest.data, rest.len)) == NGX_ERROR) {
1579         return node_connector_fail(node, "CLUSTER INFO command failed to parse current epoch number");
1580       }
1581 
1582       node->cluster.ok=1;
1583       node->state++;
1584       /* fall through */
1585     case REDIS_NODE_GET_CLUSTER_NODES:
1586       if(!node->ctx.cmd) {
1587         return node_connector_fail(node, "cmd connection missing, can't send CLUSTER NODES command");
1588       }
1589       redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "CLUSTER NODES");
1590       node->state++;
1591       break;
1592 
1593     case REDIS_NODE_GETTING_CLUSTER_NODES:
1594       if(!reply_str_ok(reply)) {
1595         return node_connector_fail(node, "CLUSTER NODES command failed");
1596       }
1597       else {
1598         size_t                  i, n;
1599         cluster_nodes_line_t   *l;
1600         if(!(l = parse_cluster_nodes(node, reply->str, &n))) {
1601           return node_connector_fail(node, "parsing CLUSTER NODES command failed");
1602         }
1603         for(i=0; i<n; i++) {
1604           if(l[i].self) {
1605             nchan_strcpy(&node->cluster.id, &l[i].id, MAX_CLUSTER_ID_LENGTH);
1606             if(l[i].slot_ranges_count == 0 && l[i].master) {
1607               node_log_notice(node, "is a master cluster node with no keyslots");
1608             }
1609             else {
1610               redis_node_t       *conflict_node;
1611               node->cluster.slot_range.n = l[i].slot_ranges_count;
1612               size_t                 j;
1613               if(node->cluster.slot_range.range) {
1614                 ngx_free(node->cluster.slot_range.range);
1615               }
1616               node->cluster.slot_range.range = ngx_alloc(sizeof(redis_slot_range_t) * node->cluster.slot_range.n, ngx_cycle->log);
1617               if(!node->cluster.slot_range.range) {
1618                 return node_connector_fail(node, "failed allocating cluster slots range");
1619               }
1620               if(!parse_cluster_node_slots(&l[i], node->cluster.slot_range.range)) {
1621                 return node_connector_fail(node, "failed parsing cluster slots range");
1622               }
1623               for(j = 0; j<node->cluster.slot_range.n; j++) {
1624                 if((conflict_node = nodeset_node_find_by_range(nodeset, &node->cluster.slot_range.range[j]))!=NULL) {
1625                   u_char buf[1024];
1626                   ngx_snprintf(buf, 1024, "keyslot range conflict with node %s. These nodes are probably from different clusters.%Z", node_cstr(conflict_node));
1627                   return node_connector_fail(node, (char *)buf);
1628                 }
1629               }
1630 
1631               if(!nodeset_cluster_node_index_keyslot_ranges(node)) {
1632                 return node_connector_fail(node, "indexing keyslot ranges failed");
1633               }
1634             }
1635           }
1636           else if(!node_skip_cluster_peer(node, &l[i])) {
1637             node_discover_cluster_peer(node, &l[i]);
1638           }
1639         }
1640       }
1641       node->state = REDIS_NODE_READY;
1642       //intentional fall-through is affirmatively consensual
1643       //yes, i consent to being fallen through.
1644       //                               Signed,
1645       //                                 REDIS_NODE_GETTING_CLUSTER_NODES
1646       //NOTE: consent required each time a fallthrough is imposed
1647       /* fall through */
1648     case REDIS_NODE_READY:
1649       if(node->connect_timeout) {
1650         nchan_abort_oneshot_timer(node->connect_timeout);
1651         node->connect_timeout = NULL;
1652       }
1653       if(!node->ping_timer.timer_set && nodeset->settings.ping_interval > 0) {
1654         ngx_add_timer(&node->ping_timer, nodeset->settings.ping_interval * 1000);
1655       }
1656       if(node->cluster.enabled) {
1657         if(!node->cluster.check_timer.timer_set && nodeset->settings.cluster_check_interval > 0) {
1658           ngx_add_timer(&node->cluster.check_timer, nodeset->settings.cluster_check_interval * 1000);
1659         }
1660         node->cluster.last_successful_check = ngx_time();
1661       }
1662       node_log_notice(node, "%s", node->generation == 0 ? "connected" : "reconnected");
1663       node->generation++;
1664       nodeset_examine(nodeset);
1665       break;
1666   }
1667 }
1668 
nodeset_cluster_keyslot_space_complete(redis_nodeset_t * ns)1669 static int nodeset_cluster_keyslot_space_complete(redis_nodeset_t *ns) {
1670   ngx_rbtree_node_t                  *node;
1671   redis_slot_range_t                  range = {0, 0};
1672   redis_nodeset_slot_range_node_t    *rangenode;
1673 
1674   while(range.min <= 16383) {
1675     if((node = rbtree_find_node(&ns->cluster.keyslots, &range)) == NULL) {
1676       DBG("cluster slots range incomplete: can't find slot %i", range.min);
1677       return 0;
1678     }
1679     rangenode = rbtree_data_from_node(node);
1680 
1681     if(rangenode->node->state < REDIS_NODE_READY) {
1682       node_log_notice(rangenode->node, "cluster node for range %d - %d not connected", rangenode->range.min, rangenode->range.max);
1683       return 0;
1684     }
1685     if(rangenode->node->role != REDIS_NODE_ROLE_MASTER) {
1686       node_log_notice(rangenode->node, "cluster node for range %d - %d is not a master. That's weird.", rangenode->range.min, rangenode->range.max);
1687       return 0;
1688     }
1689 
1690     range.min = rangenode->range.max + 1;
1691     range.max = range.min;
1692   }
1693   DBG("cluster range complete");
1694   //print_cluster_slots(cluster);
1695   return 1;
1696 }
1697 
nodeset_status_timer_interval(redis_nodeset_status_t status)1698 static int nodeset_status_timer_interval(redis_nodeset_status_t status) {
1699   switch(status) {
1700     case REDIS_NODESET_FAILED:
1701     case REDIS_NODESET_INVALID:
1702     case REDIS_NODESET_DISCONNECTED:
1703       return 2000;
1704     case REDIS_NODESET_FAILING:
1705     case REDIS_NODESET_CLUSTER_FAILING:
1706       return 300;
1707     case REDIS_NODESET_CONNECTING:
1708       return 1000;
1709     case REDIS_NODESET_READY:
1710       return 10000;
1711   }
1712   return 500; //default?
1713 }
1714 
nodeset_onready_expire_event(ngx_event_t * ev)1715 void nodeset_onready_expire_event(ngx_event_t *ev) {
1716   nodeset_onready_callback_t *rcb = ev->data;
1717   rcb->cb(rcb->ns, rcb->pd);
1718   nchan_list_remove(&rcb->ns->onready_callbacks, rcb);
1719 }
1720 
nodeset_callback_on_ready(redis_nodeset_t * ns,ngx_msec_t max_wait,ngx_int_t (* cb)(redis_nodeset_t *,void *),void * pd)1721 ngx_int_t nodeset_callback_on_ready(redis_nodeset_t *ns, ngx_msec_t max_wait, ngx_int_t (*cb)(redis_nodeset_t *, void *), void *pd) {
1722   nodeset_onready_callback_t *ncb;
1723 
1724   if(ns->status == REDIS_NODESET_READY) {
1725     cb(ns, pd);
1726     return NGX_OK;
1727   }
1728 
1729   ncb = nchan_list_append(&ns->onready_callbacks);
1730   if(ncb == NULL) {
1731     ERR("failed to add to onready_callback list");
1732     return NGX_ERROR;
1733   }
1734 
1735   ncb->cb = cb;
1736   ncb->pd = pd;
1737   ncb->ns = ns;
1738   ngx_memzero(&ncb->ev, sizeof(ncb->ev));
1739   if(max_wait > 0) {
1740     nchan_init_timer(&ncb->ev, nodeset_onready_expire_event, ncb);
1741     ngx_add_timer(&ncb->ev, max_wait);
1742   }
1743 
1744   return NGX_OK;
1745 }
1746 
nodeset_run_on_ready_callbacks(redis_nodeset_t * ns)1747 ngx_int_t nodeset_run_on_ready_callbacks(redis_nodeset_t *ns) {
1748   nodeset_onready_callback_t *rcb;
1749   for(rcb = nchan_list_first(&ns->onready_callbacks); rcb != NULL; rcb = nchan_list_next(rcb)) {
1750     if(rcb->ev.timer_set) {
1751       ngx_del_timer(&rcb->ev);
1752     }
1753     rcb->cb(ns, rcb->pd);
1754   }
1755   nchan_list_empty(&ns->onready_callbacks);
1756   return NGX_OK;
1757 }
1758 
nodeset_abort_on_ready_callbacks(redis_nodeset_t * ns)1759 ngx_int_t nodeset_abort_on_ready_callbacks(redis_nodeset_t *ns) {
1760   nodeset_onready_callback_t *rcb;
1761   for(rcb = nchan_list_first(&ns->onready_callbacks); rcb != NULL; rcb = nchan_list_next(rcb)) {
1762     if(rcb->ev.timer_set) {
1763       ngx_del_timer(&rcb->ev);
1764     }
1765     rcb->cb(ns, rcb->pd);
1766   }
1767   nchan_list_empty(&ns->onready_callbacks);
1768   return NGX_OK;
1769 }
1770 
update_chanhead_status_on_reconnect(rdstore_channel_head_t * ch)1771 static ngx_int_t update_chanhead_status_on_reconnect(rdstore_channel_head_t *ch) {
1772   if(ch->redis.node.cmd && ch->redis.node.pubsub && ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBED && ch->status == NOTREADY) {
1773     ch->status = READY;
1774   }
1775   return NGX_OK;
1776 }
1777 
nodeset_reconnect_disconnected_channels(redis_nodeset_t * ns)1778 ngx_int_t nodeset_reconnect_disconnected_channels(redis_nodeset_t *ns) {
1779   rdstore_channel_head_t *cur;
1780   nchan_slist_t *disconnected_cmd = &ns->channels.disconnected_cmd;
1781   nchan_slist_t *disconnected_pubsub = &ns->channels.disconnected_pubsub;
1782   assert(nodeset_ready(ns));
1783 
1784   while((cur = nchan_slist_pop(disconnected_cmd)) != NULL) {
1785     assert(cur->redis.node.cmd == NULL);
1786     cur->redis.slist.in_disconnected_cmd_list = 0;
1787     assert(nodeset_node_find_by_chanhead(cur)); // this reuses the linked-list fields
1788     update_chanhead_status_on_reconnect(cur);
1789   }
1790 
1791   while((cur = nchan_slist_pop(disconnected_pubsub)) != NULL) {
1792     assert(cur->redis.node.pubsub == NULL);
1793     cur->redis.slist.in_disconnected_pubsub_list = 0;
1794     assert(nodeset_node_pubsub_find_by_chanhead(cur)); // this reuses the linked-list fields
1795     redis_chanhead_catch_up_after_reconnect(cur);
1796     ensure_chanhead_pubsub_subscribed_if_needed(cur);
1797     update_chanhead_status_on_reconnect(cur);
1798   }
1799 
1800   return NGX_OK;
1801 }
1802 
nodeset_name_cstr(redis_nodeset_t * nodeset,char * buf,size_t maxlen)1803 static char* nodeset_name_cstr(redis_nodeset_t *nodeset, char *buf, size_t maxlen) {
1804   const char *what = NULL;
1805   ngx_str_t  *name = NULL;
1806   if(nodeset->upstream) {
1807     what = "upstream";
1808     name = &nodeset->upstream->host;
1809   }
1810   else {
1811     ngx_str_t **url = nchan_list_first(&nodeset->urls);
1812     if(url && *url) {
1813       name = *url;
1814     }
1815     what = "host";
1816   }
1817 
1818   if(what && name) {
1819     ngx_snprintf((u_char *)buf, maxlen, "%s %V%Z", what, name);
1820   }
1821   else if(what) {
1822     ngx_snprintf((u_char *)buf, maxlen, "%s%Z", what);
1823   }
1824   else if(name) {
1825     ngx_snprintf((u_char *)buf, maxlen, "%V%Z", name);
1826   }
1827   else {
1828     ngx_snprintf((u_char *)buf, maxlen, "node set%Z");
1829   }
1830   return buf;
1831 }
1832 
__node_nickname_cstr(redis_node_t * node)1833 const char *__node_nickname_cstr(redis_node_t *node) {
1834   static char buf[512];
1835   if(node) {
1836     __node_cstr(node, buf);
1837     return buf;
1838   }
1839   else {
1840     return "???";
1841   }
1842 }
1843 
nodeset_set_status(redis_nodeset_t * nodeset,redis_nodeset_status_t status,const char * msg)1844 ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t status, const char *msg) {
1845   nodeset->status_msg = msg;
1846   if(nodeset->status != status) {
1847     if(msg) {
1848       ngx_uint_t  lvl;
1849       if(status == REDIS_NODESET_INVALID) {
1850         lvl = NGX_LOG_ERR;
1851       }
1852       else if(status == REDIS_NODESET_DISCONNECTED
1853         ||    status == REDIS_NODESET_CLUSTER_FAILING
1854         ||    status == REDIS_NODESET_FAILED
1855       ) {
1856         lvl = NGX_LOG_WARN;
1857       }
1858       else {
1859         lvl = NGX_LOG_NOTICE;
1860       }
1861       nodeset_log(nodeset, lvl, "%s", msg);
1862     }
1863     nodeset->current_status_start = ngx_time();
1864     nodeset->current_status_times_checked = 0;
1865     nodeset->status = status;
1866 
1867     if(nodeset->status_check_ev.timer_set) {
1868       ngx_del_timer(&nodeset->status_check_ev);
1869     }
1870 
1871     switch(status) {
1872       case REDIS_NODESET_FAILED:
1873       case REDIS_NODESET_DISCONNECTED:
1874       case REDIS_NODESET_INVALID:
1875         nodeset_disconnect(nodeset);
1876         break;
1877       case REDIS_NODESET_CLUSTER_FAILING:
1878 
1879         //very intentional fallthrough
1880       case REDIS_NODESET_FAILING:
1881         nodeset_connect(nodeset);
1882         break;
1883       case REDIS_NODESET_CONNECTING:
1884         //no special actions
1885         break;
1886       case REDIS_NODESET_READY:
1887         nodeset_reconnect_disconnected_channels(nodeset);
1888         nodeset_run_on_ready_callbacks(nodeset);
1889         break;
1890     }
1891   }
1892 
1893   if(!nodeset->status_check_ev.timer_set) {
1894     ngx_add_timer(&nodeset->status_check_ev, nodeset_status_timer_interval(status));
1895   }
1896   return NGX_OK;
1897 }
1898 
node_find_slaves_callback(redisAsyncContext * ac,void * rep,void * pd)1899 static void node_find_slaves_callback(redisAsyncContext *ac, void *rep, void *pd) {
1900   redis_node_t   *node = pd;
1901   redisReply     *reply = rep;
1902   if(!reply) {
1903     node_log_debug(node, "INFO REPLICATION aborted reply");
1904     return;
1905   }
1906   node_discover_slaves_from_info_reply(node, reply);
1907 }
1908 
nodeset_examine(redis_nodeset_t * nodeset)1909 ngx_int_t nodeset_examine(redis_nodeset_t *nodeset) {
1910   redis_node_t *cur, *next;
1911   int cluster = 0, masters = 0, slaves = 0, total = 0, connecting = 0, ready = 0, disconnected = 0;
1912   int discovered = 0, failed_masters=0, failed_slaves = 0, failed_unknowns = 0;
1913   int ready_cluster = 0, ready_non_cluster = 0, connecting_masters = 0;
1914   redis_nodeset_status_t current_status = nodeset->status;
1915   //ERR("check nodeset %p", nodeset);
1916 
1917   for(cur = nchan_list_first(&nodeset->nodes); cur != NULL; cur = next) {
1918     next = nchan_list_next(cur);
1919     total++;
1920     if(cur->cluster.enabled == 1) {
1921       cluster++;
1922     }
1923     if(cur->discovered)
1924       discovered++;
1925     if(cur->role == REDIS_NODE_ROLE_MASTER) {
1926       masters++;
1927       if(cur->state > REDIS_NODE_DISCONNECTED && cur->state < REDIS_NODE_READY) {
1928         connecting_masters++;
1929       }
1930     }
1931     if(cur->role == REDIS_NODE_ROLE_SLAVE)
1932       slaves++;
1933     if(cur->state <= REDIS_NODE_DISCONNECTED)
1934       disconnected++;
1935     if(cur->state > REDIS_NODE_DISCONNECTED && cur->state < REDIS_NODE_READY)
1936       connecting++;
1937     if(cur->state == REDIS_NODE_READY) {
1938       ready++;
1939       if(cur->cluster.enabled == 1)
1940         ready_cluster++;
1941       else
1942         ready_non_cluster++;
1943     }
1944     if(cur->state == REDIS_NODE_FAILED) {
1945       if(cur->role == REDIS_NODE_ROLE_MASTER) {
1946         failed_masters++;
1947       }
1948       else if(cur->role == REDIS_NODE_ROLE_SLAVE) {
1949         failed_slaves++;
1950         if(cur->peers.master && cur->peers.master->state >= REDIS_NODE_READY && cur->nodeset->status == REDIS_NODESET_READY) {
1951           //rediscover slaves
1952           redisAsyncCommand(cur->peers.master->ctx.cmd, node_find_slaves_callback, cur->peers.master, "INFO REPLICATION");
1953         }
1954         //remove failed slave
1955         node_log_notice(cur, "removed failed slave node");
1956         node_disconnect(cur, REDIS_NODE_FAILED);
1957         nodeset_node_destroy(cur);
1958         total--;
1959       }
1960       else {
1961         failed_unknowns++;
1962       }
1963     }
1964   }
1965 
1966   nodeset->cluster.enabled = cluster > 0;
1967 
1968   if(current_status == REDIS_NODESET_CONNECTING && connecting > 0) {
1969     //still connecting, with a few nodws yet to try to connect
1970     return NGX_OK;
1971   }
1972   if(ready == 0 && total == 0) {
1973     nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "no reachable servers");
1974   }
1975   else if(cluster == 0 && masters > 1) {
1976     nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "invalid config, more than one master in non-cluster");
1977   }
1978   else if(ready_cluster > 0 && ready_non_cluster > 0) {
1979     nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "invalid config, cluster and non-cluster servers present");
1980   }
1981   else if(connecting > 0) {
1982     nodeset_set_status(nodeset, REDIS_NODESET_CONNECTING, NULL);
1983   }
1984   else if(failed_masters > 0) {
1985     if(current_status == REDIS_NODESET_READY) {
1986       nodeset_set_status(nodeset, REDIS_NODESET_FAILING, NULL);
1987     }
1988     else {
1989       nodeset_set_status(nodeset, REDIS_NODESET_FAILED, NULL);
1990     }
1991   }
1992   else if (masters == 0) {
1993     //this prevents slave-of-slave-of-master lookups
1994     nodeset_set_status(nodeset, REDIS_NODESET_INVALID, "no reachable masters");
1995   }
1996   else if(cluster > 0 && !nodeset_cluster_keyslot_space_complete(nodeset)) {
1997       nodeset_set_status(nodeset, REDIS_NODESET_CONNECTING, "keyslot space incomplete");
1998   }
1999   else if(current_status == REDIS_NODESET_READY && (ready == 0 || ready < total)) {
2000     nodeset_set_status(nodeset, REDIS_NODESET_FAILING, NULL);
2001   }
2002   else if(ready == 0) {
2003     nodeset_set_status(nodeset, REDIS_NODESET_DISCONNECTED, "no connected servers");
2004   }
2005   else {
2006     nodeset_set_status(nodeset, REDIS_NODESET_READY, "ready");
2007   }
2008 
2009   return NGX_OK;
2010 }
2011 
nodeset_check_status_event(ngx_event_t * ev)2012 static void nodeset_check_status_event(ngx_event_t *ev) {
2013   redis_nodeset_t *ns = ev->data;
2014 
2015   if(!ev->timedout) {
2016     return;
2017   }
2018   DBG("nodeset %p status check event", ns);
2019   ev->timedout = 0;
2020 
2021   switch(ns->status) {
2022     case REDIS_NODESET_FAILED:
2023       //fall-through rather intentionally
2024       if(ngx_time() - ns->current_status_start > REDIS_NODESET_RECONNECT_WAIT_TIME_SEC) {
2025         nodeset_log_notice(ns, "reconnecting...");
2026         nodeset_connect(ns);
2027       }
2028       break;
2029     case REDIS_NODESET_INVALID:
2030     case REDIS_NODESET_DISCONNECTED:
2031       //connect whatever needs to be connected
2032       nodeset_connect(ns);
2033       break;
2034 
2035     case REDIS_NODESET_CONNECTING:
2036       //wait it out
2037       if(ngx_time() - ns->current_status_start > REDIS_NODESET_MAX_CONNECTING_TIME_SEC) {
2038         nodeset_set_status(ns, REDIS_NODESET_DISCONNECTED, "Redis node set took too long to connect");
2039       }
2040       else {
2041         nodeset_examine(ns); // full status check
2042       }
2043       break;
2044     case REDIS_NODESET_CLUSTER_FAILING:
2045     case REDIS_NODESET_FAILING:
2046       if(ngx_time() - ns->current_status_start > REDIS_NODESET_MAX_FAILING_TIME_SEC) {
2047         nodeset_set_status(ns, REDIS_NODESET_FAILED, "Redis node set has failed");
2048       }
2049       break;
2050 
2051     case REDIS_NODESET_READY:
2052       nodeset_reconnect_disconnected_channels(ns); //in case there are any waiting around
2053       nodeset_run_on_ready_callbacks(ns); //in case there are any that got left behind
2054       break;
2055   }
2056 
2057   //check again soon!
2058   if(!ev->timer_set) {
2059     ngx_add_timer(ev, nodeset_status_timer_interval(ns->status));
2060   }
2061 }
2062 
nodeset_connect(redis_nodeset_t * ns)2063 int nodeset_connect(redis_nodeset_t *ns) {
2064   redis_node_t             *node;
2065   ngx_str_t               **url;
2066   redis_connect_params_t    rcp;
2067 
2068   for(url = nchan_list_first(&ns->urls); url != NULL; url = nchan_list_next(url)) {
2069     parse_redis_url(*url, &rcp);
2070     if((node = nodeset_node_find_by_connect_params(ns, &rcp)) == NULL) {
2071       node = nodeset_node_create(ns, &rcp);
2072       node_log_debug(node, "created");
2073     }
2074     assert(node);
2075   }
2076   for(node = nchan_list_first(&ns->nodes); node != NULL; node = nchan_list_next(node)) {
2077     if(node->state <= REDIS_NODE_DISCONNECTED) {
2078       node_log_debug(node, "start connecting");
2079       node_connect(node);
2080     }
2081   }
2082   nodeset_set_status(ns, REDIS_NODESET_CONNECTING, NULL);
2083   return 1;
2084 }
2085 
nodeset_disconnect(redis_nodeset_t * ns)2086 int nodeset_disconnect(redis_nodeset_t *ns) {
2087   redis_node_t *node;
2088   for(node = nchan_list_first(&ns->nodes); node != NULL; node = nchan_list_first(&ns->nodes)) {
2089     node_log_debug(node, "destroy %p", node);
2090     if(node->state > REDIS_NODE_DISCONNECTED) {
2091       node_log_debug(node, "intiating disconnect");
2092       node_disconnect(node, REDIS_NODE_DISCONNECTED);
2093     }
2094     nodeset_node_destroy(node);
2095   }
2096 
2097   return 1;
2098 }
2099 
2100 
nodeset_connect_all(void)2101 ngx_int_t nodeset_connect_all(void) {
2102   int                      i;
2103   redis_nodeset_t         *ns;
2104   DBG("connect all");
2105   for(i=0; i<redis_nodeset_count; i++) {
2106     ns = &redis_nodeset[i];
2107     nodeset_connect(ns);
2108   }
2109   return NGX_OK;
2110 }
2111 
nodeset_destroy_all(void)2112 ngx_int_t nodeset_destroy_all(void) {
2113   int                      i;
2114   redis_nodeset_t         *ns;
2115   DBG("nodeset destroy all");
2116   for(i=0; i<redis_nodeset_count; i++) {
2117     ns = &redis_nodeset[i];
2118     nodeset_disconnect(ns);
2119     if(ns->name && ns->name != nchan_redis_blankname) {
2120       ngx_free(ns->name);
2121     }
2122     nchan_list_empty(&ns->urls);
2123   }
2124   redis_nodeset_count = 0;
2125   return NGX_OK;
2126 }
2127 
nodeset_each(void (* cb)(redis_nodeset_t *,void *),void * privdata)2128 ngx_int_t nodeset_each(void (*cb)(redis_nodeset_t *, void *), void *privdata) {
2129   int                      i;
2130   redis_nodeset_t         *ns;
2131   for(i=0; i<redis_nodeset_count; i++) {
2132     ns = &redis_nodeset[i];
2133     cb(ns, privdata);
2134   }
2135   return NGX_OK;
2136 }
nodeset_each_node(redis_nodeset_t * ns,void (* cb)(redis_node_t *,void *),void * privdata)2137 ngx_int_t nodeset_each_node(redis_nodeset_t *ns, void (*cb)(redis_node_t *, void *), void *privdata) {
2138   redis_node_t             *node, *next;
2139   for(node = nchan_list_first(&ns->nodes); node != NULL; node = next) {
2140     next = nchan_list_next(node);
2141     cb(node, privdata);
2142   }
2143   return NGX_OK;
2144 }
2145 
set_preallocated_peername(redisAsyncContext * ctx,ngx_str_t * dst)2146 static ngx_int_t set_preallocated_peername(redisAsyncContext *ctx, ngx_str_t *dst) {
2147   char                  *ipstr = (char *)dst->data;
2148   struct sockaddr_in    *s4;
2149   struct sockaddr_in6   *s6;
2150   // deal with both IPv4 and IPv6:
2151   switch(ctx->c.sockaddr.sa_family) {
2152     case AF_INET:
2153       s4 = (struct sockaddr_in *)&ctx->c.sockaddr;
2154       inet_ntop(AF_INET, &s4->sin_addr, ipstr, INET6_ADDRSTRLEN);
2155       break;
2156 #ifdef AF_INET6
2157     case AF_INET6:
2158       s6 = (struct sockaddr_in6 *)&ctx->c.sockaddr;
2159       inet_ntop(AF_INET6, &s6->sin6_addr, ipstr, INET6_ADDRSTRLEN);
2160       break;
2161 #endif
2162     case AF_UNSPEC:
2163     default:
2164       DBG("couldn't get sockaddr");
2165       return NGX_ERROR;
2166   }
2167   dst->len = strlen(ipstr);
2168   return NGX_OK;
2169 }
2170 
2171 
2172 //sneaky channel stuff
nodeset_associate_chanhead(redis_nodeset_t * ns,void * chan)2173 ngx_int_t nodeset_associate_chanhead(redis_nodeset_t *ns, void *chan) {
2174   rdstore_channel_head_t *ch = chan;
2175   if(ch->redis.nodeset && ch->redis.nodeset != ns) {
2176     nodeset_dissociate_chanhead(ch);
2177   }
2178   ngx_memzero(&ch->redis.slist, sizeof(ch->redis.slist));
2179   ch->redis.nodeset = ns;
2180   nchan_slist_append(&ns->channels.all, ch);
2181   return NGX_OK;
2182 }
nodeset_dissociate_chanhead(void * chan)2183 ngx_int_t nodeset_dissociate_chanhead(void *chan) {
2184   rdstore_channel_head_t *ch = chan;
2185   redis_nodeset_t *ns = ch->redis.nodeset;
2186 
2187   if(ns) {
2188     if(ch->redis.node.cmd) {
2189       assert(!ch->redis.slist.in_disconnected_cmd_list);
2190       nodeset_node_dissociate_chanhead(ch);
2191     }
2192     else if(ch->redis.slist.in_disconnected_cmd_list) {
2193       ch->redis.slist.in_disconnected_cmd_list = 0;
2194       nchan_slist_remove(&ns->channels.disconnected_cmd, ch);
2195     }
2196 
2197     if(ch->redis.node.pubsub) {
2198       assert(!ch->redis.slist.in_disconnected_pubsub_list);
2199       nodeset_node_dissociate_pubsub_chanhead(ch);
2200     }
2201     else if(ch->redis.slist.in_disconnected_pubsub_list) {
2202       ch->redis.slist.in_disconnected_pubsub_list = 0;
2203       nchan_slist_remove(&ns->channels.disconnected_pubsub, ch);
2204     }
2205 
2206     ch->redis.nodeset = NULL;
2207     nchan_slist_remove(&ns->channels.all, ch);
2208   }
2209   return NGX_OK;
2210 }
nodeset_node_associate_chanhead(redis_node_t * node,void * chan)2211 ngx_int_t nodeset_node_associate_chanhead(redis_node_t *node, void *chan) {
2212   rdstore_channel_head_t *ch = chan;
2213   assert(ch->redis.node.cmd == NULL);
2214   assert(node->nodeset == ch->redis.nodeset);
2215   assert(ch->redis.slist.in_disconnected_cmd_list == 0);
2216   if(ch->redis.node.cmd != node) { //dat idempotence tho
2217     nchan_slist_append(&node->channels.cmd, ch);
2218   }
2219   ch->redis.node.cmd = node;
2220   return NGX_OK;
2221 }
nodeset_node_associate_pubsub_chanhead(redis_node_t * node,void * chan)2222 ngx_int_t nodeset_node_associate_pubsub_chanhead(redis_node_t *node, void *chan) {
2223   rdstore_channel_head_t *ch = chan;
2224   assert(ch->redis.node.pubsub == NULL);
2225   assert(node->nodeset == ch->redis.nodeset);
2226   assert(ch->redis.slist.in_disconnected_pubsub_list == 0);
2227   if(ch->redis.node.pubsub != node) { //dat idempotence tho
2228     nchan_slist_append(&node->channels.pubsub, ch);
2229   }
2230   ch->redis.node.pubsub = node;
2231   return NGX_OK;
2232 }
nodeset_node_dissociate_chanhead(void * chan)2233 ngx_int_t nodeset_node_dissociate_chanhead(void *chan) {
2234   rdstore_channel_head_t *ch = chan;
2235   if(ch->redis.node.cmd) {
2236     nchan_slist_remove(&ch->redis.node.cmd->channels.cmd, ch);
2237   }
2238   ch->redis.node.cmd = NULL;
2239   return NGX_OK;
2240 }
nodeset_node_dissociate_pubsub_chanhead(void * chan)2241 ngx_int_t nodeset_node_dissociate_pubsub_chanhead(void *chan) {
2242   rdstore_channel_head_t *ch = chan;
2243   if(ch->redis.node.pubsub) {
2244     nchan_slist_remove(&ch->redis.node.pubsub->channels.pubsub, ch);
2245   }
2246   ch->redis.node.pubsub = NULL;
2247   return NGX_OK;
2248 }
2249 
nodeset_node_random_master_or_slave(redis_node_t * master)2250 static redis_node_t *nodeset_node_random_master_or_slave(redis_node_t *master) {
2251   redis_nodeset_t *ns = master->nodeset;
2252   int master_total = ns->settings.node_weight.master;
2253   int slave_total = master->peers.slaves.n * ns->settings.node_weight.slave;
2254   int n;
2255 
2256   assert(master->role == REDIS_NODE_ROLE_MASTER);
2257 
2258   if(master_total + slave_total == 0) {
2259     return master;
2260   }
2261 
2262   n = ngx_random() % (slave_total + master_total);
2263   if(n < master_total) {
2264     return master;
2265   }
2266   else {
2267     int           i = 0;
2268     n = ngx_random() % master->peers.slaves.n; //random slave
2269     redis_node_t **nodeptr;
2270     for(nodeptr = nchan_list_first(&master->peers.slaves); nodeptr != NULL && i < n; nodeptr = nchan_list_next(nodeptr)) {
2271       i++;
2272     }
2273     if(nodeptr == NULL || (*nodeptr)->state < REDIS_NODE_READY) {
2274       //not ready? play it safe.
2275       //node_log_error(*nodeptr, "got slave, but it's not ready. return master instead");
2276       return master;
2277     }
2278     else {
2279       //node_log_error(*nodeptr, "got slave");
2280       return *nodeptr;
2281     }
2282   }
2283 }
2284 
nodeset_node_find_by_chanhead(void * chan)2285 redis_node_t *nodeset_node_find_by_chanhead(void *chan) {
2286   rdstore_channel_head_t *ch = chan;
2287   redis_node_t           *node;
2288   if(ch->redis.node.cmd) {
2289     return ch->redis.node.cmd;
2290   }
2291   node = nodeset_node_find_by_channel_id(ch->redis.nodeset, &ch->id);
2292   nodeset_node_associate_chanhead(node, ch);
2293   return node;
2294 }
nodeset_node_pubsub_find_by_chanhead(void * chan)2295 redis_node_t *nodeset_node_pubsub_find_by_chanhead(void *chan) {
2296   rdstore_channel_head_t *ch = chan;
2297   redis_node_t           *node;
2298   if(ch->redis.node.pubsub) {
2299     return ch->redis.node.pubsub;
2300   }
2301   node = nodeset_node_find_by_channel_id(ch->redis.nodeset, &ch->id);
2302   node = nodeset_node_random_master_or_slave(node);
2303   nodeset_node_associate_pubsub_chanhead(node, ch);
2304   return ch->redis.node.pubsub;
2305 }
2306 
2307 /*
2308  * Copyright 2001-2010 Georges Menie (www.menie.org)
2309  * Copyright 2010 Salvatore Sanfilippo (adapted to Redis coding style)
2310  * All rights reserved.
2311  * Redistribution and use in source and binary forms, with or without
2312  * modification, are permitted provided that the following conditions are met:
2313  *
2314  *     * Redistributions of source code must retain the above copyright
2315  *       notice, this list of conditions and the following disclaimer.
2316  *     * Redistributions in binary form must reproduce the above copyright
2317  *       notice, this list of conditions and the following disclaimer in the
2318  *       documentation and/or other materials provided with the distribution.
2319  *     * Neither the name of the University of California, Berkeley nor the
2320  *       names of its contributors may be used to endorse or promote products
2321  *       derived from this software without specific prior written permission.
2322  *
2323  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
2324  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
2325  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
2326  * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
2327  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
2328  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
2329  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
2330  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
2331  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2332  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2333  */
2334 
2335 /* CRC16 implementation according to CCITT standards.
2336  *
2337  * Note by @antirez: this is actually the XMODEM CRC 16 algorithm, using the
2338  * following parameters:
2339  *
2340  * Name                       : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN"
2341  * Width                      : 16 bit
2342  * Poly                       : 1021 (That is actually x^16 + x^12 + x^5 + 1)
2343  * Initialization             : 0000
2344  * Reflect Input byte         : False
2345  * Reflect Output CRC         : False
2346  * Xor constant to output CRC : 0000
2347  * Output for "123456789"     : 31C3
2348  */
2349 
2350 static const uint16_t crc16tab[256]= {
2351     0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
2352     0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
2353     0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
2354     0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
2355     0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
2356     0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
2357     0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
2358     0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
2359     0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
2360     0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
2361     0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
2362     0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
2363     0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
2364     0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
2365     0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
2366     0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
2367     0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
2368     0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
2369     0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
2370     0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
2371     0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
2372     0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
2373     0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
2374     0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
2375     0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
2376     0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
2377     0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
2378     0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
2379     0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
2380     0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
2381     0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
2382     0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
2383 };
2384 
redis_crc16(uint16_t crc,const char * buf,int len)2385 uint16_t redis_crc16(uint16_t crc, const char *buf, int len) {
2386     int counter;
2387     for (counter = 0; counter < len; counter++)
2388             crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF];
2389     return crc;
2390 }
2391 
2392