1 #ifndef NCHAN_REDIS_NODESET_H
2 #define NCHAN_REDIS_NODESET_H
3 
4 #include <nchan_module.h>
5 #if NCHAN_HAVE_HIREDIS_WITH_SOCKADDR
6 #include <hiredis/hiredis.h>
7 #include <hiredis/async.h>
8 #else
9 #include <store/redis/hiredis/hiredis.h>
10 #include <store/redis/hiredis/async.h>
11 #endif
12 #include <util/nchan_reaper.h>
13 #include <util/nchan_rbtree.h>
14 #include <util/nchan_list.h>
15 #include <util/nchan_slist.h>
16 
17 //#include "store-private.h"
18 
19 //#define REDIS_NODESET_DBG 1
20 
21 #define node_log(node, lvl, fmt, args...) \
22   ngx_log_error(lvl, ngx_cycle->log, 0, "nchan: Redis node %s " fmt, __node_nickname_cstr(node), ##args)
23 #define node_log_error(node, fmt, args...)    node_log((node), NGX_LOG_ERR, fmt, ##args)
24 #define node_log_warning(node, fmt, args...)  node_log((node), NGX_LOG_WARN, fmt, ##args)
25 #define node_log_notice(node, fmt, args...)   node_log((node), NGX_LOG_NOTICE, fmt, ##args)
26 #define node_log_info(node, fmt, args...)     node_log((node), NGX_LOG_INFO, fmt, ##args)
27 #define node_log_debug(node, fmt, args...)    node_log((node), NGX_LOG_DEBUG, fmt, ##args)
28 
29 #define nodeset_log(nodeset, lvl, fmt, args...) \
30   ngx_log_error(lvl, ngx_cycle->log, 0, "nchan: Redis %s: " fmt, (nodeset)->name, ##args)
31 #define nodeset_log_error(nodeset, fmt, args...)    nodeset_log((nodeset), NGX_LOG_ERR, fmt, ##args)
32 #define nodeset_log_warning(nodeset, fmt, args...)  nodeset_log((nodeset), NGX_LOG_WARN, fmt, ##args)
33 #define nodeset_log_notice(nodeset, fmt, args...)   nodeset_log((nodeset), NGX_LOG_NOTICE, fmt, ##args)
34 #define nodeset_log_info(nodeset, fmt, args...)     nodeset_log((nodeset), NGX_LOG_INFO, fmt, ##args)
35 #define nodeset_log_debug(nodeset, fmt, args...)    nodeset_log((nodeset), NGX_LOG_DEBUG, fmt, ##args)
36 
37 #define NCHAN_MAX_NODESETS 128
38 #define REDIS_NODESET_STATUS_CHECK_TIME_MSEC 4000
39 #define REDIS_NODESET_MAX_CONNECTING_TIME_SEC 5
40 #define REDIS_NODESET_RECONNECT_WAIT_TIME_SEC 5
41 #define REDIS_NODESET_MAX_FAILING_TIME_SEC 2
42 
43 #define REDIS_NODE_DEDUPLICATED         -100
44 #define REDIS_NODE_CONNECTION_TIMED_OUT   -2
45 #define REDIS_NODE_FAILED                 -1
46 #define REDIS_NODE_DISCONNECTED            0
47 #define REDIS_NODE_CMD_CONNECTING          1
48 #define REDIS_NODE_PUBSUB_CONNECTING       2
49 #define REDIS_NODE_CONNECTED               3
50 #define REDIS_NODE_CMD_AUTHENTICATING      4
51 #define REDIS_NODE_PUBSUB_AUTHENTICATING   5
52 #define REDIS_NODE_SELECT_DB               6
53 #define REDIS_NODE_CMD_SELECTING_DB        7
54 #define REDIS_NODE_PUBSUB_SELECTING_DB     8
55 #define REDIS_NODE_SCRIPTS_LOAD            9
56 #define REDIS_NODE_SCRIPTS_LOADING        10
57 #define REDIS_NODE_GET_INFO               11
58 #define REDIS_NODE_GETTING_INFO           12
59 #define REDIS_NODE_PUBSUB_GET_INFO        13
60 #define REDIS_NODE_PUBSUB_GETTING_INFO    14
61 #define REDIS_NODE_SUBSCRIBE_WORKER       15
62 #define REDIS_NODE_SUBSCRIBING_WORKER     16
63 #define REDIS_NODE_GET_CLUSTERINFO        17
64 #define REDIS_NODE_GETTING_CLUSTERINFO    18
65 #define REDIS_NODE_GET_CLUSTER_NODES      19
66 #define REDIS_NODE_GETTING_CLUSTER_NODES  20
67 #define REDIS_NODE_READY                  100
68 
69 typedef struct redis_nodeset_s redis_nodeset_t;
70 typedef struct redis_node_s redis_node_t;
71 
72 typedef struct { //redis_nodeset_cluster_t
73   unsigned                    enabled:1;
74   rbtree_seed_t               keyslots; //cluster rbtree seed
75 } redis_nodeset_cluster_t;
76 
77 typedef struct {
78   unsigned         min:16;
79   unsigned         max:16;
80 } redis_slot_range_t;
81 
82 typedef enum {
83   REDIS_NODE_ROLE_UNKNOWN = 0, REDIS_NODE_ROLE_MASTER, REDIS_NODE_ROLE_SLAVE
84 } redis_node_role_t;
85 
86 typedef enum {
87   REDIS_NODESET_FAILED = -4,
88   REDIS_NODESET_CLUSTER_FAILING = -3,
89   REDIS_NODESET_FAILING = -2,
90   REDIS_NODESET_INVALID = -1,
91   REDIS_NODESET_DISCONNECTED = 0,
92   REDIS_NODESET_CONNECTING,
93   REDIS_NODESET_READY
94 } redis_nodeset_status_t;
95 
96 #if REDIS_NODESET_DBG
97 typedef struct {
98   int  n;
99   redis_node_t *node[128];
100 } redis_node_dbg_list_t;
101 typedef struct {
102   redis_slot_range_t range;
103   redis_node_t       *node;
104 } redis_node_range_dbg_t;
105 typedef struct {
106   int  n;
107   redis_node_range_dbg_t node[128];
108 } redis_nodeset_dbg_range_tree_t;
109 #endif
110 
111 
112 
113 struct redis_nodeset_s {
114   //a set of redis nodes
115   //  maybe just 1 master
116   //  maybe a master and its slaves
117   //  maybe a cluster of masters and their slaves
118   //slaves of slaves not included
119   char                       *name;
120   redis_nodeset_status_t      status;
121   ngx_event_t                 status_check_ev;
122   const char                 *status_msg;
123   time_t                      current_status_start;
124   ngx_int_t                   current_status_times_checked;
125   ngx_int_t                   generation;
126   nchan_list_t                urls;
127   nchan_loc_conf_t           *first_loc_conf;
128   ngx_http_upstream_srv_conf_t *upstream;
129   nchan_list_t                nodes;
130   redis_nodeset_cluster_t     cluster;
131   struct {                    //settings
132     nchan_redis_storage_mode_t  storage_mode;
133     ngx_int_t                   nostore_fastpublish;
134     struct {                    //pubsub_subscribe_weight
135       ngx_int_t                   master;
136       ngx_int_t                   slave;
137     }                           node_weight;
138     time_t                      ping_interval;
139     time_t                      cluster_check_interval;
140     ngx_str_t                  *namespace;
141     nchan_redis_optimize_t      optimize_target;
142     ngx_msec_t                  connect_timeout;
143     struct {
144       int                         count;
145       nchan_redis_ip_range_t     *list;
146     }                         blacklist;
147   }                           settings;
148 
149   struct {
150     nchan_slist_t               all;
151     nchan_slist_t               disconnected_cmd;
152     nchan_slist_t               disconnected_pubsub;
153   }                           channels;
154 
155   nchan_reaper_t              chanhead_reaper;
156   time_t                      reconnect_delay_sec;
157   nchan_list_t                onready_callbacks;
158 #if REDIS_NODESET_DBG
159   struct {
160     redis_node_dbg_list_t       nodes;
161     redis_nodeset_dbg_range_tree_t ranges;
162     int                         keyspace_complete;
163   }                           dbg;
164 #endif
165 
166 }; //redis_nodeset_t
167 
168 struct redis_node_s {
169   int8_t                    state;
170   unsigned                  discovered:1;
171   redis_node_role_t         role;
172   redis_connect_params_t    connect_params;
173   void                     *connect_timeout;
174   redis_nodeset_t          *nodeset;
175   ngx_str_t                 run_id;
176   ngx_str_t                 version;
177   int                       scripts_loaded;
178   int                       generation;
179   ngx_event_t               ping_timer;
180   struct {
181     unsigned                  enabled:1;
182     unsigned                  ok:1;
183     ngx_str_t                 id;
184     ngx_event_t               check_timer;
185     time_t                    last_successful_check;
186     int                       current_epoch; //as reported on this node
187     struct {
188       redis_slot_range_t         *range;
189       size_t                      n;
190       unsigned                    indexed:1;
191     }                         slot_range;
192     char                     *cluster_nodes;
193   }                         cluster;
194   struct {
195     redis_node_t              *master;
196     nchan_list_t               slaves;
197   }                         peers;
198   struct {
199     redisAsyncContext         *cmd;
200     redisAsyncContext         *pubsub;
201     redisContext              *sync;
202   }                         ctx;
203   int                       pending_commands;
204   struct {
205   nchan_slist_t               cmd;
206   nchan_slist_t               pubsub;
207   }                         channels;
208 }; //redis_node_t
209 
210 typedef struct {
211   redis_slot_range_t      range;
212   redis_node_t           *node;
213 } redis_nodeset_slot_range_node_t;
214 
215 
216 
217 redis_nodeset_t *nodeset_create(nchan_loc_conf_t *lcf);
218 ngx_int_t nodeset_initialize(char *worker_id, redisCallbackFn *subscribe_handler);
219 redis_nodeset_t *nodeset_find(nchan_redis_conf_t *rcf);
220 ngx_int_t nodeset_examine(redis_nodeset_t *nodeset);
221 
222 ngx_int_t nodeset_node_destroy(redis_node_t *node);
223 
224 
225 int node_disconnect(redis_node_t *node, int disconnected_state);
226 int node_connect(redis_node_t *node);
227 void node_set_role(redis_node_t *node, redis_node_role_t role);
228 int node_set_master_node(redis_node_t *node, redis_node_t *master);
229 redis_node_t *node_find_slave_node(redis_node_t *node, redis_node_t *slave);
230 int node_add_slave_node(redis_node_t *node, redis_node_t *slave);
231 int node_remove_slave_node(redis_node_t *node, redis_node_t *slave);
232 
233 ngx_int_t nodeset_connect_all(void);
234 int nodeset_connect(redis_nodeset_t *ns);
235 int nodeset_node_keyslot_changed(redis_node_t *node);
236 int nodeset_disconnect(redis_nodeset_t *ns);
237 ngx_int_t nodeset_destroy_all(void);
238 ngx_int_t nodeset_each(void (*)(redis_nodeset_t *, void *), void *privdata);
239 ngx_int_t nodeset_each_node(redis_nodeset_t *, void (*)(redis_node_t *, void *), void *privdata);
240 ngx_int_t nodeset_callback_on_ready(redis_nodeset_t *ns, ngx_msec_t max_wait, ngx_int_t (*cb)(redis_nodeset_t *, void *), void *pd);
241 ngx_int_t nodeset_abort_on_ready_callbacks(redis_nodeset_t *ns);
242 ngx_int_t nodeset_run_on_ready_callbacks(redis_nodeset_t *ns);
243 
244 ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t status, const char *msg);
245 
246 int nodeset_node_deduplicate_by_connect_params(redis_node_t *node);
247 int nodeset_node_deduplicate_by_run_id(redis_node_t *node);
248 int nodeset_node_deduplicate_by_cluster_id(redis_node_t *node);
249 
250 redis_node_t *nodeset_node_find_by_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp);
251 redis_node_t *nodeset_node_find_by_run_id(redis_nodeset_t *ns, ngx_str_t *run_id);
252 redis_node_t *nodeset_node_find_by_cluster_id(redis_nodeset_t *ns, ngx_str_t *cluster_id);
253 redis_node_t *nodeset_node_find_by_range(redis_nodeset_t *ns, redis_slot_range_t *range);
254 redis_node_t *nodeset_node_find_by_slot(redis_nodeset_t *ns, uint16_t slot);
255 redis_node_t *nodeset_node_find_by_channel_id(redis_nodeset_t *ns, ngx_str_t *channel_id);
256 redis_node_t *nodeset_node_find_by_key(redis_nodeset_t *ns, ngx_str_t *key);
257 redis_node_t *nodeset_node_find_any_ready_master(redis_nodeset_t *ns);
258 int nodeset_node_reply_keyslot_ok(redis_node_t *node, redisReply *r);
259 int nodeset_ready(redis_nodeset_t *nodeset);
260 
261 //chanheads are (void *) here to avoid circular typedef dependency with store-private.h
262 //it's terrible, and dirty -- but quick
263 ngx_int_t nodeset_associate_chanhead(redis_nodeset_t *, void *chanhead);
264 ngx_int_t nodeset_dissociate_chanhead(void *chanhead);
265 
266 ngx_int_t nodeset_node_associate_chanhead(redis_node_t *, void *chanhead);
267 ngx_int_t nodeset_node_associate_pubsub_chanhead(redis_node_t *, void *chanhead);
268 ngx_int_t nodeset_node_dissociate_chanhead(void *chanhead);
269 ngx_int_t nodeset_node_dissociate_pubsub_chanhead(void *chanhead);
270 
271 redis_node_t *nodeset_node_find_by_chanhead(void *chanhead);
272 redis_node_t *nodeset_node_pubsub_find_by_chanhead(void *chanhead);
273 
274 
275 
276 redis_node_t *nodeset_node_create(redis_nodeset_t *ns, redis_connect_params_t *rcp);
277 
278 uint16_t redis_crc16(uint16_t crc, const char *buf, int len);
279 
280 
281 const char *__node_nickname_cstr(redis_node_t *node);
282 
283 #endif /* NCHAN_REDIS_NODESET_H */
284