1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 //#define DEBUG_LEVEL NGX_LOG_WARN
4 #define DEBUG_LEVEL NGX_LOG_DEBUG
5 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:LONGPOLL:" fmt, ##arg)
6 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:LONGPOLL:" fmt, ##arg)
7 #include <assert.h>
8 #include "longpoll-private.h"
9 
10 #include <store/memory/store.h>
11 
12 void memstore_fakeprocess_push(ngx_int_t slot);
13 void memstore_fakeprocess_push_random(void);
14 void memstore_fakeprocess_pop(void);
15 ngx_int_t memstore_slot(void);
16 
17 static const subscriber_t new_longpoll_sub;
18 
empty_handler()19 static void empty_handler() { }
20 
sudden_abort_handler(subscriber_t * sub)21 static void sudden_abort_handler(subscriber_t *sub) {
22   if(sub->request && sub->status != DEAD) {
23     sub->request->headers_out.status = NGX_HTTP_CLIENT_CLOSED_REQUEST;
24   }
25 #if FAKESHARD
26   full_subscriber_t  *fsub = (full_subscriber_t  *)sub;
27   memstore_fakeprocess_push(fsub->sub.owner);
28 #endif
29   sub->status = DEAD;
30   sub->fn->dequeue(sub);
31 #if FAKESHARD
32   memstore_fakeprocess_pop();
33 #endif
34 }
35 
36 //void verify_unique_response(ngx_str_t *uri, nchan_msg_id_t *msgid, nchan_msg_t *msg, subscriber_t *sub);
37 
longpoll_subscriber_create(ngx_http_request_t * r,nchan_msg_id_t * msg_id)38 subscriber_t *longpoll_subscriber_create(ngx_http_request_t *r, nchan_msg_id_t *msg_id) {
39   DBG("create for req %p", r);
40   full_subscriber_t      *fsub;
41 
42   //TODO: allocate from pool (but not the request's pool)
43   if((fsub = ngx_alloc(sizeof(*fsub), ngx_cycle->log)) == NULL) {
44     ERR("Unable to allocate");
45     assert(0);
46     return NULL;
47   }
48 
49   nchan_subscriber_init(&fsub->sub, &new_longpoll_sub, r, msg_id);
50   fsub->privdata = NULL;
51   fsub->data.cln = NULL;
52   fsub->data.finalize_request = 1;
53   fsub->data.holding = 0;
54   fsub->data.act_as_intervalpoll = 0;
55 
56   nchan_subscriber_init_timeout_timer(&fsub->sub, &fsub->data.timeout_ev);
57 
58   fsub->data.enqueue_callback = empty_handler;
59   fsub->data.enqueue_callback_data = NULL;
60 
61   fsub->data.dequeue_callback = empty_handler;
62   fsub->data.dequeue_callback_data = NULL;
63 
64   fsub->data.already_responded = 0;
65   fsub->data.awaiting_destruction = 0;
66 
67   if(fsub->sub.cf->longpoll_multimsg) {
68     nchan_request_ctx_t  *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
69     fsub->sub.dequeue_after_response = 0;
70     ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
71     nchan_bufchain_pool_init(ctx->bcp, r->pool);
72 
73   }
74 
75   fsub->data.multimsg_first = NULL;
76   fsub->data.multimsg_last = NULL;
77 
78 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
79   subscriber_debug_add(&fsub->sub);
80 #endif
81 
82   //http request sudden close cleanup
83   if((fsub->data.cln = ngx_http_cleanup_add(r, 0)) == NULL) {
84     ERR("Unable to add request cleanup for longpoll subscriber");
85     assert(0);
86     return NULL;
87   }
88   fsub->data.cln->data = fsub;
89   fsub->data.cln->handler = (ngx_http_cleanup_pt )sudden_abort_handler;
90   DBG("%p created for request %p", &fsub->sub, r);
91 
92 
93   return &fsub->sub;
94 }
95 
longpoll_subscriber_destroy(subscriber_t * sub)96 ngx_int_t longpoll_subscriber_destroy(subscriber_t *sub) {
97   full_subscriber_t   *fsub = (full_subscriber_t  *)sub;
98 
99   if(sub->reserved > 0) {
100     DBG("%p not ready to destroy (reserved for %i) for req %p", sub, sub->reserved, fsub->sub.request);
101     fsub->data.awaiting_destruction = 1;
102   }
103   else {
104     DBG("%p destroy for req %p", sub, fsub->sub.request);
105     nchan_free_msg_id(&fsub->sub.last_msgid);
106     assert(sub->status == DEAD);
107     nchan_subscriber_subrequest_cleanup(sub);
108 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
109     subscriber_debug_remove(sub);
110     ngx_memset(fsub, 0xB9, sizeof(*fsub)); //debug
111 #endif
112     ngx_free(fsub);
113   }
114   return NGX_OK;
115 }
116 
finalize_request_handler(ngx_http_request_t * r)117 static void finalize_request_handler(ngx_http_request_t *r) {
118   nchan_request_ctx_t  *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
119   if(ctx->sub == NULL) {
120     ngx_http_test_reading(r);
121     return;
122   }
123   subscriber_t *sub = ctx->sub;
124   #if FAKESHARD
125     memstore_fakeprocess_push(sub->owner);
126   #endif
127 
128     ngx_connection_t *c = r->connection;
129     ngx_event_t      *rev = c->read;
130 
131 
132     if (c->error || c->timedout || c->close || c->destroyed || rev->closed || rev->eof || rev->pending_eof) {
133       ngx_http_test_reading(r);
134       return;
135     }
136 
137     sub->dequeue_after_response = 1;
138     sub->fn->respond_status(sub, NGX_HTTP_BAD_REQUEST, &NCHAN_HTTP_STATUS_400, NULL);
139   #if FAKESHARD
140     memstore_fakeprocess_pop();
141   #endif
142 }
143 
ensure_request_hold(full_subscriber_t * fsub)144 static void ensure_request_hold(full_subscriber_t *fsub) {
145   if(fsub->data.holding == 0) {
146     DBG("hodl request %p", fsub->sub.request);
147     fsub->data.holding = 1;
148     fsub->sub.request->read_event_handler = finalize_request_handler;
149     fsub->sub.request->write_event_handler = ngx_http_request_empty_handler;
150     fsub->sub.request->main->count++; //this is the right way to hold and finalize the request... maybe
151   }
152 }
153 
longpoll_reserve(subscriber_t * self)154 static ngx_int_t longpoll_reserve(subscriber_t *self) {
155   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
156   ensure_request_hold(fsub);
157   self->reserved++;
158   DBG("%p reserve for req %p, reservations: %i", self, fsub->sub.request, self->reserved);
159   return NGX_OK;
160 }
longpoll_release(subscriber_t * self,uint8_t nodestroy)161 static ngx_int_t longpoll_release(subscriber_t *self, uint8_t nodestroy) {
162   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
163   assert(self->reserved > 0);
164   self->reserved--;
165   DBG("%p release for req %p. reservations: %i", self, fsub->sub.request, self->reserved);
166   if(nodestroy == 0 && fsub->data.awaiting_destruction == 1 && self->reserved == 0) {
167     longpoll_subscriber_destroy(self);
168     return NGX_ABORT;
169   }
170   else {
171     return NGX_OK;
172   }
173 }
174 
longpoll_enqueue(subscriber_t * self)175 ngx_int_t longpoll_enqueue(subscriber_t *self) {
176   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
177   assert(fsub->sub.enqueued == 0);
178   DBG("%p enqueue", self);
179 
180   fsub->data.finalize_request = 1;
181 
182   fsub->sub.enqueued = 1;
183   ensure_request_hold(fsub);
184   if(self->cf->subscriber_timeout > 0) {
185     //add timeout timer
186     ngx_add_timer(&fsub->data.timeout_ev, self->cf->subscriber_timeout * 1000);
187   }
188 
189   if(fsub->data.enqueue_callback) {
190     fsub->data.enqueue_callback(self, fsub->data.enqueue_callback_data);
191   }
192 
193   return NGX_OK;
194 }
195 
longpoll_dequeue(subscriber_t * self)196 ngx_int_t longpoll_dequeue(subscriber_t *self) {
197   full_subscriber_t    *fsub = (full_subscriber_t  *)self;
198   ngx_http_request_t   *r = fsub->sub.request;
199   nchan_request_ctx_t  *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
200   int                   finalize_now = fsub->data.finalize_request;
201   if(fsub->data.timeout_ev.timer_set) {
202     ngx_del_timer(&fsub->data.timeout_ev);
203   }
204   DBG("%p dequeue", self);
205   if(fsub->data.dequeue_callback) {
206     fsub->data.dequeue_callback(self, fsub->data.dequeue_callback_data);
207   }
208 
209   if(self->enqueued && self->enable_sub_unsub_callbacks && self->cf->unsubscribe_request_url) {
210     nchan_subscriber_unsubscribe_request(self);
211   }
212 
213   self->enqueued = 0;
214 
215   ctx->sub = NULL;
216 
217   if(finalize_now) {
218     DBG("finalize request %p", r);
219     nchan_http_finalize_request(r, NGX_OK);
220     self->status = DEAD;
221   }
222 
223   if(self->destroy_after_dequeue) {
224     longpoll_subscriber_destroy(self);
225   }
226   return NGX_OK;
227 }
228 
dequeue_maybe(subscriber_t * self)229 static ngx_int_t dequeue_maybe(subscriber_t *self) {
230   if(self->dequeue_after_response) {
231     self->fn->dequeue(self);
232   }
233   return NGX_OK;
234 }
235 
abort_response(subscriber_t * sub,char * errmsg)236 static ngx_int_t abort_response(subscriber_t *sub, char *errmsg) {
237   if(sub->request) {
238     nchan_log_request_warning(sub->request, "%V subscriber: %s", sub->name, errmsg ? errmsg : "weird response error");
239   }
240   else {
241     nchan_log_warning("%V subscriber: %s", sub->name, errmsg ? errmsg : "weird response error");
242   }
243   sub->fn->dequeue(sub);
244   return NGX_ERROR;
245 }
246 
longpoll_multipart_add(full_subscriber_t * fsub,nchan_msg_t * msg,char ** err)247 static ngx_int_t longpoll_multipart_add(full_subscriber_t *fsub, nchan_msg_t *msg, char **err) {
248 
249   nchan_longpoll_multimsg_t     *mmsg;
250 
251   if((mmsg = ngx_palloc(fsub->sub.request->pool, sizeof(*mmsg))) == NULL) {
252     *err = "can't allocate multipart msg link";
253     return NGX_ERROR;
254   }
255 
256   if(msg->storage != NCHAN_MSG_SHARED) {
257     if((msg = nchan_msg_derive_palloc(msg, fsub->sub.request->pool)) == NULL) {
258       *err = "can't allocate derived msg in request pool";
259       return NGX_ERROR;
260     }
261   }
262   msg_reserve(msg, "longpoll multipart");
263   assert(msg->refcount > 0);
264 
265   mmsg->msg = msg;
266   mmsg->next = NULL;
267   if(fsub->data.multimsg_first == NULL) {
268     fsub->data.multimsg_first = mmsg;
269   }
270   if(fsub->data.multimsg_last) {
271     fsub->data.multimsg_last->next = mmsg;
272   }
273   fsub->data.multimsg_last = mmsg;
274 
275   return NGX_OK;
276 }
277 
longpoll_respond_message(subscriber_t * self,nchan_msg_t * msg)278 static ngx_int_t longpoll_respond_message(subscriber_t *self, nchan_msg_t *msg) {
279   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
280   ngx_int_t                  rc;
281   char                      *err = NULL;
282   ngx_http_request_t        *r = fsub->sub.request;
283   nchan_request_ctx_t       *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
284   nchan_loc_conf_t          *cf = fsub->sub.cf;
285 
286   DBG("%p respond req %p msg %p", self, r, msg);
287 
288   ctx->prev_msg_id = self->last_msgid;
289   update_subscriber_last_msg_id(self, msg);
290   ctx->msg_id = self->last_msgid;
291 
292   //verify_unique_response(&fsub->data.request->uri, &self->last_msgid, msg, self);
293   if(fsub->data.timeout_ev.timer_set) {
294     ngx_del_timer(&fsub->data.timeout_ev);
295   }
296   if(!cf->longpoll_multimsg) {
297     //disable abort handler
298     fsub->data.cln->handler = empty_handler;
299 
300     assert(fsub->data.already_responded != 1);
301     fsub->data.already_responded = 1;
302     if((rc = nchan_respond_msg(r, msg, &self->last_msgid, 0, &err)) != NGX_OK) {
303       return abort_response(self, err);
304     }
305   }
306   else {
307     if((rc = longpoll_multipart_add(fsub, msg, &err)) != NGX_OK) {
308       return abort_response(self, err);
309     }
310   }
311   dequeue_maybe(self);
312   return rc;
313 }
314 
multipart_request_cleanup_handler(nchan_longpoll_multimsg_t * first)315 static void multipart_request_cleanup_handler(nchan_longpoll_multimsg_t *first) {
316   nchan_longpoll_multimsg_t    *cur;
317   for(cur = first; cur != NULL; cur = cur->next) {
318     msg_release(cur->msg, "longpoll multipart");
319   }
320 }
321 
longpoll_multipart_respond(full_subscriber_t * fsub)322 static ngx_int_t longpoll_multipart_respond(full_subscriber_t *fsub) {
323   ngx_http_request_t    *r = fsub->sub.request;
324   nchan_request_ctx_t   *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
325   char                  *err;
326   ngx_int_t              rc;
327   u_char                *char_boundary = NULL;
328   u_char                *char_boundary_last;
329 
330   struct {
331     ngx_buf_t            first;
332     ngx_buf_t            mid;
333     ngx_buf_t            last;
334   }                      boundary;
335 
336   ngx_buf_t              double_newline_buf;
337   ngx_str_t             *content_type;
338   nchan_loc_conf_t      *cf = fsub->sub.cf;
339   int                    use_raw_stream_separator = cf->longpoll_multimsg_use_raw_stream_separator;
340 
341   ngx_init_set_membuf_char(&double_newline_buf, "\r\n\r\n");
342 
343   nchan_longpoll_multimsg_t *first, *cur;
344 
345   //disable abort handler
346   fsub->data.cln->handler = empty_handler;
347 
348   first = fsub->data.multimsg_first;
349 
350   fsub->sub.dequeue_after_response = 1;
351 
352   //cleanup to release msgs
353   fsub->data.cln = ngx_http_cleanup_add(fsub->sub.request, 0);
354   fsub->data.cln->data = first;
355   fsub->data.cln->handler = (ngx_http_cleanup_pt )multipart_request_cleanup_handler;
356 
357   if(fsub->data.multimsg_first == fsub->data.multimsg_last) {
358     //just one message.
359     if((rc = nchan_respond_msg(r, fsub->data.multimsg_first->msg, &fsub->sub.last_msgid, 0, &err)) != NGX_OK) {
360       return abort_response(&fsub->sub, err);
361     }
362     return NGX_OK;
363   }
364 
365   //multi messages
366   if(!use_raw_stream_separator) {
367     nchan_request_set_content_type_multipart_boundary_header(r, ctx);
368     char_boundary = ngx_palloc(r->pool, 50);
369     char_boundary_last = ngx_snprintf(char_boundary, 50, ("\r\n--%V--\r\n"), nchan_request_multipart_boundary(r, ctx));
370 
371     //set up the boundaries
372     ngx_init_set_membuf(&boundary.first, &char_boundary[2], &char_boundary_last[-4]);
373     ngx_init_set_membuf(&boundary.mid, &char_boundary[0], &char_boundary_last[-4]);
374     ngx_init_set_membuf(&boundary.last, &char_boundary[0], char_boundary_last);
375   }
376 
377   for(cur = first; cur != NULL; cur = cur->next) {
378     if(!use_raw_stream_separator) {
379       // each buffer needs to be unique for the purpose of dealing with nginx output guts
380       // (something about max. 64 iovecs per write call and counting the number of bytes already sent)
381 
382       nchan_bufchain_append_buf(ctx->bcp, cur == first ? &boundary.first : &boundary.mid);
383 
384       content_type = cur->msg->content_type;
385       if (content_type) {
386         nchan_bufchain_append_cstr(ctx->bcp, "\r\nContent-Type: ");
387         nchan_bufchain_append_str(ctx->bcp, content_type);
388       }
389       nchan_bufchain_append_cstr(ctx->bcp, "\r\n\r\n");
390     }
391 
392     if(ngx_buf_size((&cur->msg->buf)) > 0) {
393       ngx_buf_t             msgbuf = cur->msg->buf;
394 
395       if(msgbuf.file) {
396         ngx_file_t  *file_copy = nchan_bufchain_pool_reserve_file(ctx->bcp);
397         nchan_msg_buf_open_fd_if_needed(&msgbuf, file_copy, NULL);
398       }
399       nchan_bufchain_append_buf(ctx->bcp, &msgbuf);
400     }
401 
402     if(use_raw_stream_separator) {
403       nchan_bufchain_append_str(ctx->bcp, &cf->subscriber_http_raw_stream_separator);
404     }
405     else if(cur->next == NULL) { //lastmsg
406       nchan_bufchain_append_buf(ctx->bcp, &boundary.last);
407     }
408   }
409 
410   r->headers_out.status = NGX_HTTP_OK;
411   r->headers_out.content_length_n = nchan_bufchain_length(ctx->bcp);
412   nchan_set_msgid_http_response_headers(r, ctx, &fsub->data.multimsg_last->msg->id);
413   nchan_include_access_control_if_needed(r, ctx);
414   if(ngx_http_send_header(r) != NGX_OK) {
415     return abort_response(&fsub->sub, "failed to send longpoll-multipart headers");
416   }
417   if(nchan_output_filter(r, nchan_bufchain_first_chain(ctx->bcp)) != NGX_OK) {
418     return abort_response(&fsub->sub, "failed to send longpoll-multipart body");
419   }
420 
421   return NGX_OK;
422 }
423 
longpoll_respond_status(subscriber_t * self,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)424 static ngx_int_t longpoll_respond_status(subscriber_t *self, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body) {
425   full_subscriber_t     *fsub = (full_subscriber_t *)self;
426   ngx_http_request_t    *r = fsub->sub.request;
427   nchan_loc_conf_t      *cf = fsub->sub.cf;
428 
429   if(fsub->data.act_as_intervalpoll) {
430     if(status_code == NGX_HTTP_NO_CONTENT || status_code == NGX_HTTP_NOT_MODIFIED || status_code == NGX_HTTP_NOT_FOUND ) {
431       status_code = NGX_HTTP_NOT_MODIFIED;
432     }
433   }
434   else if(status_code == NGX_HTTP_NO_CONTENT || (status_code == NGX_HTTP_NOT_MODIFIED && !status_line)) {
435     if(cf->longpoll_multimsg) {
436       if(fsub->data.multimsg_first != NULL) {
437         if(longpoll_multipart_respond(fsub) == NGX_OK) {
438           dequeue_maybe(self);
439         }
440         else {
441           DBG("%p should have been dequeued through abort_response");
442         }
443       }
444       return NGX_OK;
445     }
446     else {
447       //don't care, ignore
448       return NGX_OK;
449     }
450   }
451 
452   DBG("%p respond req %p status %i", self, r, status_code);
453 
454   fsub->sub.dequeue_after_response = 1;
455 
456   nchan_set_msgid_http_response_headers(r, NULL, &self->last_msgid);
457 
458   //disable abort handler
459   fsub->data.cln->handler = empty_handler;
460 
461   nchan_respond_status(r, status_code, status_line, status_body, 0);
462 
463   dequeue_maybe(self);
464   return NGX_OK;
465 }
466 
subscriber_respond_unqueued_status(full_subscriber_t * fsub,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)467 ngx_int_t subscriber_respond_unqueued_status(full_subscriber_t *fsub, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body) {
468   ngx_http_request_t     *r = fsub->sub.request;
469   nchan_loc_conf_t       *cf = fsub->sub.cf;
470   nchan_request_ctx_t    *ctx;
471 
472   fsub->data.cln->handler = (ngx_http_cleanup_pt )empty_handler;
473   fsub->data.finalize_request = 0;
474   fsub->sub.status = DEAD;
475   fsub->sub.fn->dequeue(&fsub->sub);
476   if(cf->unsubscribe_request_url || cf->subscribe_request_url) {
477     ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
478     ctx->sent_unsubscribe_request = 1; //lie about having sent the unsub request already to avoid sending it
479   }
480   return nchan_respond_status(r, status_code, status_line, status_body, 1);
481 }
482 
subscriber_maybe_dequeue_after_status_response(full_subscriber_t * fsub,ngx_int_t status_code)483 void subscriber_maybe_dequeue_after_status_response(full_subscriber_t *fsub, ngx_int_t status_code) {
484   if((status_code >=400 && status_code < 600) || status_code == NGX_HTTP_NOT_MODIFIED) {
485     fsub->data.cln->handler = (ngx_http_cleanup_pt )empty_handler;
486     fsub->sub.request->keepalive=0;
487     fsub->data.finalize_request=1;
488     fsub->sub.request->headers_out.status = status_code;
489     fsub->sub.fn->dequeue(&fsub->sub);
490   }
491 }
492 
request_cleanup_handler(subscriber_t * sub)493 static void request_cleanup_handler(subscriber_t *sub) {
494 
495 }
496 
497 
longpoll_set_enqueue_callback(subscriber_t * self,subscriber_callback_pt cb,void * privdata)498 static ngx_int_t longpoll_set_enqueue_callback(subscriber_t *self, subscriber_callback_pt cb, void *privdata) {
499   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
500   fsub->data.enqueue_callback = cb;
501   fsub->data.enqueue_callback_data = privdata;
502   return NGX_OK;
503 }
504 
longpoll_set_dequeue_callback(subscriber_t * self,subscriber_callback_pt cb,void * privdata)505 static ngx_int_t longpoll_set_dequeue_callback(subscriber_t *self, subscriber_callback_pt cb, void *privdata) {
506   full_subscriber_t  *fsub = (full_subscriber_t  *)self;
507   if(fsub->data.cln == NULL) {
508     fsub->data.cln = ngx_http_cleanup_add(fsub->sub.request, 0);
509     fsub->data.cln->data = self;
510     fsub->data.cln->handler = (ngx_http_cleanup_pt )request_cleanup_handler;
511   }
512   fsub->data.dequeue_callback = cb;
513   fsub->data.dequeue_callback_data = privdata;
514   return NGX_OK;
515 }
516 
517 static const subscriber_fn_t longpoll_fn = {
518   &longpoll_enqueue,
519   &longpoll_dequeue,
520   &longpoll_respond_message,
521   &longpoll_respond_status,
522   &longpoll_set_enqueue_callback,
523   &longpoll_set_dequeue_callback,
524   &longpoll_reserve,
525   &longpoll_release,
526   &nchan_subscriber_receive_notice,
527   &nchan_subscriber_authorize_subscribe_request
528 };
529 
530 static ngx_str_t  sub_name = ngx_string("longpoll");
531 
532 static const subscriber_t new_longpoll_sub = {
533   &sub_name,
534   LONGPOLL,
535   &longpoll_fn,
536   UNKNOWN,
537   NCHAN_ZERO_MSGID,
538   NULL,
539   NULL,
540   NULL,
541   0, //reservations
542   0, //enable sub/unsub callbacks
543   1, //deque after response
544   1, //destroy after dequeue
545   0, //enqueued
546 
547 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
548   NULL, NULL, NULL
549 #endif
550 };
551