1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 
4 #include <store/memory/ipc.h>
5 #include <store/memory/store-private.h>
6 #include <store/memory/ipc-handlers.h>
7 #include <store/memory/store.h>
8 
9 #include <store/redis/store.h>
10 #include <store/redis/store-private.h>
11 #include <store/redis/redis_nodeset.h>
12 
13 #include <util/nchan_msg.h>
14 #include "internal.h"
15 #include "memstore_redis.h"
16 #include <assert.h>
17 
18 //#define DEBUG_LEVEL NGX_LOG_WARN
19 #define DEBUG_LEVEL NGX_LOG_DEBUG
20 
21 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:MEM-REDIS:" fmt, ##arg)
22 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:MEM-REDIS:" fmt, ##arg)
23 
24 
25 typedef struct sub_data_s sub_data_t;
26 
27 struct sub_data_s {
28   subscriber_t                 *sub;
29   memstore_channel_head_t      *chanhead;
30   ngx_str_t                    *chid;
31   ngx_event_t                   timeout_ev;
32   nchan_msg_status_t            last_msg_status;
33   sub_data_t                  **onconnect_callback_pd;
34 }; //sub_data_t
35 
36 /*
37 static ngx_int_t empty_callback(){
38   return NGX_OK;
39 }
40 */
41 
sub_enqueue(ngx_int_t status,void * ptr,sub_data_t * d)42 static ngx_int_t sub_enqueue(ngx_int_t status, void *ptr, sub_data_t *d) {
43   DBG("%p memstore-redis subsriber enqueued ok", d->sub);
44   if(d->chanhead) {
45     d->chanhead->status = READY;
46     d->chanhead->spooler.fn->handle_channel_status_change(&d->chanhead->spooler);
47   }
48 
49   return NGX_OK;
50 }
51 
memstore_redis_subscriber_destroy(subscriber_t * sub)52 ngx_int_t memstore_redis_subscriber_destroy(subscriber_t *sub) {
53   DBG("%p destroy", sub);
54   sub_data_t  *d = internal_subscriber_get_privdata(sub);
55   d->chanhead = NULL; //memstore chanhead should be presumed missing
56   return internal_subscriber_destroy(sub);
57 }
58 
sub_dequeue(ngx_int_t status,void * ptr,sub_data_t * d)59 static ngx_int_t sub_dequeue(ngx_int_t status, void *ptr, sub_data_t* d) {
60   DBG("%p dequeue", d->sub);
61   return NGX_OK;
62 }
63 
sub_respond_message(ngx_int_t status,void * ptr,sub_data_t * d)64 static ngx_int_t sub_respond_message(ngx_int_t status, void *ptr, sub_data_t* d) {
65   nchan_msg_t       *msg = (nchan_msg_t *) ptr;
66   nchan_loc_conf_t   cf;
67   nchan_msg_id_t    *lastid;
68   ngx_pool_t        *deflate_pool = NULL;
69   int                nostore_mode;
70   if(!d->chanhead) {
71     DBG("memstore chanhead gone");
72     return NGX_DECLINED;
73   }
74   nostore_mode = d->chanhead->cf->redis.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE;
75   DBG("%p memstore-redis subscriber respond with message", d->sub);
76 
77   cf.max_messages = d->chanhead->max_messages;
78   cf.redis.enabled = 0;
79   cf.message_timeout = msg->expires - ngx_time();
80   cf.complex_max_messages = NULL;
81   cf.complex_message_timeout = NULL;
82   cf.message_compression = msg->compressed ? msg->compressed->compression : NCHAN_MSG_NO_COMPRESSION;
83 
84   if(cf.message_compression != NCHAN_MSG_NO_COMPRESSION) {
85     deflate_pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE / 2, ngx_cycle->log);
86     if(!deflate_pool) {
87       ERR("unable to create deflatepool");
88       return NGX_ERROR;
89     }
90     nchan_deflate_message_if_needed(msg, &cf, NULL, deflate_pool);
91   }
92   else {
93     msg->compressed = NULL;
94   }
95 
96   lastid = &d->chanhead->latest_msgid;
97 
98   assert(lastid->tagcount == 1 && msg->id.tagcount == 1);
99   if(nostore_mode || lastid->time < msg->id.time ||
100     (lastid->time == msg->id.time && lastid->tag.fixed[0] < msg->id.tag.fixed[0])) {
101     if(nostore_mode) {
102       //out of order messages are ok, we don't care. publish them like they just got here. we can't lose messages due to network lag
103       msg->expires = 0;
104       msg->id.time = 0;
105     }
106     memstore_ensure_chanhead_is_ready(d->chanhead, 1);
107     nchan_store_chanhead_publish_message_generic(d->chanhead, msg, 0, &cf, NULL, NULL);
108   }
109   else {
110     //meh, this message has already been delivered probably hopefully
111   }
112   if(deflate_pool) {
113     ngx_destroy_pool(deflate_pool);
114   }
115   return NGX_OK;
116 }
117 
sub_destroy_handler(ngx_int_t status,void * d,sub_data_t * pd)118 static ngx_int_t sub_destroy_handler(ngx_int_t status, void *d, sub_data_t *pd) {
119   DBG("%p sub_destroy_handler", pd->sub);
120   if(pd->onconnect_callback_pd)
121     (*pd->onconnect_callback_pd) = NULL;
122   return NGX_OK;
123 }
124 
reconnect_callback(redis_nodeset_t * ns,void * pd)125 static ngx_int_t reconnect_callback(redis_nodeset_t *ns, void *pd) {
126   sub_data_t *sd = *((sub_data_t **) pd);
127   if(!sd->chanhead || !nodeset_ready(ns)) {
128     return NGX_ERROR;
129   }
130   if(sd) {
131     DBG("%reconnect callback");
132     assert(sd->chanhead->redis_sub == sd->sub);
133     assert(&sd->chanhead->id == sd->chid);
134     nchan_store_redis.subscribe(sd->chid, sd->chanhead->redis_sub);
135     sd->onconnect_callback_pd = NULL;
136     sd->sub->dequeue_after_response = 0;
137     ((internal_subscriber_t *)sd->sub)->already_dequeued = 0;
138     ngx_free(pd);
139   }
140   else {
141     DBG("%reconnect callback skipped"); //probably because the channel was deleted while we were waiting
142   }
143   return NGX_OK;
144 }
145 
sub_respond_status(ngx_int_t status,void * ptr,sub_data_t * d)146 static ngx_int_t sub_respond_status(ngx_int_t status, void *ptr, sub_data_t *d) {
147   nchan_loc_conf_t  fake_cf;
148   redis_nodeset_t   *nodeset;
149   if(!d->chanhead) {
150     return NGX_DECLINED;
151   }
152 
153   DBG("%p memstore-redis subscriber respond with status %i", d->sub, status);
154   switch(status) {
155     case NGX_HTTP_GONE: //delete
156     case NGX_HTTP_CLOSE: //delete
157       fake_cf = *d->sub->cf;
158       fake_cf.redis.enabled = 0;
159       d->sub->destroy_after_dequeue = 1;
160       nchan_store_memory.delete_channel(d->chid, &fake_cf, NULL, NULL);
161       //now the chanhead will be in the garbage collector
162       d->chanhead->redis_sub = NULL;
163 
164       nodeset = nodeset_find(&d->sub->cf->redis);
165       if(!nodeset_ready(nodeset) && d->onconnect_callback_pd == NULL) {
166         sub_data_t **dd = ngx_alloc(sizeof(*d), ngx_cycle->log);
167         *dd = d;
168         d->onconnect_callback_pd = dd;
169         nodeset_callback_on_ready(nodeset, 0, reconnect_callback, dd);
170       }
171       break;
172 
173     case NGX_HTTP_NO_CONTENT:
174       if(d->last_msg_status != MSG_EXPECTED) {
175         //the message buffer has just been walked start to finish
176         nchan_memstore_publish_notice(d->chanhead, NCHAN_NOTICE_BUFFER_LOADED, NULL);
177       }
178       d->last_msg_status = MSG_EXPECTED;
179       //TODO: stuff about REDIS_MODE_BACKUP
180 
181       break;
182 
183     default:
184       //meh, no big deal.
185       break;
186   }
187 
188   return NGX_OK;
189 }
190 
sub_notify_handler(ngx_int_t code,void * data,sub_data_t * d)191 static ngx_int_t sub_notify_handler(ngx_int_t code, void *data, sub_data_t *d) {
192   intptr_t  max_messages;
193   if(!d->chanhead) {
194     return NGX_DECLINED;
195   }
196   switch(code) {
197     case NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE:
198       max_messages = (intptr_t )data;
199       d->chanhead->max_messages = max_messages;
200       memstore_chanhead_messages_gc(d->chanhead);
201       break;
202 
203     case NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST:
204       nchan_memstore_publish_notice(d->chanhead, NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST, data);
205       break;
206   }
207   return NGX_OK;
208 }
209 
210 /*
211 static void reset_timer(sub_data_t *data) {
212   if(data->timeout_ev.timer_set) {
213     ngx_del_timer(&data->timeout_ev);
214   }
215   ngx_add_timer(&data->timeout_ev, MEMSTORE_REDIS_SUBSCRIBER_TIMEOUT * 1000);
216 }
217 */
218 /*
219 static ngx_int_t keepalive_reply_handler(ngx_int_t renew, void *_, void* pd) {
220   sub_data_t *d = (sub_data_t *)pd;
221   if(d->sub->release(d->sub) == NGX_OK) {
222     if(renew) {
223       reset_timer(d);
224     }
225     else{
226       d->sub->dequeue(d->sub);
227     }
228   }
229   return NGX_OK;
230 }
231 */
232 
233 static ngx_str_t   sub_name = ngx_string("memstore-redis");
234 
memstore_redis_subscriber_create(memstore_channel_head_t * chanhead)235 subscriber_t *memstore_redis_subscriber_create(memstore_channel_head_t *chanhead) {
236   subscriber_t               *sub;
237   sub_data_t                 *d;
238 
239   assert(chanhead->cf);
240 
241   sub = internal_subscriber_create_init(&sub_name, chanhead->cf, sizeof(*d), (void **)&d, (callback_pt )sub_enqueue, (callback_pt )sub_dequeue, (callback_pt )sub_respond_message, (callback_pt )sub_respond_status, (callback_pt )sub_notify_handler, (callback_pt )sub_destroy_handler);
242 
243 
244   sub->destroy_after_dequeue = 0;
245   sub->dequeue_after_response = 0;
246 
247   d->sub = sub;
248   d->chanhead = chanhead;
249   d->chid = &chanhead->id;
250   d->last_msg_status = MSG_PENDING;
251   d->onconnect_callback_pd = NULL;
252 
253 
254   /*
255   ngx_memzero(&d->timeout_ev, sizeof(d->timeout_ev));
256   nchan_init_timer(&d->timeout_ev, timeout_ev_handler, d)
257   reset_timer(d);
258   */
259   DBG("%p created memstore-redis subscriber with privdata %p", d->sub, d);
260   return sub;
261 }
262