1 #ifndef NCHAN_REDIS_NODESET_H 2 #define NCHAN_REDIS_NODESET_H 3 4 #include <nchan_module.h> 5 #if NCHAN_HAVE_HIREDIS_WITH_SOCKADDR 6 #include <hiredis/hiredis.h> 7 #include <hiredis/async.h> 8 #else 9 #include <store/redis/hiredis/hiredis.h> 10 #include <store/redis/hiredis/async.h> 11 #endif 12 #include <util/nchan_reaper.h> 13 #include <util/nchan_rbtree.h> 14 #include <util/nchan_list.h> 15 #include <util/nchan_slist.h> 16 17 //#include "store-private.h" 18 19 //#define REDIS_NODESET_DBG 1 20 21 #define node_log(node, lvl, fmt, args...) \ 22 ngx_log_error(lvl, ngx_cycle->log, 0, "nchan: Redis node %s " fmt, __node_nickname_cstr(node), ##args) 23 #define node_log_error(node, fmt, args...) node_log((node), NGX_LOG_ERR, fmt, ##args) 24 #define node_log_warning(node, fmt, args...) node_log((node), NGX_LOG_WARN, fmt, ##args) 25 #define node_log_notice(node, fmt, args...) node_log((node), NGX_LOG_NOTICE, fmt, ##args) 26 #define node_log_info(node, fmt, args...) node_log((node), NGX_LOG_INFO, fmt, ##args) 27 #define node_log_debug(node, fmt, args...) node_log((node), NGX_LOG_DEBUG, fmt, ##args) 28 29 #define nodeset_log(nodeset, lvl, fmt, args...) \ 30 ngx_log_error(lvl, ngx_cycle->log, 0, "nchan: Redis %s: " fmt, (nodeset)->name, ##args) 31 #define nodeset_log_error(nodeset, fmt, args...) nodeset_log((nodeset), NGX_LOG_ERR, fmt, ##args) 32 #define nodeset_log_warning(nodeset, fmt, args...) nodeset_log((nodeset), NGX_LOG_WARN, fmt, ##args) 33 #define nodeset_log_notice(nodeset, fmt, args...) nodeset_log((nodeset), NGX_LOG_NOTICE, fmt, ##args) 34 #define nodeset_log_info(nodeset, fmt, args...) nodeset_log((nodeset), NGX_LOG_INFO, fmt, ##args) 35 #define nodeset_log_debug(nodeset, fmt, args...) nodeset_log((nodeset), NGX_LOG_DEBUG, fmt, ##args) 36 37 #define NCHAN_MAX_NODESETS 128 38 #define REDIS_NODESET_STATUS_CHECK_TIME_MSEC 4000 39 #define REDIS_NODESET_MAX_CONNECTING_TIME_SEC 5 40 #define REDIS_NODESET_RECONNECT_WAIT_TIME_SEC 5 41 #define REDIS_NODESET_MAX_FAILING_TIME_SEC 2 42 43 #define REDIS_NODE_DEDUPLICATED -100 44 #define REDIS_NODE_CONNECTION_TIMED_OUT -2 45 #define REDIS_NODE_FAILED -1 46 #define REDIS_NODE_DISCONNECTED 0 47 #define REDIS_NODE_CMD_CONNECTING 1 48 #define REDIS_NODE_PUBSUB_CONNECTING 2 49 #define REDIS_NODE_CONNECTED 3 50 #define REDIS_NODE_CMD_AUTHENTICATING 4 51 #define REDIS_NODE_PUBSUB_AUTHENTICATING 5 52 #define REDIS_NODE_SELECT_DB 6 53 #define REDIS_NODE_CMD_SELECTING_DB 7 54 #define REDIS_NODE_PUBSUB_SELECTING_DB 8 55 #define REDIS_NODE_SCRIPTS_LOAD 9 56 #define REDIS_NODE_SCRIPTS_LOADING 10 57 #define REDIS_NODE_GET_INFO 11 58 #define REDIS_NODE_GETTING_INFO 12 59 #define REDIS_NODE_PUBSUB_GET_INFO 13 60 #define REDIS_NODE_PUBSUB_GETTING_INFO 14 61 #define REDIS_NODE_SUBSCRIBE_WORKER 15 62 #define REDIS_NODE_SUBSCRIBING_WORKER 16 63 #define REDIS_NODE_GET_CLUSTERINFO 17 64 #define REDIS_NODE_GETTING_CLUSTERINFO 18 65 #define REDIS_NODE_GET_CLUSTER_NODES 19 66 #define REDIS_NODE_GETTING_CLUSTER_NODES 20 67 #define REDIS_NODE_READY 100 68 69 typedef struct redis_nodeset_s redis_nodeset_t; 70 typedef struct redis_node_s redis_node_t; 71 72 typedef struct { //redis_nodeset_cluster_t 73 unsigned enabled:1; 74 rbtree_seed_t keyslots; //cluster rbtree seed 75 } redis_nodeset_cluster_t; 76 77 typedef struct { 78 unsigned min:16; 79 unsigned max:16; 80 } redis_slot_range_t; 81 82 typedef enum { 83 REDIS_NODE_ROLE_UNKNOWN = 0, REDIS_NODE_ROLE_MASTER, REDIS_NODE_ROLE_SLAVE 84 } redis_node_role_t; 85 86 typedef enum { 87 REDIS_NODESET_FAILED = -4, 88 REDIS_NODESET_CLUSTER_FAILING = -3, 89 REDIS_NODESET_FAILING = -2, 90 REDIS_NODESET_INVALID = -1, 91 REDIS_NODESET_DISCONNECTED = 0, 92 REDIS_NODESET_CONNECTING, 93 REDIS_NODESET_READY 94 } redis_nodeset_status_t; 95 96 #if REDIS_NODESET_DBG 97 typedef struct { 98 int n; 99 redis_node_t *node[128]; 100 } redis_node_dbg_list_t; 101 typedef struct { 102 redis_slot_range_t range; 103 redis_node_t *node; 104 } redis_node_range_dbg_t; 105 typedef struct { 106 int n; 107 redis_node_range_dbg_t node[128]; 108 } redis_nodeset_dbg_range_tree_t; 109 #endif 110 111 112 113 struct redis_nodeset_s { 114 //a set of redis nodes 115 // maybe just 1 master 116 // maybe a master and its slaves 117 // maybe a cluster of masters and their slaves 118 //slaves of slaves not included 119 char *name; 120 redis_nodeset_status_t status; 121 ngx_event_t status_check_ev; 122 const char *status_msg; 123 time_t current_status_start; 124 ngx_int_t current_status_times_checked; 125 ngx_int_t generation; 126 nchan_list_t urls; 127 nchan_loc_conf_t *first_loc_conf; 128 ngx_http_upstream_srv_conf_t *upstream; 129 nchan_list_t nodes; 130 redis_nodeset_cluster_t cluster; 131 struct { //settings 132 nchan_redis_storage_mode_t storage_mode; 133 ngx_int_t nostore_fastpublish; 134 struct { //pubsub_subscribe_weight 135 ngx_int_t master; 136 ngx_int_t slave; 137 } node_weight; 138 time_t ping_interval; 139 time_t cluster_check_interval; 140 ngx_str_t *namespace; 141 nchan_redis_optimize_t optimize_target; 142 ngx_msec_t connect_timeout; 143 struct { 144 int count; 145 nchan_redis_ip_range_t *list; 146 } blacklist; 147 } settings; 148 149 struct { 150 nchan_slist_t all; 151 nchan_slist_t disconnected_cmd; 152 nchan_slist_t disconnected_pubsub; 153 } channels; 154 155 nchan_reaper_t chanhead_reaper; 156 time_t reconnect_delay_sec; 157 nchan_list_t onready_callbacks; 158 #if REDIS_NODESET_DBG 159 struct { 160 redis_node_dbg_list_t nodes; 161 redis_nodeset_dbg_range_tree_t ranges; 162 int keyspace_complete; 163 } dbg; 164 #endif 165 166 }; //redis_nodeset_t 167 168 struct redis_node_s { 169 int8_t state; 170 unsigned discovered:1; 171 redis_node_role_t role; 172 redis_connect_params_t connect_params; 173 void *connect_timeout; 174 redis_nodeset_t *nodeset; 175 ngx_str_t run_id; 176 ngx_str_t version; 177 int scripts_loaded; 178 int generation; 179 ngx_event_t ping_timer; 180 struct { 181 unsigned enabled:1; 182 unsigned ok:1; 183 ngx_str_t id; 184 ngx_event_t check_timer; 185 time_t last_successful_check; 186 int current_epoch; //as reported on this node 187 struct { 188 redis_slot_range_t *range; 189 size_t n; 190 unsigned indexed:1; 191 } slot_range; 192 char *cluster_nodes; 193 } cluster; 194 struct { 195 redis_node_t *master; 196 nchan_list_t slaves; 197 } peers; 198 struct { 199 redisAsyncContext *cmd; 200 redisAsyncContext *pubsub; 201 redisContext *sync; 202 } ctx; 203 int pending_commands; 204 struct { 205 nchan_slist_t cmd; 206 nchan_slist_t pubsub; 207 } channels; 208 }; //redis_node_t 209 210 typedef struct { 211 redis_slot_range_t range; 212 redis_node_t *node; 213 } redis_nodeset_slot_range_node_t; 214 215 216 217 redis_nodeset_t *nodeset_create(nchan_loc_conf_t *lcf); 218 ngx_int_t nodeset_initialize(char *worker_id, redisCallbackFn *subscribe_handler); 219 redis_nodeset_t *nodeset_find(nchan_redis_conf_t *rcf); 220 ngx_int_t nodeset_examine(redis_nodeset_t *nodeset); 221 222 ngx_int_t nodeset_node_destroy(redis_node_t *node); 223 224 225 int node_disconnect(redis_node_t *node, int disconnected_state); 226 int node_connect(redis_node_t *node); 227 void node_set_role(redis_node_t *node, redis_node_role_t role); 228 int node_set_master_node(redis_node_t *node, redis_node_t *master); 229 redis_node_t *node_find_slave_node(redis_node_t *node, redis_node_t *slave); 230 int node_add_slave_node(redis_node_t *node, redis_node_t *slave); 231 int node_remove_slave_node(redis_node_t *node, redis_node_t *slave); 232 233 ngx_int_t nodeset_connect_all(void); 234 int nodeset_connect(redis_nodeset_t *ns); 235 int nodeset_node_keyslot_changed(redis_node_t *node); 236 int nodeset_disconnect(redis_nodeset_t *ns); 237 ngx_int_t nodeset_destroy_all(void); 238 ngx_int_t nodeset_each(void (*)(redis_nodeset_t *, void *), void *privdata); 239 ngx_int_t nodeset_each_node(redis_nodeset_t *, void (*)(redis_node_t *, void *), void *privdata); 240 ngx_int_t nodeset_callback_on_ready(redis_nodeset_t *ns, ngx_msec_t max_wait, ngx_int_t (*cb)(redis_nodeset_t *, void *), void *pd); 241 ngx_int_t nodeset_abort_on_ready_callbacks(redis_nodeset_t *ns); 242 ngx_int_t nodeset_run_on_ready_callbacks(redis_nodeset_t *ns); 243 244 ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t status, const char *msg); 245 246 int nodeset_node_deduplicate_by_connect_params(redis_node_t *node); 247 int nodeset_node_deduplicate_by_run_id(redis_node_t *node); 248 int nodeset_node_deduplicate_by_cluster_id(redis_node_t *node); 249 250 redis_node_t *nodeset_node_find_by_connect_params(redis_nodeset_t *ns, redis_connect_params_t *rcp); 251 redis_node_t *nodeset_node_find_by_run_id(redis_nodeset_t *ns, ngx_str_t *run_id); 252 redis_node_t *nodeset_node_find_by_cluster_id(redis_nodeset_t *ns, ngx_str_t *cluster_id); 253 redis_node_t *nodeset_node_find_by_range(redis_nodeset_t *ns, redis_slot_range_t *range); 254 redis_node_t *nodeset_node_find_by_slot(redis_nodeset_t *ns, uint16_t slot); 255 redis_node_t *nodeset_node_find_by_channel_id(redis_nodeset_t *ns, ngx_str_t *channel_id); 256 redis_node_t *nodeset_node_find_by_key(redis_nodeset_t *ns, ngx_str_t *key); 257 redis_node_t *nodeset_node_find_any_ready_master(redis_nodeset_t *ns); 258 int nodeset_node_reply_keyslot_ok(redis_node_t *node, redisReply *r); 259 int nodeset_ready(redis_nodeset_t *nodeset); 260 261 //chanheads are (void *) here to avoid circular typedef dependency with store-private.h 262 //it's terrible, and dirty -- but quick 263 ngx_int_t nodeset_associate_chanhead(redis_nodeset_t *, void *chanhead); 264 ngx_int_t nodeset_dissociate_chanhead(void *chanhead); 265 266 ngx_int_t nodeset_node_associate_chanhead(redis_node_t *, void *chanhead); 267 ngx_int_t nodeset_node_associate_pubsub_chanhead(redis_node_t *, void *chanhead); 268 ngx_int_t nodeset_node_dissociate_chanhead(void *chanhead); 269 ngx_int_t nodeset_node_dissociate_pubsub_chanhead(void *chanhead); 270 271 redis_node_t *nodeset_node_find_by_chanhead(void *chanhead); 272 redis_node_t *nodeset_node_pubsub_find_by_chanhead(void *chanhead); 273 274 275 276 redis_node_t *nodeset_node_create(redis_nodeset_t *ns, redis_connect_params_t *rcp); 277 278 uint16_t redis_crc16(uint16_t crc, const char *buf, int len); 279 280 281 const char *__node_nickname_cstr(redis_node_t *node); 282 283 #endif /* NCHAN_REDIS_NODESET_H */ 284