1 #include <nchan_module.h>
2 #include "spool.h"
3 #include <assert.h>
4 
5 #define DEBUG_LEVEL NGX_LOG_DEBUG
6 //#define DEBUG_LEVEL NGX_LOG_WARN
7 
8 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SPOOL:" fmt, ##arg)
9 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SPOOL:" fmt, ##arg)
10 
11 #define NCHAN_SPOOL_FETCHMSG_MAX_TIMES 20
12 
13 //////// SPOOLs -- Subscriber Pools  /////////
14 
15 static ngx_int_t spool_remove_subscriber(subscriber_pool_t *, spooled_subscriber_t *);
16 static void spool_bubbleup_dequeue_handler(subscriber_pool_t *spool, subscriber_t *sub, channel_spooler_t *spl);
17 //static void spool_bubbleup_bulk_dequeue_handler(subscriber_pool_t *spool, subscriber_type_t type, ngx_int_t count, channel_spooler_t *spl);
18 static ngx_int_t spool_respond_general(subscriber_pool_t *self, nchan_msg_t *msg, ngx_int_t status_code, void *code_data, unsigned notice);
19 static ngx_int_t spool_transfer_subscribers(subscriber_pool_t *spool, subscriber_pool_t *newspool, uint8_t update_subscriber_last_msgid);
20 static ngx_int_t destroy_spool(subscriber_pool_t *spool);
21 static ngx_int_t remove_spool(subscriber_pool_t *spool);
22 static ngx_int_t spool_fetch_msg(subscriber_pool_t *spool);
23 
24 static nchan_msg_id_t     latest_msg_id = NCHAN_NEWEST_MSGID;
25 static nchan_msg_id_t     oldest_msg_id = NCHAN_OLDEST_MSGID;
26 
find_spool(channel_spooler_t * spl,nchan_msg_id_t * id)27 static subscriber_pool_t *find_spool(channel_spooler_t *spl, nchan_msg_id_t *id) {
28   rbtree_seed_t      *seed = &spl->spoolseed;
29   ngx_rbtree_node_t  *node;
30   subscriber_pool_t  *spool = NULL;
31 
32   if(id->time == NCHAN_NEWEST_MSGID_TIME || spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
33     spool = &spl->current_msg_spool;
34     spool->msg_status = MSG_EXPECTED;
35   }
36   else if((node = rbtree_find_node(seed, id)) != NULL) {
37     spool = rbtree_data_from_node(node);
38   }
39 
40   return spool;
41 }
42 /*
43 typedef struct {
44   int                  n;
45   subscriber_pool_t   *msg_expected_spool;
46 
47   int                  err;
48 } spool_verify_data_t;
49 
50 
51 static void log_spool_err(spool_verify_data_t *d, subscriber_pool_t *spool, char *str) {
52   ERR("%p %s [%s]", spool, msg_status_to_chr(spool->msg_status), str);
53   d->err = 1;
54 }
55 
56 static int validate_spooler_walker(rbtree_seed_t *seed, subscriber_pool_t *spool, spool_verify_data_t *d) {
57   if(spool->msg_status == MSG_EXPECTED) {
58     if(d->msg_expected_spool) {
59       log_spool_err(d, spool, "Found more than 1 spool with MSG_EXPECTED status");
60     }
61     else {
62       d->msg_expected_spool = spool;
63     }
64   }
65   else {
66     if(spool->sub_count == 0) {
67       log_spool_err(d, spool, "empty spool (not MSG_EXPECTED)");
68     }
69   }
70   d->n++;
71 
72   return NGX_OK;
73 }
74 
75 static int validate_spooler(channel_spooler_t *spl, char *str) {
76   spool_verify_data_t  d;
77   ngx_memzero(&d, sizeof(d));
78   rbtree_walk(&spl->spoolseed, (rbtree_walk_callback_pt )validate_spooler_walker, &d);
79   if(d.err == 0) {
80     ERR("%s: validated %i spools in channel spooler %V", str, d.n, spl->chid);
81   }
82   else {
83     ERR("%s: validating %i spools  FAILED in channel spooler %V; %i error(s).", str, d.n, spl->chid, d.err);
84   }
85   return d.err == 0;
86 }
87 */
88 
spool_reserve(subscriber_pool_t * spool)89 static void spool_reserve(subscriber_pool_t *spool) {
90   spool->reserved++;
91 }
spool_release(subscriber_pool_t * spool)92 static void spool_release(subscriber_pool_t *spool) {
93   spool->reserved--;
94   ngx_int_t immortal_spool = spool->id.time == NCHAN_NEWEST_MSGID_TIME;
95   if(!immortal_spool && spool->reserved == 0 && spool->sub_count == 0) {
96     destroy_spool(spool);
97   }
98 }
99 
msg_ids_equal(nchan_msg_id_t * id1,nchan_msg_id_t * id2)100 static int msg_ids_equal(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
101   int           i, max;
102   int16_t      *tags1, *tags2;
103 
104   if(id1->time != id2->time || id1->tagcount != id2->tagcount) return 0;
105   max = id1->tagcount;
106   if(max <= NCHAN_FIXED_MULTITAG_MAX) {
107     tags1 = id1->tag.fixed;
108     tags2 = id2->tag.fixed;
109   }
110   else {
111     tags1 = id1->tag.allocd;
112     tags2 = id2->tag.allocd;
113   }
114 
115 
116   for(i=0; i < max; i++) {
117     if(tags1[i] != tags2[i]) return 0;
118   }
119   return 1;
120 }
121 
spooler_timer_handler(ngx_event_t * ev)122 static void spooler_timer_handler(ngx_event_t *ev) {
123   spooler_event_ll_t *spl_ev = container_of(ev, spooler_event_ll_t, ev);
124   spl_ev->callback(ev->data);
125   if(spl_ev->prev) {
126     spl_ev->prev->next = spl_ev->next;
127   }
128   if(spl_ev->next) {
129     spl_ev->next->prev = spl_ev->prev;
130   }
131   if(spl_ev->spooler->spooler_dependent_events == spl_ev) {
132     spl_ev->spooler->spooler_dependent_events = spl_ev->next;
133   }
134   ngx_free(spl_ev);
135 }
136 
spooler_add_timer(channel_spooler_t * spl,ngx_msec_t timeout,void (* cb)(void *),void (* cancel)(void *),void * pd)137 ngx_event_t *spooler_add_timer(channel_spooler_t *spl, ngx_msec_t timeout, void (*cb)(void *), void (*cancel)(void *), void *pd) {
138   spooler_event_ll_t  *spl_ev = ngx_alloc(sizeof(*spl_ev), ngx_cycle->log);
139   ngx_memzero(&spl_ev->ev, sizeof(spl_ev->ev));
140   nchan_init_timer(&spl_ev->ev, spooler_timer_handler, pd);
141 
142   spl_ev->callback = cb;
143   spl_ev->cancel = cancel;
144 
145   spl_ev->spooler = spl;
146   spl_ev->next = spl->spooler_dependent_events;
147   spl_ev->prev = NULL;
148   if(spl->spooler_dependent_events) {
149     spl->spooler_dependent_events->prev = spl_ev;
150   }
151   spl->spooler_dependent_events = spl_ev;
152 
153   ngx_add_timer(&spl_ev->ev, timeout);
154   return &spl_ev->ev;
155 }
156 
fetchmsg_ev_handler(ngx_event_t * ev)157 static void fetchmsg_ev_handler(ngx_event_t *ev) {
158   subscriber_pool_t *spool = (subscriber_pool_t *)ev->data;
159   DBG("stack-overflow-buster fetchmsg event for spool %p", spool);
160   if(spool->msg_status == MSG_INVALID) {
161     spool_fetch_msg(spool);
162   }
163 }
164 
init_spool(channel_spooler_t * spl,subscriber_pool_t * spool,nchan_msg_id_t * id)165 static ngx_inline void init_spool(channel_spooler_t *spl, subscriber_pool_t *spool, nchan_msg_id_t *id) {
166   nchan_copy_new_msg_id(&spool->id, id);
167   spool->msg = NULL;
168   spool->msg_status = MSG_INVALID;
169 
170   spool->first = NULL;
171   spool->sub_count = 0;
172   spool->non_internal_sub_count = 0;
173   //spool->generation = 0;
174   //spool->responded_count = 0;
175   spool->reserved = 0;
176   ngx_memzero(&spool->fetchmsg_ev, sizeof(spool->fetchmsg_ev));
177   nchan_init_timer(&spool->fetchmsg_ev, fetchmsg_ev_handler, spool);
178   spool->fetchmsg_current_count=0;
179   spool->fetchmsg_prev_msec=0;
180 
181   spool->spooler = spl;
182 }
183 
get_spool(channel_spooler_t * spl,nchan_msg_id_t * id)184 static subscriber_pool_t *get_spool(channel_spooler_t *spl, nchan_msg_id_t *id) {
185   rbtree_seed_t      *seed = &spl->spoolseed;
186   ngx_rbtree_node_t  *node;
187   subscriber_pool_t *spool;
188 
189 
190   if(id->time == NCHAN_NEWEST_MSGID_TIME || spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
191     spool = &spl->current_msg_spool;
192     spool->msg_status = MSG_EXPECTED;
193     return &spl->current_msg_spool;
194   }
195 
196   if((node = rbtree_find_node(seed, id)) == NULL) {
197 
198     if((node = rbtree_create_node(seed, sizeof(*spool))) == NULL) {
199       ERR("can't create rbtree node for spool");
200       return NULL;
201     }
202 
203    // DBG("CREATED spool node %p for msgid %V", node, msgid_to_str(id));
204     spool = (subscriber_pool_t *)rbtree_data_from_node(node);
205 
206     init_spool(spl, spool, id);
207 
208     if(rbtree_insert_node(seed, node) != NGX_OK) {
209       ERR("couldn't insert spool node");
210       rbtree_destroy_node(seed, node);
211       return NULL;
212     }
213   }
214   else {
215     spool = (subscriber_pool_t *)rbtree_data_from_node(node);
216     DBG("found spool node %p with msgid %V", node, msgid_to_str(id));
217     assert(spool->id.time == id->time);
218   }
219   return spool;
220 }
221 
spool_nextmsg(subscriber_pool_t * spool,nchan_msg_id_t * new_last_id)222 static ngx_int_t spool_nextmsg(subscriber_pool_t *spool, nchan_msg_id_t *new_last_id) {
223   subscriber_pool_t      *newspool;
224   channel_spooler_t      *spl = spool->spooler;
225 
226   ngx_int_t               immortal_spool = spool->id.time == NCHAN_NEWEST_MSGID_TIME;
227   int16_t                 largetags[NCHAN_MULTITAG_MAX];
228   nchan_msg_id_t          new_id = NCHAN_ZERO_MSGID;
229 
230   if(spl->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
231     if(immortal_spool) {
232       return NGX_OK; //nothing to do, already on the newest spool
233     }
234     else {
235       new_last_id = &latest_msg_id;
236     }
237   }
238 
239   nchan_copy_msg_id(&new_id, &spool->id, largetags);
240   nchan_update_multi_msgid(&new_id, new_last_id, largetags);
241 
242   //ERR("spool %p nextmsg (%V) --", spool, msgid_to_str(&spool->id));
243   //ERR(" --  update with               (%V) --", msgid_to_str(new_last_id));
244   //ERR(" -- newid                       %V", msgid_to_str(&new_id));
245 
246   if(msg_ids_equal(&spool->id, &new_id)) {
247     ERR("nextmsg id same as curmsg (%V)", msgid_to_str(&spool->id));
248     assert(0);
249   }
250   else {
251     newspool = !immortal_spool ? find_spool(spl, &new_id) : get_spool(spl, &new_id);
252 
253     if(newspool != NULL) {
254       //move subs to next, already existing spool
255       assert(spool != newspool);
256       spool_transfer_subscribers(spool, newspool, 0);
257       if(!immortal_spool && spool->reserved == 0) destroy_spool(spool);
258     }
259     else {
260       //next spool doesn't exist. reuse this one as the next
261       ngx_rbtree_node_t       *node;
262       assert(!immortal_spool);
263       node = rbtree_node_from_data(spool);
264       rbtree_remove_node(&spl->spoolseed, node);
265       nchan_copy_msg_id(&spool->id, &new_id, NULL);
266       rbtree_insert_node(&spl->spoolseed, node);
267       spool->msg_status = MSG_INVALID;
268       spool->msg = NULL;
269       newspool = spool;
270 
271       /*
272       newspool = get_spool(spl, &new_id);
273       assert(spool != newspool);
274       spool_transfer_subscribers(spool, newspool, 0);
275       destroy_spool(spool);
276       */
277     }
278 
279 
280     if(newspool->non_internal_sub_count > 0 && spl->handlers->use != NULL) {
281       spl->handlers->use(spl, spl->handlers_privdata);
282     }
283 
284     if(newspool->sub_count > 0) {
285       switch(newspool->msg_status) {
286         case MSG_CHANNEL_NOTREADY:
287           newspool->msg_status = MSG_INVALID;
288           /*fallthrough*/
289         case MSG_INVALID:
290           spool_fetch_msg(newspool);
291           break;
292         case MSG_EXPECTED:
293           spool_respond_general(newspool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
294           break;
295         default:
296           break;
297       }
298     }
299   }
300 
301   return NGX_OK;
302 }
303 
spool_fetch_msg_callback(ngx_int_t code,nchan_msg_t * msg,fetchmsg_data_t * data)304 static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetchmsg_data_t *data) {
305   nchan_msg_status_t    findmsg_status = code;
306   nchan_msg_status_t    prev_status;
307   subscriber_pool_t    *spool, *nuspool;
308   channel_spooler_t    *spl = data->spooler;
309   int                   free_msg_id = 1;
310 
311   assert(spl->fetching_strategy != NCHAN_SPOOL_PASSTHROUGH);
312 
313   if(spl && data == spl->fetchmsg_cb_data_list) {
314     spl->fetchmsg_cb_data_list = data->next;
315   }
316   if(data->next) {
317     data->next->prev = data->prev;
318   }
319   if(data->prev) {
320     data->prev->next = data->next;
321   }
322 
323   if(spl == NULL) { //channel already deleted
324     nchan_free_msg_id(&data->msgid);
325     ngx_free(data);
326     return NGX_OK;
327   }
328 
329   if(spl->handlers->get_message_finish) {
330     spl->handlers->get_message_finish(spl, spl->handlers_privdata);
331   }
332 
333   if((spool = find_spool(spl, &data->msgid)) == NULL) {
334     DBG("spool for msgid %V not found. discarding getmsg callback response.", msgid_to_str(&data->msgid));
335     nchan_free_msg_id(&data->msgid);
336     ngx_free(data);
337     return NGX_ERROR;
338   }
339 
340   prev_status = spool->msg_status;
341 
342   switch(findmsg_status) {
343     case MSG_FOUND:
344       spool->msg_status = findmsg_status;
345       DBG("fetchmsg callback for spool %p msg FOUND %p %V", spool, msg, msgid_to_str(&msg->id));
346       assert(msg != NULL);
347       spool->msg = msg;
348       spool_respond_general(spool, spool->msg, 0, NULL, 0);
349 
350       spool_nextmsg(spool, &msg->id);
351       break;
352 
353     case MSG_EXPECTED:
354       // ♫ It's gonna be the future soon ♫
355       if(spool->id.time == NCHAN_NTH_MSGID_TIME) {
356         //wait for message in the NEWEST_ID spool
357         spool_nextmsg(spool, &latest_msg_id);
358       }
359       else {
360         spool->msg_status = findmsg_status;
361         DBG("fetchmsg callback for spool %p msg EXPECTED", spool);
362         spool_respond_general(spool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
363         assert(msg == NULL);
364         spool->msg = NULL;
365       }
366       break;
367 
368     case MSG_CHANNEL_NOTREADY:
369       //just wait it out
370       spool->msg = NULL;
371       spool->msg_status = findmsg_status;
372       break;
373 
374     case MSG_NOTFOUND:
375       if(spl->fetching_strategy == NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND) {
376         spool->msg_status = prev_status;
377         break;
378       }
379       /*fallthrough*/
380     case MSG_EXPIRED:
381       //is this right?
382       //TODO: maybe message-expired notification
383       spool->msg_status = findmsg_status;
384       spool_respond_general(spool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0);
385       nuspool = get_spool(spool->spooler, &oldest_msg_id);
386       if(spool != nuspool) {
387         spool_transfer_subscribers(spool, nuspool, 1);
388         if(spool->reserved == 0) {
389           destroy_spool(spool);
390         }
391         spool_fetch_msg(nuspool);
392       }
393       else if(spool->id.tagcount == 1 && nchan_compare_msgids(&spool->id, &oldest_msg_id) == 0) {
394         // oldest msgid not found or expired. that means there are no messages in this channel,
395         // so move these subscribers over to the current_msg_spool
396         nuspool = get_spool(spool->spooler, &latest_msg_id);
397         assert(spool != nuspool);
398         spool_transfer_subscribers(spool, nuspool, 1);
399         if(spool->reserved == 0) {
400           destroy_spool(spool);
401         }
402       }
403       else if(spool == &spool->spooler->current_msg_spool) {
404         //sit there and wait, i guess
405         spool->msg_status = MSG_EXPECTED;
406       }
407       else {
408         ERR("Unexpected spool == nuspool during spool fetch_msg_callback. This is weird, please report this to the developers. findmsg_status: %i", findmsg_status);
409         assert(0);
410       }
411       break;
412 
413     case MSG_PENDING:
414       ERR("spool %p set status to MSG_PENDING", spool);
415       break;
416 
417     case MSG_ERROR:
418       spool_respond_general(spool, NULL, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, 0);
419       break;
420 
421     case MSG_INVALID:
422       assert(0); //should never happen
423       break;
424   }
425 
426   if(free_msg_id) {
427     nchan_free_msg_id(&data->msgid);
428   }
429   ngx_free(data);
430   return NGX_OK;
431 }
432 
spool_fetch_msg(subscriber_pool_t * spool)433 static ngx_int_t spool_fetch_msg(subscriber_pool_t *spool) {
434   fetchmsg_data_t        *data;
435   channel_spooler_t      *spl = spool->spooler;
436 
437 
438   //stack overflow protector
439   //ERR("spool->fetchmsg_prev_msec %i (ngx_timeofday())->msec %i spool->fetchmsg_current_count %i", spool->fetchmsg_prev_msec, (ngx_timeofday())->msec, spool->fetchmsg_current_count);
440   if(spool->fetchmsg_prev_msec == (ngx_timeofday())->msec) {
441     if(spool->fetchmsg_current_count > NCHAN_SPOOL_FETCHMSG_MAX_TIMES) {
442       ngx_add_timer(&spool->fetchmsg_ev, 0);
443       spool->fetchmsg_current_count = 0;
444       return NGX_DONE;
445     }
446     else {
447       spool->fetchmsg_current_count++;
448     }
449   }
450   else {
451     spool->fetchmsg_current_count = 0;
452     spool->fetchmsg_prev_msec = (ngx_timeofday())->msec;
453   }
454 
455   if(*spl->channel_status != READY || !*spl->channel_buffer_complete) {
456     //DBG("%p wanted to fetch msg %V, but channel %V not ready or buffer not complete", spool, msgid_to_str(&spool->id), spl->chid);
457     spool->msg_status = MSG_CHANNEL_NOTREADY;
458     //these will be fetch when channel is ready or when buffer is complete
459     return NGX_DECLINED;
460   }
461 
462   DBG("%p fetch msg %V for channel %V", spool, msgid_to_str(&spool->id), spl->chid);
463   data = ngx_alloc(sizeof(*data), ngx_cycle->log); //correctness over efficiency (at first).
464   //TODO: optimize this alloc away
465 
466   assert(data);
467 
468   data->next = spl->fetchmsg_cb_data_list;
469   if(data->next) {
470     data->next->prev = data;
471   }
472   spl->fetchmsg_cb_data_list = data;
473   data->prev = NULL;
474 
475   nchan_copy_new_msg_id(&data->msgid, &spool->id);
476   data->spooler = spool->spooler;
477 
478   assert(spool->msg == NULL);
479   assert(spool->msg_status == MSG_INVALID);
480   spool->msg_status = MSG_PENDING;
481   if(spl->handlers->get_message_start) {
482     spl->handlers->get_message_start(spl, spl->handlers_privdata);
483   }
484   switch(spl->fetching_strategy) {
485     case NCHAN_SPOOL_FETCH:
486     case NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND:
487       spool->spooler->store->get_message(spool->spooler->chid, &spool->id, spool->spooler->cf, (callback_pt )spool_fetch_msg_callback, data);
488       break;
489     case NCHAN_SPOOL_PASSTHROUGH:
490       spool_fetch_msg_callback(MSG_EXPECTED, NULL, data);
491       break;
492   }
493   return NGX_OK;
494 }
495 
spool_sub_dequeue_callback(subscriber_t * sub,void * data)496 static void spool_sub_dequeue_callback(subscriber_t *sub, void *data) {
497   spooled_subscriber_cleanup_t  *d = (spooled_subscriber_cleanup_t *)data;
498   subscriber_pool_t             *spool = d->spool;
499 
500   DBG("sub %p dequeue callback", sub);
501 
502   assert(sub == d->ssub->sub);
503   spool_remove_subscriber(spool, d->ssub);
504   spool_bubbleup_dequeue_handler(spool, sub, spool->spooler);
505 
506   if(sub->type != INTERNAL && spool->spooler->publish_events) {
507     nchan_maybe_send_channel_event_message(sub->request, SUB_DEQUEUE);
508   }
509 }
510 
spool_add_subscriber(subscriber_pool_t * self,subscriber_t * sub,uint8_t enqueue)511 static ngx_int_t spool_add_subscriber(subscriber_pool_t *self, subscriber_t *sub, uint8_t enqueue) {
512   spooled_subscriber_t       *ssub;
513   ngx_int_t                   rc;
514   ngx_int_t                   internal_sub = sub->type == INTERNAL;
515 
516   ssub = ngx_calloc(sizeof(*ssub), ngx_cycle->log);
517   //DBG("add sub %p to spool %p", sub, self);
518 
519   if(ssub == NULL) {
520     ERR("failed to allocate new sub for spool");
521     return NGX_ERROR;
522   }
523 
524   ssub->next = self->first;
525   ssub->prev = NULL;
526   if(self->first != NULL) {
527     self->first->prev = ssub;
528   }
529   self->first = ssub;
530   self->sub_count++;
531   if(!internal_sub) {
532     self->non_internal_sub_count++;
533   }
534   ssub->dequeue_callback_data.ssub = ssub;
535   ssub->dequeue_callback_data.spool = self;
536 
537   if(enqueue) {
538     if((rc = sub->fn->enqueue(sub)) != NGX_OK) {
539       //enqueue failed. undo everything and get out!
540       self->sub_count --;
541       self->first = ssub->next;
542       if(self->first) {
543         assert(self->first->prev == ssub);
544         self->first->prev = NULL;
545       }
546       if(!internal_sub) {
547         self->non_internal_sub_count--;
548       }
549       ngx_free(ssub);
550       return rc;
551     }
552     else if(sub->type != INTERNAL && self->spooler->publish_events) {
553       nchan_maybe_send_channel_event_message(sub->request, SUB_ENQUEUE);
554     }
555   }
556 
557   sub->fn->set_dequeue_callback(sub, spool_sub_dequeue_callback, &ssub->dequeue_callback_data);
558   ssub->sub = sub;
559 
560   return NGX_OK;
561 }
562 
spool_remove_subscriber(subscriber_pool_t * self,spooled_subscriber_t * ssub)563 static ngx_int_t spool_remove_subscriber(subscriber_pool_t *self, spooled_subscriber_t *ssub) {
564   assert(ssub->next != ssub);
565   assert(ssub->prev != ssub);
566   spooled_subscriber_t   *prev, *next;
567   prev = ssub->prev;
568   next = ssub->next;
569   if(next) {
570     next->prev = prev;
571   }
572   if(prev) {
573     prev->next = next;
574   }
575   if(self->first == ssub) {
576     self->first = next;
577   }
578 
579   if(ssub->sub->type != INTERNAL) {
580     self->non_internal_sub_count--;
581   }
582 
583   ngx_free(ssub);
584 
585   assert(self->sub_count > 0);
586   self->sub_count--;
587   return NGX_OK;
588 }
589 
spool_respond_general(subscriber_pool_t * self,nchan_msg_t * msg,ngx_int_t code,void * code_data,unsigned notice)590 static ngx_int_t spool_respond_general(subscriber_pool_t *self, nchan_msg_t *msg, ngx_int_t code, void *code_data, unsigned notice) {
591   ngx_uint_t                  numsubs[SUBSCRIBER_TYPES];
592   spooled_subscriber_t       *nsub, *nnext;
593   subscriber_t               *sub;
594 
595   //channel_spooler_t          *spl = self->spooler;
596   //validate_spooler(spl, "before respond_general");
597   //nchan_msg_id_t             unid;
598   //nchan_msg_id_t             unprevid;
599   //int8_t                     i, max;
600 
601   ngx_memzero(numsubs, sizeof(numsubs));
602   //self->generation++;
603 
604   DBG("spool %p (%V) (subs: %i) respond with msg %p or code %i", self, msgid_to_str(&self->id), self->sub_count, msg, code);
605   if(msg) {
606     DBG("msgid: %V", msgid_to_str(&msg->id));
607     DBG("prev: %V", msgid_to_str(&msg->prev_id));
608   }
609 
610   /*
611   if(msg && msg->prev_id.time > 0 && msg->id.tagcount > 1) {
612     assert(msg->shared == 0);
613     max = msg->id.tagcount;
614     for(i=0; i< max; i++) {
615       unid.tag[i] =     msg->id.tag[i];
616       unprevid.tag[i] = msg->prev_id.tag[i];
617       if(unid.tag[i] == -1)     msg->id.tag[i]   =    self->id.tag[i];
618       if(unprevid.tag[i] == -1) msg->prev_id.tag[i] = self->id.tag[i];
619     }
620   }
621   */
622 
623   //uint8_t publish_events = self->spooler->publish_events;
624 
625   for(nsub = self->first; nsub != NULL; nsub = nnext) {
626     sub = nsub->sub;
627     nnext = nsub->next;
628 
629     if(msg) {
630       //self->responded_count++;
631       sub->fn->respond_message(sub, msg);
632     }
633     else if(!notice) {
634       //self->responded_count++;
635       sub->fn->respond_status(sub, code, code_data, NULL);
636     }
637     else {
638       sub->fn->notify(sub, code, code_data);
639     }
640   }
641 
642   //if(!notice && code != NGX_HTTP_NO_CONTENT) self->responded_count++;
643   //assert(validate_spooler(spl, "after respond_general"));
644   return NGX_OK;
645 }
646 
647 /////////// SPOOLER - container of several spools //////////
648 
create_spooler()649 channel_spooler_t *create_spooler() {
650   channel_spooler_t  *spooler;
651   if((spooler = ngx_alloc(sizeof(*spooler), ngx_cycle->log))==NULL) {
652     ERR("Can't allocate spooler");
653     return NULL;
654   }
655   return spooler;
656 }
657 
spool_bubbleup_dequeue_handler(subscriber_pool_t * spool,subscriber_t * sub,channel_spooler_t * spl)658 static void spool_bubbleup_dequeue_handler(subscriber_pool_t *spool, subscriber_t *sub, channel_spooler_t *spl) {
659   //bubble on up, yeah
660   channel_spooler_handlers_t *h = spl->handlers;
661   if(h->dequeue) {
662     h->dequeue(spl, sub, spl->handlers_privdata);
663   }
664   else if (h->bulk_dequeue){
665     h->bulk_dequeue(spl, sub->type, 1, spl->handlers_privdata);
666   }
667   else {
668     ERR("Neither dequeue_handler not bulk_dequeue_handler present in spooler for spool sub dequeue");
669   }
670 }
671 
672 /*
673 static void spool_bubbleup_bulk_dequeue_handler(subscriber_pool_t *spool, subscriber_type_t type, ngx_int_t count, channel_spooler_t *spl) {
674   //bubble on up, yeah
675   if(spl->handlers->bulk_dequeue) {
676     spl->handlers->bulk_dequeue(spl, type, count, spl->handlers_privdata);
677   }
678 }
679 */
680 
spooler_add_subscriber(channel_spooler_t * self,subscriber_t * sub)681 static ngx_int_t spooler_add_subscriber(channel_spooler_t *self, subscriber_t *sub) {
682   nchan_msg_id_t          *msgid = &sub->last_msgid;
683   subscriber_pool_t       *spool;
684   subscriber_type_t        subtype;
685   ngx_int_t                rc;
686 
687   if(self->want_to_stop) {
688     ERR("Not accepting new subscribers right now. want to stop.");
689     return NGX_ERROR;
690   }
691 
692   //validate_spooler(self, "before add_subscriber");
693 
694   spool = get_spool(self, msgid);
695 
696   if(self->fetching_strategy == NCHAN_SPOOL_PASSTHROUGH) {
697     assert(spool->id.time == NCHAN_NEWEST_MSGID_TIME);
698   }
699 
700   if(spool == NULL) {
701     return NGX_ERROR;
702   }
703 
704   subtype = sub->type;
705 
706   if((rc = spool_add_subscriber(spool, sub, 1)) != NGX_OK) {
707     DBG("couldn't add subscriber to spool %p", spool);
708     return rc;
709   }
710   self->handlers->add(self, sub, self->handlers_privdata);
711 
712   switch(spool->msg_status) {
713     case MSG_FOUND:
714       assert(spool->msg);
715       spool_respond_general(spool, spool->msg, 0, NULL, 0);
716       break;
717 
718     case MSG_INVALID:
719       assert(spool->msg == NULL);
720       spool_fetch_msg(spool);
721       break;
722 
723     case MSG_CHANNEL_NOTREADY:
724     case MSG_PENDING:
725       //nothing to do
726       break;
727 
728     case MSG_EXPECTED:
729       //notify subscriber
730       sub->fn->respond_status(sub, NGX_HTTP_NO_CONTENT, NULL, NULL);
731       break;
732 
733     case MSG_EXPIRED:
734     case MSG_NOTFOUND:
735     case MSG_ERROR:
736       //shouldn't happen
737 
738       assert(0);
739   }
740 
741   if(self->handlers->use != NULL && subtype != INTERNAL) {
742     self->handlers->use(self, self->handlers_privdata);
743   }
744 
745   //validate_spooler(self, "after add_subscriber");
746 
747   return NGX_OK;
748 }
749 
750 
spool_transfer_subscribers(subscriber_pool_t * spool,subscriber_pool_t * newspool,uint8_t update_subscriber_last_msgid)751 static ngx_int_t spool_transfer_subscribers(subscriber_pool_t *spool, subscriber_pool_t *newspool, uint8_t update_subscriber_last_msgid) {
752   ngx_int_t               count = 0;
753   subscriber_t           *sub;
754   spooled_subscriber_t   *cur;
755   channel_spooler_t      *spl = spool->spooler;
756 
757   assert(spl == newspool->spooler);
758 
759   if(spool == NULL || newspool == NULL) {
760     ERR("failed to transfer spool subscribers");
761     return 0;
762   }
763   for(cur = spool->first; cur != NULL; cur = spool->first) {
764     sub = cur->sub;
765     spool_remove_subscriber(spool, cur);
766     if(update_subscriber_last_msgid) {
767       sub->last_msgid=newspool->id;
768     }
769     if(spool_add_subscriber(newspool, sub, 0) == NGX_OK) {
770       count++;
771     }
772   }
773 
774   return count;
775 }
776 
777 typedef struct spool_collect_overflow_s spool_collect_overflow_t;
778 struct spool_collect_overflow_s {
779   subscriber_pool_t                *spool;
780   struct spool_collect_overflow_s  *next;
781 };// spool_collect_overflow_t;
782 
783 #define SPOOLER_RESPOND_SPOOLARRAY_SIZE 32
784 
785 typedef struct {
786   nchan_msg_id_t             min;
787   nchan_msg_id_t             max;
788   uint8_t                    multi;
789   ngx_int_t                  n;
790   nchan_msg_t               *msg;
791   subscriber_pool_t         *spools[SPOOLER_RESPOND_SPOOLARRAY_SIZE];
792   spool_collect_overflow_t  *overflow;
793 } spooler_respond_data_t;
794 
795 
compare_msgid_onetag_range(nchan_msg_id_t * min,nchan_msg_id_t * max,nchan_msg_id_t * id)796 static rbtree_walk_direction_t compare_msgid_onetag_range(nchan_msg_id_t *min, nchan_msg_id_t *max, nchan_msg_id_t *id) {
797 
798   assert(min->tagcount == max->tagcount);
799   assert(max->tagcount == id->tagcount);
800   assert(id->tagcount == 1);
801 
802   if(min->time < id->time || (min->time == id->time && min->tag.fixed[0] <= id->tag.fixed[0])) {
803     if(max->time > id->time || (max->time == id->time && max->tag.fixed[0] > id->tag.fixed[0])) {
804       //inrange
805       return RBTREE_WALK_LEFT_RIGHT;
806     }
807     else {
808       //too large
809       return RBTREE_WALK_LEFT;
810     }
811   }
812   else {
813     //too small
814     return RBTREE_WALK_RIGHT;
815   }
816 }
817 
compare_msgid_time(nchan_msg_id_t * min,nchan_msg_id_t * max,nchan_msg_id_t * cur)818 static int8_t compare_msgid_time(nchan_msg_id_t *min, nchan_msg_id_t *max, nchan_msg_id_t *cur) {
819   if(min->time <= cur->time) {
820     if(max->time >= cur->time) {
821       return 0;
822     }
823     else {
824       return 1;
825     }
826   }
827   else {
828     return -1;
829   }
830 }
831 
832 
spoolcollector_addspool(spooler_respond_data_t * data,subscriber_pool_t * spool)833 static void spoolcollector_addspool(spooler_respond_data_t *data, subscriber_pool_t *spool) {
834   spool_collect_overflow_t  *overflow;
835   if(data->n < SPOOLER_RESPOND_SPOOLARRAY_SIZE) {
836     data->spools[data->n] = spool;
837   }
838   else {
839     if((overflow = ngx_alloc(sizeof(*overflow), ngx_cycle->log)) == NULL) {
840       ERR("can't allocate spoolcollector overflow");
841       return;
842     }
843     overflow->next = data->overflow;
844     overflow->spool = spool;
845     data->overflow = overflow;
846   }
847   data->n++;
848 }
849 
spoolcollector_unwind_nextspool(spooler_respond_data_t * data)850 static subscriber_pool_t *spoolcollector_unwind_nextspool(spooler_respond_data_t *data) {
851   spool_collect_overflow_t  *overflow;
852   subscriber_pool_t         *spool;
853   if(data->n > SPOOLER_RESPOND_SPOOLARRAY_SIZE) {
854     overflow = data->overflow;
855     spool = overflow->spool;
856     data->overflow = overflow->next;
857     ngx_free(overflow);
858     data->n--;
859     return spool;
860   }
861   else if(data->n > 0) {
862     return data->spools[--data->n];
863   }
864   else {
865     return NULL;
866   }
867 }
868 
869 
collect_spool_range_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,spooler_respond_data_t * data)870 static rbtree_walk_direction_t collect_spool_range_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, spooler_respond_data_t *data) {
871   rbtree_walk_direction_t  dir;
872   uint8_t multi_count = data->multi;
873 
874   if(multi_count <= 1) {
875     dir = compare_msgid_onetag_range(&data->min, &data->max, &spool->id);
876     if(dir == RBTREE_WALK_LEFT_RIGHT) {
877       spoolcollector_addspool(data, spool);
878     }
879     return dir;
880   }
881   else {
882     int tc = compare_msgid_time(&data->min, &data->max, &spool->id);
883     if(tc < 0) {
884       return RBTREE_WALK_RIGHT;
885     }
886     else if(tc > 0) {
887       return RBTREE_WALK_LEFT;
888     }
889     else {
890       time_t      timmin = data->min.time, timmax = data->max.time, timcur = spool->id.time;
891 
892       int         max_cmp = -1, min_cmp = -1;
893 
894       if( timcur > timmin && timcur < timmax) {
895         spoolcollector_addspool(data, spool);
896       }
897       else if(timcur == timmax && timcur == timmin) {
898         if( nchan_compare_msgid_tags(&spool->id, &data->max) < 0
899          && nchan_compare_msgid_tags(&spool->id, &data->min) >= 0 )
900         {
901           spoolcollector_addspool(data, spool);
902         }
903       }
904       else if((timcur == timmax && (max_cmp = nchan_compare_msgid_tags(&spool->id, &data->max)) < 0)
905            || (timcur == timmin && (min_cmp = nchan_compare_msgid_tags(&spool->id, &data->min)) >= 0))
906       {
907         spoolcollector_addspool(data, spool);
908       }
909       else if( timcur > timmin && timcur < timmax) {
910         spoolcollector_addspool(data, spool);
911       }
912       /*
913       else {
914         ERR("time_min: %i, time_cur: %i, time_max: %i", timmin, timcur, timmax);
915       }
916       */
917 
918       return RBTREE_WALK_LEFT_RIGHT;
919     }
920   }
921 }
922 
spooler_respond_status(channel_spooler_t * self,nchan_msg_id_t * id,ngx_int_t status_code,ngx_str_t * status_line)923 static ngx_int_t spooler_respond_status(channel_spooler_t *self, nchan_msg_id_t *id, ngx_int_t status_code, ngx_str_t *status_line) {
924   subscriber_pool_t         *spool = find_spool(self, id);
925   //validate_spooler(self, "before respond_status");
926   if(spool) {
927     if(status_code == NGX_HTTP_NO_CONTENT) {
928       spool->msg_status = MSG_EXPECTED;
929     }
930     spool_respond_general(spool, NULL, status_code, status_line, 0);
931     destroy_spool(spool);
932   }
933   //validate_spooler(self, "after respond_status");
934   return NGX_OK;
935 }
936 
spooler_respond_message(channel_spooler_t * self,nchan_msg_t * msg)937 static ngx_int_t spooler_respond_message(channel_spooler_t *self, nchan_msg_t *msg) {
938   spooler_respond_data_t     srdata;
939   subscriber_pool_t         *spool;
940   ngx_int_t                  responded_subs = 0;
941 
942 
943   if(self->fetching_strategy != NCHAN_SPOOL_PASSTHROUGH) {
944     srdata.min = msg->prev_id;
945     srdata.max = msg->id;
946     srdata.multi = msg->id.tagcount;
947     srdata.overflow = NULL;
948     srdata.msg = msg;
949     srdata.n = 0;
950 
951     //spooler_print_contents(self);
952 
953     //find all spools between msg->prev_id and msg->id
954     rbtree_conditional_walk(&self->spoolseed, (rbtree_walk_conditional_callback_pt )collect_spool_range_callback, &srdata);
955     /*
956     if(srdata.n == 0) {
957       DBG("no spools in range %V -- ", msgid_to_str(&msg->prev_id));
958       DBG(" -- %V", msgid_to_str(&msg->id));
959     }
960     */
961     while((spool = spoolcollector_unwind_nextspool(&srdata)) != NULL) {
962       responded_subs += spool->sub_count;
963       if(msg->id.tagcount > NCHAN_FIXED_MULTITAG_MAX) {
964         assert(spool->id.tag.allocd != msg->id.tag.allocd);
965       }
966       spool_respond_general(spool, msg, 0, NULL, 0);
967       if(msg->id.tagcount > NCHAN_FIXED_MULTITAG_MAX) {
968         assert(spool->id.tag.allocd != msg->id.tag.allocd);
969       }
970       spool_nextmsg(spool, &msg->id);
971     }
972   }
973   spool = get_spool(self, &latest_msg_id);
974   if(spool->sub_count > 0 && *self->channel_buffer_complete) {
975     spool_respond_general(spool, msg, 0, NULL, 0);
976     spool_nextmsg(spool, &msg->id);
977   }
978 
979   nchan_copy_msg_id(&self->prev_msg_id, &msg->id, NULL);
980 
981   return NGX_OK;
982 }
983 
984 typedef struct {
985   channel_spooler_t *spl;
986   nchan_msg_t       *msg;
987   ngx_int_t          code;
988   void              *code_data;
989   unsigned           notice:1;
990 } spooler_respond_generic_data_t;
991 
spooler_respond_rbtree_node_spool(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)992 static ngx_int_t spooler_respond_rbtree_node_spool(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
993   spooler_respond_generic_data_t  *d = data;
994 
995   return spool_respond_general(spool, d->msg, d->code, d->code_data, d->notice);
996 }
997 
spooler_respond_generic(channel_spooler_t * self,nchan_msg_t * msg,ngx_int_t code,void * code_data,unsigned notice)998 static ngx_int_t spooler_respond_generic(channel_spooler_t *self, nchan_msg_t *msg, ngx_int_t code, void *code_data, unsigned notice) {
999   spooler_respond_generic_data_t  data = {self, msg, code, code_data, notice};
1000   rbtree_walk(&self->spoolseed, (rbtree_walk_callback_pt )spooler_respond_rbtree_node_spool, &data);
1001   spool_respond_general(&self->current_msg_spool, data.msg, data.code, data.code_data, notice);
1002   return NGX_OK;
1003 }
1004 
spooler_broadcast_status(channel_spooler_t * self,ngx_int_t code,const ngx_str_t * line)1005 static ngx_int_t spooler_broadcast_status(channel_spooler_t *self, ngx_int_t code, const ngx_str_t *line) {
1006   return spooler_respond_generic(self, NULL, code, (void *)line, 0);
1007 }
1008 
spooler_broadcast_notice(channel_spooler_t * self,ngx_int_t code,void * data)1009 static ngx_int_t spooler_broadcast_notice(channel_spooler_t *self, ngx_int_t code, void *data) {
1010   return spooler_respond_generic(self, NULL, code, data, 1);
1011 }
1012 
spooler_spool_dequeue_all(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1013 static ngx_int_t spooler_spool_dequeue_all(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1014   spooled_subscriber_t *cur;
1015 
1016   for(cur = spool->first; cur != NULL; cur = cur->next) {
1017     cur->sub->dequeue_after_response = 1;
1018   }
1019 
1020   return NGX_OK;
1021 }
1022 
spooler_prepare_to_stop(channel_spooler_t * spl)1023 static ngx_int_t spooler_prepare_to_stop(channel_spooler_t *spl) {
1024   rbtree_walk(&spl->spoolseed, (rbtree_walk_callback_pt )spooler_spool_dequeue_all, (void *)spl);
1025   spl->want_to_stop = 1;
1026   return NGX_OK;
1027 }
1028 
1029 
1030 
spool_rbtree_node_id(void * data)1031 static void *spool_rbtree_node_id(void *data) {
1032   return &((subscriber_pool_t *)data)->id;
1033 }
1034 
spool_rbtree_bucketer(void * vid)1035 static uint32_t spool_rbtree_bucketer(void *vid) {
1036   nchan_msg_id_t   *id = (nchan_msg_id_t *)vid;
1037   return (uint32_t )id->time;
1038 }
1039 
spool_rbtree_compare(void * v1,void * v2)1040 static ngx_int_t spool_rbtree_compare(void *v1, void *v2) {
1041   nchan_msg_id_t   *id1 = (nchan_msg_id_t *)v1;
1042   nchan_msg_id_t   *id2 = (nchan_msg_id_t *)v2;
1043   time_t            t1 = id1->time;
1044   time_t            t2 = id2->time;
1045   ngx_int_t         tag1;
1046   ngx_int_t         tag2;
1047 
1048   if(t1 > t2) {
1049     return 1;
1050   }
1051   else if (t1 < t2) {
1052     return -1;
1053   }
1054   else {
1055     uint16_t   i, max1 = id1->tagcount, max2 = id2->tagcount;
1056     uint16_t   max = max1 > max2 ? max1 : max2;
1057     int16_t   *tags1, *tags2;
1058 
1059     tags1 = max1 <= NCHAN_FIXED_MULTITAG_MAX ? id1->tag.fixed : id1->tag.allocd;
1060     tags2 = max2 <= NCHAN_FIXED_MULTITAG_MAX ? id2->tag.fixed : id2->tag.allocd;
1061 
1062     for(i=0; i < max; i++) {
1063       tag1 = i < max1 ? tags1[i] : -1;
1064       tag2 = i < max2 ? tags2[i] : -1;
1065       if(tag1 > tag2) {
1066         return 1;
1067       }
1068       else if(tag1 < tag2) {
1069         return -1;
1070       }
1071     }
1072     return 0;
1073   }
1074 }
1075 
its_time_for_a_spooling_filter(void * data)1076 static int its_time_for_a_spooling_filter(void *data) {
1077   subscriber_pool_t *spool = data;
1078   if(spool->msg_status == MSG_CHANNEL_NOTREADY || spool->msg_status == MSG_INVALID) {
1079     spool_reserve(spool);
1080     return 1;
1081   }
1082   else {
1083     return 0;
1084   }
1085 }
1086 
its_time_for_a_spooling(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1087 static ngx_int_t its_time_for_a_spooling(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1088   ngx_int_t       rc = NGX_OK;
1089   //validate_spool(spool);
1090   if(spool->msg_status == MSG_CHANNEL_NOTREADY || spool->msg_status == MSG_INVALID) {
1091     //TODO: maybe don't fetch spool with zero subs?
1092     spool->msg_status = MSG_INVALID;
1093     rc = spool_fetch_msg(spool);
1094     assert(rc == NGX_OK || rc == NGX_DONE || NGX_DECLINED);
1095   }
1096   spool_release(spool);
1097   return rc;
1098 }
1099 
spooler_channel_status_changed(channel_spooler_t * self)1100 static ngx_int_t spooler_channel_status_changed(channel_spooler_t *self) {
1101   switch(*self->channel_status) {
1102     case READY:
1103       rbtree_walk_writesafe(&self->spoolseed, its_time_for_a_spooling_filter, (rbtree_walk_callback_pt )its_time_for_a_spooling, NULL);
1104       break;
1105 
1106     default:
1107       //do nothing
1108       break;
1109   };
1110   return NGX_OK;
1111 }
1112 
1113 
spooler_print_contents_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,channel_spooler_t * spl)1114 static ngx_int_t spooler_print_contents_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, channel_spooler_t *spl) {
1115   spooled_subscriber_t       *cur;
1116 
1117   ERR("  spool %p id %V", spool, msgid_to_str(&spool->id));
1118   for(cur = spool->first; cur != NULL; cur = cur->next) {
1119     ERR("    %V", cur->sub->name);
1120   }
1121 
1122   return NGX_OK;
1123 }
1124 
spooler_print_contents(channel_spooler_t * spl)1125 ngx_int_t spooler_print_contents(channel_spooler_t *spl) {
1126   ERR("spooler for channel %V", spl->chid);
1127   spooler_print_contents_callback(NULL, &spl->current_msg_spool, spl);
1128   rbtree_walk_incr(&spl->spoolseed, (rbtree_walk_callback_pt )spooler_print_contents_callback, spl);
1129   return NGX_OK;
1130 }
1131 
1132 
1133 
spooler_catch_up_filter(void * data)1134 static int spooler_catch_up_filter(void *data) {
1135   nchan_msg_status_t  status =  ((subscriber_pool_t *)data)->msg_status;
1136   return status == MSG_EXPECTED || status == MSG_PENDING;
1137 }
1138 
spooler_catch_up_callback(rbtree_seed_t * seed,subscriber_pool_t * spool,void * data)1139 static ngx_int_t spooler_catch_up_callback(rbtree_seed_t *seed, subscriber_pool_t *spool, void *data) {
1140   spool->msg_status = MSG_INVALID;
1141   spool_fetch_msg(spool);
1142   return NGX_OK;
1143 }
1144 
spooler_catch_up(channel_spooler_t * spl)1145 ngx_int_t spooler_catch_up(channel_spooler_t *spl) {
1146   rbtree_walk_writesafe(&spl->spoolseed, spooler_catch_up_filter, (rbtree_walk_callback_pt )spooler_catch_up_callback, NULL);
1147   return NGX_OK;
1148 }
1149 
1150 
1151 static channel_spooler_fn_t  spooler_fn = {
1152   spooler_add_subscriber,
1153   spooler_channel_status_changed,
1154   spooler_respond_message,
1155   spooler_respond_status,
1156   spooler_broadcast_status,
1157   spooler_broadcast_notice,
1158   spooler_prepare_to_stop
1159 };
1160 
start_spooler(channel_spooler_t * spl,ngx_str_t * chid,chanhead_pubsub_status_t * channel_status,uint8_t * channel_buffer_complete,nchan_store_t * store,nchan_loc_conf_t * cf,spooler_fetching_strategy_t fetching_strategy,channel_spooler_handlers_t * handlers,void * handlers_privdata)1161 channel_spooler_t *start_spooler(channel_spooler_t *spl, ngx_str_t *chid, chanhead_pubsub_status_t *channel_status, uint8_t *channel_buffer_complete, nchan_store_t *store, nchan_loc_conf_t *cf, spooler_fetching_strategy_t fetching_strategy, channel_spooler_handlers_t *handlers, void *handlers_privdata) {
1162   if(!spl->running) {
1163     ngx_memzero(spl, sizeof(*spl));
1164     rbtree_init(&spl->spoolseed, "spooler msg_id tree", spool_rbtree_node_id, spool_rbtree_bucketer, spool_rbtree_compare);
1165 
1166     spl->fn=&spooler_fn;
1167     //spl->prev_msg_id.time=0;
1168     //spl->prev_msg_id.tag=0;
1169 
1170     DBG("start SPOOLER %p", *spl);
1171 
1172     spl->chid = chid;
1173     spl->store = store;
1174 
1175     spl->channel_status = channel_status;
1176     spl->channel_buffer_complete = channel_buffer_complete;
1177 
1178     spl->running = 1;
1179     //spl->want_to_stop = 0;
1180     spl->publish_events = 1;
1181     spl->fetching_strategy = fetching_strategy;
1182 
1183     init_spool(spl, &spl->current_msg_spool, &latest_msg_id);
1184     spl->current_msg_spool.msg_status = MSG_EXPECTED;
1185 
1186     spl->handlers = handlers;
1187     spl->handlers_privdata = handlers_privdata;
1188 
1189     spl->cf = cf;
1190 
1191     return spl;
1192   }
1193   else {
1194     ERR("looks like spooler is already running. make sure spooler->running=0 before starting.");
1195     assert(0);
1196     return NULL;
1197   }
1198 }
1199 
remove_spool(subscriber_pool_t * spool)1200 static ngx_int_t remove_spool(subscriber_pool_t *spool) {
1201   channel_spooler_t    *spl = spool->spooler;
1202   ngx_rbtree_node_t    *node = rbtree_node_from_data(spool);
1203 
1204   assert(spool->reserved == 0);
1205   DBG("remove spool node %p", node);
1206 
1207   assert(spool->spooler->running);
1208 
1209   if(spool->fetchmsg_ev.timer_set) {
1210     ngx_del_timer(&spool->fetchmsg_ev);
1211   }
1212 
1213   nchan_free_msg_id(&spool->id);
1214   rbtree_remove_node(&spl->spoolseed, rbtree_node_from_data(spool));
1215 
1216   //assert((node = rbtree_find_node(&spl->spoolseed, &spool->id)) == NULL);
1217   //double-check that it's gone
1218 
1219   return NGX_OK;
1220 }
1221 
destroy_spool(subscriber_pool_t * spool)1222 static ngx_int_t destroy_spool(subscriber_pool_t *spool) {
1223   rbtree_seed_t         *seed = &spool->spooler->spoolseed;
1224   spooled_subscriber_t  *ssub, *ssub_next;
1225   subscriber_t          *sub;
1226   ngx_rbtree_node_t     *node = rbtree_node_from_data(spool);
1227 
1228   remove_spool(spool);
1229 
1230   DBG("destroy spool node %p", node);
1231 
1232   for(ssub = spool->first; ssub!=NULL; ssub = ssub_next) {
1233     sub = ssub->sub;
1234     ssub_next = ssub->next;
1235     //DBG("dequeue sub %p in spool %p", sub, spool);
1236     sub->fn->dequeue(sub);
1237   }
1238   assert(spool->sub_count == 0);
1239   assert(spool->first == NULL);
1240 
1241   //ngx_memset(spool, 0x42, sizeof(*spool)); //debug
1242 
1243   rbtree_destroy_node(seed, node);
1244   return NGX_OK;
1245 }
1246 
stop_spooler(channel_spooler_t * spl,uint8_t dequeue_subscribers)1247 ngx_int_t stop_spooler(channel_spooler_t *spl, uint8_t dequeue_subscribers) {
1248   ngx_rbtree_node_t    *cur, *sentinel;
1249   spooler_event_ll_t   *ecur, *ecur_next;
1250   subscriber_pool_t    *spool;
1251   rbtree_seed_t        *seed = &spl->spoolseed;
1252   ngx_rbtree_t         *tree = &seed->tree;
1253   ngx_int_t             n=0;
1254   sentinel = tree->sentinel;
1255 
1256   fetchmsg_data_t      *dcur;
1257 #if NCHAN_RBTREE_DBG
1258   ngx_int_t active_before = seed->active_nodes, allocd_before = seed->active_nodes;
1259 #endif
1260   if(spl->running) {
1261 
1262     for(ecur = spl->spooler_dependent_events; ecur != NULL; ecur = ecur_next) {
1263       ecur_next = ecur->next;
1264       if(ecur->cancel) {
1265         ecur->cancel(ecur->ev.data);
1266       }
1267       ngx_event_del_timer(&ecur->ev);
1268 
1269       ngx_free(ecur);
1270     }
1271 
1272     for(cur = tree->root; cur != NULL && cur != sentinel; cur = tree->root) {
1273       spool = (subscriber_pool_t *)rbtree_data_from_node(cur);
1274       if(dequeue_subscribers) {
1275         destroy_spool(spool);
1276       }
1277       else {
1278         remove_spool(spool);
1279         rbtree_destroy_node(seed, cur);
1280       }
1281       n++;
1282     }
1283 
1284     for(dcur = spl->fetchmsg_cb_data_list; dcur != NULL; dcur = dcur->next) {
1285       dcur->spooler = NULL;
1286     }
1287 
1288     DBG("stopped %i spools in SPOOLER %p", n, *spl);
1289   }
1290   else {
1291     DBG("SPOOLER %p not running", *spl);
1292   }
1293 #if NCHAN_RBTREE_DBG
1294   assert(active_before - n == 0);
1295   assert(allocd_before - n == 0);
1296   assert(seed->active_nodes == 0);
1297   assert(seed->allocd_nodes == 0);
1298 #endif
1299   nchan_free_msg_id(&spl->prev_msg_id);
1300   spl->running = 0;
1301   return NGX_OK;
1302 }
1303