1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3
4 #include <store/memory/ipc.h>
5 #include <store/memory/store-private.h>
6 #include <store/memory/ipc-handlers.h>
7 #include <store/memory/store.h>
8
9 #include <store/redis/store.h>
10 #include <store/redis/store-private.h>
11 #include <store/redis/redis_nodeset.h>
12
13 #include <util/nchan_msg.h>
14 #include "internal.h"
15 #include "memstore_redis.h"
16 #include <assert.h>
17
18 //#define DEBUG_LEVEL NGX_LOG_WARN
19 #define DEBUG_LEVEL NGX_LOG_DEBUG
20
21 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:MEM-REDIS:" fmt, ##arg)
22 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:MEM-REDIS:" fmt, ##arg)
23
24
25 typedef struct sub_data_s sub_data_t;
26
27 struct sub_data_s {
28 subscriber_t *sub;
29 memstore_channel_head_t *chanhead;
30 ngx_str_t *chid;
31 ngx_event_t timeout_ev;
32 nchan_msg_status_t last_msg_status;
33 sub_data_t **onconnect_callback_pd;
34 }; //sub_data_t
35
36 /*
37 static ngx_int_t empty_callback(){
38 return NGX_OK;
39 }
40 */
41
sub_enqueue(ngx_int_t status,void * ptr,sub_data_t * d)42 static ngx_int_t sub_enqueue(ngx_int_t status, void *ptr, sub_data_t *d) {
43 DBG("%p memstore-redis subsriber enqueued ok", d->sub);
44 if(d->chanhead) {
45 d->chanhead->status = READY;
46 d->chanhead->spooler.fn->handle_channel_status_change(&d->chanhead->spooler);
47 }
48
49 return NGX_OK;
50 }
51
memstore_redis_subscriber_destroy(subscriber_t * sub)52 ngx_int_t memstore_redis_subscriber_destroy(subscriber_t *sub) {
53 DBG("%p destroy", sub);
54 sub_data_t *d = internal_subscriber_get_privdata(sub);
55 d->chanhead = NULL; //memstore chanhead should be presumed missing
56 return internal_subscriber_destroy(sub);
57 }
58
sub_dequeue(ngx_int_t status,void * ptr,sub_data_t * d)59 static ngx_int_t sub_dequeue(ngx_int_t status, void *ptr, sub_data_t* d) {
60 DBG("%p dequeue", d->sub);
61 return NGX_OK;
62 }
63
sub_respond_message(ngx_int_t status,void * ptr,sub_data_t * d)64 static ngx_int_t sub_respond_message(ngx_int_t status, void *ptr, sub_data_t* d) {
65 nchan_msg_t *msg = (nchan_msg_t *) ptr;
66 nchan_loc_conf_t cf;
67 nchan_msg_id_t *lastid;
68 ngx_pool_t *deflate_pool = NULL;
69 int nostore_mode;
70 if(!d->chanhead) {
71 DBG("memstore chanhead gone");
72 return NGX_DECLINED;
73 }
74 nostore_mode = d->chanhead->cf->redis.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE;
75 DBG("%p memstore-redis subscriber respond with message", d->sub);
76
77 cf.max_messages = d->chanhead->max_messages;
78 cf.redis.enabled = 0;
79 cf.message_timeout = msg->expires - ngx_time();
80 cf.complex_max_messages = NULL;
81 cf.complex_message_timeout = NULL;
82 cf.message_compression = msg->compressed ? msg->compressed->compression : NCHAN_MSG_NO_COMPRESSION;
83
84 if(cf.message_compression != NCHAN_MSG_NO_COMPRESSION) {
85 deflate_pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE / 2, ngx_cycle->log);
86 if(!deflate_pool) {
87 ERR("unable to create deflatepool");
88 return NGX_ERROR;
89 }
90 nchan_deflate_message_if_needed(msg, &cf, NULL, deflate_pool);
91 }
92 else {
93 msg->compressed = NULL;
94 }
95
96 lastid = &d->chanhead->latest_msgid;
97
98 assert(lastid->tagcount == 1 && msg->id.tagcount == 1);
99 if(nostore_mode || lastid->time < msg->id.time ||
100 (lastid->time == msg->id.time && lastid->tag.fixed[0] < msg->id.tag.fixed[0])) {
101 if(nostore_mode) {
102 //out of order messages are ok, we don't care. publish them like they just got here. we can't lose messages due to network lag
103 msg->expires = 0;
104 msg->id.time = 0;
105 }
106 memstore_ensure_chanhead_is_ready(d->chanhead, 1);
107 nchan_store_chanhead_publish_message_generic(d->chanhead, msg, 0, &cf, NULL, NULL);
108 }
109 else {
110 //meh, this message has already been delivered probably hopefully
111 }
112 if(deflate_pool) {
113 ngx_destroy_pool(deflate_pool);
114 }
115 return NGX_OK;
116 }
117
sub_destroy_handler(ngx_int_t status,void * d,sub_data_t * pd)118 static ngx_int_t sub_destroy_handler(ngx_int_t status, void *d, sub_data_t *pd) {
119 DBG("%p sub_destroy_handler", pd->sub);
120 if(pd->onconnect_callback_pd)
121 (*pd->onconnect_callback_pd) = NULL;
122 return NGX_OK;
123 }
124
reconnect_callback(redis_nodeset_t * ns,void * pd)125 static ngx_int_t reconnect_callback(redis_nodeset_t *ns, void *pd) {
126 sub_data_t *sd = *((sub_data_t **) pd);
127 if(!sd->chanhead || !nodeset_ready(ns)) {
128 return NGX_ERROR;
129 }
130 if(sd) {
131 DBG("%reconnect callback");
132 assert(sd->chanhead->redis_sub == sd->sub);
133 assert(&sd->chanhead->id == sd->chid);
134 nchan_store_redis.subscribe(sd->chid, sd->chanhead->redis_sub);
135 sd->onconnect_callback_pd = NULL;
136 sd->sub->dequeue_after_response = 0;
137 ((internal_subscriber_t *)sd->sub)->already_dequeued = 0;
138 ngx_free(pd);
139 }
140 else {
141 DBG("%reconnect callback skipped"); //probably because the channel was deleted while we were waiting
142 }
143 return NGX_OK;
144 }
145
sub_respond_status(ngx_int_t status,void * ptr,sub_data_t * d)146 static ngx_int_t sub_respond_status(ngx_int_t status, void *ptr, sub_data_t *d) {
147 nchan_loc_conf_t fake_cf;
148 redis_nodeset_t *nodeset;
149 if(!d->chanhead) {
150 return NGX_DECLINED;
151 }
152
153 DBG("%p memstore-redis subscriber respond with status %i", d->sub, status);
154 switch(status) {
155 case NGX_HTTP_GONE: //delete
156 case NGX_HTTP_CLOSE: //delete
157 fake_cf = *d->sub->cf;
158 fake_cf.redis.enabled = 0;
159 d->sub->destroy_after_dequeue = 1;
160 nchan_store_memory.delete_channel(d->chid, &fake_cf, NULL, NULL);
161 //now the chanhead will be in the garbage collector
162 d->chanhead->redis_sub = NULL;
163
164 nodeset = nodeset_find(&d->sub->cf->redis);
165 if(!nodeset_ready(nodeset) && d->onconnect_callback_pd == NULL) {
166 sub_data_t **dd = ngx_alloc(sizeof(*d), ngx_cycle->log);
167 *dd = d;
168 d->onconnect_callback_pd = dd;
169 nodeset_callback_on_ready(nodeset, 0, reconnect_callback, dd);
170 }
171 break;
172
173 case NGX_HTTP_NO_CONTENT:
174 if(d->last_msg_status != MSG_EXPECTED) {
175 //the message buffer has just been walked start to finish
176 nchan_memstore_publish_notice(d->chanhead, NCHAN_NOTICE_BUFFER_LOADED, NULL);
177 }
178 d->last_msg_status = MSG_EXPECTED;
179 //TODO: stuff about REDIS_MODE_BACKUP
180
181 break;
182
183 default:
184 //meh, no big deal.
185 break;
186 }
187
188 return NGX_OK;
189 }
190
sub_notify_handler(ngx_int_t code,void * data,sub_data_t * d)191 static ngx_int_t sub_notify_handler(ngx_int_t code, void *data, sub_data_t *d) {
192 intptr_t max_messages;
193 if(!d->chanhead) {
194 return NGX_DECLINED;
195 }
196 switch(code) {
197 case NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE:
198 max_messages = (intptr_t )data;
199 d->chanhead->max_messages = max_messages;
200 memstore_chanhead_messages_gc(d->chanhead);
201 break;
202
203 case NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST:
204 nchan_memstore_publish_notice(d->chanhead, NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST, data);
205 break;
206 }
207 return NGX_OK;
208 }
209
210 /*
211 static void reset_timer(sub_data_t *data) {
212 if(data->timeout_ev.timer_set) {
213 ngx_del_timer(&data->timeout_ev);
214 }
215 ngx_add_timer(&data->timeout_ev, MEMSTORE_REDIS_SUBSCRIBER_TIMEOUT * 1000);
216 }
217 */
218 /*
219 static ngx_int_t keepalive_reply_handler(ngx_int_t renew, void *_, void* pd) {
220 sub_data_t *d = (sub_data_t *)pd;
221 if(d->sub->release(d->sub) == NGX_OK) {
222 if(renew) {
223 reset_timer(d);
224 }
225 else{
226 d->sub->dequeue(d->sub);
227 }
228 }
229 return NGX_OK;
230 }
231 */
232
233 static ngx_str_t sub_name = ngx_string("memstore-redis");
234
memstore_redis_subscriber_create(memstore_channel_head_t * chanhead)235 subscriber_t *memstore_redis_subscriber_create(memstore_channel_head_t *chanhead) {
236 subscriber_t *sub;
237 sub_data_t *d;
238
239 assert(chanhead->cf);
240
241 sub = internal_subscriber_create_init(&sub_name, chanhead->cf, sizeof(*d), (void **)&d, (callback_pt )sub_enqueue, (callback_pt )sub_dequeue, (callback_pt )sub_respond_message, (callback_pt )sub_respond_status, (callback_pt )sub_notify_handler, (callback_pt )sub_destroy_handler);
242
243
244 sub->destroy_after_dequeue = 0;
245 sub->dequeue_after_response = 0;
246
247 d->sub = sub;
248 d->chanhead = chanhead;
249 d->chid = &chanhead->id;
250 d->last_msg_status = MSG_PENDING;
251 d->onconnect_callback_pd = NULL;
252
253
254 /*
255 ngx_memzero(&d->timeout_ev, sizeof(d->timeout_ev));
256 nchan_init_timer(&d->timeout_ev, timeout_ev_handler, d)
257 reset_timer(d);
258 */
259 DBG("%p created memstore-redis subscriber with privdata %p", d->sub, d);
260 return sub;
261 }
262