1 #include <nchan_module.h>
2
3 #include <assert.h>
4 #include <netinet/ip.h>
5 #include "store-private.h"
6 #include "store.h"
7
8 #include "redis_nginx_adapter.h"
9
10 #include <util/nchan_msg.h>
11 #include <util/nchan_rbtree.h>
12 #include <store/store_common.h>
13
14 #include <store/memory/store.h>
15
16 #include "redis_nodeset.h"
17 #include "redis_lua_commands.h"
18
19 #define REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP 600 //10min
20 #define REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX 2628000 //whole month
21
22 #define REDIS_RECONNECT_TIME 5000
23
24 #define REDIS_STALL_CHECK_TIME 0 //disable for now
25
26 //#define DEBUG_LEVEL NGX_LOG_WARN
27 #define DEBUG_LEVEL NGX_LOG_DEBUG
28
29 #define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "REDISTORE: " fmt, ##args)
30 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "REDISTORE: " fmt, ##args)
31
32 #define REDIS_CONNECTION_FOR_PUBLISH_WAIT 5000
33
34 u_char redis_subscriber_id[255];
35 size_t redis_subscriber_id_len;
36
37 static rdstore_channel_head_t *chanhead_hash = NULL;
38 static size_t redis_publish_message_msgkey_size;
39
40
41 #define CHANNEL_HASH_FIND(id_buf, p) HASH_FIND( hh, chanhead_hash, (id_buf)->data, (id_buf)->len, p)
42 #define CHANNEL_HASH_ADD(chanhead) HASH_ADD_KEYPTR( hh, chanhead_hash, (chanhead->id).data, (chanhead->id).len, chanhead)
43 #define CHANNEL_HASH_DEL(chanhead) HASH_DEL( chanhead_hash, chanhead)
44
45 #include <stdbool.h>
46 #include "cmp.h"
47
48
49 #define redis_subscriber_command(node, cb, pd, fmt, args...) \
50 do { \
51 if((node)->state >= REDIS_NODE_READY) { \
52 redisAsyncCommand((node)->ctx.pubsub, cb, pd, fmt, ##args); \
53 } else { \
54 ERR("Can't run redis command: no connection to redis server.");\
55 } \
56 }while(0) \
57
58
59
60 #define redis_sync_command(node, fmt, args...) \
61 do { \
62 if((node)->ctx.sync == NULL) { \
63 redis_nginx_open_sync_context(((node)->connect_params.peername.len > 0 ? &(node)->connect_params.peername : &(node)->connect_params.hostname), (node)->connect_params.port, (node)->connect_params.db, &(node)->connect_params.password, &(node)->ctx.sync); \
64 } \
65 if((node)->ctx.sync) { \
66 redisCommand((node)->ctx.sync, fmt, ##args); \
67 } else { \
68 ERR("Can't run redis command: no connection to redis server.");\
69 } \
70 }while(0)
71
72 #define redis_sync_script(script_name, node, fmt, args...) \
73 redis_sync_command(node, "EVALSHA %s " fmt, redis_lua_scripts.script_name.hash, ##args)
74
75 #define nchan_redis_sync_script(script_name, node, channel_id, fmt, args...) \
76 redis_sync_script(script_name, node, "0 %b %b " fmt, STR(node->nodeset->settings.namespace), STR(channel_id), ##args)
77
78
79
80 #define redis_command(node, cb, pd, fmt, args...) \
81 do { \
82 if(node->state >= REDIS_NODE_READY) { \
83 if((cb) != NULL) { \
84 /* a reply is expected, so track this command */ \
85 node->pending_commands++; \
86 nchan_update_stub_status(redis_pending_commands, 1); \
87 } \
88 redisAsyncCommand((node)->ctx.cmd, cb, pd, fmt, ##args); \
89 } else { \
90 node_log_error(node, "Can't run redis command: no connection to redis server.");\
91 } \
92 }while(0) \
93
94 #define redis_script(script_name, node, cb, pd, fmt, args...) \
95 redis_command(node, cb, pd, "EVALSHA %s " fmt, redis_lua_scripts.script_name.hash, ##args)
96
97 #define nchan_redis_script(script_name, node, cb, pd, channel_id, fmt, args...) \
98 redis_script(script_name, node, cb, pd, "0 %b %b " fmt, STR((node)->nodeset->settings.namespace), STR(channel_id), ##args)
99
100
101 #define CHECK_REPLY_STR(reply) ((reply)->type == REDIS_REPLY_STRING)
102 #define CHECK_REPLY_STRVAL(reply, v) ( CHECK_REPLY_STR(reply) && ngx_strcmp((reply)->str, v) == 0 )
103 #define CHECK_REPLY_STRNVAL(reply, v, n) ( CHECK_REPLY_STR(reply) && ngx_strncmp((reply)->str, v, n) == 0 )
104 #define CHECK_REPLY_STATUSVAL(reply, v) ( (reply)->type == REDIS_REPLY_STATUS && ngx_strcmp((reply)->str, v) == 0 )
105 #define CHECK_REPLY_INT(reply) ((reply)->type == REDIS_REPLY_INTEGER)
106 #define CHECK_REPLY_INTVAL(reply, v) ( CHECK_REPLY_INT(reply) && (reply)->integer == v )
107 #define CHECK_REPLY_ARRAY_MIN_SIZE(reply, size) ( (reply)->type == REDIS_REPLY_ARRAY && (reply)->elements >= (unsigned )size )
108 #define CHECK_REPLY_NIL(reply) ((reply)->type == REDIS_REPLY_NIL)
109 #define CHECK_REPLY_INT_OR_STR(reply) ((reply)->type == REDIS_REPLY_INTEGER || (reply)->type == REDIS_REPLY_STRING)
110
111 static ngx_int_t nchan_store_publish_generic(ngx_str_t *, redis_nodeset_t *, nchan_msg_t *, ngx_int_t, const ngx_str_t *);
112
113 static rdstore_channel_head_t * nchan_store_get_chanhead(ngx_str_t *channel_id, redis_nodeset_t *);
114
set_buf(ngx_buf_t * buf,u_char * start,off_t len)115 static ngx_buf_t *set_buf(ngx_buf_t *buf, u_char *start, off_t len){
116 ngx_memzero(buf, sizeof(*buf));
117 buf->start = start;
118 buf->end = start + len;
119 buf->pos = buf->start;
120 buf->last = buf->end;
121 return buf;
122 }
123
ngx_strmatch(ngx_str_t * str,char * match)124 static int ngx_strmatch(ngx_str_t *str, char *match) {
125 return ngx_strncmp(str->data, match, str->len) == 0;
126 }
127
ngx_str_chop_if_startswith(ngx_str_t * str,char * match)128 static int ngx_str_chop_if_startswith(ngx_str_t *str, char *match) {
129 char *cur, *max = (char *)str->data + str->len;
130 for(cur = (char *)str->data; cur < max; cur++, match++) {
131 if(*match == '\0') {
132 str->len -= (u_char *)cur - str->data;
133 str->data = (u_char *)cur;
134 return 1;
135 }
136 else if(*match != *cur)
137 break;
138 }
139 return 0;
140 }
141
fwd_buf(ngx_buf_t * buf,size_t sz)142 static u_char *fwd_buf(ngx_buf_t *buf, size_t sz) {
143 u_char *ret = buf->pos;
144 buf->pos += sz;
145 return ret;
146 }
147
fwd_buf_to_str(ngx_buf_t * buf,size_t sz,ngx_str_t * str)148 static void fwd_buf_to_str(ngx_buf_t *buf, size_t sz, ngx_str_t *str) {
149 str->data = fwd_buf(buf, sz);
150 str->len = sz;;
151 }
152
ngx_buf_reader(cmp_ctx_t * ctx,void * data,size_t limit)153 static bool ngx_buf_reader(cmp_ctx_t *ctx, void *data, size_t limit) {
154 ngx_buf_t *buf=(ngx_buf_t *)ctx->buf;
155 if(buf->pos + limit > buf->last){
156 return false;
157 }
158 ngx_memcpy(data, buf->pos, limit);
159 buf->pos += limit;
160 return true;
161 }
ngx_buf_writer(cmp_ctx_t * ctx,const void * data,size_t count)162 static size_t ngx_buf_writer(cmp_ctx_t *ctx, const void *data, size_t count) {
163 return 0;
164 }
165
nchan_store_redis_validate_url(ngx_str_t * url)166 int nchan_store_redis_validate_url(ngx_str_t *url) {
167 redis_connect_params_t rcp;
168 return parse_redis_url(url, &rcp) == NGX_OK;
169 }
170
parse_redis_url(ngx_str_t * url,redis_connect_params_t * rcp)171 ngx_int_t parse_redis_url(ngx_str_t *url, redis_connect_params_t *rcp) {
172 u_char *cur, *last, *ret;
173
174 cur = url->data;
175 last = url->data + url->len;
176
177
178 //ignore redis://
179 if(ngx_strnstr(cur, "redis://", 8) != NULL) {
180 cur += 8;
181 }
182
183 if(cur[0] == ':') {
184 cur++;
185 if((ret = ngx_strlchr(cur, last, '@')) == NULL) {
186 rcp->password.data = NULL;
187 rcp->password.len = 0;
188 return NGX_ERROR;
189 }
190 else {
191 rcp->password.data = cur;
192 rcp->password.len = ret - cur;
193 cur = ret + 1;
194 }
195 }
196 else {
197 rcp->password.data = NULL;
198 rcp->password.len = 0;
199 }
200
201 ///port:host
202 if((ret = ngx_strlchr(cur, last, ':')) == NULL) {
203 //just host
204 rcp->port = 6379;
205 if((ret = ngx_strlchr(cur, last, '/')) == NULL) {
206 ret = last;
207 }
208 rcp->hostname.data = cur;
209 rcp->hostname.len = ret - cur;
210 }
211 else {
212 rcp->hostname.data = cur;
213 rcp->hostname.len = ret - cur;
214 cur = ret + 1;
215
216 //port
217 if((ret = ngx_strlchr(cur, last, '/')) == NULL) {
218 ret = last;
219 }
220 rcp->port = ngx_atoi(cur, ret-cur);
221 if(rcp->port == NGX_ERROR) {
222 return NGX_ERROR;
223 }
224 }
225 cur = ret;
226
227 if(cur[0] == '/') {
228 cur++;
229 rcp->db = ngx_atoi(cur, last-cur);
230 if(rcp->db == NGX_ERROR) {
231 rcp->db = 0;
232 }
233 }
234 else {
235 rcp->db = 0;
236 }
237
238 return NGX_OK;
239 }
240
redis_store_reap_chanhead(rdstore_channel_head_t * ch)241 static void redis_store_reap_chanhead(rdstore_channel_head_t *ch) {
242 if(!ch->shutting_down) {
243 assert(ch->sub_count == 0 && ch->fetching_message_count == 0);
244 }
245
246 DBG("reap channel %V", &ch->id);
247
248 if(ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBED) {
249 assert(ch->redis.nodeset->settings.storage_mode >= REDIS_MODE_DISTRIBUTED);
250 assert(ch->redis.node.pubsub);
251 ch->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
252 redis_subscriber_command(ch->redis.node.pubsub, NULL, NULL, "UNSUBSCRIBE %b{channel:%b}:pubsub", STR(ch->redis.nodeset->settings.namespace), STR(&ch->id));
253 }
254
255 /*
256 redis_nodeset_t *ns = ch->redis.nodeset;
257 redis_node_t *cmd = ch->redis.node.cmd;
258 redis_node_t *pubsub = ch->redis.node.pubsub;
259 */
260
261 nodeset_dissociate_chanhead(ch);
262
263 /*
264 rdstore_channel_head_t *cur;
265
266
267 for(cur = nchan_slist_first(&ns->channels.all); cur != NULL; cur = nchan_slist_next(&ns->channels.all, cur)) {
268 assert(cur != ch);
269 }
270 for(cur = nchan_slist_first(&ns->channels.disconnected_cmd); cur != NULL; cur = nchan_slist_next(&ns->channels.disconnected_cmd, cur)) {
271 assert(cur != ch);
272 }
273 for(cur = nchan_slist_first(&ns->channels.disconnected_pubsub); cur != NULL; cur = nchan_slist_next(&ns->channels.disconnected_pubsub, cur)) {
274 assert(cur != ch);
275 }
276 if(cmd) {
277 for(cur = nchan_slist_first(&cmd->channels.cmd); cur != NULL; cur = nchan_slist_next(&cmd->channels.cmd, cur)) {
278 assert(cur != ch);
279 }
280 for(cur = nchan_slist_first(&cmd->channels.pubsub); cur != NULL; cur = nchan_slist_next(&cmd->channels.pubsub, cur)) {
281 assert(cur != ch);
282 }
283 }
284 if(pubsub) {
285 for(cur = nchan_slist_first(&pubsub->channels.cmd); cur != NULL; cur = nchan_slist_next(&pubsub->channels.cmd, cur)) {
286 assert(cur != ch);
287 }
288 for(cur = nchan_slist_first(&pubsub->channels.pubsub); cur != NULL; cur = nchan_slist_next(&pubsub->channels.pubsub, cur)) {
289 assert(cur != ch);
290 }
291 }
292 */
293
294
295 DBG("chanhead %p (%V) is empty and expired. delete.", ch, &ch->id);
296 if(ch->keepalive_timer.timer_set) {
297 ngx_del_timer(&ch->keepalive_timer);
298 }
299 stop_spooler(&ch->spooler, 1);
300 CHANNEL_HASH_DEL(ch);
301
302 ngx_free(ch);
303 }
304
nchan_redis_chanhead_ready_to_reap(rdstore_channel_head_t * ch,uint8_t force)305 static ngx_int_t nchan_redis_chanhead_ready_to_reap(rdstore_channel_head_t *ch, uint8_t force) {
306 if(!force) {
307 if(ch->status != INACTIVE) {
308 return NGX_DECLINED;
309 }
310
311 if(ch->reserved > 0 ) {
312 DBG("not yet time to reap %V, %i reservations left", &ch->id, ch->reserved);
313 return NGX_DECLINED;
314 }
315
316 if(ch->gc.time - ngx_time() > 0) {
317 DBG("not yet time to reap %V, %i sec left", &ch->id, ch->gc.time - ngx_time());
318 return NGX_DECLINED;
319 }
320
321 if (ch->sub_count > 0) { //there are subscribers
322 DBG("not ready to reap %V, %i subs left", &ch->id, ch->sub_count);
323 return NGX_DECLINED;
324 }
325
326 if (ch->fetching_message_count > 0) { //there are subscribers
327 DBG("not ready to reap %V, fetching %i messages", &ch->id, ch->fetching_message_count);
328 return NGX_DECLINED;
329 }
330
331 //if(ch->pubsub_status == REDIS_PUBSUB_SUBSCRIBING) {
332 // return NGX_DECLINED;
333 //}
334
335 //DBG("ok to delete channel %V", &ch->id);
336 return NGX_OK;
337 }
338 else {
339 //force delete is always ok
340 return NGX_OK;
341 }
342 }
343
344 static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privdata);
345
nchan_store_init_worker(ngx_cycle_t * cycle)346 static ngx_int_t nchan_store_init_worker(ngx_cycle_t *cycle) {
347 ngx_int_t rc = NGX_OK;
348 u_char *cur;
349 cur = ngx_snprintf(redis_subscriber_id, 512, "nchan_worker:{%i:time:%i}%Z", ngx_pid, ngx_time());
350 redis_subscriber_id_len = cur - redis_subscriber_id;
351
352 //DBG("worker id %s len %i", redis_subscriber_id, redis_subscriber_id_len);
353
354 redis_nginx_init();
355
356 nodeset_initialize((char *)redis_subscriber_id, redis_subscriber_callback);
357 nodeset_connect_all();
358
359 //OLD
360 //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_connector, &rc);
361 return rc;
362 }
363
redisCheckErrorCallback(redisAsyncContext * c,void * r,void * privdata)364 void redisCheckErrorCallback(redisAsyncContext *c, void *r, void *privdata) {
365 redisReplyOk(c, r);
366 }
redisReplyOk(redisAsyncContext * ac,void * r)367 int redisReplyOk(redisAsyncContext *ac, void *r) {
368 static const ngx_str_t script_error_start= ngx_string("ERR Error running script (call to f_");
369 redis_node_t *node = ac->data;
370 redisReply *reply = (redisReply *)r;
371 if(reply == NULL) { //redis disconnected?...
372 if(ac->err) {
373 node_log_error(node, "connection to redis failed while waiting for reply - %s", ac->errstr);
374 }
375 else {
376 node_log_error(node, "got a NULL redis reply for unknown reason");
377 }
378 return 0;
379 }
380 else if(reply->type == REDIS_REPLY_ERROR) {
381 if(ngx_strncmp(reply->str, script_error_start.data, script_error_start.len) == 0 && (unsigned ) reply->len > script_error_start.len + REDIS_LUA_HASH_LENGTH) {
382 char *hash = &reply->str[script_error_start.len];
383 redis_lua_script_t *script;
384 REDIS_LUA_SCRIPTS_EACH(script) {
385 if (ngx_strncmp(script->hash, hash, REDIS_LUA_HASH_LENGTH)==0) {
386 node_log_error(node, "REDIS SCRIPT ERROR: %s :%s", script->name, &reply->str[script_error_start.len + REDIS_LUA_HASH_LENGTH + 2]);
387 return 0;
388 }
389 }
390 node_log_error(node, "REDIS SCRIPT ERROR: (unknown): %s", reply->str);
391 }
392 else {
393 node_log_error(node, "REDIS REPLY ERROR: %s", reply->str);
394 }
395 return 0;
396 }
397 else {
398 return 1;
399 }
400 }
401
redisEchoCallback(redisAsyncContext * ac,void * r,void * privdata)402 static void redisEchoCallback(redisAsyncContext *ac, void *r, void *privdata) {
403 redisReply *reply = r;
404 redis_node_t *node = NULL;
405 unsigned i;
406 //nchan_channel_t * channel = (nchan_channel_t *)privdata;
407 if(ac) {
408 node = ac->data;
409 if(ac->err) {
410 node_log_error(node, "connection to redis failed - %s", ac->errstr);
411 return;
412 }
413 }
414 else {
415 node_log_error(node, "connection to redis was terminated");
416 return;
417 }
418 if(reply == NULL) {
419 node_log_error(node, "REDIS REPLY is NULL");
420 return;
421 }
422
423 switch(reply->type) {
424 case REDIS_REPLY_STATUS:
425 node_log_error(node, "REDIS_REPLY_STATUS %s", reply->str);
426 break;
427
428 case REDIS_REPLY_ERROR:
429 redisCheckErrorCallback(ac, r, privdata);
430 break;
431
432 case REDIS_REPLY_INTEGER:
433 node_log_error(node, "REDIS_REPLY_INTEGER: %i", reply->integer);
434 break;
435
436 case REDIS_REPLY_NIL:
437 node_log_error(node, "REDIS_REPLY_NIL: nil");
438 break;
439
440 case REDIS_REPLY_STRING:
441 node_log_error(node, "REDIS_REPLY_STRING: %s", reply->str);
442 break;
443
444 case REDIS_REPLY_ARRAY:
445 node_log_error(node, "REDIS_REPLY_ARRAY: %i", reply->elements);
446 for(i=0; i< reply->elements; i++) {
447 redisEchoCallback(ac, reply->element[i], " ");
448 }
449 break;
450 }
451 //redis_subscriber_command(NULL, NULL, "UNSUBSCRIBE {channel:%b}:pubsub", str(&(channel->id)));
452 }
453
454 static ngx_int_t msg_from_redis_get_message_reply(nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event, redisReply *r, uint16_t offset);
455
456 #define SLOW_REDIS_REPLY 100 //ms
457
log_redis_reply(char * name,ngx_msec_t t)458 static ngx_int_t log_redis_reply(char *name, ngx_msec_t t) {
459 ngx_msec_t dt = ngx_current_msec - t;
460 if(dt >= SLOW_REDIS_REPLY) {
461 DBG("redis command %s took %i msec", name, dt);
462 }
463 return NGX_OK;
464 }
465
redisReply_to_ngx_int(redisReply * el,ngx_int_t * integer)466 static ngx_int_t redisReply_to_ngx_int(redisReply *el, ngx_int_t *integer) {
467 if(CHECK_REPLY_INT(el)) {
468 *integer=el->integer;
469 }
470 else if(CHECK_REPLY_STR(el)) {
471 *integer=ngx_atoi((u_char *)el->str, el->len);
472 }
473 else {
474 return NGX_ERROR;
475 }
476 return NGX_OK;
477 }
478
479 typedef struct {
480 ngx_msec_t t;
481 char *name;
482 ngx_str_t channel_id;
483 nchan_msg_id_t *msg_id;
484 ngx_str_t msg_key;
485 } redis_get_message_from_key_data_t;
486
487 static void get_msg_from_msgkey_callback(redisAsyncContext *ac, void *r, void *privdata);
488
get_msg_from_msgkey_send(redis_nodeset_t * ns,void * pd)489 static ngx_int_t get_msg_from_msgkey_send(redis_nodeset_t *ns, void *pd) {
490 redis_get_message_from_key_data_t *d = pd;
491 if(nodeset_ready(ns)) {
492 redis_node_t *node = nodeset_node_find_by_key(ns, &d->msg_key);
493 redis_script(get_message_from_key, node, &get_msg_from_msgkey_callback, d, "1 %b", STR(&d->msg_key));
494 }
495 else {
496 ngx_free(d);
497 }
498 return NGX_OK;
499 }
500
get_msg_from_msgkey_callback(redisAsyncContext * ac,void * r,void * privdata)501 static void get_msg_from_msgkey_callback(redisAsyncContext *ac, void *r, void *privdata) {
502 redis_get_message_from_key_data_t *d = (redis_get_message_from_key_data_t *)privdata;
503 redisReply *reply = r;
504 nchan_msg_t msg;
505 nchan_compressed_msg_t cmsg;
506 ngx_str_t content_type;
507 ngx_str_t eventsource_event;
508 ngx_str_t *chid = &d->channel_id;
509 redis_node_t *node = ac->data;
510
511 node->pending_commands--;
512 nchan_update_stub_status(redis_pending_commands, -1);
513
514 DBG("get_msg_from_msgkey_callback");
515
516 log_redis_reply(d->name, d->t);
517
518 if(!nodeset_node_reply_keyslot_ok(node, reply)) {
519 nodeset_callback_on_ready(node->nodeset, 1000, get_msg_from_msgkey_send, d);
520 return;
521 }
522
523 if(reply) {
524 if(chid == NULL) {
525 ERR("get_msg_from_msgkey channel id is NULL");
526 return;
527 }
528 if(msg_from_redis_get_message_reply(&msg, &cmsg, &content_type, &eventsource_event, reply, 0) != NGX_OK) {
529 ERR("invalid message or message absent after get_msg_from_key");
530 return;
531 }
532 nchan_store_publish_generic(chid, node->nodeset, &msg, 0, NULL);
533 }
534 else {
535 //reply is NULL
536 }
537 ngx_free(d);
538 }
539
cmp_err(cmp_ctx_t * cmp)540 static bool cmp_err(cmp_ctx_t *cmp) {
541 ERR("msgpack parsing error: %s", cmp_strerror(cmp));
542 return false;
543 }
544
cmp_to_str(cmp_ctx_t * cmp,ngx_str_t * str)545 static bool cmp_to_str(cmp_ctx_t *cmp, ngx_str_t *str) {
546 ngx_buf_t *mpbuf =(ngx_buf_t *)cmp->buf;
547 uint32_t sz;
548
549 if(cmp_read_str_size(cmp, &sz)) {
550 fwd_buf_to_str(mpbuf, sz, str);
551 return true;
552 }
553 else {
554 cmp_err(cmp);
555 return false;
556 }
557 }
558
cmp_to_msg(cmp_ctx_t * cmp,nchan_msg_t * msg,nchan_compressed_msg_t * cmsg,ngx_str_t * content_type,ngx_str_t * eventsource_event)559 static bool cmp_to_msg(cmp_ctx_t *cmp, nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event) {
560 ngx_buf_t *mpb = (ngx_buf_t *)cmp->buf;
561 uint32_t sz;
562 uint64_t msgtag;
563 int32_t ttl;
564 int32_t compression;
565 //ttl
566 if(!cmp_read_int(cmp, &ttl)) {
567 return cmp_err(cmp);
568 }
569 assert(ttl >= 0);
570 if(ttl == 0) {
571 ttl++; // less than a second left for this message... give it a second's lease on life
572 }
573 msg->expires = ngx_time() + ttl;
574
575 //msg id
576 if(!cmp_read_uinteger(cmp, (uint64_t *)&msg->id.time)) {
577 return cmp_err(cmp);
578 }
579 if(!cmp_read_uinteger(cmp, &msgtag)) {
580 return cmp_err(cmp);
581 }
582 else {
583 msg->id.tag.fixed[0] = msgtag;
584 msg->id.tagactive = 0;
585 msg->id.tagcount = 1;
586 }
587
588 //msg prev_id
589 if(!cmp_read_uinteger(cmp, (uint64_t *)&msg->prev_id.time)) {
590 return cmp_err(cmp);
591 }
592 if(!cmp_read_uinteger(cmp, &msgtag)) {
593 return cmp_err(cmp);
594 }
595 else {
596 msg->prev_id.tag.fixed[0] = msgtag;
597 msg->prev_id.tagactive = 0;
598 msg->prev_id.tagcount = 1;
599 }
600
601 //message data
602 if(!cmp_read_str_size(cmp, &sz)) {
603 return cmp_err(cmp);
604 }
605 set_buf(&msg->buf, mpb->pos, sz);
606 fwd_buf(mpb, sz);
607 msg->buf.memory = 1;
608 msg->buf.last_buf = 1;
609 msg->buf.last_in_chain = 1;
610
611 //content-type
612 if(!cmp_read_str_size(cmp, &sz)) {
613 return cmp_err(cmp);
614 }
615 fwd_buf_to_str(mpb, sz, content_type);
616 msg->content_type = sz > 0 ? content_type : NULL;
617
618 //eventsource_event
619 if(!cmp_read_str_size(cmp, &sz)) {
620 return cmp_err(cmp);
621 }
622 fwd_buf_to_str(mpb, sz, eventsource_event);
623 msg->eventsource_event = sz > 0 ? eventsource_event : NULL;
624
625 //compression
626 if(!cmp_read_int(cmp, &compression)) {
627 msg->compressed = NULL;
628 }
629 if(compression > 0) {
630 msg->compressed = cmsg;
631 ngx_memzero(&cmsg->buf, sizeof(cmsg->buf));
632 cmsg->compression = compression;
633 }
634 else {
635 msg->compressed = NULL;
636 }
637
638 return true;
639 }
640
get_msg_from_msgkey(ngx_str_t * channel_id,redis_nodeset_t * nodeset,nchan_msg_id_t * msgid,ngx_str_t * msg_redis_hash_key)641 static ngx_int_t get_msg_from_msgkey(ngx_str_t *channel_id, redis_nodeset_t *nodeset, nchan_msg_id_t *msgid, ngx_str_t *msg_redis_hash_key) {
642 rdstore_channel_head_t *head;
643 redis_get_message_from_key_data_t *d;
644 DBG("Get message from msgkey %V", msg_redis_hash_key);
645
646 head = nchan_store_get_chanhead(channel_id, nodeset);
647 if(head->sub_count == 0) {
648 DBG("Nobody wants this message we'll need to grab with an HMGET");
649 return NGX_OK;
650 }
651
652 if((d=ngx_alloc(sizeof(*d) + (u_char)channel_id->len + (u_char)msg_redis_hash_key->len, ngx_cycle->log)) == 0) {
653 ERR("unable to allocate memory for callback data for message hmget");
654 return NGX_ERROR;
655 }
656 d->channel_id.data = (u_char *)&d[1];
657 nchan_strcpy(&d->channel_id, channel_id, 0);
658
659 d->msg_key.data = d->channel_id.data + d->channel_id.len;
660 nchan_strcpy(&d->msg_key, msg_redis_hash_key, 0);
661
662 d->t = ngx_current_msec;
663
664 d->name = "get_message_from_key";
665
666 //d->hcln = put_current_subscribers_in_limbo(head);
667 //assert(d->hcln != 0);
668 get_msg_from_msgkey_send(nodeset, d);
669
670 return NGX_OK;
671 }
672
673 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub);
674
str_match_redis_subscriber_channel(ngx_str_t * pubsub_channel,ngx_str_t * ns)675 static int str_match_redis_subscriber_channel(ngx_str_t *pubsub_channel, ngx_str_t *ns) {
676 ngx_str_t psch = *pubsub_channel;
677 if(pubsub_channel->len != ns->len + redis_subscriber_id_len || psch.len < ns->len) {
678 return 0;
679 }
680 if(ngx_memcmp(psch.data, ns->data, ns->len) != 0) {
681 return 0;
682 }
683 psch.data += ns->len;
684 psch.len -= ns->len;
685
686 return ngx_strmatch(&psch, (char *)redis_subscriber_id);
687 }
688
get_channel_id_from_pubsub_channel(ngx_str_t * pubsub_channel,ngx_str_t * ns,ngx_str_t * str_in)689 static ngx_str_t *get_channel_id_from_pubsub_channel(ngx_str_t *pubsub_channel, ngx_str_t *ns, ngx_str_t *str_in) {
690 if(str_match_redis_subscriber_channel(pubsub_channel, ns)) {
691 return NULL;
692 }
693 str_in->data = pubsub_channel->data + ns->len + 9; //"<namespace>{channel:"
694 str_in->len = pubsub_channel->len - ns->len - 9;
695 str_in->len -= 8;//"}:pubsub"
696 return str_in;
697 }
698
find_chanhead_for_pubsub_callback(ngx_str_t * chid)699 static rdstore_channel_head_t *find_chanhead_for_pubsub_callback(ngx_str_t *chid) {
700 rdstore_channel_head_t *head;
701 CHANNEL_HASH_FIND(chid, head);
702 return head;
703 }
704
redis_subscriber_callback(redisAsyncContext * c,void * r,void * privdata)705 static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privdata) {
706 redisReply *reply = r;
707 redisReply *el = NULL;
708 nchan_msg_t msg;
709 nchan_compressed_msg_t cmsg;
710 ngx_str_t content_type;
711 ngx_str_t eventsource_event;
712 ngx_str_t chid_str;
713 ngx_str_t *chid = NULL;
714 ngx_str_t pubsub_channel;
715 ngx_str_t msg_redis_hash_key = ngx_null_string;
716 ngx_uint_t subscriber_id;
717
718 ngx_buf_t mpbuf;
719 cmp_ctx_t cmp;
720
721 rdstore_channel_head_t *chanhead = NULL;
722 redis_node_t *node = c->data;
723 redis_nodeset_t *nodeset = node->nodeset;
724 ngx_str_t *namespace = nodeset->settings.namespace;
725
726 msg.expires = 0;
727 msg.refcount = 0;
728 msg.parent = NULL;
729 msg.storage = NCHAN_MSG_STACK;
730
731 if(reply == NULL) return;
732
733 if(CHECK_REPLY_ARRAY_MIN_SIZE(reply, 3)
734 && CHECK_REPLY_STR(reply->element[0])
735 && (CHECK_REPLY_STR(reply->element[1]) || CHECK_REPLY_INT(reply->element[1]))) {
736 pubsub_channel.data = (u_char *)reply->element[1]->str;
737 pubsub_channel.len = reply->element[1]->len;
738 chid = get_channel_id_from_pubsub_channel(&pubsub_channel, namespace, &chid_str);
739 }
740 else {
741 ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "no PUBSUB message, something else");
742 redisEchoCallback(c,r,privdata);
743 return;
744 }
745
746 if(CHECK_REPLY_STRVAL(reply->element[0], "message") && CHECK_REPLY_STR(reply->element[2])) {
747
748 //reply->element[1] is the pubsub channel name
749 el = reply->element[2];
750
751 if(CHECK_REPLY_STRVAL(el, "ping") && str_match_redis_subscriber_channel(&pubsub_channel, namespace)) {
752 DBG("got pinged");
753 }
754 else if(CHECK_REPLY_STR(el)) {
755 uint32_t array_sz;
756 unsigned chid_present = 0;
757 ngx_str_t extracted_channel_id;
758 unsigned msgbuf_size_changed = 0;
759 int64_t msgbuf_size = 0;
760 //maybe a message?
761 set_buf(&mpbuf, (u_char *)el->str, el->len);
762 cmp_init(&cmp, &mpbuf, ngx_buf_reader, NULL, ngx_buf_writer);
763 if(cmp_read_array(&cmp, &array_sz)) {
764
765 if(array_sz != 0) {
766 uint32_t sz;
767 ngx_str_t msg_type;
768 cmp_read_str_size(&cmp ,&sz);
769 fwd_buf_to_str(&mpbuf, sz, &msg_type);
770
771 if(ngx_str_chop_if_startswith(&msg_type, "max_msgs+")) {
772 if(cmp_read_integer(&cmp, &msgbuf_size))
773 msgbuf_size_changed = 1;
774 else
775 cmp_err(&cmp);
776 }
777
778 if(ngx_str_chop_if_startswith(&msg_type, "ch+")) {
779 if(cmp_read_str_size(&cmp, &sz)) {
780 fwd_buf_to_str(&mpbuf, sz, &extracted_channel_id);
781 chid = &extracted_channel_id;
782 }
783 else {
784 cmp_err(&cmp);
785 }
786 }
787 //else {
788 // chid already set from the pubsub channel name
789 //}
790
791 if(msgbuf_size_changed && (chanhead = nchan_store_get_chanhead(chid, nodeset)) != NULL) {
792 chanhead->spooler.fn->broadcast_notice(&chanhead->spooler, NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE, (void *)(intptr_t )msgbuf_size);
793 }
794
795 if(!chanhead) {
796 chanhead = find_chanhead_for_pubsub_callback(chid);
797 }
798
799 if(ngx_strmatch(&msg_type, "msg")) {
800 assert(array_sz >= 9 + msgbuf_size_changed + chid_present);
801 if(chanhead && cmp_to_msg(&cmp, &msg, &cmsg, &content_type, &eventsource_event)) {
802 //ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "got msg %V", msgid_to_str(&msg));
803 nchan_store_publish_generic(chid, chanhead ? chanhead->redis.nodeset : nodeset, &msg, 0, NULL);
804 }
805 else {
806 ERR("thought there'd be a channel for msg");
807 }
808 }
809 else if(ngx_strmatch(&msg_type, "msgkey")) {
810 assert(array_sz == 4 + msgbuf_size_changed + chid_present);
811 if(chanhead != NULL) {
812 uint64_t msgtag;
813 nchan_msg_id_t msgid;
814
815 if(!cmp_read_uinteger(&cmp, (uint64_t *)&msgid.time)) {
816 cmp_err(&cmp);
817 return;
818 }
819
820 if(!cmp_read_uinteger(&cmp, &msgtag)) {
821 cmp_err(&cmp);
822 return;
823 }
824 else {
825 msgid.tag.fixed[0] = msgtag;
826 msgid.tagactive = 0;
827 msgid.tagcount = 1;
828 }
829
830 if(cmp_to_str(&cmp, &msg_redis_hash_key)) {
831 get_msg_from_msgkey(chid, chanhead ? chanhead->redis.nodeset : node->nodeset, &msgid, &msg_redis_hash_key);
832 }
833 }
834 else {
835 ERR("thought there'd be a channel id around for msgkey");
836 }
837 }
838 else if(ngx_strmatch(&msg_type, "alert") && array_sz > 1) {
839 ngx_str_t alerttype;
840
841 if(!cmp_to_str(&cmp, &alerttype)) {
842 return;
843 }
844
845 if(ngx_strmatch(&alerttype, "delete channel") && array_sz > 2) {
846 if(cmp_to_str(&cmp, &extracted_channel_id)) {
847 rdstore_channel_head_t *doomed_channel;
848 nchan_store_publish_generic(&extracted_channel_id, nodeset, NULL, NGX_HTTP_GONE, &NCHAN_HTTP_STATUS_410);
849 doomed_channel = nchan_store_get_chanhead(&extracted_channel_id, nodeset);
850 redis_chanhead_gc_add(doomed_channel, 0, "channel deleted");
851 }
852 else {
853 ERR("unexpected \"delete channel\" msgpack message from redis");
854 }
855 }
856 else if(ngx_strmatch(&alerttype, "unsub one") && array_sz > 3) {
857 if(cmp_to_str(&cmp, &extracted_channel_id)) {
858 cmp_to_str(&cmp, &extracted_channel_id);
859 cmp_read_uinteger(&cmp, (uint64_t *)&subscriber_id);
860 //TODO
861 }
862 ERR("unsub one not yet implemented");
863 }
864 else if(ngx_strmatch(&alerttype, "unsub all") && array_sz > 1) {
865 if(cmp_to_str(&cmp, &extracted_channel_id)) {
866 nchan_store_publish_generic(&extracted_channel_id, nodeset, NULL, NGX_HTTP_CONFLICT, &NCHAN_HTTP_STATUS_409);
867 }
868 }
869 else if(ngx_strmatch(&alerttype, "unsub all except")) {
870 if(cmp_to_str(&cmp, &extracted_channel_id)) {
871 cmp_read_uinteger(&cmp, (uint64_t *)&subscriber_id);
872 //TODO
873 }
874 ERR("unsub all except not yet implemented");
875 }
876 else if(ngx_strmatch(&alerttype, "subscriber info")) {
877 uint64_t request_id;
878 cmp_read_uinteger(&cmp, &request_id);
879
880 if((chanhead = nchan_store_get_chanhead(chid, nodeset)) == NULL) {
881 ERR("received invalid subscriber info notice with bad channel name");
882 }
883 else {
884 chanhead->spooler.fn->broadcast_notice(&chanhead->spooler, NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST, (void *)(intptr_t )request_id);
885 }
886 }
887
888 else {
889 ERR("unexpected msgpack alert from redis");
890 }
891 }
892 else {
893 ERR("unexpected msgpack message from redis");
894 }
895 }
896 else {
897 ERR("unexpected msgpack object from redis");
898 }
899 }
900 else {
901 ERR("invalid msgpack message from redis: %s", cmp_strerror(&cmp));
902 }
903 }
904
905 else { //not a string
906 redisEchoCallback(c, el, NULL);
907 }
908 }
909
910 else if(CHECK_REPLY_STRVAL(reply->element[0], "subscribe") && CHECK_REPLY_INT(reply->element[2])) {
911
912 if(chid) {
913 chanhead = find_chanhead_for_pubsub_callback(chid);
914 if(chanhead != NULL) {
915 if(chanhead->pubsub_status != REDIS_PUBSUB_SUBSCRIBING) {
916 ERR("expected previous pubsub_status for channel %p (id: %V) to be REDIS_PUBSUB_SUBSCRIBING (%i), was %i", chanhead, &chanhead->id, REDIS_PUBSUB_SUBSCRIBING, chanhead->pubsub_status);
917 }
918 chanhead->pubsub_status = REDIS_PUBSUB_SUBSCRIBED;
919
920 switch(chanhead->status) {
921 case NOTREADY:
922 chanhead->status = READY;
923 chanhead->spooler.fn->handle_channel_status_change(&chanhead->spooler);
924 //ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "REDIS: PUB/SUB subscribed to %s, chanhead %p now READY.", reply->element[1]->str, chanhead);
925 break;
926 case READY:
927 ERR("REDIS: PUB/SUB already subscribed to %s, chanhead %p (id %V) already READY.", reply->element[1]->str, chanhead, &chanhead->id);
928 break;
929 case INACTIVE:
930 // this is fine, inactive channels can be pubsubbed, they will be garbage collected
931 // later if needed
932 break;
933 default:
934 ERR("REDIS: PUB/SUB really unexpected chanhead status %i", chanhead->status);
935 assert(0);
936 //not sposed to happen
937 }
938 }
939 else {
940 ERR("received SUBSCRIBE acknowledgement for unknown channel %V", chid);
941 }
942 }
943 else {
944 DBG("subscribed to worker channel %s", redis_subscriber_id);
945 }
946
947 DBG("REDIS: PUB/SUB subscribed to %s (%i total)", reply->element[1]->str, reply->element[2]->integer);
948 }
949 else if(CHECK_REPLY_STRVAL(reply->element[0], "unsubscribe") && CHECK_REPLY_INT(reply->element[2])) {
950
951 if(chid) {
952 DBG("received UNSUBSCRIBE acknowledgement for channel %V", chid);
953 }
954 else {
955 DBG("received UNSUBSCRIBE acknowledgement for worker channel %s", redis_subscriber_id);
956 }
957 }
958
959 else {
960 ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "Unexpected PUBSUB message %s", reply->element[0]);
961 redisEchoCallback(c,r,privdata);
962 }
963 }
964
965 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub);
966 static ngx_int_t redis_subscriber_unregister(rdstore_channel_head_t *chanhead, subscriber_t *sub, uint8_t shutting_down);
spooler_add_handler(channel_spooler_t * spl,subscriber_t * sub,void * privdata)967 static void spooler_add_handler(channel_spooler_t *spl, subscriber_t *sub, void *privdata) {
968 rdstore_channel_head_t *head = (rdstore_channel_head_t *)privdata;
969 head->sub_count++;
970 if(sub->type == INTERNAL) {
971 head->internal_sub_count++;
972 }
973 redis_subscriber_register(head, sub);
974 }
975
spooler_dequeue_handler(channel_spooler_t * spl,subscriber_t * sub,void * privdata)976 static void spooler_dequeue_handler(channel_spooler_t *spl, subscriber_t *sub, void *privdata) {
977 //need individual subscriber
978 //TODO
979 rdstore_channel_head_t *head = (rdstore_channel_head_t *)privdata;
980
981 head->sub_count--;
982 if(sub->type == INTERNAL) {
983 head->internal_sub_count--;
984 }
985
986 redis_subscriber_unregister(head, sub, head->shutting_down);
987
988 if(head->sub_count == 0 && head->fetching_message_count == 0) {
989 redis_chanhead_gc_add(head, 0, "sub count == 0 and fetching_message_count == 0 after spooler dequeue");
990 }
991
992 }
993
spooler_use_handler(channel_spooler_t * spl,void * d)994 static void spooler_use_handler(channel_spooler_t *spl, void *d) {
995 //nothing.
996 }
997
spooler_get_message_start_handler(channel_spooler_t * spl,void * pd)998 void spooler_get_message_start_handler(channel_spooler_t *spl, void *pd) {
999 ((rdstore_channel_head_t *)pd)->fetching_message_count++;
1000 }
1001
spooler_get_message_finish_handler(channel_spooler_t * spl,void * pd)1002 void spooler_get_message_finish_handler(channel_spooler_t *spl, void *pd) {
1003 ((rdstore_channel_head_t *)pd)->fetching_message_count--;
1004 assert(((rdstore_channel_head_t *)pd)->fetching_message_count >= 0);
1005 }
1006
start_chanhead_spooler(rdstore_channel_head_t * head)1007 static ngx_int_t start_chanhead_spooler(rdstore_channel_head_t *head) {
1008 static uint8_t channel_buffer_complete = 1;
1009 spooler_fetching_strategy_t spooling_strategy;
1010 static channel_spooler_handlers_t handlers = {
1011 spooler_add_handler,
1012 spooler_dequeue_handler,
1013 NULL,
1014 spooler_use_handler,
1015 spooler_get_message_start_handler,
1016 spooler_get_message_finish_handler
1017 };
1018 nchan_loc_conf_t *lcf = head->redis.nodeset->first_loc_conf; //any loc_conf that refers to this nodeset will work.
1019 //the spooler needs it to pass to get_message calls, which in rdstore's case only cares about the nodeset referenced in the loc_conf
1020 if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE) {
1021 spooling_strategy = NCHAN_SPOOL_PASSTHROUGH;
1022 }
1023 else {
1024 spooling_strategy = NCHAN_SPOOL_FETCH;
1025 }
1026
1027 start_spooler(&head->spooler, &head->id, &head->status, &channel_buffer_complete, &nchan_store_redis, lcf, spooling_strategy, &handlers, head);
1028 return NGX_OK;
1029 }
1030
1031 static void redis_subscriber_register_cb(redisAsyncContext *c, void *vr, void *privdata);
1032
1033 typedef struct {
1034 rdstore_channel_head_t *chanhead;
1035 unsigned generation;
1036 subscriber_t *sub;
1037 } redis_subscriber_register_t;
1038
redis_subscriber_register_send(redis_nodeset_t * nodeset,void * pd)1039 static ngx_int_t redis_subscriber_register_send(redis_nodeset_t *nodeset, void *pd) {
1040 redis_subscriber_register_t *d = pd;
1041 if(nodeset_ready(nodeset)) {
1042 d->chanhead->reserved++;
1043 redis_node_t *node = nodeset_node_find_by_chanhead(d->chanhead);
1044
1045 nchan_redis_script(subscriber_register, node, &redis_subscriber_register_cb, d, &d->chanhead->id,
1046 "- %i %i 1",
1047 REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP,
1048 ngx_time()
1049 );
1050 }
1051 else {
1052 d->sub->fn->release(d->sub, 0);
1053 ngx_free(d);
1054 }
1055 return NGX_OK;
1056 }
1057
1058
redis_subscriber_register(rdstore_channel_head_t * chanhead,subscriber_t * sub)1059 static ngx_int_t redis_subscriber_register(rdstore_channel_head_t *chanhead, subscriber_t *sub) {
1060 redis_subscriber_register_t *sdata=NULL;
1061 if((sdata = ngx_alloc(sizeof(*sdata), ngx_cycle->log)) == NULL) {
1062 ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "No memory for sdata. Part IV, subparagraph 12 of the Cryptic Error Series.");
1063 return NGX_ERROR;
1064 }
1065 sdata->chanhead = chanhead;
1066 sdata->generation = chanhead->generation;
1067 sdata->sub = sub;
1068
1069 sub->fn->reserve(sub);
1070
1071 //input: keys: [], values: [channel_id, subscriber_id, active_ttl]
1072 // 'subscriber_id' can be '-' for new id, or an existing id
1073 // 'active_ttl' is channel ttl with non-zero subscribers. -1 to persist, >0 ttl in sec
1074 //output: subscriber_id, num_current_subscribers, next_keepalive_time
1075 redis_subscriber_register_send(chanhead->redis.nodeset, sdata);
1076
1077 return NGX_OK;
1078 }
1079
redis_subscriber_register_send_retry_wrapper(redis_nodeset_t * nodeset,void * pd)1080 static ngx_int_t redis_subscriber_register_send_retry_wrapper(redis_nodeset_t *nodeset, void *pd) {
1081 redis_subscriber_register_t *d = pd;
1082 d->chanhead->reserved--;
1083 return redis_subscriber_register_send(nodeset, pd);
1084 }
1085
redis_subscriber_register_cb(redisAsyncContext * c,void * vr,void * privdata)1086 static void redis_subscriber_register_cb(redisAsyncContext *c, void *vr, void *privdata) {
1087 redis_subscriber_register_t *sdata= (redis_subscriber_register_t *) privdata;
1088 redisReply *reply = (redisReply *)vr;
1089 redis_node_t *node = c->data;
1090 int keepalive_ttl;
1091
1092 node->pending_commands--;
1093 nchan_update_stub_status(redis_pending_commands, -1);
1094
1095 sdata->chanhead->reserved--;
1096
1097 if(!nodeset_node_reply_keyslot_ok(node, reply)) {
1098 sdata->chanhead->reserved++;
1099 nodeset_callback_on_ready(node->nodeset, 1000, redis_subscriber_register_send_retry_wrapper, sdata);
1100 return;
1101 }
1102
1103 if (!redisReplyOk(c, reply)) {
1104 //TODO: fail less silently, maybe retry subscriber registration?
1105 sdata->sub->fn->release(sdata->sub, 0);
1106 ngx_free(sdata);
1107 return;
1108 }
1109
1110 if( CHECK_REPLY_ARRAY_MIN_SIZE(reply, 4)
1111 && CHECK_REPLY_INT(reply->element[3])
1112 ) {
1113 //notify about channel buffer size if it's present
1114 sdata->sub->fn->notify(sdata->sub, NCHAN_NOTICE_REDIS_CHANNEL_MESSAGE_BUFFER_SIZE_CHANGE, (void *)(intptr_t )reply->element[3]->integer);
1115 }
1116
1117 if(sdata->generation == sdata->chanhead->generation) {
1118 //is the subscriber
1119 //TODO: set subscriber id
1120 //sdata->sub->id = reply->element[1]->integer;
1121 }
1122
1123 sdata->sub->fn->release(sdata->sub, 0);
1124 sdata->sub = NULL; //don't use it anymore, it might have been freed
1125
1126 if ( !CHECK_REPLY_ARRAY_MIN_SIZE(reply, 3) || !CHECK_REPLY_INT(reply->element[1]) || !CHECK_REPLY_INT(reply->element[2])) {
1127 //no good
1128 //TODO: fail less silently, maybe retry subscriber registration?
1129 redisEchoCallback(c,reply,privdata);
1130 ngx_free(sdata);
1131 return;
1132 }
1133
1134 keepalive_ttl = reply->element[2]->integer;
1135 if(keepalive_ttl > 0) {
1136 if(!sdata->chanhead->keepalive_timer.timer_set) {
1137 ngx_add_timer(&sdata->chanhead->keepalive_timer, keepalive_ttl * 1000);
1138 }
1139 }
1140 ngx_free(sdata);
1141 }
1142
1143
1144 typedef struct {
1145 ngx_str_t *channel_id;
1146 time_t channel_timeout;
1147 unsigned allocd:1;
1148 } subscriber_unregister_data_t;
1149
1150 static void redis_subscriber_unregister_cb(redisAsyncContext *c, void *r, void *privdata);
redis_subscriber_unregister_send(redis_nodeset_t * nodeset,void * pd)1151 static ngx_int_t redis_subscriber_unregister_send(redis_nodeset_t *nodeset, void *pd) {
1152 //input: keys: [], values: [namespace, channel_id, subscriber_id, empty_ttl]
1153 // 'subscriber_id' is an existing id
1154 // 'empty_ttl' is channel ttl when without subscribers. 0 to delete immediately, -1 to persist, >0 ttl in sec
1155 //output: subscriber_id, num_current_subscribers
1156 subscriber_unregister_data_t *d = pd;
1157 if(nodeset_ready(nodeset)) {
1158 redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
1159 nchan_redis_script( subscriber_unregister, node, &redis_subscriber_unregister_cb, NULL,
1160 d->channel_id, "%i %i",
1161 0/*TODO: sub->id*/,
1162 d->channel_timeout
1163 );
1164 }
1165 if(d->allocd) {
1166 ngx_free(d);
1167 }
1168 return NGX_OK;
1169 }
1170
redis_subscriber_unregister_cb(redisAsyncContext * c,void * r,void * privdata)1171 static void redis_subscriber_unregister_cb(redisAsyncContext *c, void *r, void *privdata) {
1172 redisReply *reply = r;
1173 redis_node_t *node = c->data;
1174
1175 node->pending_commands--;
1176 nchan_update_stub_status(redis_pending_commands, -1);
1177
1178 if(reply && reply->type == REDIS_REPLY_ERROR) {
1179 ngx_str_t errstr;
1180 ngx_str_t countstr;
1181 ngx_str_t channel_id;
1182 ngx_int_t channel_timeout;
1183
1184 errstr.data = (u_char *)reply->str;
1185 errstr.len = strlen(reply->str);
1186
1187 if(ngx_str_chop_if_startswith(&errstr, "CLUSTER KEYSLOT ERROR. ")) {
1188 nodeset_node_keyslot_changed(node);
1189 nchan_scan_until_chr_on_line(&errstr, &countstr, ' ');
1190 channel_timeout = ngx_atoi(countstr.data, countstr.len);
1191 channel_id = errstr;
1192
1193 subscriber_unregister_data_t *d = ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id.len, ngx_cycle->log);
1194 if(!d) {
1195 ERR("can't allocate add_fakesub_data for CLUSTER KEYSLOT ERROR retry");
1196 return;
1197 }
1198 d->channel_timeout = channel_timeout;
1199 d->channel_id = (ngx_str_t *)&d[1];
1200 d->channel_id->data = (u_char *)&d->channel_id[1];
1201 d->allocd = 1;
1202 nchan_strcpy(d->channel_id, &channel_id, 0);
1203 nodeset_callback_on_ready(node->nodeset, 1000, redis_subscriber_unregister_send, d);
1204 return;
1205 }
1206
1207 }
1208 redisCheckErrorCallback(c, r, privdata);
1209 }
1210
redis_subscriber_unregister(rdstore_channel_head_t * chanhead,subscriber_t * sub,uint8_t shutting_down)1211 static ngx_int_t redis_subscriber_unregister(rdstore_channel_head_t *chanhead, subscriber_t *sub, uint8_t shutting_down) {
1212 nchan_loc_conf_t *cf = sub->cf;;
1213
1214 if(!shutting_down) {
1215 subscriber_unregister_data_t d;
1216 d.channel_id = &chanhead->id;
1217 d.channel_timeout = cf->channel_timeout;
1218 d.allocd = 0;
1219 redis_subscriber_unregister_send(chanhead->redis.nodeset, &d);
1220 }
1221 else {
1222 if(nodeset_ready(chanhead->redis.nodeset)) {
1223 redis_node_t *node = nodeset_node_find_by_chanhead(chanhead);
1224 nchan_redis_sync_script(subscriber_unregister, node, &chanhead->id, "%i %i",
1225 0/*TODO: sub->id*/,
1226 cf->channel_timeout
1227 );
1228 }
1229 }
1230 return NGX_OK;
1231 }
1232
1233
1234 static void redisChannelKeepaliveCallback(redisAsyncContext *c, void *vr, void *privdata);
1235
redisChannelKeepaliveCallback_send(redis_nodeset_t * ns,void * pd)1236 static ngx_int_t redisChannelKeepaliveCallback_send(redis_nodeset_t *ns, void *pd) {
1237 rdstore_channel_head_t *head = pd;
1238 time_t ttl;
1239 //TODO: optimize this
1240 redis_node_t *node = nodeset_node_find_by_channel_id(head->redis.nodeset, &head->id);
1241 if(nodeset_ready(ns)) {
1242 head->reserved++;
1243 ttl = REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_STEP * (1+head->keepalive_times_sent);
1244 if(ttl > REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX) { //1 week at most
1245 ttl = REDIS_CHANNEL_EMPTY_BUT_SUBSCRIBED_TTL_MAX;
1246 }
1247 nchan_redis_script(channel_keepalive, node, &redisChannelKeepaliveCallback, head, &head->id, "%i", ttl);
1248 }
1249 return NGX_OK;
1250 }
1251
redisChannelKeepaliveCallback_retry_wrapper(redis_nodeset_t * ns,void * pd)1252 static ngx_int_t redisChannelKeepaliveCallback_retry_wrapper(redis_nodeset_t *ns, void *pd) {
1253 rdstore_channel_head_t *head = pd;
1254 head->reserved--;
1255 return redisChannelKeepaliveCallback_send(ns, pd);
1256 }
1257
redisChannelKeepaliveCallback(redisAsyncContext * c,void * vr,void * privdata)1258 static void redisChannelKeepaliveCallback(redisAsyncContext *c, void *vr, void *privdata) {
1259 rdstore_channel_head_t *head = (rdstore_channel_head_t *)privdata;
1260 redisReply *reply = (redisReply *)vr;
1261 redis_node_t *node = c->data;
1262
1263 head->reserved--;
1264 node->pending_commands--;
1265 nchan_update_stub_status(redis_pending_commands, -1);
1266
1267 if(!nodeset_node_reply_keyslot_ok(node, reply)) {
1268 head->reserved++;
1269 nodeset_callback_on_ready(node->nodeset, 1000, redisChannelKeepaliveCallback_retry_wrapper, head);
1270 return;
1271 }
1272 else {
1273 head->keepalive_times_sent++;
1274 }
1275
1276 if(redisReplyOk(c, vr)) {
1277 assert(CHECK_REPLY_INT(reply));
1278
1279 //reply->integer == -1 means "let it disappear" (see channel_keepalive.lua)
1280
1281 if(reply->integer != -1 && !head->keepalive_timer.timer_set) {
1282 ngx_add_timer(&head->keepalive_timer, reply->integer * 1000);
1283 }
1284 }
1285
1286 }
1287
redis_channel_keepalive_timer_handler(ngx_event_t * ev)1288 static void redis_channel_keepalive_timer_handler(ngx_event_t *ev) {
1289 rdstore_channel_head_t *head = ev->data;
1290 if(ev->timedout) {
1291 ev->timedout=0;
1292 if(head->pubsub_status != REDIS_PUBSUB_SUBSCRIBED || head->status == NOTREADY) {
1293 //no use trying to keepalive a not-ready (possibly disconnected) chanhead
1294 DBG("Tried sending channel keepalive when channel is not ready");
1295 ngx_add_timer(ev, REDIS_RECONNECT_TIME); //retry after reconnect timeout
1296 }
1297 else {
1298 redisChannelKeepaliveCallback_send(head->redis.nodeset, head);
1299 }
1300 }
1301 }
1302
ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t * ch)1303 ngx_int_t ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t *ch) {
1304 redis_node_t *pubsub_node;
1305 ngx_str_t *namespace;
1306 if( ch->pubsub_status != REDIS_PUBSUB_SUBSCRIBED && ch->pubsub_status != REDIS_PUBSUB_SUBSCRIBING
1307 && ch->redis.nodeset->settings.storage_mode >= REDIS_MODE_DISTRIBUTED
1308 && nodeset_ready(ch->redis.nodeset)
1309 ) {
1310 pubsub_node = nodeset_node_pubsub_find_by_chanhead(ch);
1311 namespace = ch->redis.nodeset->settings.namespace;
1312 DBG("SUBSCRIBING to %V{channel:%V}:pubsub", namespace, &ch->id);
1313 ch->pubsub_status = REDIS_PUBSUB_SUBSCRIBING;
1314 redis_subscriber_command(pubsub_node, redis_subscriber_callback, NULL, "SUBSCRIBE %b{channel:%b}:pubsub", STR(namespace), STR(&ch->id));
1315 }
1316 return NGX_OK;
1317 }
1318
redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t * ch)1319 ngx_int_t redis_chanhead_catch_up_after_reconnect(rdstore_channel_head_t *ch) {
1320 return spooler_catch_up(&ch->spooler);
1321 }
1322
create_chanhead(ngx_str_t * channel_id,redis_nodeset_t * ns)1323 static rdstore_channel_head_t *create_chanhead(ngx_str_t *channel_id, redis_nodeset_t *ns) {
1324 rdstore_channel_head_t *head;
1325
1326 head=ngx_calloc(sizeof(*head) + sizeof(u_char)*(channel_id->len), ngx_cycle->log);
1327 if(head==NULL) {
1328 ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "can't allocate memory for (new) channel subscriber head");
1329 return NULL;
1330 }
1331 head->id.len = channel_id->len;
1332 head->id.data = (u_char *)&head[1];
1333 ngx_memcpy(head->id.data, channel_id->data, channel_id->len);
1334 head->sub_count=0;
1335 head->fetching_message_count=0;
1336 head->redis_subscriber_privdata = NULL;
1337 head->status = NOTREADY;
1338 head->pubsub_status = REDIS_PUBSUB_UNSUBSCRIBED;
1339 head->generation = 0;
1340 head->last_msgid.time=0;
1341 head->last_msgid.tag.fixed[0]=0;
1342 head->last_msgid.tagcount = 1;
1343 head->last_msgid.tagactive = 0;
1344 head->shutting_down = 0;
1345 head->reserved = 0;
1346 head->keepalive_times_sent = 0;
1347
1348 head->gc.in_reaper = 0;
1349 head->gc.time = 0;
1350 head->gc.prev = NULL;
1351 head->gc.next = NULL;
1352
1353 if(head->id.len >= 5 && ngx_strncmp(head->id.data, "meta/", 5) == 0) {
1354 head->meta = 1;
1355 }
1356 else {
1357 head->meta = 0;
1358 }
1359
1360 ngx_memzero(&head->keepalive_timer, sizeof(head->keepalive_timer));
1361 nchan_init_timer(&head->keepalive_timer, redis_channel_keepalive_timer_handler, head);
1362
1363 if(channel_id->len > 2) { // absolutely no multiplexed channels allowed
1364 assert(ngx_strncmp(head->id.data, "m/", 2) != 0);
1365 }
1366
1367 head->redis.nodeset = ns;
1368 head->redis.generation = 0;
1369 head->redis.node.cmd = NULL;
1370 head->redis.node.pubsub = NULL;
1371 ngx_memzero(&head->redis.slist, sizeof(head->redis.slist));
1372
1373 if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP) {
1374 head->status = READY;
1375 }
1376
1377 head->spooler.running=0;
1378 start_chanhead_spooler(head);
1379 if(head->meta) {
1380 head->spooler.publish_events = 0;
1381 }
1382
1383 ensure_chanhead_pubsub_subscribed_if_needed(head);
1384 CHANNEL_HASH_ADD(head);
1385
1386 return head;
1387 }
1388
nchan_store_get_chanhead(ngx_str_t * channel_id,redis_nodeset_t * nodeset)1389 static rdstore_channel_head_t * nchan_store_get_chanhead(ngx_str_t *channel_id, redis_nodeset_t *nodeset) {
1390 rdstore_channel_head_t *head;
1391
1392 CHANNEL_HASH_FIND(channel_id, head); //BUG: this doesn't account for namespacing!!
1393 if(head==NULL) {
1394 head = create_chanhead(channel_id, nodeset);
1395 }
1396 if(head == NULL) {
1397 ERR("can't create chanhead for redis store");
1398 return NULL;
1399 }
1400
1401 if (head->status == INACTIVE) { //recycled chanhead
1402 ensure_chanhead_pubsub_subscribed_if_needed(head);
1403 redis_chanhead_gc_withdraw(head);
1404
1405 if(head->redis.nodeset->settings.storage_mode == REDIS_MODE_BACKUP) {
1406 head->status = READY;
1407 }
1408 else {
1409 head->status = head->pubsub_status == REDIS_PUBSUB_SUBSCRIBED ? READY : NOTREADY;
1410 }
1411 }
1412
1413 if(!head->spooler.running) {
1414 DBG("Spooler for channel %p %V wasn't running. start it.", head, &head->id);
1415 start_chanhead_spooler(head);
1416 }
1417
1418 return head;
1419 }
1420
redis_chanhead_gc_add(rdstore_channel_head_t * head,ngx_int_t expire,const char * reason)1421 ngx_int_t redis_chanhead_gc_add(rdstore_channel_head_t *head, ngx_int_t expire, const char *reason) {
1422 assert(head->sub_count == 0);
1423 nchan_reaper_t *reaper = &head->redis.nodeset->chanhead_reaper;
1424 if(!head->gc.in_reaper) {
1425 assert(head->status != INACTIVE);
1426 head->status = INACTIVE;
1427 head->gc.time = ngx_time() + (expire == 0 ? NCHAN_CHANHEAD_EXPIRE_SEC : expire);
1428 head->gc.in_reaper = 1;
1429
1430 nchan_reaper_add(reaper, head);
1431
1432 DBG("gc_add chanhead %V to %s (%s)", &head->id, reaper->name, reason);
1433 }
1434 else {
1435 ERR("gc_add chanhead %V to %s: already added (%s)", &head->id, reaper->name, reason);
1436 }
1437
1438 return NGX_OK;
1439 }
1440
redis_chanhead_gc_withdraw(rdstore_channel_head_t * chanhead)1441 ngx_int_t redis_chanhead_gc_withdraw(rdstore_channel_head_t *chanhead) {
1442 if(chanhead->gc.in_reaper) {
1443 nchan_reaper_t *reaper = &chanhead->redis.nodeset->chanhead_reaper;
1444 DBG("gc_withdraw chanhead %s from %V", reaper->name, &chanhead->id);
1445 assert(chanhead->status == INACTIVE);
1446
1447 nchan_reaper_withdraw(reaper, chanhead);
1448 chanhead->gc.in_reaper = 0;
1449 }
1450 else {
1451 DBG("gc_withdraw chanhead (%V), but not in gc reaper", &chanhead->id);
1452 }
1453 return NGX_OK;
1454 }
1455
nchan_store_publish_generic(ngx_str_t * channel_id,redis_nodeset_t * nodeset,nchan_msg_t * msg,ngx_int_t status_code,const ngx_str_t * status_line)1456 static ngx_int_t nchan_store_publish_generic(ngx_str_t *channel_id, redis_nodeset_t *nodeset, nchan_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line){
1457 rdstore_channel_head_t *head;
1458 ngx_int_t ret;
1459 //redis_channel_head_cleanup_t *hcln;
1460
1461
1462
1463 head = nchan_store_get_chanhead(channel_id, nodeset);
1464
1465 if(head->sub_count > 0) {
1466 if(msg) {
1467 assert(msg->id.tagcount == 1);
1468 head->last_msgid.time = msg->id.time;
1469 head->last_msgid.tag.fixed[0] = msg->id.tag.fixed[0];
1470 head->last_msgid.tagcount = 1;
1471 head->last_msgid.tagactive = 0;
1472
1473 head->spooler.fn->respond_message(&head->spooler, msg);
1474 }
1475 else {
1476 head->spooler.fn->broadcast_status(&head->spooler, status_code, status_line);
1477 }
1478 ret= NGX_OK;
1479 }
1480 else {
1481 ret= NCHAN_MESSAGE_QUEUED;
1482 }
1483 return ret;
1484 }
1485
1486
redis_array_to_channel(redisReply * r,nchan_channel_t * ch)1487 static ngx_int_t redis_array_to_channel(redisReply *r, nchan_channel_t *ch) {
1488 ngx_str_t msgid;
1489 nchan_msg_id_t zeroid = NCHAN_OLDEST_MSGID;
1490
1491 if ( CHECK_REPLY_ARRAY_MIN_SIZE(r, 5)
1492 && CHECK_REPLY_INT(r->element[0])
1493 && CHECK_REPLY_INT(r->element[1])
1494 && CHECK_REPLY_INT(r->element[2])
1495 && CHECK_REPLY_STR(r->element[3])
1496 && CHECK_REPLY_INT(r->element[4])) {
1497
1498 //channel info
1499 ch->expires = ngx_time() + r->element[0]->integer;
1500 ch->last_seen = r->element[1]->integer;
1501 ch->subscribers = r->element[2]->integer;
1502
1503 msgid.data = (u_char *)r->element[3]->str;
1504 msgid.len = r->element[3]->len;
1505
1506 if(msgid.len == 0) {
1507 ch->last_published_msg_id = zeroid;
1508 }
1509 else if(nchan_parse_compound_msgid(&ch->last_published_msg_id, &msgid, 1) != NGX_OK) {
1510 ERR("failed to parse last-msgid %V from redis", &msgid);
1511 }
1512
1513 ch->messages = r->element[4]->integer;
1514
1515 //no id?..
1516 ch->id.len=0;
1517 ch->id.data=NULL;
1518
1519 //queued messages
1520 if( CHECK_REPLY_ARRAY_MIN_SIZE(r, 6)
1521 && CHECK_REPLY_INT(r->element[5])) {
1522
1523 ch->messages = r->element[5]->integer;
1524 }
1525
1526 return NGX_OK;
1527 }
1528 else if(CHECK_REPLY_NIL(r)) {
1529 return NGX_DECLINED;
1530 }
1531 else {
1532 return NGX_ERROR;
1533 }
1534 }
1535
1536 typedef struct {
1537 ngx_msec_t t;
1538 char *name;
1539 ngx_str_t *channel_id;
1540 callback_pt callback;
1541 void *privdata;
1542 } redis_channel_callback_data_t;
1543
1544 #define CREATE_CALLBACK_DATA(d, nodeset, cf, namestr, channel_id, callback, privdata) \
1545 do { \
1546 if ((d = ngx_alloc(sizeof(*d) + ((nodeset)->cluster.enabled ? (sizeof(*channel_id) + channel_id->len) : 0), ngx_cycle->log)) == NULL) { \
1547 ERR("Can't allocate redis %s channel callback data", namestr); \
1548 return NGX_ERROR; \
1549 } \
1550 d->t = ngx_current_msec; \
1551 d->name = namestr; \
1552 if((nodeset)->cluster.enabled) { \
1553 /* might need to use channel id later to retry the command */ \
1554 d->channel_id = (ngx_str_t *)&d[1]; \
1555 d->channel_id->data = (u_char *)&d->channel_id[1]; \
1556 nchan_strcpy(d->channel_id, channel_id, 0); \
1557 } \
1558 else { \
1559 d->channel_id = channel_id; \
1560 } \
1561 d->callback = callback; \
1562 d->privdata = privdata; \
1563 } while(0)
1564
redisChannelInfoCallback(redisAsyncContext * c,void * r,void * privdata)1565 static void redisChannelInfoCallback(redisAsyncContext *c, void *r, void *privdata) {
1566 redisReply *reply=r;
1567 redis_channel_callback_data_t *d=(redis_channel_callback_data_t *)privdata;
1568 nchan_channel_t channel;
1569 ngx_memzero(&channel, sizeof(channel)); // for ddebugging. this should be removed later.
1570
1571 log_redis_reply(d->name, d->t);
1572
1573 if(d->callback) {
1574 if(reply) {
1575 switch(redis_array_to_channel(reply, &channel)) {
1576 case NGX_OK:
1577 d->callback(NGX_OK, &channel, d->privdata);
1578 break;
1579 case NGX_DECLINED: //not found
1580 d->callback(NGX_OK, NULL, d->privdata);
1581 break;
1582 case NGX_ERROR:
1583 default:
1584 redisEchoCallback(c, r, privdata);
1585 d->callback(NGX_ERROR, NULL, d->privdata);
1586 }
1587 }
1588 else {
1589 d->callback(NGX_ERROR, NULL, d->privdata);
1590 }
1591 }
1592 }
1593
1594 static void redisChannelDeleteCallback(redisAsyncContext *c, void *r, void *privdata);
1595
nchan_store_delete_channel_send(redis_nodeset_t * ns,void * pd)1596 static ngx_int_t nchan_store_delete_channel_send(redis_nodeset_t *ns, void *pd) {
1597 redis_channel_callback_data_t *d = pd;
1598 if(nodeset_ready(ns)) {
1599 redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1600 nchan_redis_script(delete, node, &redisChannelDeleteCallback, d, d->channel_id, "");
1601 return NGX_OK;
1602 }
1603 else {
1604 redisChannelDeleteCallback(NULL, NULL, d);
1605 return NGX_ERROR;
1606 }
1607 }
1608
redisChannelDeleteCallback(redisAsyncContext * ac,void * r,void * privdata)1609 static void redisChannelDeleteCallback(redisAsyncContext *ac, void *r, void *privdata) {
1610 redis_node_t *node;
1611
1612 nchan_update_stub_status(redis_pending_commands, -1);
1613 if(ac) {
1614 node = ac->data;
1615 node->pending_commands--;
1616
1617 if(!nodeset_node_reply_keyslot_ok(node, (redisReply *)r)) {
1618 nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_delete_channel_send, privdata);
1619 return;
1620 }
1621 }
1622 redisChannelInfoCallback(ac, r, privdata);
1623 ngx_free(privdata);
1624 }
1625
nchan_store_delete_channel(ngx_str_t * channel_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1626 static ngx_int_t nchan_store_delete_channel(ngx_str_t *channel_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1627 redis_channel_callback_data_t *d;
1628 redis_nodeset_t *ns = nodeset_find(&cf->redis);
1629 CREATE_CALLBACK_DATA(d, ns, cf, "delete", channel_id, callback, privdata);
1630
1631 return nchan_store_delete_channel_send(ns, d);
1632 }
1633
1634
1635 static void redisChannelFindCallback(redisAsyncContext *c, void *r, void *privdata);
1636
nchan_store_find_channel_send(redis_nodeset_t * ns,void * pd)1637 static ngx_int_t nchan_store_find_channel_send(redis_nodeset_t *ns, void *pd) {
1638 redis_channel_callback_data_t *d = pd;
1639 if(nodeset_ready(ns)) {
1640 redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1641 nchan_redis_script(find_channel, node, &redisChannelFindCallback, d, d->channel_id, "");
1642 }
1643 else {
1644 redisChannelFindCallback(NULL, NULL, d);
1645 }
1646 return NGX_OK;
1647 }
1648
redisChannelFindCallback(redisAsyncContext * ac,void * r,void * privdata)1649 static void redisChannelFindCallback(redisAsyncContext *ac, void *r, void *privdata) {
1650 redis_node_t *node = NULL;
1651
1652 if(ac) {
1653 node = ac->data;
1654 node->pending_commands--;
1655 nchan_update_stub_status(redis_pending_commands, -1);
1656
1657 if(!nodeset_node_reply_keyslot_ok(node, (redisReply *)r)) {
1658 nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_find_channel_send, privdata);
1659 return;
1660 }
1661 }
1662
1663 redisChannelInfoCallback(ac, r, privdata);
1664 ngx_free(privdata);
1665 }
1666
nchan_store_find_channel(ngx_str_t * channel_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1667 static ngx_int_t nchan_store_find_channel(ngx_str_t *channel_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1668 redis_channel_callback_data_t *d;
1669 redis_nodeset_t *ns = nodeset_find(&cf->redis);
1670 CREATE_CALLBACK_DATA(d, ns, cf, "find_channel", channel_id, callback, privdata);
1671
1672 nchan_store_find_channel_send(ns, d);
1673
1674 return NGX_OK;
1675 }
1676
msg_from_redis_get_message_reply(nchan_msg_t * msg,nchan_compressed_msg_t * cmsg,ngx_str_t * content_type,ngx_str_t * eventsource_event,redisReply * r,uint16_t offset)1677 static ngx_int_t msg_from_redis_get_message_reply(nchan_msg_t *msg, nchan_compressed_msg_t *cmsg, ngx_str_t *content_type, ngx_str_t *eventsource_event, redisReply *r, uint16_t offset) {
1678
1679 redisReply **els = r->element;
1680 size_t content_type_len = 0, es_event_len = 0;
1681 ngx_int_t time_int = 0, ttl;
1682 ngx_int_t compression;
1683
1684 if(CHECK_REPLY_ARRAY_MIN_SIZE(r, offset + 8)
1685 && CHECK_REPLY_INT(els[offset]) //msg TTL
1686 && CHECK_REPLY_INT_OR_STR(els[offset+1]) //id - time
1687 && CHECK_REPLY_INT_OR_STR(els[offset+2]) //id - tag
1688 && CHECK_REPLY_INT_OR_STR(els[offset+3]) //prev_id - time
1689 && CHECK_REPLY_INT_OR_STR(els[offset+4]) //prev_id - tag
1690 && CHECK_REPLY_STR(els[offset+5]) //message
1691 && CHECK_REPLY_STR(els[offset+6]) //content-type
1692 && CHECK_REPLY_STR(els[offset+7]) //eventsource event
1693 ) {
1694
1695 content_type_len=els[offset+6]->len;
1696 es_event_len = els[offset+7]->len;
1697
1698 ngx_memzero(msg, sizeof(*msg));
1699
1700 msg->buf.start = msg->buf.pos = (u_char *)els[offset+5]->str;
1701 msg->buf.end = msg->buf.last = msg->buf.start + els[offset+5]->len;
1702 msg->buf.memory = 1;
1703 msg->buf.last_buf = 1;
1704 msg->buf.last_in_chain = 1;
1705
1706 if(redisReply_to_ngx_int(els[offset], &ttl) != NGX_OK) {
1707 ERR("invalid ttl integer value in msg response from redis");
1708 return NGX_ERROR;
1709 }
1710 assert(ttl >= 0);
1711 if(ttl == 0)
1712 ttl++; // less than a second left for this message... give it a second's lease on life
1713
1714 msg->expires = ngx_time() + ttl;
1715
1716 msg->compressed = NULL;
1717 if(r->elements >= (uint16_t )(offset + 8)) {
1718 if(!CHECK_REPLY_INT_OR_STR(els[offset+8]) || redisReply_to_ngx_int(els[offset+8], &compression) != NGX_OK) {
1719 ERR("invalid compression type integer value in msg response from redis");
1720 return NGX_ERROR;
1721 }
1722 if((nchan_msg_compression_type_t )compression != NCHAN_MSG_COMPRESSION_INVALID && (nchan_msg_compression_type_t )compression != NCHAN_MSG_NO_COMPRESSION) {
1723 msg->compressed = cmsg;
1724 ngx_memzero(&cmsg->buf, sizeof(cmsg->buf));
1725 cmsg->compression = (nchan_msg_compression_type_t )compression;
1726 }
1727 }
1728
1729 if(content_type_len > 0) {
1730 msg->content_type = content_type;
1731 msg->content_type->len=content_type_len;
1732 msg->content_type->data=(u_char *)els[offset+6]->str;
1733 }
1734 else {
1735 msg->content_type = NULL;
1736 }
1737
1738 if(es_event_len > 0) {
1739 msg->eventsource_event = eventsource_event;
1740 msg->eventsource_event->len=es_event_len;
1741 msg->eventsource_event->data=(u_char *)els[offset+7]->str;
1742 }
1743 else {
1744 msg->eventsource_event = NULL;
1745 }
1746
1747 if(redisReply_to_ngx_int(els[offset+1], &time_int) == NGX_OK) {
1748 msg->id.time = time_int;
1749 }
1750 else {
1751 msg->id.time = 0;
1752 ERR("invalid msg time from redis");
1753 }
1754
1755 redisReply_to_ngx_int(els[offset+2], (ngx_int_t *)&msg->id.tag.fixed[0]); // tag is a uint, meh.
1756 msg->id.tagcount = 1;
1757 msg->id.tagactive = 0;
1758
1759 redisReply_to_ngx_int(els[offset+3], &time_int);
1760 msg->prev_id.time = time_int;
1761 redisReply_to_ngx_int(els[offset+4], (ngx_int_t *)&msg->prev_id.tag.fixed[0]);
1762 msg->prev_id.tagcount = 1;
1763 msg->prev_id.tagactive = 0;
1764
1765 return NGX_OK;
1766 }
1767 else {
1768 ERR("invalid message redis reply");
1769 return NGX_ERROR;
1770 }
1771 }
1772
1773 typedef struct {
1774 ngx_msec_t t;
1775 char *name;
1776 ngx_str_t *channel_id;
1777 nchan_msg_tiny_id_t msg_id;
1778 callback_pt callback;
1779 void *privdata;
1780 } redis_get_message_data_t;
1781
1782 static void redis_get_message_callback(redisAsyncContext *c, void *r, void *privdata);
1783
nchan_store_async_get_message_send(redis_nodeset_t * ns,void * pd)1784 static ngx_int_t nchan_store_async_get_message_send(redis_nodeset_t *ns, void *pd) {
1785 redis_get_message_data_t *d = pd;
1786 //input: keys: [], values: [namespace, channel_id, msg_time, msg_tag, no_msgid_order, create_channel_ttl]
1787 //output: result_code, msg_ttl, msg_time, msg_tag, prev_msg_time, prev_msg_tag, message, content_type, eventsource_event, channel_subscriber_count
1788 if(nodeset_ready(ns)) {
1789 redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
1790 nchan_redis_script(get_message, node, &redis_get_message_callback, d, d->channel_id, "%i %i FILO 0",
1791 d->msg_id.time,
1792 d->msg_id.tag
1793 );
1794 }
1795 else {
1796 //TODO: pass on a get_msg error status maybe?
1797 ngx_free(d);
1798 }
1799 return NGX_OK;
1800 }
1801
redis_get_message_callback(redisAsyncContext * ac,void * r,void * privdata)1802 static void redis_get_message_callback(redisAsyncContext *ac, void *r, void *privdata) {
1803 redisReply *reply= r;
1804 redis_get_message_data_t *d= (redis_get_message_data_t *)privdata;
1805 nchan_msg_t msg;
1806 nchan_compressed_msg_t cmsg;
1807 ngx_str_t content_type;
1808 ngx_str_t eventsource_event;
1809 redis_node_t *node;
1810
1811 if(d == NULL) {
1812 ERR("redis_get_mesage_callback has NULL userdata");
1813 return;
1814 }
1815
1816 if(ac) {
1817 node = ac->data;
1818
1819 node->pending_commands--;
1820 nchan_update_stub_status(redis_pending_commands, -1);
1821
1822 if(!nodeset_ready(node->nodeset) || !nodeset_node_reply_keyslot_ok(node, reply)) {
1823 nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_async_get_message_send, privdata);
1824 return;
1825 }
1826
1827 log_redis_reply(d->name, d->t);
1828
1829 //output: result_code, msg_ttl, msg_time, msg_tag, prev_msg_time, prev_msg_tag, message, content_type, eventsource_event, compression_type, channel_subscriber_count
1830 // result_code can be: 200 - ok, 403 - channel not found, 404 - not found, 410 - gone, 418 - not yet available
1831
1832 if (!redisReplyOk(ac, r) || !CHECK_REPLY_ARRAY_MIN_SIZE(reply, 1) || !CHECK_REPLY_INT(reply->element[0]) ) {
1833 //no good
1834 ngx_free(d);
1835 return;
1836 }
1837
1838 switch(reply->element[0]->integer) {
1839 case 200: //ok
1840 if(msg_from_redis_get_message_reply(&msg, &cmsg, &content_type, &eventsource_event, reply, 1) == NGX_OK) {
1841 d->callback(MSG_FOUND, &msg, d->privdata);
1842 }
1843 break;
1844 case 403: //channel not found
1845 case 404: //not found
1846 d->callback(MSG_NOTFOUND, NULL, d->privdata);
1847 break;
1848 case 410: //gone
1849 d->callback(MSG_EXPIRED, NULL, d->privdata);
1850 break;
1851 case 418: //not yet available
1852 d->callback(MSG_EXPECTED, NULL, d->privdata);
1853 break;
1854 }
1855 }
1856 else {
1857 ERR("redisAsyncContext NULL for redis_get_message_callback");
1858 }
1859
1860 ngx_free(d);
1861 }
1862
nchan_store_async_get_message(ngx_str_t * channel_id,nchan_msg_id_t * msg_id,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)1863 static ngx_int_t nchan_store_async_get_message(ngx_str_t *channel_id, nchan_msg_id_t *msg_id, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
1864 redis_get_message_data_t *d;
1865 redis_nodeset_t *ns = nodeset_find(&cf->redis);
1866 if(callback==NULL) {
1867 ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "no callback given for async get_message. someone's using the API wrong!");
1868 return NGX_ERROR;
1869 }
1870
1871 assert(msg_id->tagcount == 1);
1872
1873 CREATE_CALLBACK_DATA(d, ns, cf, "get_message", channel_id, callback, privdata);
1874 d->msg_id.time = msg_id->time;
1875 d->msg_id.tag = msg_id->tag.fixed[0];
1876
1877 nchan_store_async_get_message_send(ns, d);
1878 return NGX_OK; //async only now!
1879 }
1880
1881
1882 typedef struct nchan_redis_conf_ll_s nchan_redis_conf_ll_t;
1883 struct nchan_redis_conf_ll_s {
1884 nchan_loc_conf_t *lcf;
1885 nchan_redis_conf_ll_t *next;
1886 };
1887
1888 nchan_redis_conf_ll_t *redis_conf_head;
1889
nchan_store_redis_add_active_loc_conf(ngx_conf_t * cf,nchan_loc_conf_t * loc_conf)1890 ngx_int_t nchan_store_redis_add_active_loc_conf(ngx_conf_t *cf, nchan_loc_conf_t *loc_conf) {
1891 nchan_redis_conf_ll_t *rcf_ll = ngx_palloc(cf->pool, sizeof(*rcf_ll));
1892 rcf_ll->lcf = loc_conf;
1893 rcf_ll->next = redis_conf_head;
1894 redis_conf_head = rcf_ll;
1895 return NGX_OK;
1896 }
1897
nchan_store_redis_remove_active_loc_conf(ngx_conf_t * cf,nchan_loc_conf_t * loc_conf)1898 ngx_int_t nchan_store_redis_remove_active_loc_conf(ngx_conf_t *cf, nchan_loc_conf_t *loc_conf) {
1899 nchan_redis_conf_ll_t *cur, *prev;
1900
1901 for(cur = redis_conf_head, prev = NULL; cur != NULL; prev = cur, cur = cur->next) {
1902 if(cur->lcf == loc_conf) { //found it
1903 if(prev == NULL) {
1904 redis_conf_head = cur->next;
1905 }
1906 else {
1907 prev->next = cur->next;
1908 }
1909 //don't need to ngx_pfree
1910 return NGX_OK;
1911 }
1912 }
1913 return NGX_OK;
1914 }
1915
1916 //initialization
nchan_store_init_module(ngx_cycle_t * cycle)1917 static ngx_int_t nchan_store_init_module(ngx_cycle_t *cycle) {
1918 ngx_core_conf_t *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
1919 nchan_worker_processes = ccf->worker_processes;
1920 return NGX_OK;
1921 }
1922
1923
rdstore_initialize_chanhead_reaper(nchan_reaper_t * reaper,char * name)1924 ngx_int_t rdstore_initialize_chanhead_reaper(nchan_reaper_t *reaper, char *name) {
1925
1926 nchan_reaper_start(reaper,
1927 name,
1928 offsetof(rdstore_channel_head_t, gc.prev),
1929 offsetof(rdstore_channel_head_t, gc.next),
1930 (ngx_int_t (*)(void *, uint8_t)) nchan_redis_chanhead_ready_to_reap,
1931 (void (*)(void *)) redis_store_reap_chanhead,
1932 4
1933 );
1934
1935 return NGX_OK;
1936 }
1937
nchan_store_init_redis_loc_conf_postconfig(nchan_loc_conf_t * lcf)1938 static ngx_int_t nchan_store_init_redis_loc_conf_postconfig(nchan_loc_conf_t *lcf) {
1939 nchan_redis_conf_t *rcf = &lcf->redis;
1940
1941 assert(rcf->enabled);
1942
1943 //server-scope loc_conf may have some undefined values (because it was never merged with a prev)
1944 //thus we must reduntantly check for unser values
1945 if(rcf->ping_interval == NGX_CONF_UNSET) {
1946 rcf->ping_interval = NCHAN_REDIS_DEFAULT_PING_INTERVAL_TIME;
1947 }
1948 if(rcf->storage_mode == REDIS_MODE_CONF_UNSET) {
1949 rcf->storage_mode = REDIS_MODE_DISTRIBUTED;
1950 }
1951 if(rcf->nostore_fastpublish == NGX_CONF_UNSET) {
1952 rcf->nostore_fastpublish = 0;
1953 }
1954
1955 return NGX_OK;
1956 }
1957
nchan_store_init_postconfig(ngx_conf_t * cf)1958 static ngx_int_t nchan_store_init_postconfig(ngx_conf_t *cf) {
1959 nchan_loc_conf_t *lcf;
1960 nchan_redis_conf_ll_t *cur;
1961 nchan_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_nchan_module);
1962 redis_nodeset_t *nodeset;
1963
1964 if(mcf->redis_publish_message_msgkey_size == NGX_CONF_UNSET_SIZE) {
1965 mcf->redis_publish_message_msgkey_size = NCHAN_REDIS_DEFAULT_PUBSUB_MESSAGE_MSGKEY_SIZE;
1966 }
1967 redis_publish_message_msgkey_size = mcf->redis_publish_message_msgkey_size;
1968
1969 for(cur = redis_conf_head; cur != NULL; cur = cur->next) {
1970 lcf = cur->lcf;
1971 nchan_store_init_redis_loc_conf_postconfig(lcf);
1972
1973 if((nodeset = nodeset_find(&lcf->redis)) == NULL) {
1974 nodeset = nodeset_create(lcf);
1975 rdstore_initialize_chanhead_reaper(&nodeset->chanhead_reaper, "Redis channel reaper");
1976 }
1977 if(!nodeset) {
1978 ERR("Unable to create Redis nodeset.");
1979 continue;
1980 }
1981 }
1982
1983 return NGX_OK;
1984 }
1985
nchan_store_create_main_conf(ngx_conf_t * cf,nchan_main_conf_t * mcf)1986 static void nchan_store_create_main_conf(ngx_conf_t *cf, nchan_main_conf_t *mcf) {
1987 mcf->redis_publish_message_msgkey_size=NGX_CONF_UNSET_SIZE;
1988
1989 //reset redis_conf_head for reloads
1990 redis_conf_head = NULL;
1991 nodeset_destroy_all(); //reset all nodesets before loading config
1992 }
1993
redis_store_prepare_to_exit_worker()1994 void redis_store_prepare_to_exit_worker() {
1995 rdstore_channel_head_t *cur, *tmp;
1996 HASH_ITER(hh, chanhead_hash, cur, tmp) {
1997 cur->shutting_down = 1;
1998 }
1999 }
2000
nodeset_exiter_stage1(redis_nodeset_t * ns,void * pd)2001 void nodeset_exiter_stage1(redis_nodeset_t *ns, void *pd) {
2002 nodeset_abort_on_ready_callbacks(ns);
2003 }
nodeset_exiter_stage2(redis_nodeset_t * ns,void * pd)2004 void nodeset_exiter_stage2(redis_nodeset_t *ns, void *pd) {
2005 unsigned *chanheads = pd;
2006 *chanheads += ns->chanhead_reaper.count;
2007 nchan_reaper_stop(&ns->chanhead_reaper);
2008 }
2009
nodeset_exiter_stage3(redis_nodeset_t * ns,void * pd)2010 void nodeset_exiter_stage3(redis_nodeset_t *ns, void *pd) {
2011 nodeset_disconnect(ns);
2012 }
2013
nchan_store_exit_worker(ngx_cycle_t * cycle)2014 static void nchan_store_exit_worker(ngx_cycle_t *cycle) {
2015 rdstore_channel_head_t *cur, *tmp;
2016 unsigned chanheads = 0;
2017 DBG("redis exit worker");
2018
2019 //old
2020 //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage1, NULL);
2021
2022 nodeset_each(nodeset_exiter_stage1, NULL);
2023
2024 HASH_ITER(hh, chanhead_hash, cur, tmp) {
2025 cur->shutting_down = 1;
2026 if(!cur->gc.in_reaper) {
2027 cur->spooler.fn->broadcast_status(&cur->spooler, NGX_HTTP_GONE, &NCHAN_HTTP_STATUS_410);
2028 redis_chanhead_gc_add(cur, 0, "exit worker");
2029 }
2030 }
2031
2032 nodeset_each(nodeset_exiter_stage2, &chanheads);
2033
2034 //OLD
2035 //rbtree_walk(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage2, &chanheads);
2036 nodeset_destroy_all();
2037
2038 //OLD
2039 //rbtree_empty(&redis_data_tree, (rbtree_walk_callback_pt )redis_data_tree_exiter_stage3, NULL);
2040
2041 nchan_exit_notice_about_remaining_things("redis channel", "", chanheads);
2042 }
2043
nchan_store_exit_master(ngx_cycle_t * cycle)2044 static void nchan_store_exit_master(ngx_cycle_t *cycle) {
2045 nodeset_destroy_all();
2046 }
2047
2048 typedef struct {
2049 ngx_str_t *channel_id;
2050 subscriber_t *sub;
2051 unsigned allocd:1;
2052 } redis_subscribe_data_t;
2053
2054 static ngx_int_t nchan_store_subscribe_continued(redis_subscribe_data_t *d);
2055
subscribe_existing_channel_callback(ngx_int_t status,void * ch,void * d)2056 static ngx_int_t subscribe_existing_channel_callback(ngx_int_t status, void *ch, void *d) {
2057 nchan_channel_t *channel = (nchan_channel_t *)ch;
2058 redis_subscribe_data_t *data = (redis_subscribe_data_t *)d;
2059
2060 if(channel == NULL) {
2061 data->sub->fn->respond_status(data->sub, NGX_HTTP_FORBIDDEN, NULL, NULL);
2062 data->sub->fn->release(data->sub, 0);
2063 }
2064 else {
2065 nchan_store_subscribe_continued(d);
2066 }
2067 assert(data->allocd);
2068 ngx_free(data);
2069
2070 return NGX_OK;
2071 }
2072
nchan_store_subscribe(ngx_str_t * channel_id,subscriber_t * sub)2073 static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) {
2074 redis_subscribe_data_t d_data;
2075 redis_subscribe_data_t *d = NULL;
2076
2077 assert(sub->last_msgid.tagcount == 1);
2078
2079 if(!sub->cf->subscribe_only_existing_channel) {
2080 d_data.allocd = 0;
2081 d_data.channel_id = channel_id;
2082 d_data.sub = sub;
2083 nchan_store_subscribe_continued(&d_data);
2084 }
2085 else {
2086 if((d=ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id->len, ngx_cycle->log))==NULL) {
2087 ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "can't allocate redis get_message callback data");
2088 return NGX_ERROR;
2089 }
2090 d->allocd = 1;
2091 d->channel_id=(ngx_str_t *)&d[1];
2092 d->channel_id->len = channel_id->len;
2093 d->channel_id->data = (u_char *)&(d->channel_id)[1];
2094 ngx_memcpy(d->channel_id->data, channel_id->data, channel_id->len);
2095 d->sub = sub;
2096 nchan_store_find_channel(channel_id, sub->cf, subscribe_existing_channel_callback, d);
2097 }
2098
2099 return NGX_OK;
2100 }
2101
nchan_store_subscribe_continued(redis_subscribe_data_t * d)2102 static ngx_int_t nchan_store_subscribe_continued(redis_subscribe_data_t *d) {
2103 //nchan_loc_conf_t *cf = d->sub->cf;
2104 rdstore_channel_head_t *ch;
2105 //ngx_int_t create_channel_ttl = cf->subscribe_only_existing_channel==1 ? 0 : cf->channel_timeout;
2106 assert(d->sub->cf->redis.enabled);
2107 redis_nodeset_t *nodeset = nodeset_find(&d->sub->cf->redis);
2108 ngx_int_t rc;
2109
2110 ch = nchan_store_get_chanhead(d->channel_id, nodeset);
2111
2112 assert(ch != NULL);
2113
2114 rc = ch->spooler.fn->add(&ch->spooler, d->sub);
2115 //redisAsyncCommand(rds_ctx(), &redis_getmessage_callback, (void *)d, "EVALSHA %s 0 %b %i %i %s %i", redis_lua_scripts.get_message.hash, STR(d->channel_id), d->msg_id->time, d->msg_id->tag[0], "FILO", create_channel_ttl);
2116 return rc;
2117 }
2118
2119 typedef struct {
2120 ngx_msec_t t;
2121 char *name;
2122 ngx_str_t *channel_id;
2123 time_t msg_time;
2124 nchan_msg_t *msg;
2125 unsigned shared_msg:1;
2126 unsigned cluster_move_error:1;
2127 time_t message_timeout;
2128 ngx_int_t max_messages;
2129 nchan_msg_compression_type_t compression;
2130 ngx_int_t msglen;
2131 callback_pt callback;
2132 void *privdata;
2133 uint8_t retry;
2134 } redis_publish_callback_data_t;
2135
2136 static void redisPublishCallback(redisAsyncContext *, void *, void *);
2137 static void redisPublishNostoreCallback(redisAsyncContext *, void *, void *);
2138 static void redisPublishNostoreQueuedCheckCallback(redisAsyncContext *, void *, void *);
2139 static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd);
2140
redis_publish_message_nodeset_maybe_retry(redis_nodeset_t * ns,redis_publish_callback_data_t * d)2141 static ngx_int_t redis_publish_message_nodeset_maybe_retry(redis_nodeset_t *ns, redis_publish_callback_data_t *d) {
2142 //retry maybe
2143 if(d->retry < REDIS_NODESET_NOT_READY_MAX_RETRIES) {
2144 d->retry++;
2145 nodeset_callback_on_ready(ns, 1000, redis_publish_message_send, d);
2146 }
2147 else {
2148 d->callback(NGX_HTTP_SERVICE_UNAVAILABLE, NULL, d->privdata);
2149 ngx_free(d);
2150 }
2151 return NGX_DECLINED;
2152 }
2153
redis_publish_message_send(redis_nodeset_t * nodeset,void * pd)2154 static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd) {
2155 redis_publish_callback_data_t *d = pd;
2156 ngx_int_t mmapped = 0;
2157 ngx_buf_t *buf;
2158 ngx_str_t msgstr;
2159 nchan_msg_t *msg = d->msg;
2160 const ngx_str_t empty=ngx_string("");
2161
2162 if(!nodeset_ready(nodeset)) {
2163 return redis_publish_message_nodeset_maybe_retry(nodeset, d);
2164 }
2165
2166 redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
2167
2168 buf = &msg->buf;
2169 if(ngx_buf_in_memory(buf)) {
2170 msgstr.data = buf->pos;
2171 msgstr.len = buf->last - msgstr.data;
2172 }
2173 else { //in a file
2174 ngx_fd_t fd = buf->file->fd == NGX_INVALID_FILE ? nchan_fdcache_get(&buf->file->name) : buf->file->fd;
2175
2176 msgstr.len = buf->file_last - buf->file_pos;
2177 msgstr.data = mmap(NULL, msgstr.len, PROT_READ, MAP_SHARED, fd, 0);
2178 if (msgstr.data != MAP_FAILED) {
2179 mmapped = 1;
2180 }
2181 else {
2182 msgstr.data = NULL;
2183 msgstr.len = 0;
2184 ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, ngx_errno, "Redis store: Couldn't mmap file %V", &buf->file->name);
2185 }
2186 }
2187 d->msglen = msgstr.len;
2188
2189 if(nodeset->settings.storage_mode == REDIS_MODE_DISTRIBUTED_NOSTORE) {
2190 //hand-roll the msgpacked message
2191 /*
2192 9A A3"msg" CE<ttl> CE<time> 00 00 00 DB<len><str> D9<len><str> D9<len><str> 0X
2193 | | | | | | | | | | |
2194 | | | | | | | | | | fixint0/1 compression
2195 | | | | | | | | | bin8 <uint8 len> eventsource-event
2196 | | | | | | | | bin8 <uint8 len> content type
2197 | | | | | | | bin32 <uint32 BE len> msg data
2198 | | | | fixint=0 tag=0, prev_time=0, prev_tag=0
2199 | | | uint32 (BE) message time
2200 | | uint32 (BE) message ttl
2201 | fixstr[3]
2202 fixarray[10]
2203 */
2204
2205 uint32_t ttl, time;
2206 uint32_t msglen;
2207 uint8_t content_type_len, eventsource_event_len, compression;
2208 char zero='\0';
2209 int fastpublish = nodeset->settings.nostore_fastpublish;
2210 void (*publish_callback)(redisAsyncContext *, void *, void *) = NULL;
2211
2212
2213 ttl = htonl(d->message_timeout);
2214 time = htonl(msg->id.time);
2215 msglen = htonl(d->msglen);
2216 content_type_len = msg->content_type ? (msg->content_type->len > 255 ? 255 : msg->content_type->len) : 0;
2217 eventsource_event_len = msg->eventsource_event ? (msg->eventsource_event->len > 255 ? 255 : msg->eventsource_event->len) : 0;
2218 compression = d->compression;
2219
2220 if(!fastpublish) {
2221 redis_command(node, NULL, d, "MULTI");
2222 publish_callback = redisPublishNostoreQueuedCheckCallback;
2223 }
2224 else {
2225 publish_callback = redisPublishNostoreCallback;
2226 }
2227 redis_command(node, publish_callback, d,
2228 "PUBLISH %b{channel:%b}:pubsub "
2229 "\x9A\xA3msg\xCE%b\xCE%b%b%b%b\xDB%b%b\xD9%b%b\xD9%b%b%b",
2230
2231 STR(nodeset->settings.namespace),
2232 STR(d->channel_id),
2233
2234 (char *)&ttl, (size_t )4,
2235 (char *)&time, (size_t )4,
2236 (char *)&zero, (size_t )1,
2237 (char *)&zero, (size_t )1,
2238 (char *)&zero, (size_t )1,
2239 (char *)&msglen, (size_t )4,
2240 STR(&msgstr),
2241 (char *)&content_type_len, (size_t )1,
2242 STR((msg->content_type ? msg->content_type : &empty)),
2243 (char *)&eventsource_event_len, (size_t )1,
2244 STR((msg->eventsource_event ? msg->eventsource_event : &empty)),
2245 (char *)&compression, (size_t )1
2246 );
2247 if(!fastpublish) {
2248 redis_command(node, publish_callback, d, "HMGET %b{channel:%b} last_seen_fake_subscriber fake_subscribers",
2249 STR(nodeset->settings.namespace),
2250 STR(d->channel_id)
2251 );
2252 redis_command(node, &redisPublishNostoreCallback, d, "EXEC");
2253 }
2254
2255 }
2256 else {
2257 //input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff]
2258 //output: message_time, message_tag, channel_hash {ttl, time_last_seen, subscribers, messages}
2259 nchan_redis_script(publish, node, &redisPublishCallback, d, d->channel_id,
2260 "%i %b %b %b %i %i %i %i %i",
2261 msg->id.time,
2262 STR(&msgstr),
2263 STR((msg->content_type ? msg->content_type : &empty)),
2264 STR((msg->eventsource_event ? msg->eventsource_event : &empty)),
2265 d->compression,
2266 d->message_timeout,
2267 d->max_messages,
2268 redis_publish_message_msgkey_size,
2269 nodeset->settings.optimize_target
2270 );
2271 }
2272 if(mmapped && munmap(msgstr.data, msgstr.len) == -1) {
2273 ERR("munmap was a problem");
2274 return NGX_ERROR;
2275 }
2276 return NGX_OK;
2277 }
2278
nchan_store_publish_message(ngx_str_t * channel_id,nchan_msg_t * msg,nchan_loc_conf_t * cf,callback_pt callback,void * privdata)2279 static ngx_int_t nchan_store_publish_message(ngx_str_t *channel_id, nchan_msg_t *msg, nchan_loc_conf_t *cf, callback_pt callback, void *privdata) {
2280 redis_publish_callback_data_t *d=NULL;
2281 redis_nodeset_t *ns = nodeset_find(&cf->redis);
2282 assert(callback != NULL);
2283
2284 CREATE_CALLBACK_DATA(d, ns, cf, "publish_message", channel_id, callback, privdata);
2285
2286 d->msg_time=msg->id.time;
2287 if(d->msg_time == 0) {
2288 d->msg_time = ngx_time();
2289 }
2290 d->msg = msg;
2291 d->shared_msg = msg->storage == NCHAN_MSG_SHARED;
2292 d->message_timeout = nchan_loc_conf_message_timeout(cf);
2293 d->max_messages = nchan_loc_conf_max_messages(cf);
2294 d->compression = cf->message_compression;
2295 d->retry = 0;
2296 d->cluster_move_error = 0;
2297
2298 assert(msg->id.tagcount == 1);
2299
2300 if(d->shared_msg) {
2301 msg_reserve(d->msg, "redis publish");
2302 }
2303 redis_publish_message_send(ns, d);
2304
2305 return NGX_OK;
2306 }
2307
redisReply_to_int(redisReply * reply,int nil_value,int wrong_datatype_value)2308 static int64_t redisReply_to_int(redisReply *reply, int nil_value, int wrong_datatype_value) {
2309 switch(reply->type) {
2310 case REDIS_REPLY_INTEGER:
2311 return reply->integer;
2312 case REDIS_REPLY_STRING:
2313 return atol(reply->str);
2314 case REDIS_REPLY_NIL:
2315 return nil_value;
2316 default:
2317 return wrong_datatype_value;
2318 }
2319 }
2320
redisPublishNostoreCallback(redisAsyncContext * c,void * r,void * privdata)2321 static void redisPublishNostoreCallback(redisAsyncContext *c, void *r, void *privdata) {
2322 redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2323 redisReply *reply=r;
2324 redisReply **els;
2325 nchan_channel_t ch;
2326
2327
2328
2329 redis_node_t *node = c->data;
2330 node->pending_commands--;
2331 nchan_update_stub_status(redis_pending_commands, -1);
2332
2333 if(d->shared_msg) {
2334 msg_release(d->msg, "redis publish");
2335 }
2336
2337 ngx_memzero(&ch, sizeof(ch)); //for debugging basically. should be removed in the future and zeroed as-needed
2338 if(d->cluster_move_error) {
2339 nodeset_node_keyslot_changed(node);
2340 d->callback(NGX_HTTP_SERVICE_UNAVAILABLE, NULL, d->privdata);
2341 }
2342 else if(reply) {
2343 if(reply->type == REDIS_REPLY_ARRAY && reply->elements == 2 && reply->element[1]->type == REDIS_REPLY_ARRAY && reply->element[1]->elements == 2) {
2344 els = reply->element[1]->element;
2345 ch.last_seen = redisReply_to_int(els[0], 0, 0);
2346 ch.subscribers = redisReply_to_int(els[1], 0, 0);
2347 d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2348 }
2349 else if(reply->type == REDIS_REPLY_INTEGER) {
2350 ch.last_seen = 0;
2351 ch.subscribers = redisReply_to_int(reply, 0, 0);
2352 d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2353 }
2354 else {
2355 redisEchoCallback(c, r, privdata);
2356 d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2357 }
2358 }
2359 else {
2360 redisEchoCallback(c, r, privdata);
2361 d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2362 }
2363
2364 ngx_free(d);
2365 }
2366
redisPublishNostoreQueuedCheckCallback(redisAsyncContext * c,void * r,void * privdata)2367 static void redisPublishNostoreQueuedCheckCallback(redisAsyncContext *c, void *r, void *privdata) {
2368 redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2369 redisReply *reply=r;
2370
2371 redis_node_t *node = c->data;
2372
2373 if(reply && !CHECK_REPLY_STATUSVAL(reply, "QUEUED")) {
2374 if(!nodeset_node_reply_keyslot_ok(node, reply)) {
2375 d->cluster_move_error = 1;
2376 }
2377 else {
2378 redisEchoCallback(c, r, privdata);
2379 }
2380 }
2381 }
2382
redisPublishCallback(redisAsyncContext * c,void * r,void * privdata)2383 static void redisPublishCallback(redisAsyncContext *c, void *r, void *privdata) {
2384 redis_publish_callback_data_t *d=(redis_publish_callback_data_t *)privdata;
2385 redisReply *reply=r;
2386 redisReply *cur;
2387 nchan_channel_t ch;
2388
2389 redis_node_t *node = c->data;
2390 node->pending_commands--;
2391 nchan_update_stub_status(redis_pending_commands, -1);
2392
2393 if(!nodeset_node_reply_keyslot_ok(node, reply)) {
2394 if(d->shared_msg) {
2395 redis_publish_message_nodeset_maybe_retry(node->nodeset, d);
2396 }
2397 else {
2398 //message probably isn't available anymore...
2399 ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "redis store received cluster MOVE/ASK error while publishing, and can't retry publishing after reconfiguring cluster.");
2400 d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2401 ngx_free(d);
2402 }
2403 return;
2404 }
2405
2406 if(d->shared_msg) {
2407 msg_release(d->msg, "redis publish");
2408 }
2409
2410 ngx_memzero(&ch, sizeof(ch)); //for debugging basically. should be removed in the future and zeroed as-needed
2411
2412 if(reply && CHECK_REPLY_ARRAY_MIN_SIZE(reply, 2)) {
2413 cur=reply->element[0];
2414 switch(redis_array_to_channel(cur, &ch)) {
2415 case NGX_OK:
2416 d->callback(ch.subscribers > 0 ? NCHAN_MESSAGE_RECEIVED : NCHAN_MESSAGE_QUEUED, &ch, d->privdata);
2417 break;
2418 case NGX_DECLINED: //not found
2419 d->callback(NGX_OK, NULL, d->privdata);
2420 break;
2421 case NGX_ERROR:
2422 default:
2423 redisEchoCallback(c, r, privdata);
2424 d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2425 }
2426 }
2427 else {
2428 redisEchoCallback(c, r, privdata);
2429 d->callback(NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, d->privdata);
2430 }
2431 ngx_free(d);
2432 }
2433
2434
2435
2436 typedef struct {
2437 ngx_str_t *channel_id;
2438 ngx_int_t count;
2439 } add_fakesub_data_t;
2440
2441
2442 static void nchan_store_redis_add_fakesub_callback(redisAsyncContext *c, void *r, void *privdata);
nchan_store_redis_add_fakesub_send(redis_nodeset_t * nodeset,void * pd)2443 static ngx_int_t nchan_store_redis_add_fakesub_send(redis_nodeset_t *nodeset, void *pd) {
2444 add_fakesub_data_t *d = pd;
2445 if(nodeset_ready(nodeset)) {
2446 redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, d->channel_id);
2447 nchan_redis_script(add_fakesub, node, &nchan_store_redis_add_fakesub_callback, NULL,
2448 d->channel_id,
2449 "%i %i",
2450 d->count,
2451 ngx_time()
2452 );
2453 return NGX_OK;
2454 }
2455 else {
2456 return NGX_ERROR;
2457 }
2458 }
2459
nchan_store_redis_add_fakesub_send_retry_wrapper(redis_nodeset_t * nodeset,void * pd)2460 static ngx_int_t nchan_store_redis_add_fakesub_send_retry_wrapper(redis_nodeset_t *nodeset, void *pd) {
2461 add_fakesub_data_t *d = pd;
2462 ngx_int_t rc = nchan_store_redis_add_fakesub_send(nodeset, pd);
2463 ngx_free(d);
2464 return rc;
2465 }
2466
nchan_store_redis_add_fakesub_callback(redisAsyncContext * c,void * r,void * privdata)2467 static void nchan_store_redis_add_fakesub_callback(redisAsyncContext *c, void *r, void *privdata) {
2468 redisReply *reply = r;
2469 redis_node_t *node = c->data;
2470
2471 node->pending_commands--;
2472 nchan_update_stub_status(redis_pending_commands, -1);
2473
2474 if(reply && reply->type == REDIS_REPLY_ERROR) {
2475 ngx_str_t errstr;
2476 ngx_str_t countstr;
2477 ngx_str_t channel_id;
2478 intptr_t count;
2479
2480 errstr.data = (u_char *)reply->str;
2481 errstr.len = strlen(reply->str);
2482
2483 if(ngx_str_chop_if_startswith(&errstr, "CLUSTER KEYSLOT ERROR. ")) {
2484 nodeset_node_keyslot_changed(node);
2485 nchan_scan_until_chr_on_line(&errstr, &countstr, ' ');
2486 count = ngx_atoi(countstr.data, countstr.len);
2487 channel_id = errstr;
2488
2489 add_fakesub_data_t *d = ngx_alloc(sizeof(*d) + sizeof(ngx_str_t) + channel_id.len, ngx_cycle->log);
2490 if(!d) {
2491 ERR("can't allocate add_fakesub_data for CLUSTER KEYSLOT ERROR retry");
2492 return;
2493 }
2494 d->count = count;
2495 d->channel_id = (ngx_str_t *)&d[1];
2496 d->channel_id->data = (u_char *)&d->channel_id[1];
2497 nchan_strcpy(d->channel_id, &channel_id, 0);
2498 nodeset_callback_on_ready(node->nodeset, 1000, nchan_store_redis_add_fakesub_send_retry_wrapper, d);
2499
2500 return;
2501 }
2502
2503 }
2504 redisCheckErrorCallback(c, r, privdata);
2505 }
2506
nchan_store_redis_fakesub_add(ngx_str_t * channel_id,nchan_loc_conf_t * cf,ngx_int_t count,uint8_t shutting_down)2507 ngx_int_t nchan_store_redis_fakesub_add(ngx_str_t *channel_id, nchan_loc_conf_t *cf, ngx_int_t count, uint8_t shutting_down) {
2508 redis_nodeset_t *nodeset = nodeset_find(&cf->redis);
2509
2510 if(!shutting_down) {
2511 add_fakesub_data_t data = {channel_id, count};
2512 nchan_store_redis_add_fakesub_send(nodeset, &data);
2513 }
2514 else {
2515 if(nodeset_ready(nodeset)) {
2516 redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, channel_id);
2517 redis_sync_command(node, "EVALSHA %s 0 %b %i", redis_lua_scripts.add_fakesub.hash, STR(channel_id), count);
2518 }
2519 }
2520 return NGX_OK;
2521 }
2522
nchan_store_redis_ready(nchan_loc_conf_t * cf)2523 int nchan_store_redis_ready(nchan_loc_conf_t *cf) {
2524 redis_nodeset_t *nodeset = nodeset_find(&cf->redis);
2525 return nodeset && nodeset_ready(nodeset);
2526 }
2527
2528
2529 typedef struct {
2530 callback_pt cb;
2531 void *pd;
2532 } redis_subscriber_info_id_data_t;
2533
2534 static void get_subscriber_info_id_callback(redisAsyncContext *c, void *r, void *privdata);
2535
nchan_store_get_subscriber_info_id(nchan_loc_conf_t * cf,callback_pt cb,void * pd)2536 static ngx_int_t nchan_store_get_subscriber_info_id(nchan_loc_conf_t *cf, callback_pt cb, void *pd) {
2537 redis_nodeset_t *nodeset = nodeset_find(&cf->redis);
2538
2539 if(!nodeset_ready(nodeset)) {
2540 return NGX_ERROR;
2541 }
2542
2543 ngx_str_t request_id_key = ngx_string(NCHAN_REDIS_UNIQUE_REQUEST_ID_KEY);
2544 redis_node_t *node = nodeset_node_find_by_key(nodeset, &request_id_key);
2545 if(!node) {
2546 return NGX_ERROR;
2547 }
2548
2549 redis_subscriber_info_id_data_t *d = ngx_alloc(sizeof(*d), ngx_cycle->log);
2550 if(d == NULL) {
2551 return NGX_ERROR;
2552 }
2553
2554 d->cb = cb;
2555 d->pd = pd;
2556
2557 redis_script(get_subscriber_info_id, node, &get_subscriber_info_id_callback, d, "1 %b", STR(&request_id_key));
2558
2559 return NGX_DONE;
2560 }
2561
get_subscriber_info_id_callback(redisAsyncContext * c,void * r,void * privdata)2562 static void get_subscriber_info_id_callback(redisAsyncContext *c, void *r, void *privdata) {
2563 redis_subscriber_info_id_data_t *d = privdata;
2564 redisReply *reply = r;
2565
2566 redis_node_t *node = c->data;
2567 node->pending_commands--;
2568
2569 callback_pt cb = d->cb;
2570 void *cb_pd = d->pd;
2571
2572 ngx_free(d);
2573
2574 if (!redisReplyOk(c, reply)) {
2575 cb(NGX_ERROR, NULL, cb_pd);
2576 return;
2577 }
2578
2579 int64_t new_id = redisReply_to_int(reply, 0, 0);
2580 cb(NGX_OK, (void *)(uintptr_t )new_id, cb_pd);
2581 }
2582
nchan_store_request_subscriber_info(ngx_str_t * channel_id,ngx_int_t request_id,nchan_loc_conf_t * cf,callback_pt cb,void * pd)2583 static ngx_int_t nchan_store_request_subscriber_info(ngx_str_t *channel_id, ngx_int_t request_id, nchan_loc_conf_t *cf, callback_pt cb, void *pd) {
2584 if(nchan_channel_id_is_multi(channel_id)) {
2585 ERR("redis nchan_store_request_subscriber_info can't handle multi-channel ids");
2586 return NGX_ERROR;
2587 }
2588
2589 redis_nodeset_t *nodeset = nodeset_find(&cf->redis);
2590 if(!nodeset) {
2591 ERR("redis nodeset not found for nchan_store_request_subscriber_info");
2592 return NGX_ERROR;
2593 }
2594
2595 if(!nodeset_ready(nodeset)) {
2596 ERR("redis nodeset not ready for nchan_store_request_subscriber_info");
2597 return NGX_ERROR;
2598 }
2599
2600 redis_node_t *node = nodeset_node_find_by_channel_id(nodeset, channel_id);
2601 if(!node) {
2602 ERR("couldn't find Redis node for nchan_store_request_subscriber_info");
2603 return NGX_ERROR;
2604 }
2605
2606 nchan_redis_script( request_subscriber_info, node, &redisCheckErrorCallback, NULL, channel_id, "%i", (int )request_id);
2607
2608 return NGX_DONE;
2609 }
2610
2611 nchan_store_t nchan_store_redis = {
2612 //init
2613 &nchan_store_init_module,
2614 &nchan_store_init_worker,
2615 &nchan_store_init_postconfig,
2616 &nchan_store_create_main_conf,
2617
2618 //shutdown
2619 &nchan_store_exit_worker,
2620 &nchan_store_exit_master,
2621
2622 //async-friendly functions with callbacks
2623 &nchan_store_async_get_message, //+callback
2624 &nchan_store_subscribe, //+callback
2625 &nchan_store_publish_message, //+callback
2626
2627 &nchan_store_delete_channel, //+callback
2628 &nchan_store_find_channel, //+callback
2629
2630 NULL, //get_group
2631 NULL, //set group
2632 NULL, //delete group
2633
2634 &nchan_store_get_subscriber_info_id, //+callback
2635 &nchan_store_request_subscriber_info //+callback
2636
2637 };
2638
2639