1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 //#define DEBUG_LEVEL NGX_LOG_WARN
4 #define DEBUG_LEVEL NGX_LOG_DEBUG
5 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:LONGPOLL:" fmt, ##arg)
6 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:LONGPOLL:" fmt, ##arg)
7 #include <assert.h>
8 #include "longpoll-private.h"
9
10 #include <store/memory/store.h>
11
12 void memstore_fakeprocess_push(ngx_int_t slot);
13 void memstore_fakeprocess_push_random(void);
14 void memstore_fakeprocess_pop(void);
15 ngx_int_t memstore_slot(void);
16
17 static const subscriber_t new_longpoll_sub;
18
empty_handler()19 static void empty_handler() { }
20
sudden_abort_handler(subscriber_t * sub)21 static void sudden_abort_handler(subscriber_t *sub) {
22 if(sub->request && sub->status != DEAD) {
23 sub->request->headers_out.status = NGX_HTTP_CLIENT_CLOSED_REQUEST;
24 }
25 #if FAKESHARD
26 full_subscriber_t *fsub = (full_subscriber_t *)sub;
27 memstore_fakeprocess_push(fsub->sub.owner);
28 #endif
29 sub->status = DEAD;
30 sub->fn->dequeue(sub);
31 #if FAKESHARD
32 memstore_fakeprocess_pop();
33 #endif
34 }
35
36 //void verify_unique_response(ngx_str_t *uri, nchan_msg_id_t *msgid, nchan_msg_t *msg, subscriber_t *sub);
37
longpoll_subscriber_create(ngx_http_request_t * r,nchan_msg_id_t * msg_id)38 subscriber_t *longpoll_subscriber_create(ngx_http_request_t *r, nchan_msg_id_t *msg_id) {
39 DBG("create for req %p", r);
40 full_subscriber_t *fsub;
41
42 //TODO: allocate from pool (but not the request's pool)
43 if((fsub = ngx_alloc(sizeof(*fsub), ngx_cycle->log)) == NULL) {
44 ERR("Unable to allocate");
45 assert(0);
46 return NULL;
47 }
48
49 nchan_subscriber_init(&fsub->sub, &new_longpoll_sub, r, msg_id);
50 fsub->privdata = NULL;
51 fsub->data.cln = NULL;
52 fsub->data.finalize_request = 1;
53 fsub->data.holding = 0;
54 fsub->data.act_as_intervalpoll = 0;
55
56 nchan_subscriber_init_timeout_timer(&fsub->sub, &fsub->data.timeout_ev);
57
58 fsub->data.enqueue_callback = empty_handler;
59 fsub->data.enqueue_callback_data = NULL;
60
61 fsub->data.dequeue_callback = empty_handler;
62 fsub->data.dequeue_callback_data = NULL;
63
64 fsub->data.already_responded = 0;
65 fsub->data.awaiting_destruction = 0;
66
67 if(fsub->sub.cf->longpoll_multimsg) {
68 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
69 fsub->sub.dequeue_after_response = 0;
70 ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
71 nchan_bufchain_pool_init(ctx->bcp, r->pool);
72
73 }
74
75 fsub->data.multimsg_first = NULL;
76 fsub->data.multimsg_last = NULL;
77
78 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
79 subscriber_debug_add(&fsub->sub);
80 #endif
81
82 //http request sudden close cleanup
83 if((fsub->data.cln = ngx_http_cleanup_add(r, 0)) == NULL) {
84 ERR("Unable to add request cleanup for longpoll subscriber");
85 assert(0);
86 return NULL;
87 }
88 fsub->data.cln->data = fsub;
89 fsub->data.cln->handler = (ngx_http_cleanup_pt )sudden_abort_handler;
90 DBG("%p created for request %p", &fsub->sub, r);
91
92
93 return &fsub->sub;
94 }
95
longpoll_subscriber_destroy(subscriber_t * sub)96 ngx_int_t longpoll_subscriber_destroy(subscriber_t *sub) {
97 full_subscriber_t *fsub = (full_subscriber_t *)sub;
98
99 if(sub->reserved > 0) {
100 DBG("%p not ready to destroy (reserved for %i) for req %p", sub, sub->reserved, fsub->sub.request);
101 fsub->data.awaiting_destruction = 1;
102 }
103 else {
104 DBG("%p destroy for req %p", sub, fsub->sub.request);
105 nchan_free_msg_id(&fsub->sub.last_msgid);
106 assert(sub->status == DEAD);
107 nchan_subscriber_subrequest_cleanup(sub);
108 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
109 subscriber_debug_remove(sub);
110 ngx_memset(fsub, 0xB9, sizeof(*fsub)); //debug
111 #endif
112 ngx_free(fsub);
113 }
114 return NGX_OK;
115 }
116
finalize_request_handler(ngx_http_request_t * r)117 static void finalize_request_handler(ngx_http_request_t *r) {
118 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
119 if(ctx->sub == NULL) {
120 ngx_http_test_reading(r);
121 return;
122 }
123 subscriber_t *sub = ctx->sub;
124 #if FAKESHARD
125 memstore_fakeprocess_push(sub->owner);
126 #endif
127
128 ngx_connection_t *c = r->connection;
129 ngx_event_t *rev = c->read;
130
131
132 if (c->error || c->timedout || c->close || c->destroyed || rev->closed || rev->eof || rev->pending_eof) {
133 ngx_http_test_reading(r);
134 return;
135 }
136
137 sub->dequeue_after_response = 1;
138 sub->fn->respond_status(sub, NGX_HTTP_BAD_REQUEST, &NCHAN_HTTP_STATUS_400, NULL);
139 #if FAKESHARD
140 memstore_fakeprocess_pop();
141 #endif
142 }
143
ensure_request_hold(full_subscriber_t * fsub)144 static void ensure_request_hold(full_subscriber_t *fsub) {
145 if(fsub->data.holding == 0) {
146 DBG("hodl request %p", fsub->sub.request);
147 fsub->data.holding = 1;
148 fsub->sub.request->read_event_handler = finalize_request_handler;
149 fsub->sub.request->write_event_handler = ngx_http_request_empty_handler;
150 fsub->sub.request->main->count++; //this is the right way to hold and finalize the request... maybe
151 }
152 }
153
longpoll_reserve(subscriber_t * self)154 static ngx_int_t longpoll_reserve(subscriber_t *self) {
155 full_subscriber_t *fsub = (full_subscriber_t *)self;
156 ensure_request_hold(fsub);
157 self->reserved++;
158 DBG("%p reserve for req %p, reservations: %i", self, fsub->sub.request, self->reserved);
159 return NGX_OK;
160 }
longpoll_release(subscriber_t * self,uint8_t nodestroy)161 static ngx_int_t longpoll_release(subscriber_t *self, uint8_t nodestroy) {
162 full_subscriber_t *fsub = (full_subscriber_t *)self;
163 assert(self->reserved > 0);
164 self->reserved--;
165 DBG("%p release for req %p. reservations: %i", self, fsub->sub.request, self->reserved);
166 if(nodestroy == 0 && fsub->data.awaiting_destruction == 1 && self->reserved == 0) {
167 longpoll_subscriber_destroy(self);
168 return NGX_ABORT;
169 }
170 else {
171 return NGX_OK;
172 }
173 }
174
longpoll_enqueue(subscriber_t * self)175 ngx_int_t longpoll_enqueue(subscriber_t *self) {
176 full_subscriber_t *fsub = (full_subscriber_t *)self;
177 assert(fsub->sub.enqueued == 0);
178 DBG("%p enqueue", self);
179
180 fsub->data.finalize_request = 1;
181
182 fsub->sub.enqueued = 1;
183 ensure_request_hold(fsub);
184 if(self->cf->subscriber_timeout > 0) {
185 //add timeout timer
186 ngx_add_timer(&fsub->data.timeout_ev, self->cf->subscriber_timeout * 1000);
187 }
188
189 if(fsub->data.enqueue_callback) {
190 fsub->data.enqueue_callback(self, fsub->data.enqueue_callback_data);
191 }
192
193 return NGX_OK;
194 }
195
longpoll_dequeue(subscriber_t * self)196 ngx_int_t longpoll_dequeue(subscriber_t *self) {
197 full_subscriber_t *fsub = (full_subscriber_t *)self;
198 ngx_http_request_t *r = fsub->sub.request;
199 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
200 int finalize_now = fsub->data.finalize_request;
201 if(fsub->data.timeout_ev.timer_set) {
202 ngx_del_timer(&fsub->data.timeout_ev);
203 }
204 DBG("%p dequeue", self);
205 if(fsub->data.dequeue_callback) {
206 fsub->data.dequeue_callback(self, fsub->data.dequeue_callback_data);
207 }
208
209 if(self->enqueued && self->enable_sub_unsub_callbacks && self->cf->unsubscribe_request_url) {
210 nchan_subscriber_unsubscribe_request(self);
211 }
212
213 self->enqueued = 0;
214
215 ctx->sub = NULL;
216
217 if(finalize_now) {
218 DBG("finalize request %p", r);
219 nchan_http_finalize_request(r, NGX_OK);
220 self->status = DEAD;
221 }
222
223 if(self->destroy_after_dequeue) {
224 longpoll_subscriber_destroy(self);
225 }
226 return NGX_OK;
227 }
228
dequeue_maybe(subscriber_t * self)229 static ngx_int_t dequeue_maybe(subscriber_t *self) {
230 if(self->dequeue_after_response) {
231 self->fn->dequeue(self);
232 }
233 return NGX_OK;
234 }
235
abort_response(subscriber_t * sub,char * errmsg)236 static ngx_int_t abort_response(subscriber_t *sub, char *errmsg) {
237 if(sub->request) {
238 nchan_log_request_warning(sub->request, "%V subscriber: %s", sub->name, errmsg ? errmsg : "weird response error");
239 }
240 else {
241 nchan_log_warning("%V subscriber: %s", sub->name, errmsg ? errmsg : "weird response error");
242 }
243 sub->fn->dequeue(sub);
244 return NGX_ERROR;
245 }
246
longpoll_multipart_add(full_subscriber_t * fsub,nchan_msg_t * msg,char ** err)247 static ngx_int_t longpoll_multipart_add(full_subscriber_t *fsub, nchan_msg_t *msg, char **err) {
248
249 nchan_longpoll_multimsg_t *mmsg;
250
251 if((mmsg = ngx_palloc(fsub->sub.request->pool, sizeof(*mmsg))) == NULL) {
252 *err = "can't allocate multipart msg link";
253 return NGX_ERROR;
254 }
255
256 if(msg->storage != NCHAN_MSG_SHARED) {
257 if((msg = nchan_msg_derive_palloc(msg, fsub->sub.request->pool)) == NULL) {
258 *err = "can't allocate derived msg in request pool";
259 return NGX_ERROR;
260 }
261 }
262 msg_reserve(msg, "longpoll multipart");
263 assert(msg->refcount > 0);
264
265 mmsg->msg = msg;
266 mmsg->next = NULL;
267 if(fsub->data.multimsg_first == NULL) {
268 fsub->data.multimsg_first = mmsg;
269 }
270 if(fsub->data.multimsg_last) {
271 fsub->data.multimsg_last->next = mmsg;
272 }
273 fsub->data.multimsg_last = mmsg;
274
275 return NGX_OK;
276 }
277
longpoll_respond_message(subscriber_t * self,nchan_msg_t * msg)278 static ngx_int_t longpoll_respond_message(subscriber_t *self, nchan_msg_t *msg) {
279 full_subscriber_t *fsub = (full_subscriber_t *)self;
280 ngx_int_t rc;
281 char *err = NULL;
282 ngx_http_request_t *r = fsub->sub.request;
283 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
284 nchan_loc_conf_t *cf = fsub->sub.cf;
285
286 DBG("%p respond req %p msg %p", self, r, msg);
287
288 ctx->prev_msg_id = self->last_msgid;
289 update_subscriber_last_msg_id(self, msg);
290 ctx->msg_id = self->last_msgid;
291
292 //verify_unique_response(&fsub->data.request->uri, &self->last_msgid, msg, self);
293 if(fsub->data.timeout_ev.timer_set) {
294 ngx_del_timer(&fsub->data.timeout_ev);
295 }
296 if(!cf->longpoll_multimsg) {
297 //disable abort handler
298 fsub->data.cln->handler = empty_handler;
299
300 assert(fsub->data.already_responded != 1);
301 fsub->data.already_responded = 1;
302 if((rc = nchan_respond_msg(r, msg, &self->last_msgid, 0, &err)) != NGX_OK) {
303 return abort_response(self, err);
304 }
305 }
306 else {
307 if((rc = longpoll_multipart_add(fsub, msg, &err)) != NGX_OK) {
308 return abort_response(self, err);
309 }
310 }
311 dequeue_maybe(self);
312 return rc;
313 }
314
multipart_request_cleanup_handler(nchan_longpoll_multimsg_t * first)315 static void multipart_request_cleanup_handler(nchan_longpoll_multimsg_t *first) {
316 nchan_longpoll_multimsg_t *cur;
317 for(cur = first; cur != NULL; cur = cur->next) {
318 msg_release(cur->msg, "longpoll multipart");
319 }
320 }
321
longpoll_multipart_respond(full_subscriber_t * fsub)322 static ngx_int_t longpoll_multipart_respond(full_subscriber_t *fsub) {
323 ngx_http_request_t *r = fsub->sub.request;
324 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
325 char *err;
326 ngx_int_t rc;
327 u_char *char_boundary = NULL;
328 u_char *char_boundary_last;
329
330 struct {
331 ngx_buf_t first;
332 ngx_buf_t mid;
333 ngx_buf_t last;
334 } boundary;
335
336 ngx_buf_t double_newline_buf;
337 ngx_str_t *content_type;
338 nchan_loc_conf_t *cf = fsub->sub.cf;
339 int use_raw_stream_separator = cf->longpoll_multimsg_use_raw_stream_separator;
340
341 ngx_init_set_membuf_char(&double_newline_buf, "\r\n\r\n");
342
343 nchan_longpoll_multimsg_t *first, *cur;
344
345 //disable abort handler
346 fsub->data.cln->handler = empty_handler;
347
348 first = fsub->data.multimsg_first;
349
350 fsub->sub.dequeue_after_response = 1;
351
352 //cleanup to release msgs
353 fsub->data.cln = ngx_http_cleanup_add(fsub->sub.request, 0);
354 fsub->data.cln->data = first;
355 fsub->data.cln->handler = (ngx_http_cleanup_pt )multipart_request_cleanup_handler;
356
357 if(fsub->data.multimsg_first == fsub->data.multimsg_last) {
358 //just one message.
359 if((rc = nchan_respond_msg(r, fsub->data.multimsg_first->msg, &fsub->sub.last_msgid, 0, &err)) != NGX_OK) {
360 return abort_response(&fsub->sub, err);
361 }
362 return NGX_OK;
363 }
364
365 //multi messages
366 if(!use_raw_stream_separator) {
367 nchan_request_set_content_type_multipart_boundary_header(r, ctx);
368 char_boundary = ngx_palloc(r->pool, 50);
369 char_boundary_last = ngx_snprintf(char_boundary, 50, ("\r\n--%V--\r\n"), nchan_request_multipart_boundary(r, ctx));
370
371 //set up the boundaries
372 ngx_init_set_membuf(&boundary.first, &char_boundary[2], &char_boundary_last[-4]);
373 ngx_init_set_membuf(&boundary.mid, &char_boundary[0], &char_boundary_last[-4]);
374 ngx_init_set_membuf(&boundary.last, &char_boundary[0], char_boundary_last);
375 }
376
377 for(cur = first; cur != NULL; cur = cur->next) {
378 if(!use_raw_stream_separator) {
379 // each buffer needs to be unique for the purpose of dealing with nginx output guts
380 // (something about max. 64 iovecs per write call and counting the number of bytes already sent)
381
382 nchan_bufchain_append_buf(ctx->bcp, cur == first ? &boundary.first : &boundary.mid);
383
384 content_type = cur->msg->content_type;
385 if (content_type) {
386 nchan_bufchain_append_cstr(ctx->bcp, "\r\nContent-Type: ");
387 nchan_bufchain_append_str(ctx->bcp, content_type);
388 }
389 nchan_bufchain_append_cstr(ctx->bcp, "\r\n\r\n");
390 }
391
392 if(ngx_buf_size((&cur->msg->buf)) > 0) {
393 ngx_buf_t msgbuf = cur->msg->buf;
394
395 if(msgbuf.file) {
396 ngx_file_t *file_copy = nchan_bufchain_pool_reserve_file(ctx->bcp);
397 nchan_msg_buf_open_fd_if_needed(&msgbuf, file_copy, NULL);
398 }
399 nchan_bufchain_append_buf(ctx->bcp, &msgbuf);
400 }
401
402 if(use_raw_stream_separator) {
403 nchan_bufchain_append_str(ctx->bcp, &cf->subscriber_http_raw_stream_separator);
404 }
405 else if(cur->next == NULL) { //lastmsg
406 nchan_bufchain_append_buf(ctx->bcp, &boundary.last);
407 }
408 }
409
410 r->headers_out.status = NGX_HTTP_OK;
411 r->headers_out.content_length_n = nchan_bufchain_length(ctx->bcp);
412 nchan_set_msgid_http_response_headers(r, ctx, &fsub->data.multimsg_last->msg->id);
413 nchan_include_access_control_if_needed(r, ctx);
414 if(ngx_http_send_header(r) != NGX_OK) {
415 return abort_response(&fsub->sub, "failed to send longpoll-multipart headers");
416 }
417 if(nchan_output_filter(r, nchan_bufchain_first_chain(ctx->bcp)) != NGX_OK) {
418 return abort_response(&fsub->sub, "failed to send longpoll-multipart body");
419 }
420
421 return NGX_OK;
422 }
423
longpoll_respond_status(subscriber_t * self,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)424 static ngx_int_t longpoll_respond_status(subscriber_t *self, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body) {
425 full_subscriber_t *fsub = (full_subscriber_t *)self;
426 ngx_http_request_t *r = fsub->sub.request;
427 nchan_loc_conf_t *cf = fsub->sub.cf;
428
429 if(fsub->data.act_as_intervalpoll) {
430 if(status_code == NGX_HTTP_NO_CONTENT || status_code == NGX_HTTP_NOT_MODIFIED || status_code == NGX_HTTP_NOT_FOUND ) {
431 status_code = NGX_HTTP_NOT_MODIFIED;
432 }
433 }
434 else if(status_code == NGX_HTTP_NO_CONTENT || (status_code == NGX_HTTP_NOT_MODIFIED && !status_line)) {
435 if(cf->longpoll_multimsg) {
436 if(fsub->data.multimsg_first != NULL) {
437 if(longpoll_multipart_respond(fsub) == NGX_OK) {
438 dequeue_maybe(self);
439 }
440 else {
441 DBG("%p should have been dequeued through abort_response");
442 }
443 }
444 return NGX_OK;
445 }
446 else {
447 //don't care, ignore
448 return NGX_OK;
449 }
450 }
451
452 DBG("%p respond req %p status %i", self, r, status_code);
453
454 fsub->sub.dequeue_after_response = 1;
455
456 nchan_set_msgid_http_response_headers(r, NULL, &self->last_msgid);
457
458 //disable abort handler
459 fsub->data.cln->handler = empty_handler;
460
461 nchan_respond_status(r, status_code, status_line, status_body, 0);
462
463 dequeue_maybe(self);
464 return NGX_OK;
465 }
466
subscriber_respond_unqueued_status(full_subscriber_t * fsub,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)467 ngx_int_t subscriber_respond_unqueued_status(full_subscriber_t *fsub, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body) {
468 ngx_http_request_t *r = fsub->sub.request;
469 nchan_loc_conf_t *cf = fsub->sub.cf;
470 nchan_request_ctx_t *ctx;
471
472 fsub->data.cln->handler = (ngx_http_cleanup_pt )empty_handler;
473 fsub->data.finalize_request = 0;
474 fsub->sub.status = DEAD;
475 fsub->sub.fn->dequeue(&fsub->sub);
476 if(cf->unsubscribe_request_url || cf->subscribe_request_url) {
477 ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
478 ctx->sent_unsubscribe_request = 1; //lie about having sent the unsub request already to avoid sending it
479 }
480 return nchan_respond_status(r, status_code, status_line, status_body, 1);
481 }
482
subscriber_maybe_dequeue_after_status_response(full_subscriber_t * fsub,ngx_int_t status_code)483 void subscriber_maybe_dequeue_after_status_response(full_subscriber_t *fsub, ngx_int_t status_code) {
484 if((status_code >=400 && status_code < 600) || status_code == NGX_HTTP_NOT_MODIFIED) {
485 fsub->data.cln->handler = (ngx_http_cleanup_pt )empty_handler;
486 fsub->sub.request->keepalive=0;
487 fsub->data.finalize_request=1;
488 fsub->sub.request->headers_out.status = status_code;
489 fsub->sub.fn->dequeue(&fsub->sub);
490 }
491 }
492
request_cleanup_handler(subscriber_t * sub)493 static void request_cleanup_handler(subscriber_t *sub) {
494
495 }
496
497
longpoll_set_enqueue_callback(subscriber_t * self,subscriber_callback_pt cb,void * privdata)498 static ngx_int_t longpoll_set_enqueue_callback(subscriber_t *self, subscriber_callback_pt cb, void *privdata) {
499 full_subscriber_t *fsub = (full_subscriber_t *)self;
500 fsub->data.enqueue_callback = cb;
501 fsub->data.enqueue_callback_data = privdata;
502 return NGX_OK;
503 }
504
longpoll_set_dequeue_callback(subscriber_t * self,subscriber_callback_pt cb,void * privdata)505 static ngx_int_t longpoll_set_dequeue_callback(subscriber_t *self, subscriber_callback_pt cb, void *privdata) {
506 full_subscriber_t *fsub = (full_subscriber_t *)self;
507 if(fsub->data.cln == NULL) {
508 fsub->data.cln = ngx_http_cleanup_add(fsub->sub.request, 0);
509 fsub->data.cln->data = self;
510 fsub->data.cln->handler = (ngx_http_cleanup_pt )request_cleanup_handler;
511 }
512 fsub->data.dequeue_callback = cb;
513 fsub->data.dequeue_callback_data = privdata;
514 return NGX_OK;
515 }
516
517 static const subscriber_fn_t longpoll_fn = {
518 &longpoll_enqueue,
519 &longpoll_dequeue,
520 &longpoll_respond_message,
521 &longpoll_respond_status,
522 &longpoll_set_enqueue_callback,
523 &longpoll_set_dequeue_callback,
524 &longpoll_reserve,
525 &longpoll_release,
526 &nchan_subscriber_receive_notice,
527 &nchan_subscriber_authorize_subscribe_request
528 };
529
530 static ngx_str_t sub_name = ngx_string("longpoll");
531
532 static const subscriber_t new_longpoll_sub = {
533 &sub_name,
534 LONGPOLL,
535 &longpoll_fn,
536 UNKNOWN,
537 NCHAN_ZERO_MSGID,
538 NULL,
539 NULL,
540 NULL,
541 0, //reservations
542 0, //enable sub/unsub callbacks
543 1, //deque after response
544 1, //destroy after dequeue
545 0, //enqueued
546
547 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
548 NULL, NULL, NULL
549 #endif
550 };
551