1 #ifndef NCHAN_REDIS_STORE_PRIVATE_H 2 #define NCHAN_REDIS_STORE_PRIVATE_H 3 4 #define NCHAN_CHANHEAD_EXPIRE_SEC 1 5 #define NCHAN_CHANHEAD_CLUSTER_ORPHAN_EXPIRE_SEC 15 6 #define NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE 0xB00F 7 8 #define NCHAN_REDIS_UNIQUE_REQUEST_ID_KEY "nchan:unique_request_id" 9 10 #include <nchan_module.h> 11 #include "uthash.h" 12 #if NCHAN_HAVE_HIREDIS_WITH_SOCKADDR 13 #include <hiredis/hiredis.h> 14 #include <hiredis/async.h> 15 #else 16 #include <store/redis/hiredis/hiredis.h> 17 #include <store/redis/hiredis/async.h> 18 #endif 19 #include <util/nchan_reaper.h> 20 #include <util/nchan_rbtree.h> 21 #include <util/nchan_list.h> 22 #include <store/spool.h> 23 24 #include "redis_nodeset.h" 25 #define REDIS_LUA_HASH_LENGTH 40 26 #define REDIS_NODESET_NOT_READY_MAX_RETRIES 2 27 28 //OBSOLETE 29 typedef struct { 30 unsigned min:16; 31 unsigned max:16; 32 } redis_cluster_slot_range_t; 33 34 35 typedef struct rdstore_channel_head_s rdstore_channel_head_t; 36 37 typedef enum {REDIS_PUBSUB_SUBSCRIBING, REDIS_PUBSUB_SUBSCRIBED, REDIS_PUBSUB_UNSUBSCRIBED} redis_pubsub_status_t; 38 39 struct rdstore_channel_head_s { 40 ngx_str_t id; //channel id 41 channel_spooler_t spooler; 42 ngx_uint_t generation; //subscriber pool generation. 43 chanhead_pubsub_status_t status; 44 ngx_uint_t sub_count; 45 ngx_int_t fetching_message_count; 46 ngx_uint_t internal_sub_count; 47 ngx_event_t keepalive_timer; 48 ngx_uint_t keepalive_times_sent; 49 nchan_msg_id_t last_msgid; 50 51 void *redis_subscriber_privdata; 52 //rdstore_channel_head_cluster_data_t cluster; 53 54 ngx_int_t reserved; 55 56 struct { //redis 57 int generation; 58 redis_nodeset_t *nodeset; 59 struct { //node 60 redis_node_t *cmd; 61 redis_node_t *pubsub; 62 } node; 63 64 struct { //linked list links 65 struct { 66 rdstore_channel_head_t *prev; 67 rdstore_channel_head_t *next; 68 } nodeset; 69 struct { 70 rdstore_channel_head_t *prev; 71 rdstore_channel_head_t *next; 72 } node_cmd; 73 struct { 74 rdstore_channel_head_t *prev; 75 rdstore_channel_head_t *next; 76 } node_pubsub; 77 unsigned in_disconnected_cmd_list:1; 78 unsigned in_disconnected_pubsub_list:1; 79 } slist; 80 81 } redis; 82 83 struct { //gc 84 rdstore_channel_head_t *prev; 85 rdstore_channel_head_t *next; 86 time_t time; 87 unsigned in_reaper:1; 88 } gc; 89 90 91 92 redis_pubsub_status_t pubsub_status; 93 unsigned meta:1; 94 unsigned shutting_down:1; 95 UT_hash_handle hh; 96 }; 97 98 99 void redisCheckErrorCallback(redisAsyncContext *c, void *r, void *privdata); 100 int redisReplyOk(redisAsyncContext *c, void *r); 101 ngx_int_t parse_redis_url(ngx_str_t *url, redis_connect_params_t *rcp); 102 ngx_int_t rdstore_initialize_chanhead_reaper(nchan_reaper_t *reaper, char *name); 103 104 ngx_int_t redis_chanhead_gc_add(rdstore_channel_head_t *head, ngx_int_t expire, const char *reason); 105 ngx_int_t redis_chanhead_gc_withdraw(rdstore_channel_head_t *head); 106 ngx_int_t redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t *ch); 107 108 ngx_int_t ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t *ch); 109 110 111 #endif //NCHAN_REDIS_STORE_PRIVATE_H 112