1 #include <nchan_module.h>
2 
3 #include <assert.h>
4 #include <netinet/ip.h>
5 #include "store-private.h"
6 #include "store.h"
7 
8 #include "redis_nginx_adapter.h"
9 
10 #include <util/nchan_msg.h>
11 #include <util/nchan_rbtree.h>
12 #include <store/store_common.h>
13 
14 #include <store/memory/store.h>
15 
16 #include "redis_nodeset.h"
17 #include "redis_lua_commands.h"
18 
19 #define REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP 600 //10min
20 #define REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX 2628000 //whole month
21 
22 #define REDIS_RECONNECT_TIME 5000
23 
24 #define REDIS_STALL_CHECK_TIME 0 //disable for now
25 
26 //#define DEBUG_LEVEL NGX_LOG_WARN
27 #define DEBUG_LEVEL NGX_LOG_DEBUG
28 
29 #define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "REDISTORE: " fmt, ##args)
30 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "REDISTORE: " fmt, ##args)
31 
32 #define REDIS_CONNECTION_FOR_PUBLISH_WAIT 5000
33 
34 u_char            redis_subscriber_id[255];
35 size_t            redis_subscriber_id_len;
36 
37 static rdstore_channel_head_t    *chanhead_hash = NULL;
38 static size_t                     redis_publish_message_msgkey_size;
39 
40 
41 #define CHANNEL_HASH_FIND(id_buf, p)    HASH_FIND( hh, chanhead_hash, (id_buf)->data, (id_buf)->len, p)
42 #define CHANNEL_HASH_ADD(chanhead)      HASH_ADD_KEYPTR( hh, chanhead_hash, (chanhead->id).data, (chanhead->id).len, chanhead)
43 #define CHANNEL_HASH_DEL(chanhead)      HASH_DEL( chanhead_hash, chanhead)
44 
45 #include <stdbool.h>
46 #include "cmp.h"
47 
48 
49 #define redis_subscriber_command(node, cb, pd, fmt, args...)        \
50   do {                                                               \
51     if((node)->state >= REDIS_NODE_READY) {                    \
52       redisAsyncCommand((node)->ctx.pubsub, cb, pd, fmt, ##args);      \
53     } else {                                                         \
54       ERR("Can't run redis command: no connection to redis server.");\
55     }                                                                \
56   }while(0)                                                          \
57 
58 
59 
60 #define redis_sync_command(node, fmt, args...)                       \
61   do {                                                               \
62     if((node)->ctx.sync == NULL) {                                   \
63       redis_nginx_open_sync_context(((node)->connect_params.peername.len > 0 ? &(node)->connect_params.peername : &(node)->connect_params.hostname), (node)->connect_params.port, (node)->connect_params.db, &(node)->connect_params.password, &(node)->ctx.sync); \
64     }                                                                \
65     if((node)->ctx.sync) {                                          \
66       redisCommand((node)->ctx.sync, fmt, ##args);                  \
67     } else {                                                         \
68       ERR("Can't run redis command: no connection to redis server.");\
69     }                                                                \
70   }while(0)
71 
72 #define redis_sync_script(script_name, node, fmt, args...)                    \
73   redis_sync_command(node, "EVALSHA %s " fmt, redis_lua_scripts.script_name.hash, ##args)
74 
75 #define nchan_redis_sync_script(script_name, node, channel_id, fmt, args...)  \
76   redis_sync_script(script_name, node, "0 %b %b " fmt, STR(node->nodeset->settings.namespace), STR(channel_id), ##args)
77 
78 
79 
80 #define redis_command(node, cb, pd, fmt, args...)                 \
81   do {                                                               \
82     if(node->state >= REDIS_NODE_READY) {                            \
83       if((cb) != NULL) {                                               \
84         /* a reply is expected, so track this command */             \
85         node->pending_commands++;                                    \
86         nchan_update_stub_status(redis_pending_commands, 1);         \
87       }                                                              \
88       redisAsyncCommand((node)->ctx.cmd, cb, pd, fmt, ##args);       \
89     } else {                                                         \
90       node_log_error(node, "Can't run redis command: no connection to redis server.");\
91     }                                                                \
92   }while(0)                                                          \
93 
94 #define redis_script(script_name, node, cb, pd, fmt, args...)                         \
95   redis_command(node, cb, pd, "EVALSHA %s " fmt, redis_lua_scripts.script_name.hash, ##args)
96 
97 #define nchan_redis_script(script_name, node, cb, pd, channel_id, fmt, args...)       \
98   redis_script(script_name, node, cb, pd, "0 %b %b " fmt, STR((node)->nodeset->settings.namespace), STR(channel_id), ##args)
99 
100 
101 #define CHECK_REPLY_STR(reply) ((reply)->type == REDIS_REPLY_STRING)
102 #define CHECK_REPLY_STRVAL(reply, v) ( CHECK_REPLY_STR(reply) && ngx_strcmp((reply)->str, v) == 0 )
103 #define CHECK_REPLY_STRNVAL(reply, v, n) ( CHECK_REPLY_STR(reply) && ngx_strncmp((reply)->str, v, n) == 0 )
104 #define CHECK_REPLY_STATUSVAL(reply, v) ( (reply)->type == REDIS_REPLY_STATUS && ngx_strcmp((reply)->str, v) == 0 )
105 #define CHECK_REPLY_INT(reply) ((reply)->type == REDIS_REPLY_INTEGER)
106 #define CHECK_REPLY_INTVAL(reply, v) ( CHECK_REPLY_INT(reply) && (reply)->integer == v )
107 #define CHECK_REPLY_ARRAY_MIN_SIZE(reply, size) ( (reply)->type == REDIS_REPLY_ARRAY && (reply)->elements >= (unsigned )size )
108 #define CHECK_REPLY_NIL(reply) ((reply)->type == REDIS_REPLY_NIL)
109 #define CHECK_REPLY_INT_OR_STR(reply) ((reply)->type == REDIS_REPLY_INTEGER || (reply)->type == REDIS_REPLY_STRING)
110 
111 static ngx_int_t nchan_store_publish_generic(ngx_str_t *, redis_nodeset_t *, nchan_msg_t *, ngx_int_t, const ngx_str_t *);
112 
113 static rdstore_channel_head_t * nchan_store_get_chanhead(ngx_str_t *channel_id, redis_nodeset_t *);
114 
set_buf(ngx_buf_t * buf,u_char * start,off_t len)115 static ngx_buf_t *set_buf(ngx_buf_t *buf, u_char *start, off_t len){
116   ngx_memzero(buf, sizeof(*buf));
117   buf->start = start;
118   buf->end = start + len;
119   buf->pos = buf->start;
120   buf->last = buf->end;
121   return buf;
122 }
123 
ngx_strmatch(ngx_str_t * str,char * match)124 static int ngx_strmatch(ngx_str_t *str, char *match) {
125   return ngx_strncmp(str->data, match, str->len) == 0;
126 }
127 
ngx_str_chop_if_startswith(ngx_str_t * str,char * match)128 static int ngx_str_chop_if_startswith(ngx_str_t *str, char *match) {
129   char *cur, *max = (char *)str->data + str->len;
130   for(cur = (char *)str->data; cur < max; cur++, match++) {
131     if(*match == '\0') {
132       str->len -= (u_char *)cur - str->data;
133       str->data = (u_char *)cur;
134       return 1;
135     }
136     else if(*match != *cur)
137       break;
138   }
139   return 0;
140 }
141 
fwd_buf(ngx_buf_t * buf,size_t sz)142 static u_char *fwd_buf(ngx_buf_t *buf, size_t sz) {
143   u_char *ret = buf->pos;
144   buf->pos += sz;
145   return ret;
146 }
147 
fwd_buf_to_str(ngx_buf_t * buf,size_t sz,ngx_str_t * str)148 static void fwd_buf_to_str(ngx_buf_t *buf, size_t sz, ngx_str_t *str) {
149   str->data = fwd_buf(buf, sz);
150   str->len = sz;;
151 }
152 
ngx_buf_reader(cmp_ctx_t * ctx,void * data,size_t limit)153 static bool ngx_buf_reader(cmp_ctx_t *ctx, void *data, size_t limit) {
154   ngx_buf_t   *buf=(ngx_buf_t *)ctx->buf;
155   if(buf->pos + limit > buf->last){
156     return false;
157   }
158   ngx_memcpy(data, buf->pos, limit);
159   buf->pos += limit;
160   return true;
161 }
ngx_buf_writer(cmp_ctx_t * ctx,const void * data,size_t count)162 static size_t ngx_buf_writer(cmp_ctx_t *ctx, const void *data, size_t count) {
163   return 0;
164 }
165 
nchan_store_redis_validate_url(ngx_str_t * url)166 int nchan_store_redis_validate_url(ngx_str_t *url) {
167   redis_connect_params_t rcp;
168   return parse_redis_url(url, &rcp) == NGX_OK;
169 }
170 
parse_redis_url(ngx_str_t * url,redis_connect_params_t * rcp)171 ngx_int_t parse_redis_url(ngx_str_t *url, redis_connect_params_t *rcp) {
172   u_char                  *cur, *last, *ret;
173 
174   cur = url->data;
175   last = url->data + url->len;
176 
177 
178   //ignore redis://
179   if(ngx_strnstr(cur, "redis://", 8) != NULL) {
180     cur += 8;
181   }
182 
183   if(cur[0] == ':') {
184     cur++;
185     if((ret = ngx_strlchr(cur, last, '@')) == NULL) {
186       rcp->password.data = NULL;
187       rcp->password.len = 0;
188       return NGX_ERROR;
189     }
190     else {
191       rcp->password.data = cur;
192       rcp->password.len = ret - cur;
193       cur = ret + 1;
194     }
195   }
196   else {
197     rcp->password.data = NULL;
198     rcp->password.len = 0;
199   }
200 
201   ///port:host
202   if((ret = ngx_strlchr(cur, last, ':')) == NULL) {
203     //just host
204     rcp->port = 6379;
205     if((ret = ngx_strlchr(cur, last, '/')) == NULL) {
206       ret = last;
207     }
208     rcp->hostname.data = cur;
209     rcp->hostname.len = ret - cur;
210   }
211   else {
212     rcp->hostname.data = cur;
213     rcp->hostname.len = ret - cur;
214     cur = ret + 1;
215 
216     //port
217     if((ret = ngx_strlchr(cur, last, '/')) == NULL) {
218       ret = last;
219     }
220     rcp->port = ngx_atoi(cur, ret-cur);
221     if(rcp->port == NGX_ERROR) {
222       return NGX_ERROR;
223     }
224   }
225   cur = ret;
226 
227   if(cur[0] == '/') {
228     cur++;
229     rcp->db = ngx_atoi(cur, last-cur);
230     if(rcp->db == NGX_ERROR) {
231       rcp->db = 0;
232     }
233   }
234   else {
235     rcp->db = 0;
236   }
237 
238   return NGX_OK;
239 }
240 
redis_store_reap_chanhead(rdstore_channel_head_t * ch)241 static void redis_store_reap_chanhead(rdstore_channel_head_t *ch) {
242   if(!ch->shutting_down) {
243     assert(ch->sub_count == 0 && ch->fetching_message_count == 0);
244   }
245 
246   DBG("reap channel %V", &ch->id);
247 
248   if(ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBED) {
249     assert(ch->redis.nodeset->settings.storage_mode >= REDIS_MODE_DISTRIBUTED);
250     assert(ch->redis.node.pubsub);
251     ch->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
252     redis_subscriber_command(ch->redis.node.pubsub, NULL, NULL, "UNSUBSCRIBE %b{channel:%b}:pubsub", STR(ch->redis.nodeset->settings.namespace), STR(&ch->id));
253   }
254 
255   /*
256   redis_nodeset_t *ns = ch->redis.nodeset;
257   redis_node_t *cmd = ch->redis.node.cmd;
258   redis_node_t *pubsub = ch->redis.node.pubsub;
259   */
260 
261   nodeset_dissociate_chanhead(ch);
262 
263   /*
264   rdstore_channel_head_t *cur;
265 
266 
267   for(cur = nchan_slist_first(&ns->channels.all); cur != NULL; cur = nchan_slist_next(&ns->channels.all, cur)) {
268     assert(cur != ch);
269   }
270   for(cur = nchan_slist_first(&ns->channels.disconnected_cmd); cur != NULL; cur = nchan_slist_next(&ns->channels.disconnected_cmd, cur)) {
271     assert(cur != ch);
272   }
273   for(cur = nchan_slist_first(&ns->channels.disconnected_pubsub); cur != NULL; cur = nchan_slist_next(&ns->channels.disconnected_pubsub, cur)) {
274     assert(cur != ch);
275   }
276   if(cmd) {
277     for(cur = nchan_slist_first(&cmd->channels.cmd); cur != NULL; cur = nchan_slist_next(&cmd->channels.cmd, cur)) {
278       assert(cur != ch);
279     }
280     for(cur = nchan_slist_first(&cmd->channels.pubsub); cur != NULL; cur = nchan_slist_next(&cmd->channels.pubsub, cur)) {
281       assert(cur != ch);
282     }
283   }
284   if(pubsub) {
285     for(cur = nchan_slist_first(&pubsub->channels.cmd); cur != NULL; cur = nchan_slist_next(&pubsub->channels.cmd, cur)) {
286       assert(cur != ch);
287     }
288     for(cur = nchan_slist_first(&pubsub->channels.pubsub); cur != NULL; cur = nchan_slist_next(&pubsub->channels.pubsub, cur)) {
289       assert(cur != ch);
290     }
291   }
292 */
293 
294 
295   DBG("chanhead %p (%V) is empty and expired. delete.", ch, &ch->id);
296   if(ch->keepalive_timer.timer_set) {
297     ngx_del_timer(&ch->keepalive_timer);
298   }
299   stop_spooler(&ch->spooler, 1);
300   CHANNEL_HASH_DEL(ch);
301 
302   ngx_free(ch);
303 }
304 
nchan_redis_chanhead_ready_to_reap(rdstore_channel_head_t * ch,uint8_t force)305 static ngx_int_t nchan_redis_chanhead_ready_to_reap(rdstore_channel_head_t *ch, uint8_t force) {
306   if(!force) {
307     if(ch->status != INACTIVE) {
308       return NGX_DECLINED;
309     }
310 
311     if(ch->reserved > 0 ) {
312       DBG("not yet time to reap %V, %i reservations left", &ch->id, ch->reserved);
313       return NGX_DECLINED;
314     }
315 
316     if(ch->gc.time - ngx_time() > 0) {
317       DBG("not yet time to reap %V, %i sec left", &ch->id, ch->gc.time - ngx_time());
318       return NGX_DECLINED;
319     }
320 
321     if (ch->sub_count > 0) { //there are subscribers
322       DBG("not ready to reap %V, %i subs left", &ch->id, ch->sub_count);
323       return NGX_DECLINED;
324     }
325 
326     if (ch->fetching_message_count > 0) { //there are subscribers
327       DBG("not ready to reap %V, fetching %i messages", &ch->id, ch->fetching_message_count);
328       return NGX_DECLINED;
329     }
330 
331     //if(ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBING) {
332     //  return NGX_DECLINED;
333     //}
334 
335     //DBG("ok to delete channel %V", &ch->id);
336     return NGX_OK;
337   }
338   else {
339     //force delete is always ok
340     return NGX_OK;
341   }
342 }
343 
344 static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privdata);
345 
nchan_store_init_worker(ngx_cycle_t * cycle)346 static ngx_int_t nchan_store_init_worker(ngx_cycle_t *cycle) {
347   ngx_int_t rc = NGX_OK;
348   u_char *cur;
349   cur = ngx_snprintf(redis_subscriber_id, 512, "nchan_worker:{%i:time:%i}%Z", ngx_pid, ngx_time());
350   redis_subscriber_id_len = cur - redis_subscriber_id;
351 
352   //DBG("worker id %s len %i", redis_subscriber_id, redis_subscriber_id_len);
353 
354   redis_nginx_init();
355 
356   nodeset_initialize((char *)redis_subscriber_id, redis_subscriber_callback);
357   nodeset_connect_all();
358 
359   //OLD
360   //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_connector, &rc);
361   return rc;
362 }
363 
redisCheckErrorCallback(redisAsyncContext * c,void * r,void * privdata)364 void redisCheckErrorCallback(redisAsyncContext *c, void *r, void *privdata) {
365   redisReplyOk(c, r);
366 }
redisReplyOk(redisAsyncContext * ac,void * r)367 int redisReplyOk(redisAsyncContext *ac, void *r) {
368   static const ngx_str_t script_error_start= ngx_string("ERR Error running script (call to f_");
369   redis_node_t         *node = ac->data;
370   redisReply *reply = (redisReply *)r;
371   if(reply == NULL) { //redis disconnected?...
372     if(ac->err) {
373       node_log_error(node, "connection to redis failed while waiting for reply - %s", ac->errstr);
374     }
375     else {
376       node_log_error(node, "got a NULL redis reply for unknown reason");
377     }
378     return 0;
379   }
380   else if(reply->type == REDIS_REPLY_ERROR) {
381     if(ngx_strncmp(reply->str, script_error_start.data, script_error_start.len) == 0 && (unsigned ) reply->len > script_error_start.len + REDIS_LUA_HASH_LENGTH) {
382       char *hash = &reply->str[script_error_start.len];
383       redis_lua_script_t  *script;
384       REDIS_LUA_SCRIPTS_EACH(script) {
385         if (ngx_strncmp(script->hash, hash, REDIS_LUA_HASH_LENGTH)==0) {
386           node_log_error(node, "REDIS SCRIPT ERROR: %s :%s", script->name, &reply->str[script_error_start.len + REDIS_LUA_HASH_LENGTH + 2]);
387           return 0;
388         }
389       }
390       node_log_error(node, "REDIS SCRIPT ERROR: (unknown): %s", reply->str);
391     }
392     else {
393       node_log_error(node, "REDIS REPLY ERROR: %s", reply->str);
394     }
395     return 0;
396   }
397   else {
398     return 1;
399   }
400 }
401 
redisEchoCallback(redisAsyncContext * ac,void * r,void * privdata)402 static void redisEchoCallback(redisAsyncContext *ac, void *r, void *privdata) {
403   redisReply      *reply = r;
404   redis_node_t    *node = NULL;
405   unsigned    i;
406   //nchan_channel_t * channel = (nchan_channel_t *)privdata;
407   if(ac) {
408     node = ac->data;
409     if(ac->err) {
410       node_log_error(node, "connection to redis failed - %s", ac->errstr);
411       return;
412     }
413   }
414   else {
415     node_log_error(node, "connection to redis was terminated");
416     return;
417   }
418   if(reply == NULL) {
419     node_log_error(node, "REDIS REPLY is NULL");
420     return;
421   }
422 
423   switch(reply->type) {
424     case REDIS_REPLY_STATUS:
425       node_log_error(node, "REDIS_REPLY_STATUS  %s", reply->str);
426       break;
427 
428     case REDIS_REPLY_ERROR:
429       redisCheckErrorCallback(ac, r, privdata);
430       break;
431 
432     case REDIS_REPLY_INTEGER:
433       node_log_error(node, "REDIS_REPLY_INTEGER: %i", reply->integer);
434       break;
435 
436     case REDIS_REPLY_NIL:
437       node_log_error(node, "REDIS_REPLY_NIL: nil");
438       break;
439 
440     case REDIS_REPLY_STRING:
441       node_log_error(node, "REDIS_REPLY_STRING: %s", reply->str);
442       break;
443 
444     case REDIS_REPLY_ARRAY:
445       node_log_error(node, "REDIS_REPLY_ARRAY: %i", reply->elements);
446       for(i=0; i< reply->elements; i++) {
447         redisEchoCallback(ac, reply->element[i], "  ");
448       }
449       break;
450   }
451   //redis_subscriber_command(NULL, NULL, "UNSUBSCRIBE {channel:%b}:pubsub", str(&(channel->id)));
452 }
453 
454 static ngx_int_t msg_from_redis_get_message_reply(nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event, redisReply *r, uint16_t offset);
455 
456 #define SLOW_REDIS_REPLY 100 //ms
457 
log_redis_reply(char * name,ngx_msec_t t)458 static ngx_int_t log_redis_reply(char *name, ngx_msec_t t) {
459   ngx_msec_t   dt = ngx_current_msec - t;
460   if(dt >= SLOW_REDIS_REPLY) {
461     DBG("redis command %s took %i msec", name, dt);
462   }
463   return NGX_OK;
464 }
465 
redisReply_to_ngx_int(redisReply * el,ngx_int_t * integer)466 static ngx_int_t redisReply_to_ngx_int(redisReply *el, ngx_int_t *integer) {
467   if(CHECK_REPLY_INT(el)) {
468     *integer=el->integer;
469   }
470   else if(CHECK_REPLY_STR(el)) {
471     *integer=ngx_atoi((u_char *)el->str, el->len);
472   }
473   else {
474     return NGX_ERROR;
475   }
476   return NGX_OK;
477 }
478 
479 typedef struct {
480   ngx_msec_t                    t;
481   char                         *name;
482   ngx_str_t                     channel_id;
483   nchan_msg_id_t               *msg_id;
484   ngx_str_t                     msg_key;
485 } redis_get_message_from_key_data_t;
486 
487 static void get_msg_from_msgkey_callback(redisAsyncContext *ac, void *r, void *privdata);
488 
get_msg_from_msgkey_send(redis_nodeset_t * ns,void * pd)489 static ngx_int_t get_msg_from_msgkey_send(redis_nodeset_t *ns, void *pd) {
490   redis_get_message_from_key_data_t *d = pd;
491   if(nodeset_ready(ns)) {
492     redis_node_t  *node = nodeset_node_find_by_key(ns, &d->msg_key);
493     redis_script(get_message_from_key, node, &get_msg_from_msgkey_callback, d, "1 %b", STR(&d->msg_key));
494   }
495   else {
496     ngx_free(d);
497   }
498   return NGX_OK;
499 }
500 
get_msg_from_msgkey_callback(redisAsyncContext * ac,void * r,void * privdata)501 static void get_msg_from_msgkey_callback(redisAsyncContext *ac, void *r, void *privdata) {
502   redis_get_message_from_key_data_t *d = (redis_get_message_from_key_data_t *)privdata;
503   redisReply           *reply = r;
504   nchan_msg_t           msg;
505   nchan_compressed_msg_t cmsg;
506   ngx_str_t             content_type;
507   ngx_str_t             eventsource_event;
508   ngx_str_t            *chid = &d->channel_id;
509   redis_node_t         *node = ac->data;
510 
511   node->pending_commands--;
512   nchan_update_stub_status(redis_pending_commands, -1);
513 
514   DBG("get_msg_from_msgkey_callback");
515 
516   log_redis_reply(d->name, d->t);
517 
518   if(!nodeset_node_reply_keyslot_ok(node, reply)) {
519     nodeset_callback_on_ready(node->nodeset, 1000, get_msg_from_msgkey_send, d);
520     return;
521   }
522 
523   if(reply) {
524     if(chid == NULL) {
525       ERR("get_msg_from_msgkey channel id is NULL");
526       return;
527     }
528     if(msg_from_redis_get_message_reply(&msg, &cmsg, &content_type, &eventsource_event, reply, 0) != NGX_OK) {
529       ERR("invalid message or message absent after get_msg_from_key");
530       return;
531     }
532     nchan_store_publish_generic(chid, node->nodeset, &msg, 0, NULL);
533   }
534   else {
535     //reply is NULL
536   }
537   ngx_free(d);
538 }
539 
cmp_err(cmp_ctx_t * cmp)540 static bool cmp_err(cmp_ctx_t *cmp) {
541   ERR("msgpack parsing error: %s", cmp_strerror(cmp));
542   return false;
543 }
544 
cmp_to_str(cmp_ctx_t * cmp,ngx_str_t * str)545 static bool cmp_to_str(cmp_ctx_t *cmp, ngx_str_t *str) {
546   ngx_buf_t      *mpbuf =(ngx_buf_t *)cmp->buf;
547   uint32_t        sz;
548 
549   if(cmp_read_str_size(cmp, &sz)) {
550     fwd_buf_to_str(mpbuf, sz, str);
551     return true;
552   }
553   else {
554     cmp_err(cmp);
555     return false;
556   }
557 }
558 
cmp_to_msg(cmp_ctx_t * cmp,nchan_msg_t * msg,nchan_compressed_msg_t * cmsg,ngx_str_t * content_type,ngx_str_t * eventsource_event)559 static bool cmp_to_msg(cmp_ctx_t *cmp, nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event) {
560   ngx_buf_t  *mpb = (ngx_buf_t *)cmp->buf;
561   uint32_t    sz;
562   uint64_t    msgtag;
563   int32_t     ttl;
564   int32_t     compression;
565   //ttl
566   if(!cmp_read_int(cmp, &ttl)) {
567     return cmp_err(cmp);
568   }
569   assert(ttl >= 0);
570   if(ttl == 0) {
571     ttl++; // less than a second left for this message... give it a second's lease on life
572   }
573   msg->expires = ngx_time() + ttl;
574 
575   //msg id
576   if(!cmp_read_uinteger(cmp, (uint64_t *)&msg->id.time)) {
577     return cmp_err(cmp);
578   }
579   if(!cmp_read_uinteger(cmp, &msgtag)) {
580     return cmp_err(cmp);
581   }
582   else {
583     msg->id.tag.fixed[0] = msgtag;
584     msg->id.tagactive = 0;
585     msg->id.tagcount = 1;
586   }
587 
588   //msg prev_id
589   if(!cmp_read_uinteger(cmp, (uint64_t *)&msg->prev_id.time)) {
590     return cmp_err(cmp);
591   }
592   if(!cmp_read_uinteger(cmp, &msgtag)) {
593     return cmp_err(cmp);
594   }
595   else {
596     msg->prev_id.tag.fixed[0] = msgtag;
597     msg->prev_id.tagactive = 0;
598     msg->prev_id.tagcount = 1;
599   }
600 
601   //message data
602   if(!cmp_read_str_size(cmp, &sz)) {
603     return cmp_err(cmp);
604   }
605   set_buf(&msg->buf, mpb->pos, sz);
606   fwd_buf(mpb, sz);
607   msg->buf.memory = 1;
608   msg->buf.last_buf = 1;
609   msg->buf.last_in_chain = 1;
610 
611   //content-type
612   if(!cmp_read_str_size(cmp, &sz)) {
613     return cmp_err(cmp);
614   }
615   fwd_buf_to_str(mpb, sz, content_type);
616   msg->content_type = sz > 0 ? content_type : NULL;
617 
618   //eventsource_event
619   if(!cmp_read_str_size(cmp, &sz)) {
620     return cmp_err(cmp);
621   }
622   fwd_buf_to_str(mpb, sz, eventsource_event);
623   msg->eventsource_event = sz > 0 ? eventsource_event : NULL;
624 
625   //compression
626   if(!cmp_read_int(cmp, &compression)) {
627     msg->compressed = NULL;
628   }
629   if(compression > 0) {
630     msg->compressed = cmsg;
631     ngx_memzero(&cmsg->buf, sizeof(cmsg->buf));
632     cmsg->compression = compression;
633   }
634   else {
635     msg->compressed = NULL;
636   }
637 
638   return true;
639 }
640 
get_msg_from_msgkey(ngx_str_t * channel_id,redis_nodeset_t * nodeset,nchan_msg_id_t * msgid,ngx_str_t * msg_redis_hash_key)641 static ngx_int_t get_msg_from_msgkey(ngx_str_t *channel_id, redis_nodeset_t *nodeset, nchan_msg_id_t *msgid, ngx_str_t *msg_redis_hash_key) {
642   rdstore_channel_head_t              *head;
643   redis_get_message_from_key_data_t   *d;
644   DBG("Get message from msgkey %V", msg_redis_hash_key);
645 
646   head = nchan_store_get_chanhead(channel_id, nodeset);
647   if(head->sub_count == 0) {
648     DBG("Nobody wants this message we'll need to grab with an HMGET");
649     return NGX_OK;
650   }
651 
652   if((d=ngx_alloc(sizeof(*d) + (u_char)channel_id->len + (u_char)msg_redis_hash_key->len, ngx_cycle->log)) == 0) {
653     ERR("unable to allocate memory for callback data for message hmget");
654     return NGX_ERROR;
655   }
656   d->channel_id.data = (u_char *)&d[1];
657   nchan_strcpy(&d->channel_id, channel_id, 0);
658 
659   d->msg_key.data = d->channel_id.data + d->channel_id.len;
660   nchan_strcpy(&d->msg_key, msg_redis_hash_key, 0);
661 
662   d->t = ngx_current_msec;
663 
664   d->name = "get_message_from_key";
665 
666   //d->hcln = put_current_subscribers_in_limbo(head);
667   //assert(d->hcln != 0);
668   get_msg_from_msgkey_send(nodeset, d);
669 
670   return NGX_OK;
671 }
672 
673 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub);
674 
str_match_redis_subscriber_channel(ngx_str_t * pubsub_channel,ngx_str_t * ns)675 static int str_match_redis_subscriber_channel(ngx_str_t *pubsub_channel, ngx_str_t *ns) {
676   ngx_str_t psch = *pubsub_channel;
677   if(pubsub_channel->len != ns->len + redis_subscriber_id_len || psch.len < ns->len) {
678     return 0;
679   }
680   if(ngx_memcmp(psch.data, ns->data, ns->len) != 0) {
681     return 0;
682   }
683   psch.data += ns->len;
684   psch.len -= ns->len;
685 
686   return ngx_strmatch(&psch, (char *)redis_subscriber_id);
687 }
688 
get_channel_id_from_pubsub_channel(ngx_str_t * pubsub_channel,ngx_str_t * ns,ngx_str_t * str_in)689 static ngx_str_t *get_channel_id_from_pubsub_channel(ngx_str_t *pubsub_channel, ngx_str_t *ns, ngx_str_t *str_in) {
690   if(str_match_redis_subscriber_channel(pubsub_channel, ns)) {
691     return NULL;
692   }
693   str_in->data = pubsub_channel->data + ns->len + 9; //"<namespace>{channel:"
694   str_in->len = pubsub_channel->len - ns->len - 9;
695   str_in->len -= 8;//"}:pubsub"
696   return str_in;
697 }
698 
find_chanhead_for_pubsub_callback(ngx_str_t * chid)699 static rdstore_channel_head_t *find_chanhead_for_pubsub_callback(ngx_str_t *chid) {
700   rdstore_channel_head_t *head;
701   CHANNEL_HASH_FIND(chid, head);
702   return head;
703 }
704 
redis_subscriber_callback(redisAsyncContext * c,void * r,void * privdata)705 static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privdata) {
706   redisReply             *reply = r;
707   redisReply             *el = NULL;
708   nchan_msg_t             msg;
709   nchan_compressed_msg_t  cmsg;
710   ngx_str_t               content_type;
711   ngx_str_t               eventsource_event;
712   ngx_str_t               chid_str;
713   ngx_str_t              *chid = NULL;
714   ngx_str_t               pubsub_channel;
715   ngx_str_t               msg_redis_hash_key = ngx_null_string;
716   ngx_uint_t              subscriber_id;
717 
718   ngx_buf_t               mpbuf;
719   cmp_ctx_t               cmp;
720 
721   rdstore_channel_head_t     *chanhead = NULL;
722   redis_node_t               *node = c->data;
723   redis_nodeset_t            *nodeset = node->nodeset;
724   ngx_str_t                  *namespace = nodeset->settings.namespace;
725 
726   msg.expires = 0;
727   msg.refcount = 0;
728   msg.parent = NULL;
729   msg.storage = NCHAN_MSG_STACK;
730 
731   if(reply == NULL) return;
732 
733   if(CHECK_REPLY_ARRAY_MIN_SIZE(reply, 3)
734    && CHECK_REPLY_STR(reply->element[0])
735    && (CHECK_REPLY_STR(reply->element[1]) || CHECK_REPLY_INT(reply->element[1]))) {
736     pubsub_channel.data = (u_char *)reply->element[1]->str;
737     pubsub_channel.len = reply->element[1]->len;
738     chid = get_channel_id_from_pubsub_channel(&pubsub_channel, namespace, &chid_str);
739   }
740   else {
741     ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "no PUBSUB message, something else");
742     redisEchoCallback(c,r,privdata);
743     return;
744   }
745 
746   if(CHECK_REPLY_STRVAL(reply->element[0], "message") && CHECK_REPLY_STR(reply->element[2])) {
747 
748     //reply->element[1] is the pubsub channel name
749     el = reply->element[2];
750 
751     if(CHECK_REPLY_STRVAL(el, "ping") && str_match_redis_subscriber_channel(&pubsub_channel, namespace)) {
752       DBG("got pinged");
753     }
754     else if(CHECK_REPLY_STR(el)) {
755       uint32_t    array_sz;
756       unsigned    chid_present = 0;
757       ngx_str_t   extracted_channel_id;
758       unsigned    msgbuf_size_changed = 0;
759       int64_t     msgbuf_size = 0;
760       //maybe a message?
761       set_buf(&mpbuf, (u_char *)el->str, el->len);
762       cmp_init(&cmp, &mpbuf, ngx_buf_reader, NULL, ngx_buf_writer);
763       if(cmp_read_array(&cmp, &array_sz)) {
764 
765         if(array_sz != 0) {
766           uint32_t      sz;
767           ngx_str_t     msg_type;
768           cmp_read_str_size(&cmp ,&sz);
769           fwd_buf_to_str(&mpbuf, sz, &msg_type);
770 
771           if(ngx_str_chop_if_startswith(&msg_type, "max_msgs+")) {
772             if(cmp_read_integer(&cmp, &msgbuf_size))
773               msgbuf_size_changed = 1;
774             else
775               cmp_err(&cmp);
776           }
777 
778           if(ngx_str_chop_if_startswith(&msg_type, "ch+")) {
779             if(cmp_read_str_size(&cmp, &sz)) {
780               fwd_buf_to_str(&mpbuf, sz, &extracted_channel_id);
781               chid = &extracted_channel_id;
782             }
783             else {
784               cmp_err(&cmp);
785             }
786           }
787           //else {
788           //  chid already set from the pubsub channel name
789           //}
790 
791           if(msgbuf_size_changed && (chanhead = nchan_store_get_chanhead(chid, nodeset)) != NULL) {
792             chanhead->spooler.fn->broadcast_notice(&chanhead->spooler, NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE, (void *)(intptr_t )msgbuf_size);
793           }
794 
795           if(!chanhead) {
796             chanhead = find_chanhead_for_pubsub_callback(chid);
797           }
798 
799           if(ngx_strmatch(&msg_type, "msg")) {
800             assert(array_sz >= 9 + msgbuf_size_changed + chid_present);
801             if(chanhead && cmp_to_msg(&cmp, &msg, &cmsg, &content_type, &eventsource_event)) {
802               //ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "got msg %V", msgid_to_str(&msg));
803               nchan_store_publish_generic(chid, chanhead ? chanhead->redis.nodeset : nodeset, &msg, 0, NULL);
804             }
805             else {
806               ERR("thought there'd be a channel for msg");
807             }
808           }
809           else if(ngx_strmatch(&msg_type, "msgkey")) {
810             assert(array_sz == 4 + msgbuf_size_changed + chid_present);
811             if(chanhead != NULL) {
812               uint64_t              msgtag;
813               nchan_msg_id_t        msgid;
814 
815               if(!cmp_read_uinteger(&cmp, (uint64_t *)&msgid.time)) {
816                 cmp_err(&cmp);
817                 return;
818               }
819 
820               if(!cmp_read_uinteger(&cmp, &msgtag)) {
821                 cmp_err(&cmp);
822                 return;
823               }
824               else {
825                 msgid.tag.fixed[0] = msgtag;
826                 msgid.tagactive = 0;
827                 msgid.tagcount = 1;
828               }
829 
830               if(cmp_to_str(&cmp, &msg_redis_hash_key)) {
831                 get_msg_from_msgkey(chid, chanhead ? chanhead->redis.nodeset : node->nodeset, &msgid, &msg_redis_hash_key);
832               }
833             }
834             else {
835               ERR("thought there'd be a channel id around for msgkey");
836             }
837           }
838           else if(ngx_strmatch(&msg_type, "alert") && array_sz > 1) {
839             ngx_str_t    alerttype;
840 
841             if(!cmp_to_str(&cmp, &alerttype)) {
842               return;
843             }
844 
845             if(ngx_strmatch(&alerttype, "delete channel") && array_sz > 2) {
846               if(cmp_to_str(&cmp, &extracted_channel_id)) {
847                 rdstore_channel_head_t *doomed_channel;
848                 nchan_store_publish_generic(&extracted_channel_id, nodeset, NULL, NGX_HTTP_GONE, &NCHAN_HTTP_STATUS_410);
849                 doomed_channel = nchan_store_get_chanhead(&extracted_channel_id, nodeset);
850                 redis_chanhead_gc_add(doomed_channel, 0, "channel deleted");
851               }
852               else {
853                 ERR("unexpected \"delete channel\" msgpack message from redis");
854               }
855             }
856             else if(ngx_strmatch(&alerttype, "unsub one") && array_sz > 3) {
857               if(cmp_to_str(&cmp, &extracted_channel_id)) {
858                 cmp_to_str(&cmp, &extracted_channel_id);
859                 cmp_read_uinteger(&cmp, (uint64_t *)&subscriber_id);
860                 //TODO
861               }
862               ERR("unsub one not yet implemented");
863             }
864             else if(ngx_strmatch(&alerttype, "unsub all") && array_sz > 1) {
865               if(cmp_to_str(&cmp, &extracted_channel_id)) {
866                 nchan_store_publish_generic(&extracted_channel_id, nodeset, NULL, NGX_HTTP_CONFLICT, &NCHAN_HTTP_STATUS_409);
867               }
868             }
869             else if(ngx_strmatch(&alerttype, "unsub all except")) {
870               if(cmp_to_str(&cmp, &extracted_channel_id)) {
871                 cmp_read_uinteger(&cmp, (uint64_t *)&subscriber_id);
872                 //TODO
873               }
874               ERR("unsub all except not yet  implemented");
875             }
876             else if(ngx_strmatch(&alerttype, "subscriber info")) {
877               uint64_t request_id;
878               cmp_read_uinteger(&cmp, &request_id);
879 
880               if((chanhead = nchan_store_get_chanhead(chid, nodeset)) == NULL) {
881                 ERR("received invalid subscriber info notice with bad channel name");
882               }
883               else {
884                 chanhead->spooler.fn->broadcast_notice(&chanhead->spooler, NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST, (void *)(intptr_t )request_id);
885               }
886             }
887 
888             else {
889               ERR("unexpected msgpack alert from redis");
890             }
891           }
892           else {
893             ERR("unexpected msgpack message from redis");
894           }
895         }
896         else {
897           ERR("unexpected msgpack object from redis");
898         }
899       }
900       else {
901         ERR("invalid msgpack message from redis: %s", cmp_strerror(&cmp));
902       }
903     }
904 
905     else { //not a string
906       redisEchoCallback(c, el, NULL);
907     }
908   }
909 
910   else if(CHECK_REPLY_STRVAL(reply->element[0], "subscribe") && CHECK_REPLY_INT(reply->element[2])) {
911 
912     if(chid) {
913       chanhead = find_chanhead_for_pubsub_callback(chid);
914       if(chanhead != NULL) {
915         if(chanhead->pubsub_status != REDIS_PUBSUB_SUBSCRIBING) {
916           ERR("expected previous pubsub_status for channel %p (id: %V) to be REDIS_PUBSUB_SUBSCRIBING (%i), was %i", chanhead, &chanhead->id, REDIS_PUBSUB_SUBSCRIBING, chanhead->pubsub_status);
917         }
918         chanhead->pubsub_status = REDIS_PUBSUB_SUBSCRIBED;
919 
920         switch(chanhead->status) {
921           case NOTREADY:
922             chanhead->status = READY;
923             chanhead->spooler.fn->handle_channel_status_change(&chanhead->spooler);
924             //ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "REDIS: PUB/SUB subscribed to %s, chanhead %p now READY.", reply->element[1]->str, chanhead);
925             break;
926           case READY:
927             ERR("REDIS: PUB/SUB already subscribed to %s, chanhead %p (id %V) already READY.", reply->element[1]->str, chanhead, &chanhead->id);
928             break;
929           case INACTIVE:
930             // this is fine, inactive channels can be pubsubbed, they will be garbage collected
931             // later if needed
932             break;
933           default:
934             ERR("REDIS: PUB/SUB really unexpected chanhead status %i", chanhead->status);
935             assert(0);
936             //not sposed to happen
937         }
938       }
939       else {
940         ERR("received SUBSCRIBE acknowledgement for unknown channel %V", chid);
941       }
942     }
943     else {
944       DBG("subscribed to worker channel %s", redis_subscriber_id);
945     }
946 
947     DBG("REDIS: PUB/SUB subscribed to %s (%i total)", reply->element[1]->str, reply->element[2]->integer);
948   }
949   else if(CHECK_REPLY_STRVAL(reply->element[0], "unsubscribe") && CHECK_REPLY_INT(reply->element[2])) {
950 
951     if(chid) {
952       DBG("received UNSUBSCRIBE acknowledgement for channel %V", chid);
953     }
954     else {
955       DBG("received UNSUBSCRIBE acknowledgement for worker channel %s", redis_subscriber_id);
956     }
957   }
958 
959   else {
960     ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "Unexpected PUBSUB message %s", reply->element[0]);
961     redisEchoCallback(c,r,privdata);
962   }
963 }
964 
965 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub);
966 static ngx_int_t redis_subscriber_unregister(rdstore_channel_head_t *chanhead, subscriber_t *sub, uint8_t shutting_down);
spooler_add_handler(channel_spooler_t * spl,subscriber_t * sub,void * privdata)967 static void spooler_add_handler(channel_spooler_t *spl, subscriber_t *sub, void *privdata) {
968   rdstore_channel_head_t *head = (rdstore_channel_head_t *)privdata;
969   head->sub_count++;
970   if(sub->type == INTERNAL) {
971     head->internal_sub_count++;
972   }
973   redis_subscriber_register(head, sub);
974 }
975 
spooler_dequeue_handler(channel_spooler_t * spl,subscriber_t * sub,void * privdata)976 static void spooler_dequeue_handler(channel_spooler_t *spl, subscriber_t *sub, void *privdata) {
977   //need individual subscriber
978   //TODO
979   rdstore_channel_head_t *head = (rdstore_channel_head_t *)privdata;
980 
981   head->sub_count--;
982   if(sub->type == INTERNAL) {
983     head->internal_sub_count--;
984   }
985 
986   redis_subscriber_unregister(head, sub, head->shutting_down);
987 
988   if(head->sub_count == 0 && head->fetching_message_count == 0) {
989     redis_chanhead_gc_add(head, 0, "sub count == 0 and fetching_message_count == 0 after spooler dequeue");
990   }
991 
992 }
993 
spooler_use_handler(channel_spooler_t * spl,void * d)994 static void spooler_use_handler(channel_spooler_t *spl, void *d) {
995   //nothing.
996 }
997 
spooler_get_message_start_handler(channel_spooler_t * spl,void * pd)998 void spooler_get_message_start_handler(channel_spooler_t *spl, void *pd) {
999   ((rdstore_channel_head_t *)pd)->fetching_message_count++;
1000 }
1001 
spooler_get_message_finish_handler(channel_spooler_t * spl,void * pd)1002 void spooler_get_message_finish_handler(channel_spooler_t *spl, void *pd) {
1003   ((rdstore_channel_head_t *)pd)->fetching_message_count--;
1004   assert(((rdstore_channel_head_t *)pd)->fetching_message_count >= 0);
1005 }
1006 
start_chanhead_spooler(rdstore_channel_head_t * head)1007 static ngx_int_t start_chanhead_spooler(rdstore_channel_head_t *head) {
1008   static uint8_t channel_buffer_complete = 1;
1009   spooler_fetching_strategy_t spooling_strategy;
1010   static channel_spooler_handlers_t handlers = {
1011     spooler_add_handler,
1012     spooler_dequeue_handler,
1013     NULL,
1014     spooler_use_handler,
1015     spooler_get_message_start_handler,
1016     spooler_get_message_finish_handler
1017   };
1018   nchan_loc_conf_t *lcf = head->redis.nodeset->first_loc_conf; //any loc_conf that refers to this nodeset will work.
1019   //the spooler needs it to pass to get_message calls, which in rdstore's case only cares about the nodeset referenced in the loc_conf
1020   if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE) {
1021     spooling_strategy = NCHAN_SPOOL_PASSTHROUGH;
1022   }
1023   else {
1024     spooling_strategy = NCHAN_SPOOL_FETCH;
1025   }
1026 
1027   start_spooler(&head->spooler, &head->id, &head->status, &channel_buffer_complete, &nchan_store_redis, lcf, spooling_strategy, &handlers, head);
1028   return NGX_OK;
1029 }
1030 
1031 static void redis_subscriber_register_cb(redisAsyncContext *c, void *vr, void *privdata);
1032 
1033 typedef struct {
1034   rdstore_channel_head_t *chanhead;
1035   unsigned                generation;
1036   subscriber_t           *sub;
1037 } redis_subscriber_register_t;
1038 
redis_subscriber_register_send(redis_nodeset_t * nodeset,void * pd)1039 static ngx_int_t redis_subscriber_register_send(redis_nodeset_t *nodeset, void *pd) {
1040   redis_subscriber_register_t   *d = pd;
1041   if(nodeset_ready(nodeset)) {
1042     d->chanhead->reserved++;
1043     redis_node_t *node = nodeset_node_find_by_chanhead(d->chanhead);
1044 
1045     nchan_redis_script(subscriber_register, node, &redis_subscriber_register_cb, d, &d->chanhead->id,
1046                        "- %i %i 1",
1047                        REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP,
1048                        ngx_time()
1049                       );
1050   }
1051   else {
1052     d->sub->fn->release(d->sub, 0);
1053     ngx_free(d);
1054   }
1055   return NGX_OK;
1056 }
1057 
1058 
redis_subscriber_register(rdstore_channel_head_t * chanhead,subscriber_t * sub)1059 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub) {
1060   redis_subscriber_register_t *sdata=NULL;
1061   if((sdata = ngx_alloc(sizeof(*sdata), ngx_cycle->log)) == NULL) {
1062     ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "No memory for sdata. Part IV, subparagraph 12 of the Cryptic Error Series.");
1063     return NGX_ERROR;
1064   }
1065   sdata->chanhead = chanhead;
1066   sdata->generation = chanhead->generation;
1067   sdata->sub = sub;
1068 
1069   sub->fn->reserve(sub);
1070 
1071   //input: keys: [], values: [channel_id, subscriber_id, active_ttl]
1072   //  'subscriber_id' can be '-' for new id, or an existing id
1073   //  'active_ttl' is channel ttl with non-zero subscribers. -1 to persist, >0 ttl in sec
1074   //output: subscriber_id, num_current_subscribers, next_keepalive_time
1075   redis_subscriber_register_send(chanhead->redis.nodeset, sdata);
1076 
1077   return NGX_OK;
1078 }
1079 
redis_subscriber_register_send_retry_wrapper(redis_nodeset_t * nodeset,void * pd)1080 static ngx_int_t redis_subscriber_register_send_retry_wrapper(redis_nodeset_t *nodeset, void *pd) {
1081   redis_subscriber_register_t   *d = pd;
1082   d->chanhead->reserved--;
1083   return redis_subscriber_register_send(nodeset, pd);
1084 }
1085 
redis_subscriber_register_cb(redisAsyncContext * c,void * vr,void * privdata)1086 static void redis_subscriber_register_cb(redisAsyncContext *c, void *vr, void *privdata) {
1087   redis_subscriber_register_t *sdata= (redis_subscriber_register_t *) privdata;
1088   redisReply                  *reply = (redisReply *)vr;
1089   redis_node_t                *node = c->data;
1090   int                          keepalive_ttl;
1091 
1092   node->pending_commands--;
1093   nchan_update_stub_status(redis_pending_commands, -1);
1094 
1095   sdata->chanhead->reserved--;
1096 
1097   if(!nodeset_node_reply_keyslot_ok(node, reply)) {
1098     sdata->chanhead->reserved++;
1099     nodeset_callback_on_ready(node->nodeset, 1000, redis_subscriber_register_send_retry_wrapper, sdata);
1100     return;
1101   }
1102 
1103   if (!redisReplyOk(c, reply)) {
1104     //TODO: fail less silently, maybe retry subscriber registration?
1105     sdata->sub->fn->release(sdata->sub, 0);
1106     ngx_free(sdata);
1107     return;
1108   }
1109 
1110   if(  CHECK_REPLY_ARRAY_MIN_SIZE(reply, 4)
1111     && CHECK_REPLY_INT(reply->element[3])
1112   ) {
1113     //notify about channel buffer size if it's present
1114     sdata->sub->fn->notify(sdata->sub, NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE, (void *)(intptr_t )reply->element[3]->integer);
1115   }
1116 
1117   if(sdata->generation == sdata->chanhead->generation) {
1118     //is the subscriber
1119     //TODO: set subscriber id
1120     //sdata->sub->id = reply->element[1]->integer;
1121   }
1122 
1123   sdata->sub->fn->release(sdata->sub, 0);
1124   sdata->sub = NULL; //don't use it anymore, it might have been freed
1125 
1126   if ( !CHECK_REPLY_ARRAY_MIN_SIZE(reply, 3) || !CHECK_REPLY_INT(reply->element[1]) || !CHECK_REPLY_INT(reply->element[2])) {
1127     //no good
1128     //TODO: fail less silently, maybe retry subscriber registration?
1129     redisEchoCallback(c,reply,privdata);
1130     ngx_free(sdata);
1131     return;
1132   }
1133 
1134   keepalive_ttl = reply->element[2]->integer;
1135   if(keepalive_ttl > 0) {
1136     if(!sdata->chanhead->keepalive_timer.timer_set) {
1137       ngx_add_timer(&sdata->chanhead->keepalive_timer, keepalive_ttl * 1000);
1138     }
1139   }
1140   ngx_free(sdata);
1141 }
1142 
1143 
1144 typedef struct {
1145   ngx_str_t    *channel_id;
1146   time_t        channel_timeout;
1147   unsigned      allocd:1;
1148 } subscriber_unregister_data_t;
1149 
1150 static void redis_subscriber_unregister_cb(redisAsyncContext *c, void *r, void *privdata);
redis_subscriber_unregister_send(redis_nodeset_t * nodeset,void * pd)1151 static ngx_int_t redis_subscriber_unregister_send(redis_nodeset_t *nodeset, void *pd) {
1152   //input: keys: [], values: [namespace, channel_id, subscriber_id, empty_ttl]
1153   // 'subscriber_id' is an existing id
1154   // 'empty_ttl' is channel ttl when without subscribers. 0 to delete immediately, -1 to persist, >0 ttl in sec
1155   //output: subscriber_id, num_current_subscribers
1156   subscriber_unregister_data_t *d = pd;
1157   if(nodeset_ready(nodeset)) {
1158     redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
1159     nchan_redis_script( subscriber_unregister, node, &redis_subscriber_unregister_cb, NULL,
1160       d->channel_id, "%i %i",
1161                        0/*TODO: sub->id*/,
1162                        d->channel_timeout
1163                       );
1164   }
1165   if(d->allocd) {
1166     ngx_free(d);
1167   }
1168   return NGX_OK;
1169 }
1170 
redis_subscriber_unregister_cb(redisAsyncContext * c,void * r,void * privdata)1171 static void redis_subscriber_unregister_cb(redisAsyncContext *c, void *r, void *privdata) {
1172   redisReply      *reply = r;
1173   redis_node_t    *node = c->data;
1174 
1175   node->pending_commands--;
1176   nchan_update_stub_status(redis_pending_commands, -1);
1177 
1178   if(reply && reply->type == REDIS_REPLY_ERROR) {
1179     ngx_str_t    errstr;
1180     ngx_str_t    countstr;
1181     ngx_str_t    channel_id;
1182     ngx_int_t    channel_timeout;
1183 
1184     errstr.data = (u_char *)reply->str;
1185     errstr.len = strlen(reply->str);
1186 
1187     if(ngx_str_chop_if_startswith(&errstr, "CLUSTER KEYSLOT ERROR. ")) {
1188       nodeset_node_keyslot_changed(node);
1189       nchan_scan_until_chr_on_line(&errstr, &countstr, ' ');
1190       channel_timeout = ngx_atoi(countstr.data, countstr.len);
1191       channel_id = errstr;
1192 
1193       subscriber_unregister_data_t  *d = ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id.len, ngx_cycle->log);
1194       if(!d) {
1195         ERR("can't allocate add_fakesub_data for CLUSTER KEYSLOT ERROR retry");
1196         return;
1197       }
1198       d->channel_timeout = channel_timeout;
1199       d->channel_id = (ngx_str_t *)&d[1];
1200       d->channel_id->data = (u_char *)&d->channel_id[1];
1201       d->allocd = 1;
1202       nchan_strcpy(d->channel_id, &channel_id, 0);
1203       nodeset_callback_on_ready(node->nodeset, 1000, redis_subscriber_unregister_send, d);
1204       return;
1205     }
1206 
1207   }
1208   redisCheckErrorCallback(c, r, privdata);
1209 }
1210 
redis_subscriber_unregister(rdstore_channel_head_t * chanhead,subscriber_t * sub,uint8_t shutting_down)1211 static ngx_int_t redis_subscriber_unregister(rdstore_channel_head_t *chanhead, subscriber_t *sub, uint8_t shutting_down) {
1212   nchan_loc_conf_t  *cf = sub->cf;;
1213 
1214   if(!shutting_down) {
1215     subscriber_unregister_data_t d;
1216     d.channel_id = &chanhead->id;
1217     d.channel_timeout = cf->channel_timeout;
1218     d.allocd = 0;
1219     redis_subscriber_unregister_send(chanhead->redis.nodeset, &d);
1220   }
1221   else {
1222     if(nodeset_ready(chanhead->redis.nodeset)) {
1223       redis_node_t   *node = nodeset_node_find_by_chanhead(chanhead);
1224       nchan_redis_sync_script(subscriber_unregister, node, &chanhead->id, "%i %i",
1225                               0/*TODO: sub->id*/,
1226                               cf->channel_timeout
1227                             );
1228     }
1229   }
1230   return NGX_OK;
1231 }
1232 
1233 
1234 static void redisChannelKeepaliveCallback(redisAsyncContext *c, void *vr, void *privdata);
1235 
redisChannelKeepaliveCallback_send(redis_nodeset_t * ns,void * pd)1236 static ngx_int_t redisChannelKeepaliveCallback_send(redis_nodeset_t *ns, void *pd) {
1237   rdstore_channel_head_t   *head = pd;
1238   time_t                    ttl;
1239   //TODO: optimize this
1240   redis_node_t *node = nodeset_node_find_by_channel_id(head->redis.nodeset, &head->id);
1241   if(nodeset_ready(ns)) {
1242     head->reserved++;
1243     ttl = REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP * (1+head->keepalive_times_sent);
1244     if(ttl > REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX) { //1 week at most
1245       ttl = REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX;
1246     }
1247     nchan_redis_script(channel_keepalive, node, &redisChannelKeepaliveCallback, head, &head->id, "%i", ttl);
1248   }
1249   return NGX_OK;
1250 }
1251 
redisChannelKeepaliveCallback_retry_wrapper(redis_nodeset_t * ns,void * pd)1252 static ngx_int_t redisChannelKeepaliveCallback_retry_wrapper(redis_nodeset_t *ns, void *pd) {
1253   rdstore_channel_head_t   *head = pd;
1254   head->reserved--;
1255   return redisChannelKeepaliveCallback_send(ns, pd);
1256 }
1257 
redisChannelKeepaliveCallback(redisAsyncContext * c,void * vr,void * privdata)1258 static void redisChannelKeepaliveCallback(redisAsyncContext *c, void *vr, void *privdata) {
1259   rdstore_channel_head_t   *head = (rdstore_channel_head_t *)privdata;
1260   redisReply               *reply = (redisReply *)vr;
1261   redis_node_t             *node = c->data;
1262 
1263   head->reserved--;
1264   node->pending_commands--;
1265   nchan_update_stub_status(redis_pending_commands, -1);
1266 
1267   if(!nodeset_node_reply_keyslot_ok(node, reply)) {
1268     head->reserved++;
1269     nodeset_callback_on_ready(node->nodeset, 1000, redisChannelKeepaliveCallback_retry_wrapper, head);
1270     return;
1271   }
1272   else {
1273     head->keepalive_times_sent++;
1274   }
1275 
1276   if(redisReplyOk(c, vr)) {
1277     assert(CHECK_REPLY_INT(reply));
1278 
1279     //reply->integer == -1 means "let it disappear" (see channel_keepalive.lua)
1280 
1281     if(reply->integer != -1 && !head->keepalive_timer.timer_set) {
1282       ngx_add_timer(&head->keepalive_timer, reply->integer * 1000);
1283     }
1284   }
1285 
1286 }
1287 
redis_channel_keepalive_timer_handler(ngx_event_t * ev)1288 static void redis_channel_keepalive_timer_handler(ngx_event_t *ev) {
1289   rdstore_channel_head_t   *head = ev->data;
1290   if(ev->timedout) {
1291     ev->timedout=0;
1292     if(head->pubsub_status != REDIS_PUBSUB_SUBSCRIBED || head->status == NOTREADY) {
1293       //no use trying to keepalive a not-ready (possibly disconnected) chanhead
1294       DBG("Tried sending channel keepalive when channel is not ready");
1295       ngx_add_timer(ev, REDIS_RECONNECT_TIME); //retry after reconnect timeout
1296     }
1297     else {
1298       redisChannelKeepaliveCallback_send(head->redis.nodeset, head);
1299     }
1300   }
1301 }
1302 
ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t * ch)1303 ngx_int_t ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t *ch) {
1304   redis_node_t       *pubsub_node;
1305   ngx_str_t          *namespace;
1306   if( ch->pubsub_status != REDIS_PUBSUB_SUBSCRIBED && ch->pubsub_status != REDIS_PUBSUB_SUBSCRIBING
1307    && ch->redis.nodeset->settings.storage_mode >= REDIS_MODE_DISTRIBUTED
1308    && nodeset_ready(ch->redis.nodeset)
1309   ) {
1310     pubsub_node = nodeset_node_pubsub_find_by_chanhead(ch);
1311     namespace = ch->redis.nodeset->settings.namespace;
1312     DBG("SUBSCRIBING to %V{channel:%V}:pubsub", namespace, &ch->id);
1313     ch->pubsub_status = REDIS_PUBSUB_SUBSCRIBING;
1314     redis_subscriber_command(pubsub_node, redis_subscriber_callback, NULL, "SUBSCRIBE %b{channel:%b}:pubsub", STR(namespace), STR(&ch->id));
1315   }
1316   return NGX_OK;
1317 }
1318 
redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t * ch)1319 ngx_int_t redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t *ch) {
1320   return spooler_catch_up(&ch->spooler);
1321 }
1322 
create_chanhead(ngx_str_t * channel_id,redis_nodeset_t * ns)1323 static rdstore_channel_head_t *create_chanhead(ngx_str_t *channel_id, redis_nodeset_t *ns) {
1324   rdstore_channel_head_t   *head;
1325 
1326   head=ngx_calloc(sizeof(*head) + sizeof(u_char)*(channel_id->len), ngx_cycle->log);
1327   if(head==NULL) {
1328     ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "can't allocate memory for (new) channel subscriber head");
1329     return NULL;
1330   }
1331   head->id.len = channel_id->len;
1332   head->id.data = (u_char *)&head[1];
1333   ngx_memcpy(head->id.data, channel_id->data, channel_id->len);
1334   head->sub_count=0;
1335   head->fetching_message_count=0;
1336   head->redis_subscriber_privdata = NULL;
1337   head->status = NOTREADY;
1338   head->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
1339   head->generation = 0;
1340   head->last_msgid.time=0;
1341   head->last_msgid.tag.fixed[0]=0;
1342   head->last_msgid.tagcount = 1;
1343   head->last_msgid.tagactive = 0;
1344   head->shutting_down = 0;
1345   head->reserved = 0;
1346   head->keepalive_times_sent = 0;
1347 
1348   head->gc.in_reaper = 0;
1349   head->gc.time = 0;
1350   head->gc.prev = NULL;
1351   head->gc.next = NULL;
1352 
1353   if(head->id.len >= 5 && ngx_strncmp(head->id.data, "meta/", 5) == 0) {
1354     head->meta = 1;
1355   }
1356   else {
1357     head->meta = 0;
1358   }
1359 
1360   ngx_memzero(&head->keepalive_timer, sizeof(head->keepalive_timer));
1361   nchan_init_timer(&head->keepalive_timer, redis_channel_keepalive_timer_handler, head);
1362 
1363   if(channel_id->len > 2) { // absolutely no multiplexed channels allowed
1364     assert(ngx_strncmp(head->id.data, "m/", 2) != 0);
1365   }
1366 
1367   head->redis.nodeset = ns;
1368   head->redis.generation = 0;
1369   head->redis.node.cmd = NULL;
1370   head->redis.node.pubsub = NULL;
1371   ngx_memzero(&head->redis.slist, sizeof(head->redis.slist));
1372 
1373   if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP) {
1374     head->status = READY;
1375   }
1376 
1377   head->spooler.running=0;
1378   start_chanhead_spooler(head);
1379   if(head->meta) {
1380     head->spooler.publish_events = 0;
1381   }
1382 
1383   ensure_chanhead_pubsub_subscribed_if_needed(head);
1384   CHANNEL_HASH_ADD(head);
1385 
1386   return head;
1387 }
1388 
nchan_store_get_chanhead(ngx_str_t * channel_id,redis_nodeset_t * nodeset)1389 static rdstore_channel_head_t * nchan_store_get_chanhead(ngx_str_t *channel_id, redis_nodeset_t *nodeset) {
1390   rdstore_channel_head_t    *head;
1391 
1392   CHANNEL_HASH_FIND(channel_id, head); //BUG: this doesn't account for namespacing!!
1393   if(head==NULL) {
1394     head = create_chanhead(channel_id, nodeset);
1395   }
1396   if(head == NULL) {
1397     ERR("can't create chanhead for redis store");
1398     return NULL;
1399   }
1400 
1401   if (head->status == INACTIVE) { //recycled chanhead
1402     ensure_chanhead_pubsub_subscribed_if_needed(head);
1403     redis_chanhead_gc_withdraw(head);
1404 
1405     if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP) {
1406       head->status = READY;
1407     }
1408     else {
1409       head->status = head->pubsub_status == REDIS_PUBSUB_SUBSCRIBED ? READY : NOTREADY;
1410     }
1411   }
1412 
1413   if(!head->spooler.running) {
1414     DBG("Spooler for channel %p %V wasn't running. start it.", head, &head->id);
1415     start_chanhead_spooler(head);
1416   }
1417 
1418   return head;
1419 }
1420 
redis_chanhead_gc_add(rdstore_channel_head_t * head,ngx_int_t expire,const char * reason)1421 ngx_int_t redis_chanhead_gc_add(rdstore_channel_head_t *head, ngx_int_t expire, const char *reason) {
1422   assert(head->sub_count == 0);
1423   nchan_reaper_t *reaper = &head->redis.nodeset->chanhead_reaper;
1424   if(!head->gc.in_reaper) {
1425     assert(head->status != INACTIVE);
1426     head->status = INACTIVE;
1427     head->gc.time = ngx_time() + (expire == 0 ? NCHAN_CHANHEAD_EXPIRE_SEC : expire);
1428     head->gc.in_reaper = 1;
1429 
1430     nchan_reaper_add(reaper, head);
1431 
1432     DBG("gc_add chanhead %V to %s (%s)", &head->id, reaper->name, reason);
1433   }
1434   else {
1435     ERR("gc_add chanhead %V to %s: already added (%s)", &head->id, reaper->name, reason);
1436   }
1437 
1438   return NGX_OK;
1439 }
1440 
redis_chanhead_gc_withdraw(rdstore_channel_head_t * chanhead)1441 ngx_int_t redis_chanhead_gc_withdraw(rdstore_channel_head_t *chanhead) {
1442   if(chanhead->gc.in_reaper) {
1443     nchan_reaper_t *reaper = &chanhead->redis.nodeset->chanhead_reaper;
1444     DBG("gc_withdraw chanhead %s from %V", reaper->name, &chanhead->id);
1445     assert(chanhead->status == INACTIVE);
1446 
1447     nchan_reaper_withdraw(reaper, chanhead);
1448     chanhead->gc.in_reaper = 0;
1449   }
1450   else {
1451     DBG("gc_withdraw chanhead (%V), but not in gc reaper", &chanhead->id);
1452   }
1453   return NGX_OK;
1454 }
1455 
nchan_store_publish_generic(ngx_str_t * channel_id,redis_nodeset_t * nodeset,nchan_msg_t * msg,ngx_int_t status_code,const ngx_str_t * status_line)1456 static ngx_int_t nchan_store_publish_generic(ngx_str_t *channel_id, redis_nodeset_t *nodeset, nchan_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line){
1457   rdstore_channel_head_t   *head;
1458   ngx_int_t                 ret;
1459   //redis_channel_head_cleanup_t *hcln;
1460 
1461 
1462 
1463   head = nchan_store_get_chanhead(channel_id, nodeset);
1464 
1465   if(head->sub_count > 0) {
1466     if(msg) {
1467       assert(msg->id.tagcount == 1);
1468       head->last_msgid.time = msg->id.time;
1469       head->last_msgid.tag.fixed[0] = msg->id.tag.fixed[0];
1470       head->last_msgid.tagcount = 1;
1471       head->last_msgid.tagactive = 0;
1472 
1473       head->spooler.fn->respond_message(&head->spooler, msg);
1474     }
1475     else {
1476       head->spooler.fn->broadcast_status(&head->spooler, status_code, status_line);
1477     }
1478     ret= NGX_OK;
1479   }
1480   else {
1481     ret= NCHAN_MESSAGE_QUEUED;
1482   }
1483   return ret;
1484 }
1485 
1486 
redis_array_to_channel(redisReply * r,nchan_channel_t * ch)1487 static ngx_int_t redis_array_to_channel(redisReply *r, nchan_channel_t *ch) {
1488   ngx_str_t       msgid;
1489   nchan_msg_id_t  zeroid = NCHAN_OLDEST_MSGID;
1490 
1491   if ( CHECK_REPLY_ARRAY_MIN_SIZE(r, 5)
1492     && CHECK_REPLY_INT(r->element[0])
1493     && CHECK_REPLY_INT(r->element[1])
1494     && CHECK_REPLY_INT(r->element[2])
1495     && CHECK_REPLY_STR(r->element[3])
1496     && CHECK_REPLY_INT(r->element[4])) {
1497 
1498     //channel info
1499     ch->expires = ngx_time() + r->element[0]->integer;
1500     ch->last_seen = r->element[1]->integer;
1501     ch->subscribers = r->element[2]->integer;
1502 
1503     msgid.data = (u_char *)r->element[3]->str;
1504     msgid.len = r->element[3]->len;
1505 
1506     if(msgid.len == 0) {
1507       ch->last_published_msg_id = zeroid;
1508     }
1509     else if(nchan_parse_compound_msgid(&ch->last_published_msg_id, &msgid, 1) != NGX_OK) {
1510       ERR("failed to parse last-msgid %V from redis", &msgid);
1511     }
1512 
1513     ch->messages = r->element[4]->integer;
1514 
1515     //no id?..
1516     ch->id.len=0;
1517     ch->id.data=NULL;
1518 
1519     //queued messages
1520     if( CHECK_REPLY_ARRAY_MIN_SIZE(r, 6)
1521       && CHECK_REPLY_INT(r->element[5])) {
1522 
1523       ch->messages = r->element[5]->integer;
1524     }
1525 
1526     return NGX_OK;
1527   }
1528   else if(CHECK_REPLY_NIL(r)) {
1529     return NGX_DECLINED;
1530   }
1531   else {
1532     return NGX_ERROR;
1533   }
1534 }
1535 
1536 typedef struct {
1537   ngx_msec_t           t;
1538   char                *name;
1539   ngx_str_t           *channel_id;
1540   callback_pt          callback;
1541   void                *privdata;
1542 } redis_channel_callback_data_t;
1543 
1544 #define CREATE_CALLBACK_DATA(d, nodeset, cf, namestr, channel_id, callback, privdata) \
1545   do {                                                                       \
1546     if ((d = ngx_alloc(sizeof(*d) + ((nodeset)->cluster.enabled ? (sizeof(*channel_id) + channel_id->len) : 0), ngx_cycle->log)) == NULL) { \
1547       ERR("Can't allocate redis %s channel callback data", namestr);         \
1548       return NGX_ERROR;                                                      \
1549     }                                                                        \
1550     d->t = ngx_current_msec;                                                 \
1551     d->name = namestr;                                                       \
1552     if((nodeset)->cluster.enabled) {                                            \
1553       /* might need to use channel id later to retry the command */          \
1554       d->channel_id = (ngx_str_t *)&d[1];                                    \
1555       d->channel_id->data = (u_char *)&d->channel_id[1];                     \
1556       nchan_strcpy(d->channel_id, channel_id, 0);                            \
1557     }                                                                        \
1558     else {                                                                   \
1559       d->channel_id = channel_id;                                            \
1560     }                                                                        \
1561     d->callback = callback;                                                  \
1562     d->privdata = privdata;                                                  \
1563   } while(0)
1564 
redisChannelInfoCallback(redisAsyncContext * c,void * r,void * privdata)1565 static void redisChannelInfoCallback(redisAsyncContext *c, void *r, void *privdata) {
1566   redisReply *reply=r;
1567   redis_channel_callback_data_t *d=(redis_channel_callback_data_t *)privdata;
1568   nchan_channel_t channel;
1569   ngx_memzero(&channel, sizeof(channel)); // for ddebugging. this should be removed later.
1570 
1571   log_redis_reply(d->name, d->t);
1572 
1573   if(d->callback) {
1574     if(reply) {
1575       switch(redis_array_to_channel(reply, &channel)) {
1576         case NGX_OK:
1577           d->callback(NGX_OK, &channel, d->privdata);
1578           break;
1579         case NGX_DECLINED: //not found
1580           d->callback(NGX_OK, NULL, d->privdata);
1581           break;
1582         case NGX_ERROR:
1583         default:
1584           redisEchoCallback(c, r, privdata);
1585           d->callback(NGX_ERROR, NULL, d->privdata);
1586       }
1587     }
1588     else {
1589       d->callback(NGX_ERROR, NULL, d->privdata);
1590     }
1591   }
1592 }
1593 
1594 static void redisChannelDeleteCallback(redisAsyncContext *c, void *r, void *privdata);
1595 
nchan_store_delete_channel_send(redis_nodeset_t * ns,void * pd)1596 static ngx_int_t nchan_store_delete_channel_send(redis_nodeset_t *ns, void *pd) {
1597   redis_channel_callback_data_t *d = pd;
1598   if(nodeset_ready(ns)) {
1599     redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1600     nchan_redis_script(delete, node, &redisChannelDeleteCallback, d, d->channel_id, "");
1601     return NGX_OK;
1602   }
1603   else {
1604     redisChannelDeleteCallback(NULL, NULL, d);
1605     return NGX_ERROR;
1606   }
1607 }
1608 
redisChannelDeleteCallback(redisAsyncContext * ac,void * r,void * privdata)1609 static void redisChannelDeleteCallback(redisAsyncContext *ac, void *r, void *privdata) {
1610   redis_node_t  *node;
1611 
1612   nchan_update_stub_status(redis_pending_commands, -1);
1613   if(ac) {
1614     node = ac->data;
1615     node->pending_commands--;
1616 
1617     if(!nodeset_node_reply_keyslot_ok(node, (redisReply *)r)) {
1618       nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_delete_channel_send, privdata);
1619       return;
1620     }
1621   }
1622   redisChannelInfoCallback(ac, r, privdata);
1623   ngx_free(privdata);
1624 }
1625 
nchan_store_delete_channel(ngx_str_t * channel_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1626 static ngx_int_t nchan_store_delete_channel(ngx_str_t *channel_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1627   redis_channel_callback_data_t *d;
1628   redis_nodeset_t               *ns = nodeset_find(&cf->redis);
1629   CREATE_CALLBACK_DATA(d, ns, cf, "delete", channel_id, callback, privdata);
1630 
1631   return nchan_store_delete_channel_send(ns, d);
1632 }
1633 
1634 
1635 static void redisChannelFindCallback(redisAsyncContext *c, void *r, void *privdata);
1636 
nchan_store_find_channel_send(redis_nodeset_t * ns,void * pd)1637 static ngx_int_t nchan_store_find_channel_send(redis_nodeset_t *ns, void *pd) {
1638   redis_channel_callback_data_t *d = pd;
1639   if(nodeset_ready(ns)) {
1640     redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1641     nchan_redis_script(find_channel, node, &redisChannelFindCallback, d, d->channel_id, "");
1642   }
1643   else {
1644     redisChannelFindCallback(NULL, NULL, d);
1645   }
1646   return NGX_OK;
1647 }
1648 
redisChannelFindCallback(redisAsyncContext * ac,void * r,void * privdata)1649 static void redisChannelFindCallback(redisAsyncContext *ac, void *r, void *privdata) {
1650   redis_node_t                 *node = NULL;
1651 
1652   if(ac) {
1653     node = ac->data;
1654     node->pending_commands--;
1655     nchan_update_stub_status(redis_pending_commands, -1);
1656 
1657     if(!nodeset_node_reply_keyslot_ok(node, (redisReply *)r)) {
1658       nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_find_channel_send, privdata);
1659       return;
1660     }
1661   }
1662 
1663   redisChannelInfoCallback(ac, r, privdata);
1664   ngx_free(privdata);
1665 }
1666 
nchan_store_find_channel(ngx_str_t * channel_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1667 static ngx_int_t nchan_store_find_channel(ngx_str_t *channel_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1668   redis_channel_callback_data_t *d;
1669   redis_nodeset_t               *ns = nodeset_find(&cf->redis);
1670   CREATE_CALLBACK_DATA(d, ns, cf, "find_channel", channel_id, callback, privdata);
1671 
1672   nchan_store_find_channel_send(ns, d);
1673 
1674   return NGX_OK;
1675 }
1676 
msg_from_redis_get_message_reply(nchan_msg_t * msg,nchan_compressed_msg_t * cmsg,ngx_str_t * content_type,ngx_str_t * eventsource_event,redisReply * r,uint16_t offset)1677 static ngx_int_t msg_from_redis_get_message_reply(nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event, redisReply *r, uint16_t offset) {
1678 
1679   redisReply         **els = r->element;
1680   size_t               content_type_len = 0, es_event_len = 0;
1681   ngx_int_t            time_int = 0, ttl;
1682   ngx_int_t            compression;
1683 
1684   if(CHECK_REPLY_ARRAY_MIN_SIZE(r, offset + 8)
1685    && CHECK_REPLY_INT(els[offset])            //msg TTL
1686    && CHECK_REPLY_INT_OR_STR(els[offset+1])   //id - time
1687    && CHECK_REPLY_INT_OR_STR(els[offset+2])   //id - tag
1688    && CHECK_REPLY_INT_OR_STR(els[offset+3])   //prev_id - time
1689    && CHECK_REPLY_INT_OR_STR(els[offset+4])   //prev_id - tag
1690    && CHECK_REPLY_STR(els[offset+5])   //message
1691    && CHECK_REPLY_STR(els[offset+6])   //content-type
1692    && CHECK_REPLY_STR(els[offset+7])  //eventsource event
1693   ) {
1694 
1695     content_type_len=els[offset+6]->len;
1696     es_event_len = els[offset+7]->len;
1697 
1698     ngx_memzero(msg, sizeof(*msg));
1699 
1700     msg->buf.start = msg->buf.pos = (u_char *)els[offset+5]->str;
1701     msg->buf.end = msg->buf.last = msg->buf.start + els[offset+5]->len;
1702     msg->buf.memory = 1;
1703     msg->buf.last_buf = 1;
1704     msg->buf.last_in_chain = 1;
1705 
1706     if(redisReply_to_ngx_int(els[offset], &ttl) != NGX_OK) {
1707       ERR("invalid ttl integer value in msg response from redis");
1708       return NGX_ERROR;
1709     }
1710     assert(ttl >= 0);
1711     if(ttl == 0)
1712       ttl++; // less than a second left for this message... give it a second's lease on life
1713 
1714     msg->expires = ngx_time() + ttl;
1715 
1716     msg->compressed = NULL;
1717     if(r->elements >= (uint16_t )(offset + 8)) {
1718       if(!CHECK_REPLY_INT_OR_STR(els[offset+8]) || redisReply_to_ngx_int(els[offset+8], &compression) != NGX_OK) {
1719         ERR("invalid compression type integer value in msg response from redis");
1720         return NGX_ERROR;
1721       }
1722       if((nchan_msg_compression_type_t )compression != NCHAN_MSG_COMPRESSION_INVALID && (nchan_msg_compression_type_t )compression != NCHAN_MSG_NO_COMPRESSION) {
1723         msg->compressed = cmsg;
1724         ngx_memzero(&cmsg->buf, sizeof(cmsg->buf));
1725         cmsg->compression = (nchan_msg_compression_type_t )compression;
1726       }
1727     }
1728 
1729     if(content_type_len > 0) {
1730       msg->content_type = content_type;
1731       msg->content_type->len=content_type_len;
1732       msg->content_type->data=(u_char *)els[offset+6]->str;
1733     }
1734     else {
1735       msg->content_type = NULL;
1736     }
1737 
1738     if(es_event_len > 0) {
1739       msg->eventsource_event = eventsource_event;
1740       msg->eventsource_event->len=es_event_len;
1741       msg->eventsource_event->data=(u_char *)els[offset+7]->str;
1742     }
1743     else {
1744       msg->eventsource_event = NULL;
1745     }
1746 
1747     if(redisReply_to_ngx_int(els[offset+1], &time_int) == NGX_OK) {
1748       msg->id.time = time_int;
1749     }
1750     else {
1751       msg->id.time = 0;
1752       ERR("invalid msg time from redis");
1753     }
1754 
1755     redisReply_to_ngx_int(els[offset+2], (ngx_int_t *)&msg->id.tag.fixed[0]); // tag is a uint, meh.
1756     msg->id.tagcount = 1;
1757     msg->id.tagactive = 0;
1758 
1759     redisReply_to_ngx_int(els[offset+3], &time_int);
1760     msg->prev_id.time = time_int;
1761     redisReply_to_ngx_int(els[offset+4], (ngx_int_t *)&msg->prev_id.tag.fixed[0]);
1762     msg->prev_id.tagcount = 1;
1763     msg->prev_id.tagactive = 0;
1764 
1765     return NGX_OK;
1766   }
1767   else {
1768     ERR("invalid message redis reply");
1769     return NGX_ERROR;
1770   }
1771 }
1772 
1773 typedef struct {
1774   ngx_msec_t              t;
1775   char                   *name;
1776   ngx_str_t              *channel_id;
1777   nchan_msg_tiny_id_t     msg_id;
1778   callback_pt             callback;
1779   void                   *privdata;
1780 } redis_get_message_data_t;
1781 
1782 static void redis_get_message_callback(redisAsyncContext *c, void *r, void *privdata);
1783 
nchan_store_async_get_message_send(redis_nodeset_t * ns,void * pd)1784 static ngx_int_t nchan_store_async_get_message_send(redis_nodeset_t *ns, void *pd) {
1785   redis_get_message_data_t           *d = pd;
1786   //input:  keys: [], values: [namespace, channel_id, msg_time, msg_tag, no_msgid_order, create_channel_ttl]
1787   //output: result_code, msg_ttl, msg_time, msg_tag, prev_msg_time, prev_msg_tag, message, content_type, eventsource_event, channel_subscriber_count
1788   if(nodeset_ready(ns)) {
1789     redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1790     nchan_redis_script(get_message, node, &redis_get_message_callback, d, d->channel_id, "%i %i FILO 0",
1791                        d->msg_id.time,
1792                        d->msg_id.tag
1793                       );
1794   }
1795   else {
1796     //TODO: pass on a get_msg error status maybe?
1797     ngx_free(d);
1798   }
1799   return NGX_OK;
1800 }
1801 
redis_get_message_callback(redisAsyncContext * ac,void * r,void * privdata)1802 static void redis_get_message_callback(redisAsyncContext *ac, void *r, void *privdata) {
1803   redisReply                *reply= r;
1804   redis_get_message_data_t  *d= (redis_get_message_data_t *)privdata;
1805   nchan_msg_t                msg;
1806   nchan_compressed_msg_t     cmsg;
1807   ngx_str_t                  content_type;
1808   ngx_str_t                  eventsource_event;
1809   redis_node_t              *node;
1810 
1811   if(d == NULL) {
1812     ERR("redis_get_mesage_callback has NULL userdata");
1813     return;
1814   }
1815 
1816   if(ac) {
1817     node = ac->data;
1818 
1819     node->pending_commands--;
1820     nchan_update_stub_status(redis_pending_commands, -1);
1821 
1822     if(!nodeset_ready(node->nodeset) || !nodeset_node_reply_keyslot_ok(node, reply)) {
1823       nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_async_get_message_send, privdata);
1824       return;
1825     }
1826 
1827     log_redis_reply(d->name, d->t);
1828 
1829     //output: result_code, msg_ttl, msg_time, msg_tag, prev_msg_time, prev_msg_tag, message, content_type, eventsource_event, compression_type, channel_subscriber_count
1830     // result_code can be: 200 - ok, 403 - channel not found, 404 - not found, 410 - gone, 418 - not yet available
1831 
1832     if (!redisReplyOk(ac, r) || !CHECK_REPLY_ARRAY_MIN_SIZE(reply, 1) || !CHECK_REPLY_INT(reply->element[0]) ) {
1833       //no good
1834       ngx_free(d);
1835       return;
1836     }
1837 
1838     switch(reply->element[0]->integer) {
1839       case 200: //ok
1840         if(msg_from_redis_get_message_reply(&msg, &cmsg, &content_type, &eventsource_event, reply, 1) == NGX_OK) {
1841           d->callback(MSG_FOUND, &msg, d->privdata);
1842         }
1843         break;
1844       case 403: //channel not found
1845       case 404: //not found
1846         d->callback(MSG_NOTFOUND, NULL, d->privdata);
1847         break;
1848       case 410: //gone
1849         d->callback(MSG_EXPIRED, NULL, d->privdata);
1850         break;
1851       case 418: //not yet available
1852         d->callback(MSG_EXPECTED, NULL, d->privdata);
1853         break;
1854     }
1855   }
1856   else {
1857     ERR("redisAsyncContext NULL for redis_get_message_callback");
1858   }
1859 
1860   ngx_free(d);
1861 }
1862 
nchan_store_async_get_message(ngx_str_t * channel_id,nchan_msg_id_t * msg_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1863 static ngx_int_t nchan_store_async_get_message(ngx_str_t *channel_id, nchan_msg_id_t *msg_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1864   redis_get_message_data_t      *d;
1865   redis_nodeset_t               *ns = nodeset_find(&cf->redis);
1866   if(callback==NULL) {
1867     ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "no callback given for async get_message. someone's using the API wrong!");
1868     return NGX_ERROR;
1869   }
1870 
1871   assert(msg_id->tagcount == 1);
1872 
1873   CREATE_CALLBACK_DATA(d, ns, cf, "get_message", channel_id, callback, privdata);
1874   d->msg_id.time = msg_id->time;
1875   d->msg_id.tag = msg_id->tag.fixed[0];
1876 
1877   nchan_store_async_get_message_send(ns, d);
1878   return NGX_OK; //async only now!
1879 }
1880 
1881 
1882 typedef struct nchan_redis_conf_ll_s nchan_redis_conf_ll_t;
1883 struct nchan_redis_conf_ll_s {
1884   nchan_loc_conf_t       *lcf;
1885   nchan_redis_conf_ll_t  *next;
1886 };
1887 
1888 nchan_redis_conf_ll_t   *redis_conf_head;
1889 
nchan_store_redis_add_active_loc_conf(ngx_conf_t * cf,nchan_loc_conf_t * loc_conf)1890 ngx_int_t nchan_store_redis_add_active_loc_conf(ngx_conf_t *cf, nchan_loc_conf_t *loc_conf) {
1891   nchan_redis_conf_ll_t  *rcf_ll = ngx_palloc(cf->pool, sizeof(*rcf_ll));
1892   rcf_ll->lcf = loc_conf;
1893   rcf_ll->next = redis_conf_head;
1894   redis_conf_head = rcf_ll;
1895   return NGX_OK;
1896 }
1897 
nchan_store_redis_remove_active_loc_conf(ngx_conf_t * cf,nchan_loc_conf_t * loc_conf)1898 ngx_int_t nchan_store_redis_remove_active_loc_conf(ngx_conf_t *cf, nchan_loc_conf_t *loc_conf) {
1899   nchan_redis_conf_ll_t  *cur, *prev;
1900 
1901   for(cur = redis_conf_head, prev = NULL; cur != NULL; prev = cur, cur = cur->next) {
1902     if(cur->lcf == loc_conf) { //found it
1903       if(prev == NULL) {
1904         redis_conf_head = cur->next;
1905       }
1906       else {
1907         prev->next = cur->next;
1908       }
1909       //don't need to ngx_pfree
1910       return NGX_OK;
1911     }
1912   }
1913   return NGX_OK;
1914 }
1915 
1916 //initialization
nchan_store_init_module(ngx_cycle_t * cycle)1917 static ngx_int_t nchan_store_init_module(ngx_cycle_t *cycle) {
1918   ngx_core_conf_t                *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
1919   nchan_worker_processes = ccf->worker_processes;
1920   return NGX_OK;
1921 }
1922 
1923 
rdstore_initialize_chanhead_reaper(nchan_reaper_t * reaper,char * name)1924 ngx_int_t rdstore_initialize_chanhead_reaper(nchan_reaper_t *reaper, char *name) {
1925 
1926   nchan_reaper_start(reaper,
1927               name,
1928               offsetof(rdstore_channel_head_t, gc.prev),
1929               offsetof(rdstore_channel_head_t, gc.next),
1930   (ngx_int_t (*)(void *, uint8_t)) nchan_redis_chanhead_ready_to_reap,
1931   (void (*)(void *)) redis_store_reap_chanhead,
1932               4
1933   );
1934 
1935   return NGX_OK;
1936 }
1937 
nchan_store_init_redis_loc_conf_postconfig(nchan_loc_conf_t * lcf)1938 static ngx_int_t nchan_store_init_redis_loc_conf_postconfig(nchan_loc_conf_t *lcf) {
1939   nchan_redis_conf_t    *rcf = &lcf->redis;
1940 
1941   assert(rcf->enabled);
1942 
1943   //server-scope loc_conf may have some undefined values (because it was never merged with a prev)
1944   //thus we must reduntantly check for unser values
1945   if(rcf->ping_interval == NGX_CONF_UNSET) {
1946     rcf->ping_interval = NCHAN_REDIS_DEFAULT_PING_INTERVAL_TIME;
1947   }
1948   if(rcf->storage_mode == REDIS_MODE_CONF_UNSET) {
1949     rcf->storage_mode = REDIS_MODE_DISTRIBUTED;
1950   }
1951   if(rcf->nostore_fastpublish == NGX_CONF_UNSET) {
1952     rcf->nostore_fastpublish = 0;
1953   }
1954 
1955   return NGX_OK;
1956 }
1957 
nchan_store_init_postconfig(ngx_conf_t * cf)1958 static ngx_int_t nchan_store_init_postconfig(ngx_conf_t *cf) {
1959   nchan_loc_conf_t      *lcf;
1960   nchan_redis_conf_ll_t *cur;
1961   nchan_main_conf_t     *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_nchan_module);
1962   redis_nodeset_t       *nodeset;
1963 
1964   if(mcf->redis_publish_message_msgkey_size == NGX_CONF_UNSET_SIZE) {
1965     mcf->redis_publish_message_msgkey_size = NCHAN_REDIS_DEFAULT_PUBSUB_MESSAGE_MSGKEY_SIZE;
1966   }
1967   redis_publish_message_msgkey_size = mcf->redis_publish_message_msgkey_size;
1968 
1969   for(cur = redis_conf_head; cur != NULL; cur = cur->next) {
1970     lcf = cur->lcf;
1971     nchan_store_init_redis_loc_conf_postconfig(lcf);
1972 
1973     if((nodeset = nodeset_find(&lcf->redis)) == NULL) {
1974       nodeset = nodeset_create(lcf);
1975       rdstore_initialize_chanhead_reaper(&nodeset->chanhead_reaper, "Redis channel reaper");
1976     }
1977     if(!nodeset) {
1978       ERR("Unable to create Redis nodeset.");
1979       continue;
1980     }
1981   }
1982 
1983   return NGX_OK;
1984 }
1985 
nchan_store_create_main_conf(ngx_conf_t * cf,nchan_main_conf_t * mcf)1986 static void nchan_store_create_main_conf(ngx_conf_t *cf, nchan_main_conf_t *mcf) {
1987   mcf->redis_publish_message_msgkey_size=NGX_CONF_UNSET_SIZE;
1988 
1989   //reset redis_conf_head for reloads
1990   redis_conf_head = NULL;
1991   nodeset_destroy_all(); //reset all nodesets before loading config
1992 }
1993 
redis_store_prepare_to_exit_worker()1994 void redis_store_prepare_to_exit_worker() {
1995   rdstore_channel_head_t    *cur, *tmp;
1996   HASH_ITER(hh, chanhead_hash, cur, tmp) {
1997     cur->shutting_down = 1;
1998   }
1999 }
2000 
nodeset_exiter_stage1(redis_nodeset_t * ns,void * pd)2001 void nodeset_exiter_stage1(redis_nodeset_t *ns, void *pd) {
2002   nodeset_abort_on_ready_callbacks(ns);
2003 }
nodeset_exiter_stage2(redis_nodeset_t * ns,void * pd)2004 void nodeset_exiter_stage2(redis_nodeset_t *ns, void *pd) {
2005   unsigned *chanheads = pd;
2006   *chanheads += ns->chanhead_reaper.count;
2007   nchan_reaper_stop(&ns->chanhead_reaper);
2008 }
2009 
nodeset_exiter_stage3(redis_nodeset_t * ns,void * pd)2010 void nodeset_exiter_stage3(redis_nodeset_t *ns, void *pd) {
2011   nodeset_disconnect(ns);
2012 }
2013 
nchan_store_exit_worker(ngx_cycle_t * cycle)2014 static void nchan_store_exit_worker(ngx_cycle_t *cycle) {
2015   rdstore_channel_head_t     *cur, *tmp;
2016   unsigned                    chanheads = 0;
2017   DBG("redis exit worker");
2018 
2019   //old
2020   //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage1, NULL);
2021 
2022   nodeset_each(nodeset_exiter_stage1, NULL);
2023 
2024   HASH_ITER(hh, chanhead_hash, cur, tmp) {
2025     cur->shutting_down = 1;
2026     if(!cur->gc.in_reaper) {
2027       cur->spooler.fn->broadcast_status(&cur->spooler, NGX_HTTP_GONE, &NCHAN_HTTP_STATUS_410);
2028       redis_chanhead_gc_add(cur, 0, "exit worker");
2029     }
2030   }
2031 
2032   nodeset_each(nodeset_exiter_stage2, &chanheads);
2033 
2034   //OLD
2035   //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage2, &chanheads);
2036   nodeset_destroy_all();
2037 
2038   //OLD
2039   //rbtree_empty(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage3, NULL);
2040 
2041   nchan_exit_notice_about_remaining_things("redis channel", "", chanheads);
2042 }
2043 
nchan_store_exit_master(ngx_cycle_t * cycle)2044 static void nchan_store_exit_master(ngx_cycle_t *cycle) {
2045   nodeset_destroy_all();
2046 }
2047 
2048 typedef struct {
2049   ngx_str_t                   *channel_id;
2050   subscriber_t                *sub;
2051   unsigned                     allocd:1;
2052 } redis_subscribe_data_t;
2053 
2054 static ngx_int_t nchan_store_subscribe_continued(redis_subscribe_data_t *d);
2055 
subscribe_existing_channel_callback(ngx_int_t status,void * ch,void * d)2056 static ngx_int_t subscribe_existing_channel_callback(ngx_int_t status, void *ch, void *d) {
2057   nchan_channel_t              *channel = (nchan_channel_t *)ch;
2058   redis_subscribe_data_t       *data = (redis_subscribe_data_t *)d;
2059 
2060   if(channel == NULL) {
2061     data->sub->fn->respond_status(data->sub, NGX_HTTP_FORBIDDEN, NULL, NULL);
2062     data->sub->fn->release(data->sub, 0);
2063   }
2064   else {
2065     nchan_store_subscribe_continued(d);
2066   }
2067   assert(data->allocd);
2068   ngx_free(data);
2069 
2070   return NGX_OK;
2071 }
2072 
nchan_store_subscribe(ngx_str_t * channel_id,subscriber_t * sub)2073 static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) {
2074   redis_subscribe_data_t        d_data;
2075   redis_subscribe_data_t       *d = NULL;
2076 
2077   assert(sub->last_msgid.tagcount == 1);
2078 
2079   if(!sub->cf->subscribe_only_existing_channel) {
2080     d_data.allocd = 0;
2081     d_data.channel_id = channel_id;
2082     d_data.sub = sub;
2083     nchan_store_subscribe_continued(&d_data);
2084   }
2085   else {
2086     if((d=ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id->len, ngx_cycle->log))==NULL) {
2087       ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "can't allocate redis get_message callback data");
2088       return NGX_ERROR;
2089     }
2090     d->allocd = 1;
2091     d->channel_id=(ngx_str_t *)&d[1];
2092     d->channel_id->len = channel_id->len;
2093     d->channel_id->data = (u_char *)&(d->channel_id)[1];
2094     ngx_memcpy(d->channel_id->data, channel_id->data, channel_id->len);
2095     d->sub = sub;
2096     nchan_store_find_channel(channel_id, sub->cf, subscribe_existing_channel_callback, d);
2097   }
2098 
2099   return NGX_OK;
2100 }
2101 
nchan_store_subscribe_continued(redis_subscribe_data_t * d)2102 static ngx_int_t nchan_store_subscribe_continued(redis_subscribe_data_t *d) {
2103   //nchan_loc_conf_t           *cf = d->sub->cf;
2104   rdstore_channel_head_t       *ch;
2105   //ngx_int_t                   create_channel_ttl = cf->subscribe_only_existing_channel==1 ? 0 : cf->channel_timeout;
2106   assert(d->sub->cf->redis.enabled);
2107   redis_nodeset_t              *nodeset = nodeset_find(&d->sub->cf->redis);
2108   ngx_int_t                     rc;
2109 
2110   ch = nchan_store_get_chanhead(d->channel_id, nodeset);
2111 
2112   assert(ch != NULL);
2113 
2114   rc = ch->spooler.fn->add(&ch->spooler, d->sub);
2115   //redisAsyncCommand(rds_ctx(), &redis_getmessage_callback, (void *)d, "EVALSHA %s 0 %b %i %i %s %i", redis_lua_scripts.get_message.hash, STR(d->channel_id), d->msg_id->time, d->msg_id->tag[0], "FILO", create_channel_ttl);
2116   return rc;
2117 }
2118 
2119 typedef struct {
2120   ngx_msec_t            t;
2121   char                 *name;
2122   ngx_str_t            *channel_id;
2123   time_t                msg_time;
2124   nchan_msg_t          *msg;
2125   unsigned              shared_msg:1;
2126   unsigned              cluster_move_error:1;
2127   time_t                message_timeout;
2128   ngx_int_t             max_messages;
2129   nchan_msg_compression_type_t compression;
2130   ngx_int_t             msglen;
2131   callback_pt           callback;
2132   void                 *privdata;
2133   uint8_t               retry;
2134 } redis_publish_callback_data_t;
2135 
2136 static void redisPublishCallback(redisAsyncContext *, void *, void *);
2137 static void redisPublishNostoreCallback(redisAsyncContext *, void *, void *);
2138 static void redisPublishNostoreQueuedCheckCallback(redisAsyncContext *, void *, void *);
2139 static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd);
2140 
redis_publish_message_nodeset_maybe_retry(redis_nodeset_t * ns,redis_publish_callback_data_t * d)2141 static ngx_int_t redis_publish_message_nodeset_maybe_retry(redis_nodeset_t *ns, redis_publish_callback_data_t *d) {
2142   //retry maybe
2143   if(d->retry < REDIS_NODESET_NOT_READY_MAX_RETRIES) {
2144     d->retry++;
2145     nodeset_callback_on_ready(ns, 1000, redis_publish_message_send, d);
2146   }
2147   else {
2148     d->callback(NGX_HTTP_SERVICE_UNAVAILABLE, NULL, d->privdata);
2149     ngx_free(d);
2150   }
2151   return NGX_DECLINED;
2152 }
2153 
redis_publish_message_send(redis_nodeset_t * nodeset,void * pd)2154 static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd) {
2155   redis_publish_callback_data_t  *d = pd;
2156   ngx_int_t                       mmapped = 0;
2157   ngx_buf_t                      *buf;
2158   ngx_str_t                       msgstr;
2159   nchan_msg_t                    *msg = d->msg;
2160   const ngx_str_t                 empty=ngx_string("");
2161 
2162   if(!nodeset_ready(nodeset)) {
2163     return redis_publish_message_nodeset_maybe_retry(nodeset, d);
2164   }
2165 
2166   redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
2167 
2168   buf = &msg->buf;
2169   if(ngx_buf_in_memory(buf)) {
2170     msgstr.data = buf->pos;
2171     msgstr.len = buf->last - msgstr.data;
2172   }
2173   else { //in a file
2174     ngx_fd_t fd = buf->file->fd == NGX_INVALID_FILE ? nchan_fdcache_get(&buf->file->name) : buf->file->fd;
2175 
2176     msgstr.len = buf->file_last - buf->file_pos;
2177     msgstr.data = mmap(NULL, msgstr.len, PROT_READ, MAP_SHARED, fd, 0);
2178     if (msgstr.data != MAP_FAILED) {
2179       mmapped = 1;
2180     }
2181     else {
2182       msgstr.data = NULL;
2183       msgstr.len = 0;
2184       ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, ngx_errno, "Redis store: Couldn't mmap file %V", &buf->file->name);
2185     }
2186   }
2187   d->msglen = msgstr.len;
2188 
2189   if(nodeset->settings.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE) {
2190     //hand-roll the msgpacked message
2191     /*
2192     9A A3"msg" CE<ttl> CE<time> 00 00 00 DB<len><str> D9<len><str> D9<len><str> 0X
2193     |  |       |       |        |  |  |  |            |            |            |
2194     |  |       |       |        |  |  |  |            |            |    fixint0/1 compression
2195     |  |       |       |        |  |  |  |            |    bin8 <uint8 len> eventsource-event
2196     |  |       |       |        |  |  |  |     bin8 <uint8 len> content type
2197     |  |       |       |        |  |  |  bin32 <uint32 BE len> msg data
2198     |  |       |       |      fixint=0 tag=0, prev_time=0, prev_tag=0
2199     |  |       |    uint32 (BE) message time
2200     |  |     uint32 (BE) message ttl
2201     |  fixstr[3]
2202     fixarray[10]
2203     */
2204 
2205     uint32_t ttl, time;
2206     uint32_t msglen;
2207     uint8_t  content_type_len, eventsource_event_len, compression;
2208     char     zero='\0';
2209     int      fastpublish = nodeset->settings.nostore_fastpublish;
2210     void   (*publish_callback)(redisAsyncContext *, void *, void *) = NULL;
2211 
2212 
2213     ttl = htonl(d->message_timeout);
2214     time = htonl(msg->id.time);
2215     msglen = htonl(d->msglen);
2216     content_type_len = msg->content_type ? (msg->content_type->len > 255 ? 255 : msg->content_type->len) : 0;
2217     eventsource_event_len = msg->eventsource_event ? (msg->eventsource_event->len > 255 ? 255 : msg->eventsource_event->len) : 0;
2218     compression = d->compression;
2219 
2220     if(!fastpublish) {
2221       redis_command(node, NULL, d, "MULTI");
2222       publish_callback = redisPublishNostoreQueuedCheckCallback;
2223     }
2224     else {
2225       publish_callback = redisPublishNostoreCallback;
2226     }
2227     redis_command(node, publish_callback, d,
2228       "PUBLISH %b{channel:%b}:pubsub "
2229       "\x9A\xA3msg\xCE%b\xCE%b%b%b%b\xDB%b%b\xD9%b%b\xD9%b%b%b",
2230 
2231       STR(nodeset->settings.namespace),
2232       STR(d->channel_id),
2233 
2234       (char *)&ttl, (size_t )4,
2235       (char *)&time, (size_t )4,
2236       (char *)&zero, (size_t )1,
2237       (char *)&zero, (size_t )1,
2238       (char *)&zero, (size_t )1,
2239       (char *)&msglen, (size_t )4,
2240       STR(&msgstr),
2241       (char *)&content_type_len, (size_t )1,
2242       STR((msg->content_type ? msg->content_type : &empty)),
2243       (char *)&eventsource_event_len, (size_t )1,
2244       STR((msg->eventsource_event ? msg->eventsource_event : &empty)),
2245       (char *)&compression, (size_t )1
2246     );
2247     if(!fastpublish) {
2248       redis_command(node, publish_callback, d, "HMGET %b{channel:%b} last_seen_fake_subscriber fake_subscribers",
2249         STR(nodeset->settings.namespace),
2250         STR(d->channel_id)
2251       );
2252       redis_command(node, &redisPublishNostoreCallback, d, "EXEC");
2253     }
2254 
2255   }
2256   else {
2257     //input:  keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff]
2258     //output: message_time, message_tag, channel_hash {ttl, time_last_seen, subscribers, messages}
2259     nchan_redis_script(publish, node, &redisPublishCallback, d, d->channel_id,
2260                       "%i %b %b %b %i %i %i %i %i",
2261                       msg->id.time,
2262                       STR(&msgstr),
2263                       STR((msg->content_type ? msg->content_type : &empty)),
2264                       STR((msg->eventsource_event ? msg->eventsource_event : &empty)),
2265                       d->compression,
2266                       d->message_timeout,
2267                       d->max_messages,
2268                       redis_publish_message_msgkey_size,
2269                       nodeset->settings.optimize_target
2270                       );
2271   }
2272   if(mmapped && munmap(msgstr.data, msgstr.len) == -1) {
2273     ERR("munmap was a problem");
2274     return NGX_ERROR;
2275   }
2276   return NGX_OK;
2277 }
2278 
nchan_store_publish_message(ngx_str_t * channel_id,nchan_msg_t * msg,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)2279 static ngx_int_t nchan_store_publish_message(ngx_str_t *channel_id, nchan_msg_t *msg, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
2280   redis_publish_callback_data_t  *d=NULL;
2281   redis_nodeset_t                *ns = nodeset_find(&cf->redis);
2282   assert(callback != NULL);
2283 
2284   CREATE_CALLBACK_DATA(d, ns, cf, "publish_message", channel_id, callback, privdata);
2285 
2286   d->msg_time=msg->id.time;
2287   if(d->msg_time == 0) {
2288     d->msg_time = ngx_time();
2289   }
2290   d->msg = msg;
2291   d->shared_msg = msg->storage == NCHAN_MSG_SHARED;
2292   d->message_timeout = nchan_loc_conf_message_timeout(cf);
2293   d->max_messages = nchan_loc_conf_max_messages(cf);
2294   d->compression = cf->message_compression;
2295   d->retry = 0;
2296   d->cluster_move_error = 0;
2297 
2298   assert(msg->id.tagcount == 1);
2299 
2300   if(d->shared_msg) {
2301     msg_reserve(d->msg, "redis publish");
2302   }
2303   redis_publish_message_send(ns, d);
2304 
2305   return NGX_OK;
2306 }
2307 
redisReply_to_int(redisReply * reply,int nil_value,int wrong_datatype_value)2308 static int64_t redisReply_to_int(redisReply *reply, int nil_value, int wrong_datatype_value) {
2309   switch(reply->type) {
2310     case REDIS_REPLY_INTEGER:
2311       return reply->integer;
2312     case REDIS_REPLY_STRING:
2313       return atol(reply->str);
2314     case REDIS_REPLY_NIL:
2315       return nil_value;
2316     default:
2317       return wrong_datatype_value;
2318   }
2319 }
2320 
redisPublishNostoreCallback(redisAsyncContext * c,void * r,void * privdata)2321 static void redisPublishNostoreCallback(redisAsyncContext *c, void *r, void *privdata) {
2322   redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2323   redisReply                    *reply=r;
2324   redisReply                   **els;
2325   nchan_channel_t                ch;
2326 
2327 
2328 
2329   redis_node_t                 *node = c->data;
2330   node->pending_commands--;
2331   nchan_update_stub_status(redis_pending_commands, -1);
2332 
2333   if(d->shared_msg) {
2334     msg_release(d->msg, "redis publish");
2335   }
2336 
2337   ngx_memzero(&ch, sizeof(ch)); //for debugging basically. should be removed in the future and zeroed as-needed
2338   if(d->cluster_move_error) {
2339     nodeset_node_keyslot_changed(node);
2340     d->callback(NGX_HTTP_SERVICE_UNAVAILABLE, NULL, d->privdata);
2341   }
2342   else if(reply) {
2343     if(reply->type == REDIS_REPLY_ARRAY && reply->elements == 2 && reply->element[1]->type == REDIS_REPLY_ARRAY && reply->element[1]->elements == 2) {
2344       els = reply->element[1]->element;
2345       ch.last_seen = redisReply_to_int(els[0], 0, 0);
2346       ch.subscribers = redisReply_to_int(els[1], 0, 0);
2347       d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2348     }
2349     else if(reply->type == REDIS_REPLY_INTEGER) {
2350       ch.last_seen = 0;
2351       ch.subscribers = redisReply_to_int(reply, 0, 0);
2352       d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2353     }
2354     else {
2355       redisEchoCallback(c, r, privdata);
2356       d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2357     }
2358   }
2359   else {
2360     redisEchoCallback(c, r, privdata);
2361     d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2362   }
2363 
2364   ngx_free(d);
2365 }
2366 
redisPublishNostoreQueuedCheckCallback(redisAsyncContext * c,void * r,void * privdata)2367 static void redisPublishNostoreQueuedCheckCallback(redisAsyncContext *c, void *r, void *privdata) {
2368   redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2369   redisReply                    *reply=r;
2370 
2371   redis_node_t                 *node = c->data;
2372 
2373   if(reply && !CHECK_REPLY_STATUSVAL(reply, "QUEUED")) {
2374     if(!nodeset_node_reply_keyslot_ok(node, reply)) {
2375       d->cluster_move_error = 1;
2376     }
2377     else {
2378       redisEchoCallback(c, r, privdata);
2379     }
2380   }
2381 }
2382 
redisPublishCallback(redisAsyncContext * c,void * r,void * privdata)2383 static void redisPublishCallback(redisAsyncContext *c, void *r, void *privdata) {
2384   redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2385   redisReply                    *reply=r;
2386   redisReply                    *cur;
2387   nchan_channel_t                ch;
2388 
2389   redis_node_t                 *node = c->data;
2390   node->pending_commands--;
2391   nchan_update_stub_status(redis_pending_commands, -1);
2392 
2393   if(!nodeset_node_reply_keyslot_ok(node, reply)) {
2394     if(d->shared_msg) {
2395       redis_publish_message_nodeset_maybe_retry(node->nodeset, d);
2396     }
2397     else {
2398       //message probably isn't available anymore...
2399       ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "redis store received cluster MOVE/ASK error while publishing, and can't retry publishing after reconfiguring cluster.");
2400       d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2401       ngx_free(d);
2402     }
2403     return;
2404   }
2405 
2406   if(d->shared_msg) {
2407     msg_release(d->msg, "redis publish");
2408   }
2409 
2410   ngx_memzero(&ch, sizeof(ch)); //for debugging basically. should be removed in the future and zeroed as-needed
2411 
2412   if(reply && CHECK_REPLY_ARRAY_MIN_SIZE(reply, 2)) {
2413     cur=reply->element[0];
2414     switch(redis_array_to_channel(cur, &ch)) {
2415       case NGX_OK:
2416         d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2417         break;
2418       case NGX_DECLINED: //not found
2419         d->callback(NGX_OK, NULL, d->privdata);
2420         break;
2421       case NGX_ERROR:
2422       default:
2423         redisEchoCallback(c, r, privdata);
2424         d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2425     }
2426   }
2427   else {
2428     redisEchoCallback(c, r, privdata);
2429     d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2430   }
2431   ngx_free(d);
2432 }
2433 
2434 
2435 
2436 typedef struct {
2437   ngx_str_t      *channel_id;
2438   ngx_int_t       count;
2439 } add_fakesub_data_t;
2440 
2441 
2442 static void nchan_store_redis_add_fakesub_callback(redisAsyncContext *c, void *r, void *privdata);
nchan_store_redis_add_fakesub_send(redis_nodeset_t * nodeset,void * pd)2443 static ngx_int_t nchan_store_redis_add_fakesub_send(redis_nodeset_t *nodeset, void *pd) {
2444   add_fakesub_data_t *d = pd;
2445   if(nodeset_ready(nodeset)) {
2446     redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
2447     nchan_redis_script(add_fakesub, node, &nchan_store_redis_add_fakesub_callback, NULL,
2448                        d->channel_id,
2449                        "%i %i",
2450                        d->count,
2451                        ngx_time()
2452                       );
2453     return NGX_OK;
2454   }
2455   else {
2456     return NGX_ERROR;
2457   }
2458 }
2459 
nchan_store_redis_add_fakesub_send_retry_wrapper(redis_nodeset_t * nodeset,void * pd)2460 static ngx_int_t nchan_store_redis_add_fakesub_send_retry_wrapper(redis_nodeset_t *nodeset, void *pd) {
2461   add_fakesub_data_t *d = pd;
2462   ngx_int_t rc = nchan_store_redis_add_fakesub_send(nodeset, pd);
2463   ngx_free(d);
2464   return rc;
2465 }
2466 
nchan_store_redis_add_fakesub_callback(redisAsyncContext * c,void * r,void * privdata)2467 static void nchan_store_redis_add_fakesub_callback(redisAsyncContext *c, void *r, void *privdata) {
2468   redisReply      *reply = r;
2469   redis_node_t    *node = c->data;
2470 
2471   node->pending_commands--;
2472   nchan_update_stub_status(redis_pending_commands, -1);
2473 
2474   if(reply && reply->type == REDIS_REPLY_ERROR) {
2475     ngx_str_t    errstr;
2476     ngx_str_t    countstr;
2477     ngx_str_t    channel_id;
2478     intptr_t     count;
2479 
2480     errstr.data = (u_char *)reply->str;
2481     errstr.len = strlen(reply->str);
2482 
2483     if(ngx_str_chop_if_startswith(&errstr, "CLUSTER KEYSLOT ERROR. ")) {
2484       nodeset_node_keyslot_changed(node);
2485       nchan_scan_until_chr_on_line(&errstr, &countstr, ' ');
2486       count = ngx_atoi(countstr.data, countstr.len);
2487       channel_id = errstr;
2488 
2489       add_fakesub_data_t  *d = ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id.len, ngx_cycle->log);
2490       if(!d) {
2491         ERR("can't allocate add_fakesub_data for CLUSTER KEYSLOT ERROR retry");
2492         return;
2493       }
2494       d->count = count;
2495       d->channel_id = (ngx_str_t *)&d[1];
2496       d->channel_id->data = (u_char *)&d->channel_id[1];
2497       nchan_strcpy(d->channel_id, &channel_id, 0);
2498       nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_redis_add_fakesub_send_retry_wrapper, d);
2499 
2500       return;
2501     }
2502 
2503   }
2504   redisCheckErrorCallback(c, r, privdata);
2505 }
2506 
nchan_store_redis_fakesub_add(ngx_str_t * channel_id,nchan_loc_conf_t * cf,ngx_int_t count,uint8_t shutting_down)2507 ngx_int_t nchan_store_redis_fakesub_add(ngx_str_t *channel_id, nchan_loc_conf_t *cf, ngx_int_t count, uint8_t shutting_down) {
2508   redis_nodeset_t  *nodeset = nodeset_find(&cf->redis);
2509 
2510   if(!shutting_down) {
2511     add_fakesub_data_t   data = {channel_id, count};
2512     nchan_store_redis_add_fakesub_send(nodeset, &data);
2513   }
2514   else {
2515     if(nodeset_ready(nodeset)) {
2516       redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, channel_id);
2517       redis_sync_command(node, "EVALSHA %s 0 %b %i", redis_lua_scripts.add_fakesub.hash, STR(channel_id), count);
2518     }
2519   }
2520   return NGX_OK;
2521 }
2522 
nchan_store_redis_ready(nchan_loc_conf_t * cf)2523 int nchan_store_redis_ready(nchan_loc_conf_t *cf) {
2524   redis_nodeset_t   *nodeset = nodeset_find(&cf->redis);
2525   return nodeset && nodeset_ready(nodeset);
2526 }
2527 
2528 
2529 typedef struct {
2530   callback_pt  cb;
2531   void        *pd;
2532 } redis_subscriber_info_id_data_t;
2533 
2534 static void get_subscriber_info_id_callback(redisAsyncContext *c, void *r, void *privdata);
2535 
nchan_store_get_subscriber_info_id(nchan_loc_conf_t * cf,callback_pt cb,void * pd)2536 static ngx_int_t nchan_store_get_subscriber_info_id(nchan_loc_conf_t *cf, callback_pt cb, void *pd) {
2537   redis_nodeset_t  *nodeset = nodeset_find(&cf->redis);
2538 
2539   if(!nodeset_ready(nodeset)) {
2540     return NGX_ERROR;
2541   }
2542 
2543   ngx_str_t request_id_key = ngx_string(NCHAN_REDIS_UNIQUE_REQUEST_ID_KEY);
2544   redis_node_t  *node = nodeset_node_find_by_key(nodeset, &request_id_key);
2545   if(!node) {
2546     return NGX_ERROR;
2547   }
2548 
2549   redis_subscriber_info_id_data_t *d = ngx_alloc(sizeof(*d), ngx_cycle->log);
2550   if(d == NULL) {
2551     return NGX_ERROR;
2552   }
2553 
2554   d->cb = cb;
2555   d->pd = pd;
2556 
2557   redis_script(get_subscriber_info_id, node, &get_subscriber_info_id_callback, d, "1 %b", STR(&request_id_key));
2558 
2559   return NGX_DONE;
2560 }
2561 
get_subscriber_info_id_callback(redisAsyncContext * c,void * r,void * privdata)2562 static void get_subscriber_info_id_callback(redisAsyncContext *c, void *r, void *privdata) {
2563   redis_subscriber_info_id_data_t *d = privdata;
2564   redisReply                    *reply = r;
2565 
2566   redis_node_t                 *node = c->data;
2567   node->pending_commands--;
2568 
2569   callback_pt  cb = d->cb;
2570   void        *cb_pd = d->pd;
2571 
2572   ngx_free(d);
2573 
2574   if (!redisReplyOk(c, reply)) {
2575     cb(NGX_ERROR, NULL, cb_pd);
2576     return;
2577   }
2578 
2579   int64_t new_id = redisReply_to_int(reply, 0, 0);
2580   cb(NGX_OK, (void *)(uintptr_t )new_id, cb_pd);
2581 }
2582 
nchan_store_request_subscriber_info(ngx_str_t * channel_id,ngx_int_t request_id,nchan_loc_conf_t * cf,callback_pt cb,void * pd)2583 static ngx_int_t nchan_store_request_subscriber_info(ngx_str_t *channel_id, ngx_int_t request_id, nchan_loc_conf_t *cf, callback_pt cb, void *pd) {
2584   if(nchan_channel_id_is_multi(channel_id)) {
2585     ERR("redis nchan_store_request_subscriber_info can't handle multi-channel ids");
2586     return NGX_ERROR;
2587   }
2588 
2589   redis_nodeset_t               *nodeset = nodeset_find(&cf->redis);
2590   if(!nodeset) {
2591     ERR("redis nodeset not found for nchan_store_request_subscriber_info");
2592     return NGX_ERROR;
2593   }
2594 
2595   if(!nodeset_ready(nodeset)) {
2596     ERR("redis nodeset not ready for nchan_store_request_subscriber_info");
2597     return NGX_ERROR;
2598   }
2599 
2600   redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, channel_id);
2601   if(!node) {
2602     ERR("couldn't find Redis node for nchan_store_request_subscriber_info");
2603     return NGX_ERROR;
2604   }
2605 
2606   nchan_redis_script( request_subscriber_info, node, &redisCheckErrorCallback, NULL, channel_id, "%i", (int )request_id);
2607 
2608   return NGX_DONE;
2609 }
2610 
2611 nchan_store_t nchan_store_redis = {
2612   //init
2613   &nchan_store_init_module,
2614   &nchan_store_init_worker,
2615   &nchan_store_init_postconfig,
2616   &nchan_store_create_main_conf,
2617 
2618   //shutdown
2619   &nchan_store_exit_worker,
2620   &nchan_store_exit_master,
2621 
2622   //async-friendly functions with callbacks
2623   &nchan_store_async_get_message, //+callback
2624   &nchan_store_subscribe, //+callback
2625   &nchan_store_publish_message, //+callback
2626 
2627   &nchan_store_delete_channel, //+callback
2628   &nchan_store_find_channel, //+callback
2629 
2630   NULL, //get_group
2631   NULL, //set group
2632   NULL, //delete group
2633 
2634   &nchan_store_get_subscriber_info_id, //+callback
2635   &nchan_store_request_subscriber_info //+callback
2636 
2637 };
2638 
2639