1 /*
2 * Written by Leo Ponomarev 2009-2015
3 */
4
5 #include <nchan_module.h>
6 #include <util/nchan_subrequest.h>
7 #include <assert.h>
8
9 #include <subscribers/longpoll.h>
10 #include <subscribers/intervalpoll.h>
11 #include <subscribers/eventsource.h>
12 #include <subscribers/http-chunked.h>
13 #include <subscribers/http-multipart-mixed.h>
14 #include <subscribers/http-raw-stream.h>
15 #include <subscribers/websocket.h>
16 #include <subscribers/benchmark.h>
17 #include <store/memory/store.h>
18 #include <store/redis/store.h>
19
20 #include <nchan_setup.c>
21
22 #if (NGX_ZLIB)
23 #include <zlib.h>
24 #endif
25
26
27 #if FAKESHARD
28 #include <store/memory/ipc.h>
29 #include <store/memory/shmem.h>
30 //#include <store/memory/store-private.h> //for debugging
31 #endif
32 #include <util/nchan_output.h>
33 #include <nchan_websocket_publisher.h>
34
35 ngx_int_t nchan_worker_processes;
36 int nchan_stub_status_enabled = 0;
37
38
39 static void nchan_publisher_body_handler(ngx_http_request_t *r);
40 static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r);
41
42 //#define DEBUG_LEVEL NGX_LOG_WARN
43 //#define DEBUG_LEVEL NGX_LOG_DEBUG
44
45 //#define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "NCHAN:" fmt, ##args)
46
47 typedef struct {
48 ngx_http_request_t *r;
49 ngx_http_cleanup_t *cln;
50 } safe_request_ptr_t;
51 static safe_request_ptr_t *nchan_set_safe_request_ptr(ngx_http_request_t *r);
52 static ngx_http_request_t *nchan_get_safe_request_ptr(safe_request_ptr_t *pd);
53
nchan_maybe_send_channel_event_message(ngx_http_request_t * r,channel_event_type_t event_type)54 ngx_int_t nchan_maybe_send_channel_event_message(ngx_http_request_t *r, channel_event_type_t event_type) {
55 static nchan_loc_conf_t evcf_data;
56 static nchan_loc_conf_t *evcf = NULL;
57
58 static ngx_str_t group = ngx_string("meta");
59
60 static ngx_str_t evt_sub_enqueue = ngx_string("subscriber_enqueue");
61 static ngx_str_t evt_sub_dequeue = ngx_string("subscriber_dequeue");
62 static ngx_str_t evt_sub_recvmsg = ngx_string("subscriber_receive_message");
63 static ngx_str_t evt_sub_recvsts = ngx_string("subscriber_receive_status");
64 static ngx_str_t evt_chan_publish= ngx_string("channel_publish");
65 static ngx_str_t evt_chan_delete = ngx_string("channel_delete");
66
67 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
68 ngx_http_complex_value_t *cv = cf->channel_events_channel_id;
69 if(cv==NULL) {
70 //nothing to send
71 return NGX_OK;
72 }
73
74 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
75 ngx_str_t tmpid;
76 size_t sz;
77 ngx_str_t *id;
78 u_char *cur;
79 ngx_str_t evstr;
80 nchan_msg_t msg;
81
82 switch(event_type) {
83 case SUB_ENQUEUE:
84 ctx->channel_event_name = &evt_sub_enqueue;
85 break;
86 case SUB_DEQUEUE:
87 ctx->channel_event_name = &evt_sub_dequeue;
88 break;
89 case SUB_RECEIVE_MESSAGE:
90 ctx->channel_event_name = &evt_sub_recvmsg;
91 break;
92 case SUB_RECEIVE_STATUS:
93 ctx->channel_event_name = &evt_sub_recvsts;
94 break;
95 case CHAN_PUBLISH:
96 ctx->channel_event_name = &evt_chan_publish;
97 break;
98 case CHAN_DELETE:
99 ctx->channel_event_name = &evt_chan_delete;
100 break;
101 }
102
103 //the id
104 ngx_http_complex_value(r, cv, &tmpid);
105 sz = group.len + 1 + tmpid.len;
106 if((id = ngx_palloc(r->pool, sizeof(*id) + sz)) == NULL) {
107 nchan_log_request_error(r, "can't allocate space for legacy channel id");
108 return NGX_ERROR;
109 }
110 id->len = sz;
111 id->data = (u_char *)&id[1];
112 cur = id->data;
113 ngx_memcpy(cur, group.data, group.len);
114 cur += group.len;
115 cur[0]='/';
116 cur++;
117 ngx_memcpy(cur, tmpid.data, tmpid.len);
118
119
120 //the event message
121 ngx_http_complex_value(r, cf->channel_event_string, &evstr);
122
123 ngx_memzero(&msg, sizeof(msg));
124
125 msg.buf.temporary = 1;
126 msg.buf.memory = 1;
127 msg.buf.last_buf = 1;
128 msg.buf.pos = evstr.data;
129 msg.buf.last = evstr.data + evstr.len;
130 msg.buf.start = msg.buf.pos;
131 msg.buf.end = msg.buf.last;
132
133 msg.id.time = 0;
134 msg.id.tag.fixed[0] = 0;
135 msg.id.tagactive = 0;
136 msg.id.tagcount = 1;
137 msg.storage = NCHAN_MSG_STACK;
138
139 if(evcf == NULL) {
140 evcf = &evcf_data;
141 ngx_memzero(evcf, sizeof(*evcf));
142
143 evcf->message_timeout = NCHAN_META_CHANNEL_MESSAGE_TTL;
144 evcf->max_messages = NCHAN_META_CHANNEL_MAX_MESSAGES;
145 evcf->complex_max_messages = NULL;
146 evcf->complex_message_timeout = NULL;
147 evcf->subscriber_first_message = 0;
148 evcf->channel_timeout = NCHAN_META_CHANNEL_TIMEOUT;
149 }
150 evcf->storage_engine = cf->storage_engine;
151 evcf->redis = cf->redis;
152
153 evcf->storage_engine->publish(id, &msg, evcf, NULL, NULL);
154
155 return NGX_OK;
156 }
157
158 #if FAKESHARD
memstore_sub_debug_start()159 static void memstore_sub_debug_start() {
160 #ifdef SUB_FAKE_WORKER
161 memstore_fakeprocess_push(SUB_FAKE_WORKER);
162 #else
163 memstore_fakeprocess_push_random();
164 #endif
165 }
memstore_sub_debug_end()166 static void memstore_sub_debug_end() {
167 memstore_fakeprocess_pop();
168 }
memstore_pub_debug_start()169 static void memstore_pub_debug_start() {
170 #ifdef PUB_FAKE_WORKER
171 memstore_fakeprocess_push(PUB_FAKE_WORKER);
172 #else
173 memstore_fakeprocess_push_random();
174 #endif
175 }
memstore_pub_debug_end()176 static void memstore_pub_debug_end() {
177 memstore_fakeprocess_pop();
178 }
179 #endif
180
nchan_loc_conf_message_timeout(nchan_loc_conf_t * cf)181 time_t nchan_loc_conf_message_timeout(nchan_loc_conf_t *cf) {
182 time_t timeout;
183 nchan_loc_conf_shared_data_t *shcf;
184
185 if(!cf->complex_message_timeout) {
186 timeout = cf->message_timeout;
187 }
188 else {
189 shcf = memstore_get_conf_shared_data(cf);
190 timeout = shcf->message_timeout;
191 }
192
193 return timeout != 0 ? timeout : 525600 * 60;
194 }
195
nchan_loc_conf_max_messages(nchan_loc_conf_t * cf)196 ngx_int_t nchan_loc_conf_max_messages(nchan_loc_conf_t *cf) {
197 ngx_int_t num;
198 nchan_loc_conf_shared_data_t *shcf;
199
200 if(!cf->complex_max_messages) {
201 num = cf->max_messages;
202 }
203 else {
204 shcf = memstore_get_conf_shared_data(cf);
205 num = shcf->max_messages;
206 }
207
208 return num;
209 }
210
nchan_http_publisher_handler(ngx_http_request_t * r,void (* body_handler)(ngx_http_request_t * r))211 static ngx_int_t nchan_http_publisher_handler(ngx_http_request_t * r, void (*body_handler)(ngx_http_request_t *r)) {
212 ngx_int_t rc;
213 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
214
215 static ngx_str_t publisher_name = ngx_string("http");
216
217 if(ctx) ctx->publisher_type = &publisher_name;
218
219 /* Instruct ngx_http_read_client_request_body to store the request
220 body entirely in a memory buffer or in a file */
221 r->request_body_in_single_buf = 1;
222 r->request_body_in_persistent_file = 1;
223 r->request_body_in_clean_file = 0;
224 r->request_body_file_log_level = 0;
225
226 //don't buffer the request body --send it right on through
227 //r->request_body_no_buffering = 1;
228
229 rc = ngx_http_read_client_request_body(r, body_handler);
230 if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
231 return rc;
232 }
233 return NGX_DONE;
234 }
235
nchan_stub_status_handler(ngx_http_request_t * r)236 ngx_int_t nchan_stub_status_handler(ngx_http_request_t *r) {
237 ngx_buf_t *b;
238 ngx_chain_t out;
239 nchan_stub_status_t *stats;
240
241 nchan_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_nchan_module);
242
243 float shmem_used, shmem_max;
244
245 char *buf_fmt = "total published messages: %ui\n"
246 "stored messages: %ui\n"
247 "shared memory used: %fK\n"
248 "shared memory limit: %fK\n"
249 "channels: %ui\n"
250 "subscribers: %ui\n"
251 "redis pending commands: %ui\n"
252 "redis connected servers: %ui\n"
253 "total interprocess alerts received: %ui\n"
254 "interprocess alerts in transit: %ui\n"
255 "interprocess queued alerts: %ui\n"
256 "total interprocess send delay: %ui\n"
257 "total interprocess receive delay: %ui\n"
258 "nchan version: %s\n";
259
260 if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) {
261 nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status.");
262 return NGX_HTTP_INTERNAL_SERVER_ERROR;
263 }
264
265 shmem_used = (float )((float )nchan_get_used_shmem() / 1024.0);
266 shmem_max = (float )((float )mcf->shm_size / 1024.0);
267
268 stats = nchan_get_stub_status_stats();
269
270 b->start = (u_char *)&b[1];
271 b->pos = b->start;
272
273 b->end = ngx_snprintf(b->start, 800, buf_fmt, stats->total_published_messages, stats->messages, shmem_used, shmem_max, stats->channels, stats->subscribers, stats->redis_pending_commands, stats->redis_connected_servers, stats->ipc_total_alerts_received, stats->ipc_total_alerts_sent - stats->ipc_total_alerts_received, stats->ipc_queue_size, stats->ipc_total_send_delay, stats->ipc_total_receive_delay, NCHAN_VERSION);
274 b->last = b->end;
275
276 b->memory = 1;
277 b->last_buf = 1;
278
279 r->headers_out.status = NGX_HTTP_OK;
280 r->headers_out.content_type.len = sizeof("text/plain") - 1;
281 r->headers_out.content_type.data = (u_char *) "text/plain";
282
283 r->headers_out.content_length_n = b->end - b->start;
284 ngx_http_send_header(r);
285
286 out.buf = b;
287 out.next = NULL;
288
289 return ngx_http_output_filter(r, &out);
290 }
291
nchan_parse_message_buffer_config(ngx_http_request_t * r,nchan_loc_conf_t * cf,char ** err)292 int nchan_parse_message_buffer_config(ngx_http_request_t *r, nchan_loc_conf_t *cf, char **err) {
293 ngx_str_t val;
294 nchan_loc_conf_shared_data_t *shcf;
295
296 if(!cf->complex_message_timeout && !cf->complex_max_messages) {
297 return 1;
298 }
299
300 if(cf->complex_message_timeout) {
301 time_t timeout;
302 if(ngx_http_complex_value(r, cf->complex_message_timeout, &val) != NGX_OK) {
303 nchan_log_request_error(r, "cannot evaluate nchan_message_timeout value");
304 *err = NULL;
305 return 0;
306 }
307 if(val.len == 0) {
308 *err = "missing nchan_message_timeout value";
309 nchan_log_request_error(r, "%s", *err);
310 return 0;
311 }
312
313 if((timeout = ngx_parse_time(&val, 1)) == (time_t )NGX_ERROR) {
314 *err = "invalid nchan_message_timeout value";
315 nchan_log_request_error(r, "%s '%V'", *err, &val);
316 return 0;
317 }
318
319 shcf = memstore_get_conf_shared_data(cf);
320 shcf->message_timeout = timeout;
321 }
322 if(cf->complex_max_messages) {
323 ngx_int_t num;
324 if(ngx_http_complex_value(r, cf->complex_max_messages, &val) != NGX_OK) {
325 nchan_log_request_error(r, "cannot evaluate nchan_message_buffer_length value");
326 *err = NULL;
327 return 0;
328 }
329
330 if(val.len == 0) {
331 *err = "missing nchan_message_buffer_length value";
332 nchan_log_request_error(r, "%s", *err);
333 return 0;
334 }
335
336 num = ngx_atoi(val.data, val.len);
337 if(num == NGX_ERROR || num < 0) {
338 *err = "invalid nchan_message_buffer_length value";
339 nchan_log_request_error(r, "%s %V", *err, &val);
340 return 0;
341 }
342
343 shcf = memstore_get_conf_shared_data(cf);
344 shcf->max_messages = num;
345 }
346 return 1;
347 }
348
group_handler_callback(ngx_int_t status,nchan_group_t * group,ngx_http_request_t * r)349 static ngx_int_t group_handler_callback(ngx_int_t status, nchan_group_t *group, ngx_http_request_t *r) {
350 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
351
352 if(!group) {
353 group = ngx_pcalloc(r->pool, sizeof(*group));
354 }
355
356 if(!ctx->request_ran_content_handler) {
357 r->main->count--;
358 nchan_group_info(r, group);
359 }
360 else {
361 nchan_http_finalize_request(r, nchan_group_info(r, group));
362 }
363
364
365
366 return NGX_OK;
367 }
368
369
parse_size_limit(u_char * data,size_t len)370 static ngx_int_t parse_size_limit(u_char *data, size_t len) {
371 ngx_str_t str;
372 str.data = data;
373 str.len = len;
374 return nchan_parse_size(&str);
375 }
376
set_group_num_limit(ngx_http_request_t * r,ngx_http_complex_value_t * cv,ngx_atomic_int_t * dst,ngx_int_t (* parsefunc)(u_char *,size_t),char * errstr)377 static ngx_int_t set_group_num_limit(ngx_http_request_t *r, ngx_http_complex_value_t *cv, ngx_atomic_int_t *dst, ngx_int_t (*parsefunc)(u_char *, size_t), char *errstr) {
378 ngx_str_t tmp;
379 ngx_int_t num;
380 if(cv) {
381 ngx_http_complex_value(r, cv, &tmp);
382 if(tmp.len == 0) {
383 *dst = -1;
384 return 1;
385 }
386 else if((num = parsefunc(tmp.data, tmp.len)) == NGX_ERROR || num < 0) {
387 nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, errstr, 0);
388 return 0;
389 }
390 *dst = num;
391 }
392 else {
393 *dst = -1;
394 }
395 return 1;
396 }
397
parse_group_limits(ngx_http_request_t * r,nchan_loc_conf_t * cf,nchan_group_limits_t * limits)398 static ngx_int_t parse_group_limits(ngx_http_request_t *r, nchan_loc_conf_t *cf, nchan_group_limits_t *limits) {
399 set_group_num_limit(r, cf->group.max_channels, &limits->channels, ngx_atoi, "invalid nchan_group_max_channels value");
400 set_group_num_limit(r, cf->group.max_subscribers, &limits->subscribers, ngx_atoi, "invalid nchan_group_max_subscribers value");
401 set_group_num_limit(r, cf->group.max_messages, &limits->messages, ngx_atoi, "invalid nchan_group_max_messages value");
402 set_group_num_limit(r, cf->group.max_messages_shm_bytes, &limits->messages_shmem_bytes, parse_size_limit, "invalid nchan_group_max_messages_memory value");
403 set_group_num_limit(r, cf->group.max_messages_file_bytes, &limits->messages_file_bytes, parse_size_limit, "invalid nchan_group_max_messages_disk value");
404
405 return r->headers_out.status != NGX_HTTP_FORBIDDEN ? NGX_OK : NGX_ERROR;
406 }
407
nchan_group_handler(ngx_http_request_t * r)408 ngx_int_t nchan_group_handler(ngx_http_request_t *r) {
409 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
410 nchan_request_ctx_t *ctx;
411 ngx_int_t rc = NGX_DONE;
412 ngx_str_t *group;
413
414 if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
415 return NGX_HTTP_INTERNAL_SERVER_ERROR;
416 }
417 ngx_http_set_ctx(r, ctx, ngx_nchan_module);
418
419 if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
420 ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
421 return NGX_ERROR;
422 }
423
424 if(!cf->group.enable_accounting) {
425 nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Channel group accounting is disabled.", 0);
426 return NGX_OK;
427 }
428
429 group = nchan_get_group_name(r, cf, ctx);
430 if(group == NULL) {
431 nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "No group specified", 0);
432 return NGX_OK;
433 }
434
435 switch(r->method) {
436 case NGX_HTTP_GET:
437 if(!cf->group.get) {
438 rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
439 }
440 r->main->count++;
441 cf->storage_engine->get_group(group, cf, (callback_pt )group_handler_callback, r);
442
443 break;
444
445 case NGX_HTTP_POST:
446 if(!cf->group.set) {
447 rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
448 }
449
450 nchan_group_limits_t limits;
451
452 if(parse_group_limits(r, cf, &limits) != NGX_OK) {
453 return NGX_OK;
454 }
455
456 r->main->count++;
457 cf->storage_engine->set_group_limits(group, cf, &limits, (callback_pt )group_handler_callback, r);
458 break;
459
460 case NGX_HTTP_DELETE:
461 if(!cf->group.delete) {
462 rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
463 }
464 r->main->count++;
465 cf->storage_engine->delete_group(group, cf, (callback_pt )group_handler_callback, r);
466 break;
467
468 case NGX_HTTP_OPTIONS:
469 rc= nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_GROUP_HEADERS, &NCHAN_ALLOW_GET_POST_DELETE);
470 break;
471 }
472 ctx->request_ran_content_handler = 1;
473 return rc;
474 }
475
476 static ngx_int_t nchan_subscriber_info_handler_continued(ngx_int_t, void *, void *);
477 static void nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t *, void *);
478
nchan_subscriber_info_handler(ngx_http_request_t * r)479 ngx_int_t nchan_subscriber_info_handler(ngx_http_request_t *r) {
480 if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
481 ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
482 return NGX_ERROR;
483 }
484
485 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
486
487 nchan_request_ctx_t *ctx;
488 if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
489 return NGX_HTTP_INTERNAL_SERVER_ERROR;
490 }
491 ngx_http_set_ctx(r, ctx, ngx_nchan_module);
492
493
494 if(r->upstream && r->upstream->headers_in.x_accel_redirect) {
495 nchan_recover_x_accel_redirected_request_method(r);
496 }
497
498 if(!nchan_match_origin_header(r, cf, ctx)) {
499 nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
500 ctx->request_ran_content_handler = 1;
501 return NGX_OK;
502 }
503
504 if(cf->redis.enabled && !nchan_store_redis_ready(cf)) {
505 nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
506 return NGX_OK;
507 }
508
509 ngx_int_t rc = cf->storage_engine->get_subscriber_info_id(cf, nchan_subscriber_info_handler_continued, r);
510 //get_unique_request is expected to never complete in the current event loop cycle.
511 if(rc == NGX_ERROR) {
512 return NGX_HTTP_INTERNAL_SERVER_ERROR;
513 }
514
515 r->main->count++; //hold that request!
516 ctx->request_ran_content_handler = 1;
517 return NGX_DONE;
518 }
519
nchan_subscriber_info_handler_continued(ngx_int_t rc,void * d,void * pd)520 static ngx_int_t nchan_subscriber_info_handler_continued(ngx_int_t rc, void *d, void *pd) {
521 ngx_http_request_t *r = pd;
522 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
523
524 uintptr_t response_id = (uintptr_t )d;
525 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
526 ctx->subscriber_info_response_id = response_id;
527
528 if(rc == NGX_ERROR) {
529 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
530 return NGX_ERROR;
531 }
532
533 if(r->method == NGX_HTTP_OPTIONS) {
534 nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_SUBSCRIBER_HEADERS, &NCHAN_ALLOW_GET);
535 return NGX_ERROR;
536 }
537
538 ngx_str_t *channel_id = nchan_get_subscriber_info_response_channel_id(r, response_id);
539 if(channel_id == NULL) {
540 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
541 return NGX_ERROR;
542 }
543
544 subscriber_t *(*sub_create)(ngx_http_request_t *r, nchan_msg_id_t *msg_id) = NULL;
545
546 if(nchan_detect_websocket_request(r)) {
547 if(!cf->sub.websocket) {
548 nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
549 return NGX_ERROR;
550 }
551 sub_create = websocket_subscriber_create;
552 }
553 else if(r->method != NGX_HTTP_GET) {
554 nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
555 return NGX_ERROR;
556 }
557 else {
558 if(cf->sub.eventsource && nchan_detect_eventsource_request(r)) {
559 sub_create = eventsource_subscriber_create;
560 }
561 else if(cf->sub.http_chunked && nchan_detect_chunked_subscriber_request(r)) {
562 sub_create = http_chunked_subscriber_create;
563 }
564 else if(cf->sub.http_multipart && nchan_detect_multipart_subscriber_request(r)) {
565 sub_create = http_multipart_subscriber_create;
566 }
567 else if(cf->sub.poll) {
568 sub_create = intervalpoll_subscriber_create;
569 }
570 else if(cf->sub.http_raw_stream) {
571 sub_create = http_raw_stream_subscriber_create;
572 }
573 else if(cf->sub.longpoll) {
574 sub_create = longpoll_subscriber_create;
575 }
576 }
577
578 if(!sub_create) {
579 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
580 return NGX_ERROR;
581 }
582
583 nchan_msg_id_t *msg_id;
584 if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
585 nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Message ID invalid", 0);
586 return NGX_ERROR;
587 }
588
589 subscriber_t *sub;
590 if((sub = sub_create(r, msg_id)) == NULL) {
591 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
592 return NGX_ERROR;
593 }
594
595 if(sub->fn->set_enqueue_callback(sub, nchan_subscriber_info_publish_info_request_after_subscribing, r) != NGX_OK) {
596 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
597 return NGX_ERROR;
598 }
599
600 if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
601 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
602 return NGX_ERROR;
603 }
604
605 return NGX_OK;
606 }
607
info_request_publish_callback(ngx_int_t status,void * d,void * pd)608 static ngx_int_t info_request_publish_callback(ngx_int_t status, void *d, void *pd) {
609 //whatever
610 return NGX_OK;
611 }
612
really_publish_info_request(void * pd)613 static void really_publish_info_request(void *pd) {
614 subscriber_t *sub = pd;
615 ngx_http_request_t *r = sub->request;
616 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
617 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
618 ngx_str_t *channel_id;
619 if((channel_id = nchan_get_channel_id(r, PUB, 1))==NULL) {
620 sub->fn->respond_status(sub, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL);
621 return;
622 }
623
624 cf->storage_engine->request_subscriber_info(channel_id, ctx->subscriber_info_response_id, cf, (callback_pt) &info_request_publish_callback, r);
625 nchan_update_stub_status(total_published_messages, 1);
626 }
627
nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t * sub,void * pd)628 static void nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t *sub, void *pd) {
629 //publishing needs to happen AFTER this subscriber has enqueued, not during.
630 nchan_add_oneshot_timer(really_publish_info_request, sub, 0);
631 }
632
633
nchan_pubsub_handler(ngx_http_request_t * r)634 ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) {
635 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
636 ngx_str_t *channel_id;
637 subscriber_t *sub;
638 nchan_msg_id_t *msg_id;
639 ngx_int_t rc = NGX_DONE;
640 nchan_request_ctx_t *ctx;
641 nchan_group_limits_t group_limits;
642
643 if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
644 ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
645 return NGX_ERROR;
646 }
647
648 if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
649 return NGX_HTTP_INTERNAL_SERVER_ERROR;
650 }
651 ngx_http_set_ctx(r, ctx, ngx_nchan_module);
652
653 //X-Accel-Redirected requests get their method mangled to GET. De-mangle it if necessary
654 if(r->upstream && r->upstream->headers_in.x_accel_redirect) {
655 //yep, we got x-accel-redirected. what was the original method?...
656 nchan_recover_x_accel_redirected_request_method(r);
657 }
658
659 if(!nchan_match_origin_header(r, cf, ctx)) {
660 goto forbidden;
661 }
662
663 if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
664 goto bad_msgid;
665 }
666
667 if(parse_group_limits(r, cf, &group_limits) == NGX_OK) {
668 // unless the group already exists, these limits may only be set after this incoming request.
669 // TODO: fix this, although that will lead to even gnarlier control flow.
670 cf->storage_engine->set_group_limits(nchan_get_group_name(r, cf, ctx), cf, &group_limits, NULL, NULL);
671 }
672 else {
673 // there waas an error parsing group limit strings, and it has already been sent in the response.
674 // just quit.
675 return NGX_OK;
676 }
677
678 if(cf->redis.enabled && !nchan_store_redis_ready(cf)) {
679 //using redis, and it's not ready yet
680 if(r->method == NGX_HTTP_POST || r->method == NGX_HTTP_PUT) {
681 //discard request body before responding
682 nchan_http_publisher_handler(r, nchan_publisher_unavailable_body_handler);
683 }
684 else {
685 nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
686 }
687 return NGX_OK;
688 }
689
690 if(cf->pub.websocket || cf->pub.http) {
691 char *err;
692 if(!nchan_parse_message_buffer_config(r, cf, &err)) {
693 if(err) {
694 nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, err, 0);
695 return NGX_OK;
696 }
697 else {
698 nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
699 return NGX_OK;
700 }
701 }
702 }
703
704 if(nchan_detect_websocket_request(r)) {
705 //want websocket?
706 if(cf->sub.websocket) {
707 //we prefer to subscribe
708 if((channel_id = nchan_get_channel_id(r, SUB, 1)) == NULL) {
709 return r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR;
710 }
711
712 #if FAKESHARD
713 memstore_sub_debug_start();
714 #endif
715 if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
716 goto bad_msgid;
717 }
718 if((sub = websocket_subscriber_create(r, msg_id)) == NULL) {
719 nchan_log_request_error(r, "unable to create websocket subscriber");
720 return NGX_HTTP_INTERNAL_SERVER_ERROR;
721 }
722 if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
723 return NGX_HTTP_INTERNAL_SERVER_ERROR;
724 }
725 #if FAKESHARD
726 memstore_sub_debug_end();
727 #endif
728 }
729 else if(cf->pub.websocket) {
730 //no need to subscribe, but keep a connection open for publishing
731 nchan_create_websocket_publisher(r);
732 }
733 else goto forbidden;
734 return NGX_DONE;
735 }
736 else {
737 subscriber_t *(*sub_create)(ngx_http_request_t *r, nchan_msg_id_t *msg_id) = NULL;
738
739 switch(r->method) {
740 case NGX_HTTP_GET:
741 if(cf->sub.eventsource && nchan_detect_eventsource_request(r)) {
742 sub_create = eventsource_subscriber_create;
743 }
744 else if(cf->sub.http_chunked && nchan_detect_chunked_subscriber_request(r)) {
745 sub_create = http_chunked_subscriber_create;
746 }
747 else if(cf->sub.http_multipart && nchan_detect_multipart_subscriber_request(r)) {
748 sub_create = http_multipart_subscriber_create;
749 }
750 else if(cf->sub.poll) {
751 sub_create = intervalpoll_subscriber_create;
752 }
753 else if(cf->sub.http_raw_stream) {
754 sub_create = http_raw_stream_subscriber_create;
755 }
756 else if(cf->sub.longpoll) {
757 sub_create = longpoll_subscriber_create;
758 }
759 else if(cf->pub.http) {
760 nchan_http_publisher_handler(r, nchan_publisher_body_handler);
761 }
762 else {
763 goto forbidden;
764 }
765
766 if(sub_create) {
767 if((channel_id = nchan_get_channel_id(r, SUB, 1)) == NULL) {
768 return r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR;
769 }
770 #if FAKESHARD
771 memstore_sub_debug_start();
772 #endif
773 if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
774 goto bad_msgid;
775 }
776
777 if((sub = sub_create(r, msg_id)) == NULL) {
778 nchan_log_request_error(r, "unable to create subscriber");
779 return NGX_HTTP_INTERNAL_SERVER_ERROR;
780 }
781
782 if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
783 return NGX_HTTP_INTERNAL_SERVER_ERROR;
784 }
785 #if FAKESHARD
786 memstore_sub_debug_end();
787 #endif
788 }
789
790 break;
791
792 case NGX_HTTP_POST:
793 case NGX_HTTP_PUT:
794 if(cf->pub.http) {
795 nchan_http_publisher_handler(r, nchan_publisher_body_handler);
796 }
797 else goto forbidden;
798 break;
799
800 case NGX_HTTP_DELETE:
801 if(cf->pub.http) {
802 nchan_http_publisher_handler(r, nchan_publisher_body_handler);
803 }
804 else goto forbidden;
805 break;
806
807 case NGX_HTTP_OPTIONS:
808 if(cf->pub.http && (cf->sub.poll || cf->sub.longpoll || cf->sub.eventsource || cf->sub.websocket)) {
809 nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_PUBSUB_HEADERS, &NCHAN_ALLOW_GET_POST_PUT_DELETE);
810 }
811 else if(cf->pub.http) {
812 nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_PUBLISHER_HEADERS, &NCHAN_ALLOW_GET_POST_PUT_DELETE);
813 }
814 else if(cf->sub.poll || cf->sub.longpoll || cf->sub.eventsource || cf->sub.websocket) {
815 nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_SUBSCRIBER_HEADERS, &NCHAN_ALLOW_GET);
816 }
817 else goto forbidden;
818 break;
819 }
820 }
821 ctx->request_ran_content_handler = 1;
822 return rc;
823
824 forbidden:
825 nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
826 ctx->request_ran_content_handler = 1;
827 return NGX_OK;
828
829 bad_msgid:
830 nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Message ID invalid", 0);
831 ctx->request_ran_content_handler = 1;
832 return NGX_OK;
833
834 }
835
channel_info_callback(ngx_int_t status,void * rptr,void * pd)836 static ngx_int_t channel_info_callback(ngx_int_t status, void *rptr, void *pd) {
837 ngx_http_request_t *r = nchan_get_safe_request_ptr(pd);
838 if(r == NULL) {
839 return NGX_ERROR;
840 }
841 if(status>=500 && status <= 599) {
842 nchan_http_finalize_request(r, status);
843 }
844 else {
845 nchan_http_finalize_request(r, nchan_response_channel_ptr_info( (nchan_channel_t *)rptr, r, 0));
846 }
847 return NGX_OK;
848 }
849
clear_request_pointer(safe_request_ptr_t * pdata)850 static void clear_request_pointer(safe_request_ptr_t *pdata) {
851 if(pdata) {
852 pdata->r = NULL;
853 }
854 }
855
nchan_set_safe_request_ptr(ngx_http_request_t * r)856 static safe_request_ptr_t *nchan_set_safe_request_ptr(ngx_http_request_t *r) {
857 safe_request_ptr_t *data = ngx_alloc(sizeof(*data), ngx_cycle->log);
858 ngx_http_cleanup_t *cln = ngx_http_cleanup_add(r, 0);
859
860 if(!data || !cln) {
861 nchan_log_request_error(r, "couldn't allocate request cleanup stuff.");
862 if(cln) {
863 cln->data = NULL;
864 cln->handler = (ngx_http_cleanup_pt )clear_request_pointer;
865 }
866 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
867 return NULL;
868 }
869
870 data->cln = cln;
871 data->r = r;
872
873 cln->data = data;
874 cln->handler = (ngx_http_cleanup_pt )clear_request_pointer;
875
876 return data;
877 }
878
nchan_get_safe_request_ptr(safe_request_ptr_t * d)879 static ngx_http_request_t *nchan_get_safe_request_ptr(safe_request_ptr_t *d) {
880 ngx_http_request_t *r = d->r;
881 ngx_http_cleanup_t *cln = d->cln;
882
883 ngx_free(d);
884
885 if(r) {
886 cln->data = NULL;
887 }
888
889 return r;
890 }
891
892
publish_callback(ngx_int_t status,void * data,safe_request_ptr_t * pd)893 static ngx_int_t publish_callback(ngx_int_t status, void *data, safe_request_ptr_t *pd) {
894 nchan_request_ctx_t *ctx;
895 static nchan_msg_id_t empty_msgid = NCHAN_ZERO_MSGID;
896 nchan_channel_t *ch = data;
897
898 ngx_http_request_t *r = nchan_get_safe_request_ptr(pd);
899
900 if(r == NULL) { // the request has since disappered
901 return NGX_ERROR;
902 }
903 ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
904
905 //DBG("publish_callback %V owner %i status %i", ch_id, memstore_channel_owner(ch_id), status);
906 switch(status) {
907 case NCHAN_MESSAGE_QUEUED:
908 //message was queued successfully, but there were no subscribers to receive it.
909 ctx->prev_msg_id = ctx->msg_id;
910 ctx->msg_id = ch != NULL ? ch->last_published_msg_id : empty_msgid;
911
912 nchan_maybe_send_channel_event_message(r, CHAN_PUBLISH);
913 nchan_http_finalize_request(r, nchan_response_channel_ptr_info(ch, r, NGX_HTTP_ACCEPTED));
914 return NGX_OK;
915
916 case NCHAN_MESSAGE_RECEIVED:
917 //message was queued successfully, and it was already sent to at least one subscriber
918 ctx->prev_msg_id = ctx->msg_id;
919 ctx->msg_id = ch != NULL ? ch->last_published_msg_id : empty_msgid;
920
921 nchan_maybe_send_channel_event_message(r, CHAN_PUBLISH);
922 nchan_http_finalize_request(r, nchan_response_channel_ptr_info(ch, r, NGX_HTTP_CREATED));
923 return NGX_OK;
924
925 case NGX_ERROR:
926 status = NGX_HTTP_INTERNAL_SERVER_ERROR;
927 /*fallthrough*/
928 case NGX_HTTP_INSUFFICIENT_STORAGE:
929 case NGX_HTTP_INTERNAL_SERVER_ERROR:
930 case NGX_HTTP_SERVICE_UNAVAILABLE:
931 //WTF?
932 nchan_log_request_error(r, "error publishing message (HTTP status code %i)", status);
933 ctx->prev_msg_id = empty_msgid;
934 ctx->msg_id = empty_msgid;
935 nchan_http_finalize_request(r, status);
936 return NGX_ERROR;
937
938 case NGX_HTTP_FORBIDDEN:
939 ctx->prev_msg_id = empty_msgid;
940 ctx->msg_id = empty_msgid;
941 if(data) {
942 nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, (char *)data, 1);
943 }
944 else {
945 nchan_http_finalize_request(r, NGX_HTTP_FORBIDDEN);
946 }
947 return NGX_OK;
948
949 default:
950 //for debugging, mostly. I don't expect this branch to behit during regular operation
951 ctx->prev_msg_id = empty_msgid;;
952 ctx->msg_id = empty_msgid;
953 nchan_log_request_error(r, "TOTALLY UNEXPECTED error publishing message (HTTP status code %i)", status);
954 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
955 return NGX_ERROR;
956 }
957 }
958
nchan_publisher_post_request(ngx_http_request_t * r,ngx_str_t * content_type,size_t content_length,ngx_chain_t * request_body_chain,ngx_str_t * channel_id,nchan_loc_conf_t * cf)959 static void nchan_publisher_post_request(ngx_http_request_t *r, ngx_str_t *content_type, size_t content_length, ngx_chain_t *request_body_chain, ngx_str_t *channel_id, nchan_loc_conf_t *cf) {
960 ngx_buf_t *buf;
961 nchan_msg_t *msg;
962 ngx_str_t *eventsource_event;
963
964 safe_request_ptr_t *pd;
965
966 #if FAKESHARD
967 memstore_pub_debug_start();
968 #endif
969 if((msg = ngx_pcalloc(r->pool, sizeof(*msg))) == NULL) {
970 nchan_log_request_error(r, "can't allocate msg in request pool");
971 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
972 return;
973 }
974 msg->storage = NCHAN_MSG_POOL;
975
976
977 if(cf->eventsource_event.len > 0) {
978 msg->eventsource_event = &cf->eventsource_event;
979 }
980 else if((eventsource_event = nchan_get_header_value(r, NCHAN_HEADER_EVENTSOURCE_EVENT)) != NULL) {
981 msg->eventsource_event = eventsource_event;
982 }
983
984 //content type
985 if(content_type) {
986 msg->content_type = content_type;
987 }
988
989 if(content_length == 0) {
990 buf = ngx_create_temp_buf(r->pool, 0);
991 }
992 else if(request_body_chain!=NULL) {
993 buf = nchan_chain_to_single_buffer(r->pool, request_body_chain, content_length);
994 }
995 else {
996 nchan_log_request_error(r, "unexpected publisher message request body buffer location. please report this to the nchan developers.");
997 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
998 return;
999 }
1000
1001 msg->id.time = 0;
1002 msg->id.tag.fixed[0] = 0;
1003 msg->id.tagactive = 0;
1004 msg->id.tagcount = 1;
1005
1006 msg->buf = *buf;
1007 #if NCHAN_MSG_LEAK_DEBUG
1008 msg->lbl = r->uri;
1009 #endif
1010 nchan_deflate_message_if_needed(msg, cf, r, r->pool);
1011 if((pd = nchan_set_safe_request_ptr(r)) == NULL) {
1012 return;
1013 }
1014
1015 cf->storage_engine->publish(channel_id, msg, cf, (callback_pt) &publish_callback, pd);
1016 nchan_update_stub_status(total_published_messages, 1);
1017 #if FAKESHARD
1018 memstore_pub_debug_end();
1019 #endif
1020 }
1021
1022 typedef struct {
1023 ngx_str_t *ch_id;
1024 } nchan_pub_upstream_data_t;
1025
1026 typedef struct {
1027 ngx_http_post_subrequest_t psr;
1028 nchan_pub_upstream_data_t psr_data;
1029 } nchan_pub_upstream_stuff_t;
1030
nchan_publisher_upstream_handler(ngx_http_request_t * sr,void * data,ngx_int_t rc)1031 static ngx_int_t nchan_publisher_upstream_handler(ngx_http_request_t *sr, void *data, ngx_int_t rc) {
1032 ngx_http_request_t *r = sr->parent;
1033 nchan_pub_upstream_data_t *d = (nchan_pub_upstream_data_t *)data;
1034
1035 //switch(r->headers_out
1036 if(rc == NGX_OK) {
1037 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
1038 ngx_int_t code = sr->headers_out.status;
1039 ngx_str_t *content_type;
1040 ngx_int_t content_length;
1041 ngx_chain_t *request_chain;
1042
1043 switch(code) {
1044 case NGX_HTTP_OK:
1045 case NGX_HTTP_CREATED:
1046 case NGX_HTTP_ACCEPTED:
1047 if(sr->upstream) {
1048 content_type = (sr->upstream->headers_in.content_type ? &sr->upstream->headers_in.content_type->value : NULL);
1049 content_length = nchan_subrequest_content_length(sr);
1050 #if nginx_version >= 1013010
1051 request_chain = sr->out;
1052 #else
1053 request_chain = sr->upstream->out_bufs;
1054 #endif
1055 }
1056 else {
1057 content_type = NULL;
1058 content_length = 0;
1059 request_chain = NULL;
1060 }
1061 nchan_publisher_post_request(r, content_type, content_length, request_chain, d->ch_id, cf);
1062 break;
1063
1064 case NGX_HTTP_NOT_MODIFIED:
1065 content_type = (r->headers_in.content_type ? &r->headers_in.content_type->value : NULL);
1066 content_length = r->headers_in.content_length_n > 0 ? r->headers_in.content_length_n : 0;
1067 nchan_publisher_post_request(r, content_type, content_length, r->request_body->bufs, d->ch_id, cf);
1068 break;
1069
1070 case NGX_HTTP_NO_CONTENT:
1071 //cancel publication
1072 nchan_http_finalize_request(r, NGX_HTTP_NO_CONTENT);
1073 break;
1074
1075 default:
1076 nchan_http_finalize_request(r, NGX_HTTP_FORBIDDEN);
1077 }
1078 }
1079 else {
1080 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1081 }
1082
1083 return NGX_OK;
1084 }
1085
nchan_publisher_body_handler_continued(ngx_http_request_t * r,ngx_str_t * channel_id,nchan_loc_conf_t * cf)1086 static void nchan_publisher_body_handler_continued(ngx_http_request_t *r, ngx_str_t *channel_id, nchan_loc_conf_t *cf) {
1087 ngx_http_complex_value_t *publisher_upstream_request_url_ccv;
1088 static ngx_str_t POST_REQUEST_STRING = {4, (u_char *)"POST "};
1089 safe_request_ptr_t *pd;
1090
1091 switch(r->method) {
1092 case NGX_HTTP_GET:
1093 if((pd = nchan_set_safe_request_ptr(r)) == NULL){
1094 return;
1095 }
1096 cf->storage_engine->find_channel(channel_id, cf, (callback_pt) &channel_info_callback, pd);
1097 break;
1098
1099 case NGX_HTTP_PUT:
1100 case NGX_HTTP_POST:
1101 publisher_upstream_request_url_ccv = cf->publisher_upstream_request_url;
1102 if(publisher_upstream_request_url_ccv == NULL) {
1103 ngx_str_t *content_type = (r->headers_in.content_type ? &r->headers_in.content_type->value : NULL);
1104 ngx_int_t content_length = r->headers_in.content_length_n > 0 ? r->headers_in.content_length_n : 0;
1105 // no need to check for chunked transfer-encoding, nginx automatically sets the
1106 // content-length either way.
1107
1108 nchan_publisher_post_request(r, content_type, content_length, r->request_body->bufs, channel_id, cf);
1109 }
1110 else {
1111 nchan_pub_upstream_stuff_t *psr_stuff;
1112
1113 if((psr_stuff = ngx_palloc(r->pool, sizeof(*psr_stuff))) == NULL) {
1114 nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest");
1115 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1116 return;
1117 }
1118
1119 ngx_http_post_subrequest_t *psr = &psr_stuff->psr;
1120 nchan_pub_upstream_data_t *psrd = &psr_stuff->psr_data;
1121 ngx_http_request_t *sr;
1122 ngx_str_t publisher_upstream_request_url;
1123
1124 ngx_http_complex_value(r, publisher_upstream_request_url_ccv, &publisher_upstream_request_url);
1125
1126 psr->handler = nchan_publisher_upstream_handler;
1127 psr->data = psrd;
1128
1129 psrd->ch_id = channel_id;
1130
1131 ngx_http_subrequest(r, &publisher_upstream_request_url, NULL, &sr, psr, NGX_HTTP_SUBREQUEST_IN_MEMORY);
1132 nchan_adjust_subrequest(sr, NGX_HTTP_POST, &POST_REQUEST_STRING, r->request_body, r->headers_in.content_length_n);
1133 sr->args = r->args;
1134 }
1135 break;
1136
1137 case NGX_HTTP_DELETE:
1138 if((pd = nchan_set_safe_request_ptr(r)) == NULL){
1139 return;
1140 }
1141 cf->storage_engine->delete_channel(channel_id, cf, (callback_pt) &channel_info_callback, pd);
1142 nchan_maybe_send_channel_event_message(r, CHAN_DELETE);
1143 break;
1144
1145 default:
1146 nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
1147 }
1148
1149 }
1150
1151 typedef struct {
1152 ngx_str_t *ch_id;
1153 } nchan_pub_subrequest_data_t;
1154
1155 typedef struct {
1156 ngx_http_post_subrequest_t psr;
1157 nchan_pub_subrequest_data_t psr_data;
1158 } nchan_pub_subrequest_stuff_t;
1159
1160
nchan_publisher_body_authorize_handler(ngx_http_request_t * r,void * data,ngx_int_t rc)1161 static ngx_int_t nchan_publisher_body_authorize_handler(ngx_http_request_t *r, void *data, ngx_int_t rc) {
1162 nchan_pub_subrequest_data_t *d = data;
1163
1164 if(rc == NGX_OK) {
1165 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r->parent, ngx_nchan_module);
1166 ngx_int_t code = r->headers_out.status;
1167 if(code >= 200 && code <299) {
1168 //authorized. proceed as planned
1169 nchan_publisher_body_handler_continued(r->parent, d->ch_id, cf);
1170 }
1171 else { //anything else means forbidden
1172 nchan_http_finalize_request(r->parent, NGX_HTTP_FORBIDDEN);
1173 }
1174 }
1175 else {
1176 nchan_http_finalize_request(r->parent, rc);
1177 }
1178 return NGX_OK;
1179 }
1180
nchan_publisher_unavailable_body_handler(ngx_http_request_t * r)1181 static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r) {
1182 nchan_http_finalize_request(r, NGX_HTTP_SERVICE_UNAVAILABLE);
1183 return;
1184 }
1185
nchan_publisher_body_handler(ngx_http_request_t * r)1186 static void nchan_publisher_body_handler(ngx_http_request_t *r) {
1187 ngx_str_t *channel_id;
1188 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
1189 ngx_table_elt_t *content_length_elt;
1190 ngx_http_complex_value_t *authorize_request_url_ccv = cf->authorize_request_url;
1191
1192 if((channel_id = nchan_get_channel_id(r, PUB, 1))==NULL) {
1193 nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1194 return;
1195 }
1196
1197 if(!authorize_request_url_ccv) {
1198 nchan_publisher_body_handler_continued(r, channel_id, cf);
1199 }
1200 else {
1201 nchan_pub_subrequest_stuff_t *psr_stuff;
1202
1203 if((psr_stuff = ngx_palloc(r->pool, sizeof(*psr_stuff))) == NULL) {
1204 nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest");
1205 nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1206 return;
1207 }
1208
1209 ngx_http_post_subrequest_t *psr = &psr_stuff->psr;
1210 nchan_pub_subrequest_data_t *psrd = &psr_stuff->psr_data;
1211 ngx_http_request_t *sr;
1212 ngx_str_t auth_request_url;
1213
1214 ngx_http_complex_value(r, authorize_request_url_ccv, &auth_request_url);
1215
1216 psr->handler = nchan_publisher_body_authorize_handler;
1217 psr->data = psrd;
1218
1219 psrd->ch_id = channel_id;
1220
1221 ngx_http_subrequest(r, &auth_request_url, NULL, &sr, psr, 0);
1222
1223 if((sr->request_body = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t))) == NULL) {
1224 nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest body");
1225 nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1226 return;
1227 }
1228 if((content_length_elt = ngx_palloc(r->pool, sizeof(*content_length_elt))) == NULL) {
1229 nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest content-length header");
1230 nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1231 return;
1232 }
1233
1234 if(sr->headers_in.content_length) {
1235 *content_length_elt = *sr->headers_in.content_length;
1236 content_length_elt->value.len=1;
1237 content_length_elt->value.data=(u_char *)"0";
1238 sr->headers_in.content_length = content_length_elt;
1239 }
1240
1241 sr->headers_in.content_length_n = 0;
1242 sr->args = r->args;
1243 sr->header_only = 1;
1244 }
1245 }
1246
nchan_benchmark_handler(ngx_http_request_t * r)1247 ngx_int_t nchan_benchmark_handler(ngx_http_request_t *r) {
1248 nchan_request_ctx_t *ctx;
1249
1250 if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
1251 return NGX_HTTP_INTERNAL_SERVER_ERROR;
1252 }
1253 if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
1254 return NGX_HTTP_INTERNAL_SERVER_ERROR;
1255 }
1256 ngx_http_set_ctx(r, ctx, ngx_nchan_module);
1257
1258 return nchan_benchmark_ws_initialize(r);
1259
1260 /*
1261 ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
1262 nchan_bufchain_pool_init(ctx->bcp, r->pool);
1263 return nchan_benchmark_initialize(r);
1264 */
1265 }
1266