1 #ifndef NCHAN_REDIS_STORE_PRIVATE_H
2 #define NCHAN_REDIS_STORE_PRIVATE_H
3 
4 #define NCHAN_CHANHEAD_EXPIRE_SEC 1
5 #define NCHAN_CHANHEAD_CLUSTER_ORPHAN_EXPIRE_SEC 15
6 #define NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE 0xB00F
7 
8 #define NCHAN_REDIS_UNIQUE_REQUEST_ID_KEY "nchan:unique_request_id"
9 
10 #include <nchan_module.h>
11 #include "uthash.h"
12 #if NCHAN_HAVE_HIREDIS_WITH_SOCKADDR
13 #include <hiredis/hiredis.h>
14 #include <hiredis/async.h>
15 #else
16 #include <store/redis/hiredis/hiredis.h>
17 #include <store/redis/hiredis/async.h>
18 #endif
19 #include <util/nchan_reaper.h>
20 #include <util/nchan_rbtree.h>
21 #include <util/nchan_list.h>
22 #include <store/spool.h>
23 
24 #include "redis_nodeset.h"
25 #define REDIS_LUA_HASH_LENGTH 40
26 #define REDIS_NODESET_NOT_READY_MAX_RETRIES 2
27 
28 //OBSOLETE
29 typedef struct {
30   unsigned         min:16;
31   unsigned         max:16;
32 } redis_cluster_slot_range_t;
33 
34 
35 typedef struct rdstore_channel_head_s rdstore_channel_head_t;
36 
37 typedef enum {REDIS_PUBSUB_SUBSCRIBING, REDIS_PUBSUB_SUBSCRIBED, REDIS_PUBSUB_UNSUBSCRIBED} redis_pubsub_status_t;
38 
39 struct rdstore_channel_head_s {
40   ngx_str_t                    id; //channel id
41   channel_spooler_t            spooler;
42   ngx_uint_t                   generation; //subscriber pool generation.
43   chanhead_pubsub_status_t     status;
44   ngx_uint_t                   sub_count;
45   ngx_int_t                    fetching_message_count;
46   ngx_uint_t                   internal_sub_count;
47   ngx_event_t                  keepalive_timer;
48   ngx_uint_t                   keepalive_times_sent;
49   nchan_msg_id_t               last_msgid;
50 
51   void                        *redis_subscriber_privdata;
52   //rdstore_channel_head_cluster_data_t cluster;
53 
54   ngx_int_t                    reserved;
55 
56   struct {                   //redis
57     int                          generation;
58     redis_nodeset_t             *nodeset;
59     struct {                   //node
60       redis_node_t                *cmd;
61       redis_node_t                *pubsub;
62     }                            node;
63 
64     struct {                  //linked list links
65       struct {
66         rdstore_channel_head_t      *prev;
67         rdstore_channel_head_t      *next;
68       }                            nodeset;
69       struct {
70         rdstore_channel_head_t      *prev;
71         rdstore_channel_head_t      *next;
72       }                            node_cmd;
73       struct {
74         rdstore_channel_head_t      *prev;
75         rdstore_channel_head_t      *next;
76       }                            node_pubsub;
77       unsigned                     in_disconnected_cmd_list:1;
78       unsigned                     in_disconnected_pubsub_list:1;
79     }                            slist;
80 
81   }                            redis;
82 
83   struct {                   //gc
84     rdstore_channel_head_t      *prev;
85     rdstore_channel_head_t      *next;
86     time_t                       time;
87     unsigned                     in_reaper:1;
88   }                            gc;
89 
90 
91 
92   redis_pubsub_status_t        pubsub_status;
93   unsigned                     meta:1;
94   unsigned                     shutting_down:1;
95   UT_hash_handle               hh;
96 };
97 
98 
99 void redisCheckErrorCallback(redisAsyncContext *c, void *r, void *privdata);
100 int redisReplyOk(redisAsyncContext *c, void *r);
101 ngx_int_t parse_redis_url(ngx_str_t *url, redis_connect_params_t *rcp);
102 ngx_int_t rdstore_initialize_chanhead_reaper(nchan_reaper_t *reaper, char *name);
103 
104 ngx_int_t redis_chanhead_gc_add(rdstore_channel_head_t *head, ngx_int_t expire, const char *reason);
105 ngx_int_t redis_chanhead_gc_withdraw(rdstore_channel_head_t *head);
106 ngx_int_t redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t *ch);
107 
108 ngx_int_t ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t *ch);
109 
110 
111 #endif //NCHAN_REDIS_STORE_PRIVATE_H
112