1 #include <nchan_module.h>
2 #include "spool.h"
3 #include <assert.h>
4
5 #define DEBUG_LEVEL NGX_LOG_DEBUG
6 //#define DEBUG_LEVEL NGX_LOG_WARN
7
8 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SPOOL:" fmt, ##arg)
9 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SPOOL:" fmt, ##arg)
10
11 #define NCHAN_SPOOL_FETCHMSG_MAX_TIMES 20
12
13 //////// SPOOLs -- Subscriber Pools /////////
14
15 static ngx_int_t spool_remove_subscriber(subscriber_pool_t *, spooled_subscriber_t *);
16 static void spool_bubbleup_dequeue_handler(subscriber_pool_t *spool, subscriber_t *sub, channel_spooler_t *spl);
17 //static void spool_bubbleup_bulk_dequeue_handler(subscriber_pool_t *spool, subscriber_type_t type, ngx_int_t count, channel_spooler_t *spl);
18 static ngx_int_t spool_respond_general(subscriber_pool_t *self, nchan_msg_t *msg, ngx_int_t status_code, void *code_data, unsigned notice);
19 static ngx_int_t spool_transfer_subscribers(subscriber_pool_t *spool, subscriber_pool_t *newspool, uint8_t update_subscriber_last_msgid);
20 static ngx_int_t destroy_spool(subscriber_pool_t *spool);
21 static ngx_int_t remove_spool(subscriber_pool_t *spool);
22 static ngx_int_t spool_fetch_msg(subscriber_pool_t *spool);
23
24 static nchan_msg_id_t latest_msg_id = NCHAN_NEWEST_MSGID;
25 static nchan_msg_id_t oldest_msg_id = NCHAN_OLDEST_MSGID;
26
find_spool(channel_spooler_t * spl,nchan_msg_id_t * id)27 static subscriber_pool_t *find_spool(channel_spooler_t *spl, nchan_msg_id_t *id) {
28 rbtree_seed_t *seed = &spl->spoolseed;
29 ngx_rbtree_node_t *node;
30 subscriber_pool_t *spool = NULL;
31
32 if(id->time == NCHAN_NEWEST_MSGID_TIME || spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
33 spool = &spl->current_msg_spool;
34 spool->msg_status = MSG_EXPECTED;
35 }
36 else if((node = rbtree_find_node(seed, id)) != NULL) {
37 spool = rbtree_data_from_node(node);
38 }
39
40 return spool;
41 }
42 /*
43 typedef struct {
44 int n;
45 subscriber_pool_t *msg_expected_spool;
46
47 int err;
48 } spool_verify_data_t;
49
50
51 static void log_spool_err(spool_verify_data_t *d, subscriber_pool_t *spool, char *str) {
52 ERR("%p %s [%s]", spool, msg_status_to_chr(spool->msg_status), str);
53 d->err = 1;
54 }
55
56 static int validate_spooler_walker(rbtree_seed_t *seed, subscriber_pool_t *spool, spool_verify_data_t *d) {
57 if(spool->msg_status == MSG_EXPECTED) {
58 if(d->msg_expected_spool) {
59 log_spool_err(d, spool, "Found more than 1 spool with MSG_EXPECTED status");
60 }
61 else {
62 d->msg_expected_spool = spool;
63 }
64 }
65 else {
66 if(spool->sub_count == 0) {
67 log_spool_err(d, spool, "empty spool (not MSG_EXPECTED)");
68 }
69 }
70 d->n++;
71
72 return NGX_OK;
73 }
74
75 static int validate_spooler(channel_spooler_t *spl, char *str) {
76 spool_verify_data_t d;
77 ngx_memzero(&d, sizeof(d));
78 rbtree_walk(&spl->spoolseed, (rbtree_walk_callback_pt )validate_spooler_walker, &d);
79 if(d.err == 0) {
80 ERR("%s: validated %i spools in channel spooler %V", str, d.n, spl->chid);
81 }
82 else {
83 ERR("%s: validating %i spools FAILED in channel spooler %V; %i error(s).", str, d.n, spl->chid, d.err);
84 }
85 return d.err == 0;
86 }
87 */
88
spool_reserve(subscriber_pool_t * spool)89 static void spool_reserve(subscriber_pool_t *spool) {
90 spool->reserved++;
91 }
spool_release(subscriber_pool_t * spool)92 static void spool_release(subscriber_pool_t *spool) {
93 spool->reserved--;
94 ngx_int_t immortal_spool = spool->id.time == NCHAN_NEWEST_MSGID_TIME;
95 if(!immortal_spool && spool->reserved == 0 && spool->sub_count == 0) {
96 destroy_spool(spool);
97 }
98 }
99
msg_ids_equal(nchan_msg_id_t * id1,nchan_msg_id_t * id2)100 static int msg_ids_equal(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
101 int i, max;
102 int16_t *tags1, *tags2;
103
104 if(id1->time != id2->time || id1->tagcount != id2->tagcount) return 0;
105 max = id1->tagcount;
106 if(max <= NCHAN_FIXED_MULTITAG_MAX) {
107 tags1 = id1->tag.fixed;
108 tags2 = id2->tag.fixed;
109 }
110 else {
111 tags1 = id1->tag.allocd;
112 tags2 = id2->tag.allocd;
113 }
114
115
116 for(i=0; i < max; i++) {
117 if(tags1[i] != tags2[i]) return 0;
118 }
119 return 1;
120 }
121
spooler_timer_handler(ngx_event_t * ev)122 static void spooler_timer_handler(ngx_event_t *ev) {
123 spooler_event_ll_t *spl_ev = container_of(ev, spooler_event_ll_t, ev);
124 spl_ev->callback(ev->data);
125 if(spl_ev->prev) {
126 spl_ev->prev->next = spl_ev->next;
127 }
128 if(spl_ev->next) {
129 spl_ev->next->prev = spl_ev->prev;
130 }
131 if(spl_ev->spooler->spooler_dependent_events == spl_ev) {
132 spl_ev->spooler->spooler_dependent_events = spl_ev->next;
133 }
134 ngx_free(spl_ev);
135 }
136
spooler_add_timer(channel_spooler_t * spl,ngx_msec_t timeout,void (* cb)(void *),void (* cancel)(void *),void * pd)137 ngx_event_t *spooler_add_timer(channel_spooler_t *spl, ngx_msec_t timeout, void (*cb)(void *), void (*cancel)(void *), void *pd) {
138 spooler_event_ll_t *spl_ev = ngx_alloc(sizeof(*spl_ev), ngx_cycle->log);
139 ngx_memzero(&spl_ev->ev, sizeof(spl_ev->ev));
140 nchan_init_timer(&spl_ev->ev, spooler_timer_handler, pd);
141
142 spl_ev->callback = cb;
143 spl_ev->cancel = cancel;
144
145 spl_ev->spooler = spl;
146 spl_ev->next = spl->spooler_dependent_events;
147 spl_ev->prev = NULL;
148 if(spl->spooler_dependent_events) {
149 spl->spooler_dependent_events->prev = spl_ev;
150 }
151 spl->spooler_dependent_events = spl_ev;
152
153 ngx_add_timer(&spl_ev->ev, timeout);
154 return &spl_ev->ev;
155 }
156
fetchmsg_ev_handler(ngx_event_t * ev)157 static void fetchmsg_ev_handler(ngx_event_t *ev) {
158 subscriber_pool_t *spool = (subscriber_pool_t *)ev->data;
159 DBG("stack-overflow-buster fetchmsg event for spool %p", spool);
160 if(spool->msg_status == MSG_INVALID) {
161 spool_fetch_msg(spool);
162 }
163 }
164
init_spool(channel_spooler_t * spl,subscriber_pool_t * spool,nchan_msg_id_t * id)165 static ngx_inline void init_spool(channel_spooler_t *spl, subscriber_pool_t *spool, nchan_msg_id_t *id) {
166 nchan_copy_new_msg_id(&spool->id, id);
167 spool->msg = NULL;
168 spool->msg_status = MSG_INVALID;
169
170 spool->first = NULL;
171 spool->sub_count = 0;
172 spool->non_internal_sub_count = 0;
173 //spool->generation = 0;
174 //spool->responded_count = 0;
175 spool->reserved = 0;
176 ngx_memzero(&spool->fetchmsg_ev, sizeof(spool->fetchmsg_ev));
177 nchan_init_timer(&spool->fetchmsg_ev, fetchmsg_ev_handler, spool);
178 spool->fetchmsg_current_count=0;
179 spool->fetchmsg_prev_msec=0;
180
181 spool->spooler = spl;
182 }
183
get_spool(channel_spooler_t * spl,nchan_msg_id_t * id)184 static subscriber_pool_t *get_spool(channel_spooler_t *spl, nchan_msg_id_t *id) {
185 rbtree_seed_t *seed = &spl->spoolseed;
186 ngx_rbtree_node_t *node;
187 subscriber_pool_t *spool;
188
189
190 if(id->time == NCHAN_NEWEST_MSGID_TIME || spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
191 spool = &spl->current_msg_spool;
192 spool->msg_status = MSG_EXPECTED;
193 return &spl->current_msg_spool;
194 }
195
196 if((node = rbtree_find_node(seed, id)) == NULL) {
197
198 if((node = rbtree_create_node(seed, sizeof(*spool))) == NULL) {
199 ERR("can't create rbtree node for spool");
200 return NULL;
201 }
202
203 // DBG("CREATED spool node %p for msgid %V", node, msgid_to_str(id));
204 spool = (subscriber_pool_t *)rbtree_data_from_node(node);
205
206 init_spool(spl, spool, id);
207
208 if(rbtree_insert_node(seed, node) != NGX_OK) {
209 ERR("couldn't insert spool node");
210 rbtree_destroy_node(seed, node);
211 return NULL;
212 }
213 }
214 else {
215 spool = (subscriber_pool_t *)rbtree_data_from_node(node);
216 DBG("found spool node %p with msgid %V", node, msgid_to_str(id));
217 assert(spool->id.time == id->time);
218 }
219 return spool;
220 }
221
spool_nextmsg(subscriber_pool_t * spool,nchan_msg_id_t * new_last_id)222 static ngx_int_t spool_nextmsg(subscriber_pool_t *spool, nchan_msg_id_t *new_last_id) {
223 subscriber_pool_t *newspool;
224 channel_spooler_t *spl = spool->spooler;
225
226 ngx_int_t immortal_spool = spool->id.time == NCHAN_NEWEST_MSGID_TIME;
227 int16_t largetags[NCHAN_MULTITAG_MAX];
228 nchan_msg_id_t new_id = NCHAN_ZERO_MSGID;
229
230 if(spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
231 if(immortal_spool) {
232 return NGX_OK; //nothing to do, already on the newest spool
233 }
234 else {
235 new_last_id = &latest_msg_id;
236 }
237 }
238
239 nchan_copy_msg_id(&new_id, &spool->id, largetags);
240 nchan_update_multi_msgid(&new_id, new_last_id, largetags);
241
242 //ERR("spool %p nextmsg (%V) --", spool, msgid_to_str(&spool->id));
243 //ERR(" -- update with (%V) --", msgid_to_str(new_last_id));
244 //ERR(" -- newid %V", msgid_to_str(&new_id));
245
246 if(msg_ids_equal(&spool->id, &new_id)) {
247 ERR("nextmsg id same as curmsg (%V)", msgid_to_str(&spool->id));
248 assert(0);
249 }
250 else {
251 newspool = !immortal_spool ? find_spool(spl, &new_id) : get_spool(spl, &new_id);
252
253 if(newspool != NULL) {
254 //move subs to next, already existing spool
255 assert(spool != newspool);
256 spool_transfer_subscribers(spool, newspool, 0);
257 if(!immortal_spool && spool->reserved == 0) destroy_spool(spool);
258 }
259 else {
260 //next spool doesn't exist. reuse this one as the next
261 ngx_rbtree_node_t *node;
262 assert(!immortal_spool);
263 node = rbtree_node_from_data(spool);
264 rbtree_remove_node(&spl->spoolseed, node);
265 nchan_copy_msg_id(&spool->id, &new_id, NULL);
266 rbtree_insert_node(&spl->spoolseed, node);
267 spool->msg_status = MSG_INVALID;
268 spool->msg = NULL;
269 newspool = spool;
270
271 /*
272 newspool = get_spool(spl, &new_id);
273 assert(spool != newspool);
274 spool_transfer_subscribers(spool, newspool, 0);
275 destroy_spool(spool);
276 */
277 }
278
279
280 if(newspool->non_internal_sub_count > 0 && spl->handlers->use != NULL) {
281 spl->handlers->use(spl, spl->handlers_privdata);
282 }
283
284 if(newspool->sub_count > 0) {
285 switch(newspool->msg_status) {
286 case MSG_CHANNEL_NOTREADY:
287 newspool->msg_status = MSG_INVALID;
288 /*fallthrough*/
289 case MSG_INVALID:
290 spool_fetch_msg(newspool);
291 break;
292 case MSG_EXPECTED:
293 spool_respond_general(newspool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
294 break;
295 default:
296 break;
297 }
298 }
299 }
300
301 return NGX_OK;
302 }
303
spool_fetch_msg_callback(ngx_int_t code,nchan_msg_t * msg,fetchmsg_data_t * data)304 static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetchmsg_data_t *data) {
305 nchan_msg_status_t findmsg_status = code;
306 nchan_msg_status_t prev_status;
307 subscriber_pool_t *spool, *nuspool;
308 channel_spooler_t *spl = data->spooler;
309 int free_msg_id = 1;
310
311 assert(spl->fetching_strategy != NCHAN_SPOOL_PASSTHROUGH);
312
313 if(spl && data == spl->fetchmsg_cb_data_list) {
314 spl->fetchmsg_cb_data_list = data->next;
315 }
316 if(data->next) {
317 data->next->prev = data->prev;
318 }
319 if(data->prev) {
320 data->prev->next = data->next;
321 }
322
323 if(spl == NULL) { //channel already deleted
324 nchan_free_msg_id(&data->msgid);
325 ngx_free(data);
326 return NGX_OK;
327 }
328
329 if(spl->handlers->get_message_finish) {
330 spl->handlers->get_message_finish(spl, spl->handlers_privdata);
331 }
332
333 if((spool = find_spool(spl, &data->msgid)) == NULL) {
334 DBG("spool for msgid %V not found. discarding getmsg callback response.", msgid_to_str(&data->msgid));
335 nchan_free_msg_id(&data->msgid);
336 ngx_free(data);
337 return NGX_ERROR;
338 }
339
340 prev_status = spool->msg_status;
341
342 switch(findmsg_status) {
343 case MSG_FOUND:
344 spool->msg_status = findmsg_status;
345 DBG("fetchmsg callback for spool %p msg FOUND %p %V", spool, msg, msgid_to_str(&msg->id));
346 assert(msg != NULL);
347 spool->msg = msg;
348 spool_respond_general(spool, spool->msg, 0, NULL, 0);
349
350 spool_nextmsg(spool, &msg->id);
351 break;
352
353 case MSG_EXPECTED:
354 // ♫ It's gonna be the future soon ♫
355 if(spool->id.time == NCHAN_NTH_MSGID_TIME) {
356 //wait for message in the NEWEST_ID spool
357 spool_nextmsg(spool, &latest_msg_id);
358 }
359 else {
360 spool->msg_status = findmsg_status;
361 DBG("fetchmsg callback for spool %p msg EXPECTED", spool);
362 spool_respond_general(spool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
363 assert(msg == NULL);
364 spool->msg = NULL;
365 }
366 break;
367
368 case MSG_CHANNEL_NOTREADY:
369 //just wait it out
370 spool->msg = NULL;
371 spool->msg_status = findmsg_status;
372 break;
373
374 case MSG_NOTFOUND:
375 if(spl->fetching_strategy == NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND) {
376 spool->msg_status = prev_status;
377 break;
378 }
379 /*fallthrough*/
380 case MSG_EXPIRED:
381 //is this right?
382 //TODO: maybe message-expired notification
383 spool->msg_status = findmsg_status;
384 spool_respond_general(spool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
385 nuspool = get_spool(spool->spooler, &oldest_msg_id);
386 if(spool != nuspool) {
387 spool_transfer_subscribers(spool, nuspool, 1);
388 if(spool->reserved == 0) {
389 destroy_spool(spool);
390 }
391 spool_fetch_msg(nuspool);
392 }
393 else if(spool->id.tagcount == 1 && nchan_compare_msgids(&spool->id, &oldest_msg_id) == 0) {
394 // oldest msgid not found or expired. that means there are no messages in this channel,
395 // so move these subscribers over to the current_msg_spool
396 nuspool = get_spool(spool->spooler, &latest_msg_id);
397 assert(spool != nuspool);
398 spool_transfer_subscribers(spool, nuspool, 1);
399 if(spool->reserved == 0) {
400 destroy_spool(spool);
401 }
402 }
403 else if(spool == &spool->spooler->current_msg_spool) {
404 //sit there and wait, i guess
405 spool->msg_status = MSG_EXPECTED;
406 }
407 else {
408 ERR("Unexpected spool == nuspool during spool fetch_msg_callback. This is weird, please report this to the developers. findmsg_status: %i", findmsg_status);
409 assert(0);
410 }
411 break;
412
413 case MSG_PENDING:
414 ERR("spool %p set status to MSG_PENDING", spool);
415 break;
416
417 case MSG_ERROR:
418 spool_respond_general(spool, NULL, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, 0);
419 break;
420
421 case MSG_INVALID:
422 assert(0); //should never happen
423 break;
424 }
425
426 if(free_msg_id) {
427 nchan_free_msg_id(&data->msgid);
428 }
429 ngx_free(data);
430 return NGX_OK;
431 }
432
spool_fetch_msg(subscriber_pool_t * spool)433 static ngx_int_t spool_fetch_msg(subscriber_pool_t *spool) {
434 fetchmsg_data_t *data;
435 channel_spooler_t *spl = spool->spooler;
436
437
438 //stack overflow protector
439 //ERR("spool->fetchmsg_prev_msec %i (ngx_timeofday())->msec %i spool->fetchmsg_current_count %i", spool->fetchmsg_prev_msec, (ngx_timeofday())->msec, spool->fetchmsg_current_count);
440 if(spool->fetchmsg_prev_msec == (ngx_timeofday())->msec) {
441 if(spool->fetchmsg_current_count > NCHAN_SPOOL_FETCHMSG_MAX_TIMES) {
442 ngx_add_timer(&spool->fetchmsg_ev, 0);
443 spool->fetchmsg_current_count = 0;
444 return NGX_DONE;
445 }
446 else {
447 spool->fetchmsg_current_count++;
448 }
449 }
450 else {
451 spool->fetchmsg_current_count = 0;
452 spool->fetchmsg_prev_msec = (ngx_timeofday())->msec;
453 }
454
455 if(*spl->channel_status != READY || !*spl->channel_buffer_complete) {
456 //DBG("%p wanted to fetch msg %V, but channel %V not ready or buffer not complete", spool, msgid_to_str(&spool->id), spl->chid);
457 spool->msg_status = MSG_CHANNEL_NOTREADY;
458 //these will be fetch when channel is ready or when buffer is complete
459 return NGX_DECLINED;
460 }
461
462 DBG("%p fetch msg %V for channel %V", spool, msgid_to_str(&spool->id), spl->chid);
463 data = ngx_alloc(sizeof(*data), ngx_cycle->log); //correctness over efficiency (at first).
464 //TODO: optimize this alloc away
465
466 assert(data);
467
468 data->next = spl->fetchmsg_cb_data_list;
469 if(data->next) {
470 data->next->prev = data;
471 }
472 spl->fetchmsg_cb_data_list = data;
473 data->prev = NULL;
474
475 nchan_copy_new_msg_id(&data->msgid, &spool->id);
476 data->spooler = spool->spooler;
477
478 assert(spool->msg == NULL);
479 assert(spool->msg_status == MSG_INVALID);
480 spool->msg_status = MSG_PENDING;
481 if(spl->handlers->get_message_start) {
482 spl->handlers->get_message_start(spl, spl->handlers_privdata);
483 }
484 switch(spl->fetching_strategy) {
485 case NCHAN_SPOOL_FETCH:
486 case NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND:
487 spool->spooler->store->get_message(spool->spooler->chid, &spool->id, spool->spooler->cf, (callback_pt )spool_fetch_msg_callback, data);
488 break;
489 case NCHAN_SPOOL_PASSTHROUGH:
490 spool_fetch_msg_callback(MSG_EXPECTED, NULL, data);
491 break;
492 }
493 return NGX_OK;
494 }
495
spool_sub_dequeue_callback(subscriber_t * sub,void * data)496 static void spool_sub_dequeue_callback(subscriber_t *sub, void *data) {
497 spooled_subscriber_cleanup_t *d = (spooled_subscriber_cleanup_t *)data;
498 subscriber_pool_t *spool = d->spool;
499
500 DBG("sub %p dequeue callback", sub);
501
502 assert(sub == d->ssub->sub);
503 spool_remove_subscriber(spool, d->ssub);
504 spool_bubbleup_dequeue_handler(spool, sub, spool->spooler);
505
506 if(sub->type != INTERNAL && spool->spooler->publish_events) {
507 nchan_maybe_send_channel_event_message(sub->request, SUB_DEQUEUE);
508 }
509 }
510
spool_add_subscriber(subscriber_pool_t * self,subscriber_t * sub,uint8_t enqueue)511 static ngx_int_t spool_add_subscriber(subscriber_pool_t *self, subscriber_t *sub, uint8_t enqueue) {
512 spooled_subscriber_t *ssub;
513 ngx_int_t rc;
514 ngx_int_t internal_sub = sub->type == INTERNAL;
515
516 ssub = ngx_calloc(sizeof(*ssub), ngx_cycle->log);
517 //DBG("add sub %p to spool %p", sub, self);
518
519 if(ssub == NULL) {
520 ERR("failed to allocate new sub for spool");
521 return NGX_ERROR;
522 }
523
524 ssub->next = self->first;
525 ssub->prev = NULL;
526 if(self->first != NULL) {
527 self->first->prev = ssub;
528 }
529 self->first = ssub;
530 self->sub_count++;
531 if(!internal_sub) {
532 self->non_internal_sub_count++;
533 }
534 ssub->dequeue_callback_data.ssub = ssub;
535 ssub->dequeue_callback_data.spool = self;
536
537 if(enqueue) {
538 if((rc = sub->fn->enqueue(sub)) != NGX_OK) {
539 //enqueue failed. undo everything and get out!
540 self->sub_count --;
541 self->first = ssub->next;
542 if(self->first) {
543 assert(self->first->prev == ssub);
544 self->first->prev = NULL;
545 }
546 if(!internal_sub) {
547 self->non_internal_sub_count--;
548 }
549 ngx_free(ssub);
550 return rc;
551 }
552 else if(sub->type != INTERNAL && self->spooler->publish_events) {
553 nchan_maybe_send_channel_event_message(sub->request, SUB_ENQUEUE);
554 }
555 }
556
557 sub->fn->set_dequeue_callback(sub, spool_sub_dequeue_callback, &ssub->dequeue_callback_data);
558 ssub->sub = sub;
559
560 return NGX_OK;
561 }
562
spool_remove_subscriber(subscriber_pool_t * self,spooled_subscriber_t * ssub)563 static ngx_int_t spool_remove_subscriber(subscriber_pool_t *self, spooled_subscriber_t *ssub) {
564 assert(ssub->next != ssub);
565 assert(ssub->prev != ssub);
566 spooled_subscriber_t *prev, *next;
567 prev = ssub->prev;
568 next = ssub->next;
569 if(next) {
570 next->prev = prev;
571 }
572 if(prev) {
573 prev->next = next;
574 }
575 if(self->first == ssub) {
576 self->first = next;
577 }
578
579 if(ssub->sub->type != INTERNAL) {
580 self->non_internal_sub_count--;
581 }
582
583 ngx_free(ssub);
584
585 assert(self->sub_count > 0);
586 self->sub_count--;
587 return NGX_OK;
588 }
589
spool_respond_general(subscriber_pool_t * self,nchan_msg_t * msg,ngx_int_t code,void * code_data,unsigned notice)590 static ngx_int_t spool_respond_general(subscriber_pool_t *self, nchan_msg_t *msg, ngx_int_t code, void *code_data, unsigned notice) {
591 ngx_uint_t numsubs[SUBSCRIBER_TYPES];
592 spooled_subscriber_t *nsub, *nnext;
593 subscriber_t *sub;
594
595 //channel_spooler_t *spl = self->spooler;
596 //validate_spooler(spl, "before respond_general");
597 //nchan_msg_id_t unid;
598 //nchan_msg_id_t unprevid;
599 //int8_t i, max;
600
601 ngx_memzero(numsubs, sizeof(numsubs));
602 //self->generation++;
603
604 DBG("spool %p (%V) (subs: %i) respond with msg %p or code %i", self, msgid_to_str(&self->id), self->sub_count, msg, code);
605 if(msg) {
606 DBG("msgid: %V", msgid_to_str(&msg->id));
607 DBG("prev: %V", msgid_to_str(&msg->prev_id));
608 }
609
610 /*
611 if(msg && msg->prev_id.time > 0 && msg->id.tagcount > 1) {
612 assert(msg->shared == 0);
613 max = msg->id.tagcount;
614 for(i=0; i< max; i++) {
615 unid.tag[i] = msg->id.tag[i];
616 unprevid.tag[i] = msg->prev_id.tag[i];
617 if(unid.tag[i] == -1) msg->id.tag[i] = self->id.tag[i];
618 if(unprevid.tag[i] == -1) msg->prev_id.tag[i] = self->id.tag[i];
619 }
620 }
621 */
622
623 //uint8_t publish_events = self->spooler->publish_events;
624
625 for(nsub = self->first; nsub != NULL; nsub = nnext) {
626 sub = nsub->sub;
627 nnext = nsub->next;
628
629 if(msg) {
630 //self->responded_count++;
631 sub->fn->respond_message(sub, msg);
632 }
633 else if(!notice) {
634 //self->responded_count++;
635 sub->fn->respond_status(sub, code, code_data, NULL);
636 }
637 else {
638 sub->fn->notify(sub, code, code_data);
639 }
640 }
641
642 //if(!notice && code != NGX_HTTP_NO_CONTENT) self->responded_count++;
643 //assert(validate_spooler(spl, "after respond_general"));
644 return NGX_OK;
645 }
646
647 /////////// SPOOLER - container of several spools //////////
648
create_spooler()649 channel_spooler_t *create_spooler() {
650 channel_spooler_t *spooler;
651 if((spooler = ngx_alloc(sizeof(*spooler), ngx_cycle->log))==NULL) {
652 ERR("Can't allocate spooler");
653 return NULL;
654 }
655 return spooler;
656 }
657
spool_bubbleup_dequeue_handler(subscriber_pool_t * spool,subscriber_t * sub,channel_spooler_t * spl)658 static void spool_bubbleup_dequeue_handler(subscriber_pool_t *spool, subscriber_t *sub, channel_spooler_t *spl) {
659 //bubble on up, yeah
660 channel_spooler_handlers_t *h = spl->handlers;
661 if(h->dequeue) {
662 h->dequeue(spl, sub, spl->handlers_privdata);
663 }
664 else if (h->bulk_dequeue){
665 h->bulk_dequeue(spl, sub->type, 1, spl->handlers_privdata);
666 }
667 else {
668 ERR("Neither dequeue_handler not bulk_dequeue_handler present in spooler for spool sub dequeue");
669 }
670 }
671
672 /*
673 static void spool_bubbleup_bulk_dequeue_handler(subscriber_pool_t *spool, subscriber_type_t type, ngx_int_t count, channel_spooler_t *spl) {
674 //bubble on up, yeah
675 if(spl->handlers->bulk_dequeue) {
676 spl->handlers->bulk_dequeue(spl, type, count, spl->handlers_privdata);
677 }
678 }
679 */
680
spooler_add_subscriber(channel_spooler_t * self,subscriber_t * sub)681 static ngx_int_t spooler_add_subscriber(channel_spooler_t *self, subscriber_t *sub) {
682 nchan_msg_id_t *msgid = &sub->last_msgid;
683 subscriber_pool_t *spool;
684 subscriber_type_t subtype;
685 ngx_int_t rc;
686
687 if(self->want_to_stop) {
688 ERR("Not accepting new subscribers right now. want to stop.");
689 return NGX_ERROR;
690 }
691
692 //validate_spooler(self, "before add_subscriber");
693
694 spool = get_spool(self, msgid);
695
696 if(self->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
697 assert(spool->id.time == NCHAN_NEWEST_MSGID_TIME);
698 }
699
700 if(spool == NULL) {
701 return NGX_ERROR;
702 }
703
704 subtype = sub->type;
705
706 if((rc = spool_add_subscriber(spool, sub, 1)) != NGX_OK) {
707 DBG("couldn't add subscriber to spool %p", spool);
708 return rc;
709 }
710 self->handlers->add(self, sub, self->handlers_privdata);
711
712 switch(spool->msg_status) {
713 case MSG_FOUND:
714 assert(spool->msg);
715 spool_respond_general(spool, spool->msg, 0, NULL, 0);
716 break;
717
718 case MSG_INVALID:
719 assert(spool->msg == NULL);
720 spool_fetch_msg(spool);
721 break;
722
723 case MSG_CHANNEL_NOTREADY:
724 case MSG_PENDING:
725 //nothing to do
726 break;
727
728 case MSG_EXPECTED:
729 //notify subscriber
730 sub->fn->respond_status(sub, NGX_HTTP_NO_CONTENT, NULL, NULL);
731 break;
732
733 case MSG_EXPIRED:
734 case MSG_NOTFOUND:
735 case MSG_ERROR:
736 //shouldn't happen
737
738 assert(0);
739 }
740
741 if(self->handlers->use != NULL && subtype != INTERNAL) {
742 self->handlers->use(self, self->handlers_privdata);
743 }
744
745 //validate_spooler(self, "after add_subscriber");
746
747 return NGX_OK;
748 }
749
750
spool_transfer_subscribers(subscriber_pool_t * spool,subscriber_pool_t * newspool,uint8_t update_subscriber_last_msgid)751 static ngx_int_t spool_transfer_subscribers(subscriber_pool_t *spool, subscriber_pool_t *newspool, uint8_t update_subscriber_last_msgid) {
752 ngx_int_t count = 0;
753 subscriber_t *sub;
754 spooled_subscriber_t *cur;
755 channel_spooler_t *spl = spool->spooler;
756
757 assert(spl == newspool->spooler);
758
759 if(spool == NULL || newspool == NULL) {
760 ERR("failed to transfer spool subscribers");
761 return 0;
762 }
763 for(cur = spool->first; cur != NULL; cur = spool->first) {
764 sub = cur->sub;
765 spool_remove_subscriber(spool, cur);
766 if(update_subscriber_last_msgid) {
767 sub->last_msgid=newspool->id;
768 }
769 if(spool_add_subscriber(newspool, sub, 0) == NGX_OK) {
770 count++;
771 }
772 }
773
774 return count;
775 }
776
777 typedef struct spool_collect_overflow_s spool_collect_overflow_t;
778 struct spool_collect_overflow_s {
779 subscriber_pool_t *spool;
780 struct spool_collect_overflow_s *next;
781 };// spool_collect_overflow_t;
782
783 #define SPOOLER_RESPOND_SPOOLARRAY_SIZE 32
784
785 typedef struct {
786 nchan_msg_id_t min;
787 nchan_msg_id_t max;
788 uint8_t multi;
789 ngx_int_t n;
790 nchan_msg_t *msg;
791 subscriber_pool_t *spools[SPOOLER_RESPOND_SPOOLARRAY_SIZE];
792 spool_collect_overflow_t *overflow;
793 } spooler_respond_data_t;
794
795
compare_msgid_onetag_range(nchan_msg_id_t * min,nchan_msg_id_t * max,nchan_msg_id_t * id)796 static rbtree_walk_direction_t compare_msgid_onetag_range(nchan_msg_id_t *min, nchan_msg_id_t *max, nchan_msg_id_t *id) {
797
798 assert(min->tagcount == max->tagcount);
799 assert(max->tagcount == id->tagcount);
800 assert(id->tagcount == 1);
801
802 if(min->time < id->time || (min->time == id->time && min->tag.fixed[0] <= id->tag.fixed[0])) {
803 if(max->time > id->time || (max->time == id->time && max->tag.fixed[0] > id->tag.fixed[0])) {
804 //inrange
805 return RBTREE_WALK_LEFT_RIGHT;
806 }
807 else {
808 //too large
809 return RBTREE_WALK_LEFT;
810 }
811 }
812 else {
813 //too small
814 return RBTREE_WALK_RIGHT;
815 }
816 }
817
compare_msgid_time(nchan_msg_id_t * min,nchan_msg_id_t * max,nchan_msg_id_t * cur)818 static int8_t compare_msgid_time(nchan_msg_id_t *min, nchan_msg_id_t *max, nchan_msg_id_t *cur) {
819 if(min->time <= cur->time) {
820 if(max->time >= cur->time) {
821 return 0;
822 }
823 else {
824 return 1;
825 }
826 }
827 else {
828 return -1;
829 }
830 }
831
832
spoolcollector_addspool(spooler_respond_data_t * data,subscriber_pool_t * spool)833 static void spoolcollector_addspool(spooler_respond_data_t *data, subscriber_pool_t *spool) {
834 spool_collect_overflow_t *overflow;
835 if(data->n < SPOOLER_RESPOND_SPOOLARRAY_SIZE) {
836 data->spools[data->n] = spool;
837 }
838 else {
839 if((overflow = ngx_alloc(sizeof(*overflow), ngx_cycle->log)) == NULL) {
840 ERR("can't allocate spoolcollector overflow");
841 return;
842 }
843 overflow->next = data->overflow;
844 overflow->spool = spool;
845 data->overflow = overflow;
846 }
847 data->n++;
848 }
849
spoolcollector_unwind_nextspool(spooler_respond_data_t * data)850 static subscriber_pool_t *spoolcollector_unwind_nextspool(spooler_respond_data_t *data) {
851 spool_collect_overflow_t *overflow;
852 subscriber_pool_t *spool;
853 if(data->n > SPOOLER_RESPOND_SPOOLARRAY_SIZE) {
854 overflow = data->overflow;
855 spool = overflow->spool;
856 data->overflow = overflow->next;
857 ngx_free(overflow);
858 data->n--;
859 return spool;
860 }
861 else if(data->n > 0) {
862 return data->spools[--data->n];
863 }
864 else {
865 return NULL;
866 }
867 }
868
869
collect_spool_range_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,spooler_respond_data_t * data)870 static rbtree_walk_direction_t collect_spool_range_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, spooler_respond_data_t *data) {
871 rbtree_walk_direction_t dir;
872 uint8_t multi_count = data->multi;
873
874 if(multi_count <= 1) {
875 dir = compare_msgid_onetag_range(&data->min, &data->max, &spool->id);
876 if(dir == RBTREE_WALK_LEFT_RIGHT) {
877 spoolcollector_addspool(data, spool);
878 }
879 return dir;
880 }
881 else {
882 int tc = compare_msgid_time(&data->min, &data->max, &spool->id);
883 if(tc < 0) {
884 return RBTREE_WALK_RIGHT;
885 }
886 else if(tc > 0) {
887 return RBTREE_WALK_LEFT;
888 }
889 else {
890 time_t timmin = data->min.time, timmax = data->max.time, timcur = spool->id.time;
891
892 int max_cmp = -1, min_cmp = -1;
893
894 if( timcur > timmin && timcur < timmax) {
895 spoolcollector_addspool(data, spool);
896 }
897 else if(timcur == timmax && timcur == timmin) {
898 if( nchan_compare_msgid_tags(&spool->id, &data->max) < 0
899 && nchan_compare_msgid_tags(&spool->id, &data->min) >= 0 )
900 {
901 spoolcollector_addspool(data, spool);
902 }
903 }
904 else if((timcur == timmax && (max_cmp = nchan_compare_msgid_tags(&spool->id, &data->max)) < 0)
905 || (timcur == timmin && (min_cmp = nchan_compare_msgid_tags(&spool->id, &data->min)) >= 0))
906 {
907 spoolcollector_addspool(data, spool);
908 }
909 else if( timcur > timmin && timcur < timmax) {
910 spoolcollector_addspool(data, spool);
911 }
912 /*
913 else {
914 ERR("time_min: %i, time_cur: %i, time_max: %i", timmin, timcur, timmax);
915 }
916 */
917
918 return RBTREE_WALK_LEFT_RIGHT;
919 }
920 }
921 }
922
spooler_respond_status(channel_spooler_t * self,nchan_msg_id_t * id,ngx_int_t status_code,ngx_str_t * status_line)923 static ngx_int_t spooler_respond_status(channel_spooler_t *self, nchan_msg_id_t *id, ngx_int_t status_code, ngx_str_t *status_line) {
924 subscriber_pool_t *spool = find_spool(self, id);
925 //validate_spooler(self, "before respond_status");
926 if(spool) {
927 if(status_code == NGX_HTTP_NO_CONTENT) {
928 spool->msg_status = MSG_EXPECTED;
929 }
930 spool_respond_general(spool, NULL, status_code, status_line, 0);
931 destroy_spool(spool);
932 }
933 //validate_spooler(self, "after respond_status");
934 return NGX_OK;
935 }
936
spooler_respond_message(channel_spooler_t * self,nchan_msg_t * msg)937 static ngx_int_t spooler_respond_message(channel_spooler_t *self, nchan_msg_t *msg) {
938 spooler_respond_data_t srdata;
939 subscriber_pool_t *spool;
940 ngx_int_t responded_subs = 0;
941
942
943 if(self->fetching_strategy != NCHAN_SPOOL_PASSTHROUGH) {
944 srdata.min = msg->prev_id;
945 srdata.max = msg->id;
946 srdata.multi = msg->id.tagcount;
947 srdata.overflow = NULL;
948 srdata.msg = msg;
949 srdata.n = 0;
950
951 //spooler_print_contents(self);
952
953 //find all spools between msg->prev_id and msg->id
954 rbtree_conditional_walk(&self->spoolseed, (rbtree_walk_conditional_callback_pt )collect_spool_range_callback, &srdata);
955 /*
956 if(srdata.n == 0) {
957 DBG("no spools in range %V -- ", msgid_to_str(&msg->prev_id));
958 DBG(" -- %V", msgid_to_str(&msg->id));
959 }
960 */
961 while((spool = spoolcollector_unwind_nextspool(&srdata)) != NULL) {
962 responded_subs += spool->sub_count;
963 if(msg->id.tagcount > NCHAN_FIXED_MULTITAG_MAX) {
964 assert(spool->id.tag.allocd != msg->id.tag.allocd);
965 }
966 spool_respond_general(spool, msg, 0, NULL, 0);
967 if(msg->id.tagcount > NCHAN_FIXED_MULTITAG_MAX) {
968 assert(spool->id.tag.allocd != msg->id.tag.allocd);
969 }
970 spool_nextmsg(spool, &msg->id);
971 }
972 }
973 spool = get_spool(self, &latest_msg_id);
974 if(spool->sub_count > 0 && *self->channel_buffer_complete) {
975 spool_respond_general(spool, msg, 0, NULL, 0);
976 spool_nextmsg(spool, &msg->id);
977 }
978
979 nchan_copy_msg_id(&self->prev_msg_id, &msg->id, NULL);
980
981 return NGX_OK;
982 }
983
984 typedef struct {
985 channel_spooler_t *spl;
986 nchan_msg_t *msg;
987 ngx_int_t code;
988 void *code_data;
989 unsigned notice:1;
990 } spooler_respond_generic_data_t;
991
spooler_respond_rbtree_node_spool(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)992 static ngx_int_t spooler_respond_rbtree_node_spool(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
993 spooler_respond_generic_data_t *d = data;
994
995 return spool_respond_general(spool, d->msg, d->code, d->code_data, d->notice);
996 }
997
spooler_respond_generic(channel_spooler_t * self,nchan_msg_t * msg,ngx_int_t code,void * code_data,unsigned notice)998 static ngx_int_t spooler_respond_generic(channel_spooler_t *self, nchan_msg_t *msg, ngx_int_t code, void *code_data, unsigned notice) {
999 spooler_respond_generic_data_t data = {self, msg, code, code_data, notice};
1000 rbtree_walk(&self->spoolseed, (rbtree_walk_callback_pt )spooler_respond_rbtree_node_spool, &data);
1001 spool_respond_general(&self->current_msg_spool, data.msg, data.code, data.code_data, notice);
1002 return NGX_OK;
1003 }
1004
spooler_broadcast_status(channel_spooler_t * self,ngx_int_t code,const ngx_str_t * line)1005 static ngx_int_t spooler_broadcast_status(channel_spooler_t *self, ngx_int_t code, const ngx_str_t *line) {
1006 return spooler_respond_generic(self, NULL, code, (void *)line, 0);
1007 }
1008
spooler_broadcast_notice(channel_spooler_t * self,ngx_int_t code,void * data)1009 static ngx_int_t spooler_broadcast_notice(channel_spooler_t *self, ngx_int_t code, void *data) {
1010 return spooler_respond_generic(self, NULL, code, data, 1);
1011 }
1012
spooler_spool_dequeue_all(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1013 static ngx_int_t spooler_spool_dequeue_all(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1014 spooled_subscriber_t *cur;
1015
1016 for(cur = spool->first; cur != NULL; cur = cur->next) {
1017 cur->sub->dequeue_after_response = 1;
1018 }
1019
1020 return NGX_OK;
1021 }
1022
spooler_prepare_to_stop(channel_spooler_t * spl)1023 static ngx_int_t spooler_prepare_to_stop(channel_spooler_t *spl) {
1024 rbtree_walk(&spl->spoolseed, (rbtree_walk_callback_pt )spooler_spool_dequeue_all, (void *)spl);
1025 spl->want_to_stop = 1;
1026 return NGX_OK;
1027 }
1028
1029
1030
spool_rbtree_node_id(void * data)1031 static void *spool_rbtree_node_id(void *data) {
1032 return &((subscriber_pool_t *)data)->id;
1033 }
1034
spool_rbtree_bucketer(void * vid)1035 static uint32_t spool_rbtree_bucketer(void *vid) {
1036 nchan_msg_id_t *id = (nchan_msg_id_t *)vid;
1037 return (uint32_t )id->time;
1038 }
1039
spool_rbtree_compare(void * v1,void * v2)1040 static ngx_int_t spool_rbtree_compare(void *v1, void *v2) {
1041 nchan_msg_id_t *id1 = (nchan_msg_id_t *)v1;
1042 nchan_msg_id_t *id2 = (nchan_msg_id_t *)v2;
1043 time_t t1 = id1->time;
1044 time_t t2 = id2->time;
1045 ngx_int_t tag1;
1046 ngx_int_t tag2;
1047
1048 if(t1 > t2) {
1049 return 1;
1050 }
1051 else if (t1 < t2) {
1052 return -1;
1053 }
1054 else {
1055 uint16_t i, max1 = id1->tagcount, max2 = id2->tagcount;
1056 uint16_t max = max1 > max2 ? max1 : max2;
1057 int16_t *tags1, *tags2;
1058
1059 tags1 = max1 <= NCHAN_FIXED_MULTITAG_MAX ? id1->tag.fixed : id1->tag.allocd;
1060 tags2 = max2 <= NCHAN_FIXED_MULTITAG_MAX ? id2->tag.fixed : id2->tag.allocd;
1061
1062 for(i=0; i < max; i++) {
1063 tag1 = i < max1 ? tags1[i] : -1;
1064 tag2 = i < max2 ? tags2[i] : -1;
1065 if(tag1 > tag2) {
1066 return 1;
1067 }
1068 else if(tag1 < tag2) {
1069 return -1;
1070 }
1071 }
1072 return 0;
1073 }
1074 }
1075
its_time_for_a_spooling_filter(void * data)1076 static int its_time_for_a_spooling_filter(void *data) {
1077 subscriber_pool_t *spool = data;
1078 if(spool->msg_status == MSG_CHANNEL_NOTREADY || spool->msg_status == MSG_INVALID) {
1079 spool_reserve(spool);
1080 return 1;
1081 }
1082 else {
1083 return 0;
1084 }
1085 }
1086
its_time_for_a_spooling(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1087 static ngx_int_t its_time_for_a_spooling(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1088 ngx_int_t rc = NGX_OK;
1089 //validate_spool(spool);
1090 if(spool->msg_status == MSG_CHANNEL_NOTREADY || spool->msg_status == MSG_INVALID) {
1091 //TODO: maybe don't fetch spool with zero subs?
1092 spool->msg_status = MSG_INVALID;
1093 rc = spool_fetch_msg(spool);
1094 assert(rc == NGX_OK || rc == NGX_DONE || NGX_DECLINED);
1095 }
1096 spool_release(spool);
1097 return rc;
1098 }
1099
spooler_channel_status_changed(channel_spooler_t * self)1100 static ngx_int_t spooler_channel_status_changed(channel_spooler_t *self) {
1101 switch(*self->channel_status) {
1102 case READY:
1103 rbtree_walk_writesafe(&self->spoolseed, its_time_for_a_spooling_filter, (rbtree_walk_callback_pt )its_time_for_a_spooling, NULL);
1104 break;
1105
1106 default:
1107 //do nothing
1108 break;
1109 };
1110 return NGX_OK;
1111 }
1112
1113
spooler_print_contents_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,channel_spooler_t * spl)1114 static ngx_int_t spooler_print_contents_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, channel_spooler_t *spl) {
1115 spooled_subscriber_t *cur;
1116
1117 ERR(" spool %p id %V", spool, msgid_to_str(&spool->id));
1118 for(cur = spool->first; cur != NULL; cur = cur->next) {
1119 ERR(" %V", cur->sub->name);
1120 }
1121
1122 return NGX_OK;
1123 }
1124
spooler_print_contents(channel_spooler_t * spl)1125 ngx_int_t spooler_print_contents(channel_spooler_t *spl) {
1126 ERR("spooler for channel %V", spl->chid);
1127 spooler_print_contents_callback(NULL, &spl->current_msg_spool, spl);
1128 rbtree_walk_incr(&spl->spoolseed, (rbtree_walk_callback_pt )spooler_print_contents_callback, spl);
1129 return NGX_OK;
1130 }
1131
1132
1133
spooler_catch_up_filter(void * data)1134 static int spooler_catch_up_filter(void *data) {
1135 nchan_msg_status_t status = ((subscriber_pool_t *)data)->msg_status;
1136 return status == MSG_EXPECTED || status == MSG_PENDING;
1137 }
1138
spooler_catch_up_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1139 static ngx_int_t spooler_catch_up_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1140 spool->msg_status = MSG_INVALID;
1141 spool_fetch_msg(spool);
1142 return NGX_OK;
1143 }
1144
spooler_catch_up(channel_spooler_t * spl)1145 ngx_int_t spooler_catch_up(channel_spooler_t *spl) {
1146 rbtree_walk_writesafe(&spl->spoolseed, spooler_catch_up_filter, (rbtree_walk_callback_pt )spooler_catch_up_callback, NULL);
1147 return NGX_OK;
1148 }
1149
1150
1151 static channel_spooler_fn_t spooler_fn = {
1152 spooler_add_subscriber,
1153 spooler_channel_status_changed,
1154 spooler_respond_message,
1155 spooler_respond_status,
1156 spooler_broadcast_status,
1157 spooler_broadcast_notice,
1158 spooler_prepare_to_stop
1159 };
1160
start_spooler(channel_spooler_t * spl,ngx_str_t * chid,chanhead_pubsub_status_t * channel_status,uint8_t * channel_buffer_complete,nchan_store_t * store,nchan_loc_conf_t * cf,spooler_fetching_strategy_t fetching_strategy,channel_spooler_handlers_t * handlers,void * handlers_privdata)1161 channel_spooler_t *start_spooler(channel_spooler_t *spl, ngx_str_t *chid, chanhead_pubsub_status_t *channel_status, uint8_t *channel_buffer_complete, nchan_store_t *store, nchan_loc_conf_t *cf, spooler_fetching_strategy_t fetching_strategy, channel_spooler_handlers_t *handlers, void *handlers_privdata) {
1162 if(!spl->running) {
1163 ngx_memzero(spl, sizeof(*spl));
1164 rbtree_init(&spl->spoolseed, "spooler msg_id tree", spool_rbtree_node_id, spool_rbtree_bucketer, spool_rbtree_compare);
1165
1166 spl->fn=&spooler_fn;
1167 //spl->prev_msg_id.time=0;
1168 //spl->prev_msg_id.tag=0;
1169
1170 DBG("start SPOOLER %p", *spl);
1171
1172 spl->chid = chid;
1173 spl->store = store;
1174
1175 spl->channel_status = channel_status;
1176 spl->channel_buffer_complete = channel_buffer_complete;
1177
1178 spl->running = 1;
1179 //spl->want_to_stop = 0;
1180 spl->publish_events = 1;
1181 spl->fetching_strategy = fetching_strategy;
1182
1183 init_spool(spl, &spl->current_msg_spool, &latest_msg_id);
1184 spl->current_msg_spool.msg_status = MSG_EXPECTED;
1185
1186 spl->handlers = handlers;
1187 spl->handlers_privdata = handlers_privdata;
1188
1189 spl->cf = cf;
1190
1191 return spl;
1192 }
1193 else {
1194 ERR("looks like spooler is already running. make sure spooler->running=0 before starting.");
1195 assert(0);
1196 return NULL;
1197 }
1198 }
1199
remove_spool(subscriber_pool_t * spool)1200 static ngx_int_t remove_spool(subscriber_pool_t *spool) {
1201 channel_spooler_t *spl = spool->spooler;
1202 ngx_rbtree_node_t *node = rbtree_node_from_data(spool);
1203
1204 assert(spool->reserved == 0);
1205 DBG("remove spool node %p", node);
1206
1207 assert(spool->spooler->running);
1208
1209 if(spool->fetchmsg_ev.timer_set) {
1210 ngx_del_timer(&spool->fetchmsg_ev);
1211 }
1212
1213 nchan_free_msg_id(&spool->id);
1214 rbtree_remove_node(&spl->spoolseed, rbtree_node_from_data(spool));
1215
1216 //assert((node = rbtree_find_node(&spl->spoolseed, &spool->id)) == NULL);
1217 //double-check that it's gone
1218
1219 return NGX_OK;
1220 }
1221
destroy_spool(subscriber_pool_t * spool)1222 static ngx_int_t destroy_spool(subscriber_pool_t *spool) {
1223 rbtree_seed_t *seed = &spool->spooler->spoolseed;
1224 spooled_subscriber_t *ssub, *ssub_next;
1225 subscriber_t *sub;
1226 ngx_rbtree_node_t *node = rbtree_node_from_data(spool);
1227
1228 remove_spool(spool);
1229
1230 DBG("destroy spool node %p", node);
1231
1232 for(ssub = spool->first; ssub!=NULL; ssub = ssub_next) {
1233 sub = ssub->sub;
1234 ssub_next = ssub->next;
1235 //DBG("dequeue sub %p in spool %p", sub, spool);
1236 sub->fn->dequeue(sub);
1237 }
1238 assert(spool->sub_count == 0);
1239 assert(spool->first == NULL);
1240
1241 //ngx_memset(spool, 0x42, sizeof(*spool)); //debug
1242
1243 rbtree_destroy_node(seed, node);
1244 return NGX_OK;
1245 }
1246
stop_spooler(channel_spooler_t * spl,uint8_t dequeue_subscribers)1247 ngx_int_t stop_spooler(channel_spooler_t *spl, uint8_t dequeue_subscribers) {
1248 ngx_rbtree_node_t *cur, *sentinel;
1249 spooler_event_ll_t *ecur, *ecur_next;
1250 subscriber_pool_t *spool;
1251 rbtree_seed_t *seed = &spl->spoolseed;
1252 ngx_rbtree_t *tree = &seed->tree;
1253 ngx_int_t n=0;
1254 sentinel = tree->sentinel;
1255
1256 fetchmsg_data_t *dcur;
1257 #if NCHAN_RBTREE_DBG
1258 ngx_int_t active_before = seed->active_nodes, allocd_before = seed->active_nodes;
1259 #endif
1260 if(spl->running) {
1261
1262 for(ecur = spl->spooler_dependent_events; ecur != NULL; ecur = ecur_next) {
1263 ecur_next = ecur->next;
1264 if(ecur->cancel) {
1265 ecur->cancel(ecur->ev.data);
1266 }
1267 ngx_event_del_timer(&ecur->ev);
1268
1269 ngx_free(ecur);
1270 }
1271
1272 for(cur = tree->root; cur != NULL && cur != sentinel; cur = tree->root) {
1273 spool = (subscriber_pool_t *)rbtree_data_from_node(cur);
1274 if(dequeue_subscribers) {
1275 destroy_spool(spool);
1276 }
1277 else {
1278 remove_spool(spool);
1279 rbtree_destroy_node(seed, cur);
1280 }
1281 n++;
1282 }
1283
1284 for(dcur = spl->fetchmsg_cb_data_list; dcur != NULL; dcur = dcur->next) {
1285 dcur->spooler = NULL;
1286 }
1287
1288 DBG("stopped %i spools in SPOOLER %p", n, *spl);
1289 }
1290 else {
1291 DBG("SPOOLER %p not running", *spl);
1292 }
1293 #if NCHAN_RBTREE_DBG
1294 assert(active_before - n == 0);
1295 assert(allocd_before - n == 0);
1296 assert(seed->active_nodes == 0);
1297 assert(seed->allocd_nodes == 0);
1298 #endif
1299 nchan_free_msg_id(&spl->prev_msg_id);
1300 spl->running = 0;
1301 return NGX_OK;
1302 }
1303