1 /*
2  *  Written by Leo Ponomarev 2009-2015
3  */
4 
5 #include <nchan_module.h>
6 #include <util/nchan_subrequest.h>
7 #include <assert.h>
8 
9 #include <subscribers/longpoll.h>
10 #include <subscribers/intervalpoll.h>
11 #include <subscribers/eventsource.h>
12 #include <subscribers/http-chunked.h>
13 #include <subscribers/http-multipart-mixed.h>
14 #include <subscribers/http-raw-stream.h>
15 #include <subscribers/websocket.h>
16 #include <subscribers/benchmark.h>
17 #include <store/memory/store.h>
18 #include <store/redis/store.h>
19 
20 #include <nchan_setup.c>
21 
22 #if (NGX_ZLIB)
23 #include <zlib.h>
24 #endif
25 
26 
27 #if FAKESHARD
28 #include <store/memory/ipc.h>
29 #include <store/memory/shmem.h>
30 //#include <store/memory/store-private.h> //for debugging
31 #endif
32 #include <util/nchan_output.h>
33 #include <nchan_websocket_publisher.h>
34 
35 ngx_int_t           nchan_worker_processes;
36 int                 nchan_stub_status_enabled = 0;
37 
38 
39 static void nchan_publisher_body_handler(ngx_http_request_t *r);
40 static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r);
41 
42 //#define DEBUG_LEVEL NGX_LOG_WARN
43 //#define DEBUG_LEVEL NGX_LOG_DEBUG
44 
45 //#define DBG(fmt, args...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "NCHAN:" fmt, ##args)
46 
47 typedef struct {
48   ngx_http_request_t    *r;
49   ngx_http_cleanup_t    *cln;
50 } safe_request_ptr_t;
51 static safe_request_ptr_t *nchan_set_safe_request_ptr(ngx_http_request_t *r);
52 static ngx_http_request_t *nchan_get_safe_request_ptr(safe_request_ptr_t *pd);
53 
nchan_maybe_send_channel_event_message(ngx_http_request_t * r,channel_event_type_t event_type)54 ngx_int_t nchan_maybe_send_channel_event_message(ngx_http_request_t *r, channel_event_type_t event_type) {
55   static nchan_loc_conf_t            evcf_data;
56   static nchan_loc_conf_t           *evcf = NULL;
57 
58   static ngx_str_t group =           ngx_string("meta");
59 
60   static ngx_str_t evt_sub_enqueue = ngx_string("subscriber_enqueue");
61   static ngx_str_t evt_sub_dequeue = ngx_string("subscriber_dequeue");
62   static ngx_str_t evt_sub_recvmsg = ngx_string("subscriber_receive_message");
63   static ngx_str_t evt_sub_recvsts = ngx_string("subscriber_receive_status");
64   static ngx_str_t evt_chan_publish= ngx_string("channel_publish");
65   static ngx_str_t evt_chan_delete = ngx_string("channel_delete");
66 
67   nchan_loc_conf_t          *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
68   ngx_http_complex_value_t  *cv = cf->channel_events_channel_id;
69   if(cv==NULL) {
70     //nothing to send
71     return NGX_OK;
72   }
73 
74   nchan_request_ctx_t       *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
75   ngx_str_t                  tmpid;
76   size_t                     sz;
77   ngx_str_t                 *id;
78   u_char                    *cur;
79   ngx_str_t                  evstr;
80   nchan_msg_t                msg;
81 
82   switch(event_type) {
83     case SUB_ENQUEUE:
84       ctx->channel_event_name = &evt_sub_enqueue;
85       break;
86     case SUB_DEQUEUE:
87       ctx->channel_event_name = &evt_sub_dequeue;
88       break;
89     case SUB_RECEIVE_MESSAGE:
90       ctx->channel_event_name = &evt_sub_recvmsg;
91       break;
92     case SUB_RECEIVE_STATUS:
93       ctx->channel_event_name = &evt_sub_recvsts;
94       break;
95     case CHAN_PUBLISH:
96       ctx->channel_event_name = &evt_chan_publish;
97       break;
98     case CHAN_DELETE:
99       ctx->channel_event_name = &evt_chan_delete;
100       break;
101   }
102 
103   //the id
104   ngx_http_complex_value(r, cv, &tmpid);
105   sz = group.len + 1 + tmpid.len;
106   if((id = ngx_palloc(r->pool, sizeof(*id) + sz)) == NULL) {
107     nchan_log_request_error(r, "can't allocate space for legacy channel id");
108     return NGX_ERROR;
109   }
110   id->len = sz;
111   id->data = (u_char *)&id[1];
112   cur = id->data;
113   ngx_memcpy(cur, group.data, group.len);
114   cur += group.len;
115   cur[0]='/';
116   cur++;
117   ngx_memcpy(cur, tmpid.data, tmpid.len);
118 
119 
120   //the event message
121   ngx_http_complex_value(r, cf->channel_event_string, &evstr);
122 
123   ngx_memzero(&msg, sizeof(msg));
124 
125   msg.buf.temporary = 1;
126   msg.buf.memory = 1;
127   msg.buf.last_buf = 1;
128   msg.buf.pos = evstr.data;
129   msg.buf.last = evstr.data + evstr.len;
130   msg.buf.start = msg.buf.pos;
131   msg.buf.end = msg.buf.last;
132 
133   msg.id.time = 0;
134   msg.id.tag.fixed[0] = 0;
135   msg.id.tagactive = 0;
136   msg.id.tagcount = 1;
137   msg.storage = NCHAN_MSG_STACK;
138 
139   if(evcf == NULL) {
140     evcf = &evcf_data;
141     ngx_memzero(evcf, sizeof(*evcf));
142 
143     evcf->message_timeout = NCHAN_META_CHANNEL_MESSAGE_TTL;
144     evcf->max_messages = NCHAN_META_CHANNEL_MAX_MESSAGES;
145     evcf->complex_max_messages = NULL;
146     evcf->complex_message_timeout = NULL;
147     evcf->subscriber_first_message = 0;
148     evcf->channel_timeout = NCHAN_META_CHANNEL_TIMEOUT;
149   }
150   evcf->storage_engine = cf->storage_engine;
151   evcf->redis = cf->redis;
152 
153   evcf->storage_engine->publish(id, &msg, evcf, NULL, NULL);
154 
155   return NGX_OK;
156 }
157 
158 #if FAKESHARD
memstore_sub_debug_start()159 static void memstore_sub_debug_start() {
160   #ifdef SUB_FAKE_WORKER
161   memstore_fakeprocess_push(SUB_FAKE_WORKER);
162   #else
163   memstore_fakeprocess_push_random();
164   #endif
165 }
memstore_sub_debug_end()166 static void memstore_sub_debug_end() {
167   memstore_fakeprocess_pop();
168 }
memstore_pub_debug_start()169 static void memstore_pub_debug_start() {
170   #ifdef PUB_FAKE_WORKER
171   memstore_fakeprocess_push(PUB_FAKE_WORKER);
172   #else
173   memstore_fakeprocess_push_random();
174   #endif
175 }
memstore_pub_debug_end()176 static void memstore_pub_debug_end() {
177   memstore_fakeprocess_pop();
178 }
179 #endif
180 
nchan_loc_conf_message_timeout(nchan_loc_conf_t * cf)181 time_t nchan_loc_conf_message_timeout(nchan_loc_conf_t *cf) {
182   time_t                        timeout;
183   nchan_loc_conf_shared_data_t *shcf;
184 
185   if(!cf->complex_message_timeout) {
186     timeout = cf->message_timeout;
187   }
188   else {
189     shcf = memstore_get_conf_shared_data(cf);
190     timeout = shcf->message_timeout;
191   }
192 
193   return timeout != 0 ? timeout : 525600 * 60;
194 }
195 
nchan_loc_conf_max_messages(nchan_loc_conf_t * cf)196 ngx_int_t nchan_loc_conf_max_messages(nchan_loc_conf_t *cf) {
197   ngx_int_t                     num;
198   nchan_loc_conf_shared_data_t *shcf;
199 
200   if(!cf->complex_max_messages) {
201     num = cf->max_messages;
202   }
203   else {
204     shcf = memstore_get_conf_shared_data(cf);
205     num = shcf->max_messages;
206   }
207 
208   return num;
209 }
210 
nchan_http_publisher_handler(ngx_http_request_t * r,void (* body_handler)(ngx_http_request_t * r))211 static ngx_int_t nchan_http_publisher_handler(ngx_http_request_t * r, void (*body_handler)(ngx_http_request_t *r)) {
212   ngx_int_t                       rc;
213   nchan_request_ctx_t            *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
214 
215   static ngx_str_t                publisher_name = ngx_string("http");
216 
217   if(ctx) ctx->publisher_type = &publisher_name;
218 
219   /* Instruct ngx_http_read_client_request_body to store the request
220      body entirely in a memory buffer or in a file */
221   r->request_body_in_single_buf = 1;
222   r->request_body_in_persistent_file = 1;
223   r->request_body_in_clean_file = 0;
224   r->request_body_file_log_level = 0;
225 
226   //don't buffer the request body --send it right on through
227   //r->request_body_no_buffering = 1;
228 
229   rc = ngx_http_read_client_request_body(r, body_handler);
230   if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
231     return rc;
232   }
233   return NGX_DONE;
234 }
235 
nchan_stub_status_handler(ngx_http_request_t * r)236 ngx_int_t nchan_stub_status_handler(ngx_http_request_t *r) {
237   ngx_buf_t           *b;
238   ngx_chain_t          out;
239   nchan_stub_status_t *stats;
240 
241   nchan_main_conf_t   *mcf = ngx_http_get_module_main_conf(r, ngx_nchan_module);
242 
243   float                shmem_used, shmem_max;
244 
245   char     *buf_fmt = "total published messages: %ui\n"
246                       "stored messages: %ui\n"
247                       "shared memory used: %fK\n"
248                       "shared memory limit: %fK\n"
249                       "channels: %ui\n"
250                       "subscribers: %ui\n"
251                       "redis pending commands: %ui\n"
252                       "redis connected servers: %ui\n"
253                       "total interprocess alerts received: %ui\n"
254                       "interprocess alerts in transit: %ui\n"
255                       "interprocess queued alerts: %ui\n"
256                       "total interprocess send delay: %ui\n"
257                       "total interprocess receive delay: %ui\n"
258                       "nchan version: %s\n";
259 
260   if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) {
261     nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status.");
262     return NGX_HTTP_INTERNAL_SERVER_ERROR;
263   }
264 
265   shmem_used = (float )((float )nchan_get_used_shmem() / 1024.0);
266   shmem_max = (float )((float )mcf->shm_size / 1024.0);
267 
268   stats = nchan_get_stub_status_stats();
269 
270   b->start = (u_char *)&b[1];
271   b->pos = b->start;
272 
273   b->end = ngx_snprintf(b->start, 800, buf_fmt, stats->total_published_messages, stats->messages, shmem_used, shmem_max, stats->channels, stats->subscribers, stats->redis_pending_commands, stats->redis_connected_servers, stats->ipc_total_alerts_received, stats->ipc_total_alerts_sent - stats->ipc_total_alerts_received, stats->ipc_queue_size, stats->ipc_total_send_delay, stats->ipc_total_receive_delay, NCHAN_VERSION);
274   b->last = b->end;
275 
276   b->memory = 1;
277   b->last_buf = 1;
278 
279   r->headers_out.status = NGX_HTTP_OK;
280   r->headers_out.content_type.len = sizeof("text/plain") - 1;
281   r->headers_out.content_type.data = (u_char *) "text/plain";
282 
283   r->headers_out.content_length_n = b->end - b->start;
284   ngx_http_send_header(r);
285 
286   out.buf = b;
287   out.next = NULL;
288 
289   return ngx_http_output_filter(r, &out);
290 }
291 
nchan_parse_message_buffer_config(ngx_http_request_t * r,nchan_loc_conf_t * cf,char ** err)292 int nchan_parse_message_buffer_config(ngx_http_request_t *r, nchan_loc_conf_t *cf, char **err) {
293   ngx_str_t                      val;
294   nchan_loc_conf_shared_data_t  *shcf;
295 
296   if(!cf->complex_message_timeout && !cf->complex_max_messages) {
297     return 1;
298   }
299 
300   if(cf->complex_message_timeout) {
301     time_t    timeout;
302     if(ngx_http_complex_value(r, cf->complex_message_timeout, &val) != NGX_OK) {
303       nchan_log_request_error(r, "cannot evaluate nchan_message_timeout value");
304       *err = NULL;
305       return 0;
306     }
307     if(val.len == 0) {
308       *err = "missing nchan_message_timeout value";
309       nchan_log_request_error(r, "%s", *err);
310       return 0;
311     }
312 
313     if((timeout = ngx_parse_time(&val, 1)) == (time_t )NGX_ERROR) {
314       *err = "invalid nchan_message_timeout value";
315       nchan_log_request_error(r, "%s '%V'", *err, &val);
316       return 0;
317     }
318 
319     shcf = memstore_get_conf_shared_data(cf);
320     shcf->message_timeout = timeout;
321   }
322   if(cf->complex_max_messages) {
323     ngx_int_t                      num;
324     if(ngx_http_complex_value(r, cf->complex_max_messages, &val) != NGX_OK) {
325       nchan_log_request_error(r, "cannot evaluate nchan_message_buffer_length value");
326       *err = NULL;
327       return 0;
328     }
329 
330     if(val.len == 0) {
331       *err = "missing nchan_message_buffer_length value";
332       nchan_log_request_error(r, "%s", *err);
333       return 0;
334     }
335 
336     num = ngx_atoi(val.data, val.len);
337     if(num == NGX_ERROR || num < 0) {
338       *err = "invalid nchan_message_buffer_length value";
339       nchan_log_request_error(r, "%s %V", *err, &val);
340       return 0;
341     }
342 
343     shcf = memstore_get_conf_shared_data(cf);
344     shcf->max_messages = num;
345   }
346   return 1;
347 }
348 
group_handler_callback(ngx_int_t status,nchan_group_t * group,ngx_http_request_t * r)349 static ngx_int_t group_handler_callback(ngx_int_t status, nchan_group_t *group, ngx_http_request_t *r) {
350   nchan_request_ctx_t    *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
351 
352   if(!group) {
353     group = ngx_pcalloc(r->pool, sizeof(*group));
354   }
355 
356   if(!ctx->request_ran_content_handler) {
357     r->main->count--;
358     nchan_group_info(r, group);
359   }
360   else {
361     nchan_http_finalize_request(r, nchan_group_info(r, group));
362   }
363 
364 
365 
366   return NGX_OK;
367 }
368 
369 
parse_size_limit(u_char * data,size_t len)370 static ngx_int_t parse_size_limit(u_char *data, size_t len) {
371   ngx_str_t  str;
372   str.data = data;
373   str.len = len;
374   return nchan_parse_size(&str);
375 }
376 
set_group_num_limit(ngx_http_request_t * r,ngx_http_complex_value_t * cv,ngx_atomic_int_t * dst,ngx_int_t (* parsefunc)(u_char *,size_t),char * errstr)377 static ngx_int_t set_group_num_limit(ngx_http_request_t *r, ngx_http_complex_value_t *cv, ngx_atomic_int_t *dst, ngx_int_t (*parsefunc)(u_char *, size_t), char *errstr) {
378   ngx_str_t               tmp;
379   ngx_int_t               num;
380   if(cv) {
381     ngx_http_complex_value(r, cv, &tmp);
382     if(tmp.len == 0) {
383       *dst = -1;
384       return 1;
385     }
386     else if((num = parsefunc(tmp.data, tmp.len)) == NGX_ERROR || num < 0) {
387       nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, errstr, 0);
388       return 0;
389     }
390     *dst = num;
391   }
392   else {
393     *dst = -1;
394   }
395   return 1;
396 }
397 
parse_group_limits(ngx_http_request_t * r,nchan_loc_conf_t * cf,nchan_group_limits_t * limits)398 static ngx_int_t parse_group_limits(ngx_http_request_t *r, nchan_loc_conf_t *cf, nchan_group_limits_t *limits) {
399   set_group_num_limit(r, cf->group.max_channels, &limits->channels, ngx_atoi, "invalid nchan_group_max_channels value");
400   set_group_num_limit(r, cf->group.max_subscribers, &limits->subscribers, ngx_atoi, "invalid nchan_group_max_subscribers value");
401   set_group_num_limit(r, cf->group.max_messages, &limits->messages, ngx_atoi, "invalid nchan_group_max_messages value");
402   set_group_num_limit(r, cf->group.max_messages_shm_bytes, &limits->messages_shmem_bytes, parse_size_limit, "invalid nchan_group_max_messages_memory value");
403   set_group_num_limit(r, cf->group.max_messages_file_bytes, &limits->messages_file_bytes, parse_size_limit, "invalid nchan_group_max_messages_disk value");
404 
405   return r->headers_out.status != NGX_HTTP_FORBIDDEN ? NGX_OK : NGX_ERROR;
406 }
407 
nchan_group_handler(ngx_http_request_t * r)408 ngx_int_t nchan_group_handler(ngx_http_request_t *r) {
409   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
410   nchan_request_ctx_t    *ctx;
411   ngx_int_t               rc = NGX_DONE;
412   ngx_str_t              *group;
413 
414   if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
415     return NGX_HTTP_INTERNAL_SERVER_ERROR;
416   }
417   ngx_http_set_ctx(r, ctx, ngx_nchan_module);
418 
419   if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
420     ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
421     return NGX_ERROR;
422   }
423 
424   if(!cf->group.enable_accounting) {
425     nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Channel group accounting is disabled.", 0);
426     return NGX_OK;
427   }
428 
429   group = nchan_get_group_name(r, cf, ctx);
430   if(group == NULL) {
431     nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "No group specified", 0);
432     return NGX_OK;
433   }
434 
435   switch(r->method) {
436     case NGX_HTTP_GET:
437       if(!cf->group.get) {
438         rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
439       }
440       r->main->count++;
441       cf->storage_engine->get_group(group, cf, (callback_pt )group_handler_callback, r);
442 
443       break;
444 
445     case NGX_HTTP_POST:
446       if(!cf->group.set) {
447         rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
448       }
449 
450       nchan_group_limits_t     limits;
451 
452       if(parse_group_limits(r, cf, &limits) != NGX_OK) {
453         return NGX_OK;
454       }
455 
456       r->main->count++;
457       cf->storage_engine->set_group_limits(group, cf, &limits, (callback_pt )group_handler_callback, r);
458       break;
459 
460     case NGX_HTTP_DELETE:
461       if(!cf->group.delete) {
462         rc = nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
463       }
464       r->main->count++;
465       cf->storage_engine->delete_group(group, cf, (callback_pt )group_handler_callback, r);
466       break;
467 
468     case NGX_HTTP_OPTIONS:
469         rc= nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_GROUP_HEADERS, &NCHAN_ALLOW_GET_POST_DELETE);
470       break;
471   }
472   ctx->request_ran_content_handler = 1;
473   return rc;
474 }
475 
476 static ngx_int_t nchan_subscriber_info_handler_continued(ngx_int_t, void *, void *);
477 static void nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t *, void *);
478 
nchan_subscriber_info_handler(ngx_http_request_t * r)479 ngx_int_t nchan_subscriber_info_handler(ngx_http_request_t *r) {
480   if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
481     ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
482     return NGX_ERROR;
483   }
484 
485   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
486 
487   nchan_request_ctx_t    *ctx;
488   if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
489     return NGX_HTTP_INTERNAL_SERVER_ERROR;
490   }
491   ngx_http_set_ctx(r, ctx, ngx_nchan_module);
492 
493 
494   if(r->upstream && r->upstream->headers_in.x_accel_redirect) {
495     nchan_recover_x_accel_redirected_request_method(r);
496   }
497 
498   if(!nchan_match_origin_header(r, cf, ctx)) {
499     nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
500     ctx->request_ran_content_handler = 1;
501     return NGX_OK;
502   }
503 
504   if(cf->redis.enabled && !nchan_store_redis_ready(cf)) {
505     nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
506     return NGX_OK;
507   }
508 
509   ngx_int_t rc = cf->storage_engine->get_subscriber_info_id(cf, nchan_subscriber_info_handler_continued, r);
510   //get_unique_request is expected to never complete in the current event loop cycle.
511   if(rc == NGX_ERROR) {
512     return NGX_HTTP_INTERNAL_SERVER_ERROR;
513   }
514 
515   r->main->count++; //hold that request!
516   ctx->request_ran_content_handler = 1;
517   return NGX_DONE;
518 }
519 
nchan_subscriber_info_handler_continued(ngx_int_t rc,void * d,void * pd)520 static ngx_int_t nchan_subscriber_info_handler_continued(ngx_int_t rc, void *d, void *pd) {
521   ngx_http_request_t     *r = pd;
522   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
523 
524   uintptr_t               response_id = (uintptr_t )d;
525   nchan_request_ctx_t    *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
526   ctx->subscriber_info_response_id = response_id;
527 
528   if(rc == NGX_ERROR) {
529     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
530     return NGX_ERROR;
531   }
532 
533   if(r->method == NGX_HTTP_OPTIONS) {
534     nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_SUBSCRIBER_HEADERS, &NCHAN_ALLOW_GET);
535     return NGX_ERROR;
536   }
537 
538   ngx_str_t              *channel_id = nchan_get_subscriber_info_response_channel_id(r, response_id);
539   if(channel_id == NULL) {
540     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
541     return NGX_ERROR;
542   }
543 
544   subscriber_t *(*sub_create)(ngx_http_request_t *r, nchan_msg_id_t *msg_id) = NULL;
545 
546   if(nchan_detect_websocket_request(r)) {
547     if(!cf->sub.websocket) {
548       nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
549       return NGX_ERROR;
550     }
551     sub_create = websocket_subscriber_create;
552   }
553   else if(r->method != NGX_HTTP_GET) {
554     nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
555     return NGX_ERROR;
556   }
557   else {
558     if(cf->sub.eventsource && nchan_detect_eventsource_request(r)) {
559       sub_create = eventsource_subscriber_create;
560     }
561     else if(cf->sub.http_chunked && nchan_detect_chunked_subscriber_request(r)) {
562       sub_create = http_chunked_subscriber_create;
563     }
564     else if(cf->sub.http_multipart && nchan_detect_multipart_subscriber_request(r)) {
565       sub_create = http_multipart_subscriber_create;
566     }
567     else if(cf->sub.poll) {
568       sub_create = intervalpoll_subscriber_create;
569     }
570     else if(cf->sub.http_raw_stream) {
571       sub_create = http_raw_stream_subscriber_create;
572     }
573     else if(cf->sub.longpoll) {
574       sub_create = longpoll_subscriber_create;
575     }
576   }
577 
578   if(!sub_create) {
579     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
580     return NGX_ERROR;
581   }
582 
583   nchan_msg_id_t *msg_id;
584   if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
585     nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Message ID invalid", 0);
586     return NGX_ERROR;
587   }
588 
589   subscriber_t *sub;
590   if((sub = sub_create(r, msg_id)) == NULL) {
591     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
592     return NGX_ERROR;
593   }
594 
595   if(sub->fn->set_enqueue_callback(sub, nchan_subscriber_info_publish_info_request_after_subscribing, r) != NGX_OK) {
596     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
597     return NGX_ERROR;
598   }
599 
600   if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
601     nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
602     return NGX_ERROR;
603   }
604 
605   return NGX_OK;
606 }
607 
info_request_publish_callback(ngx_int_t status,void * d,void * pd)608 static ngx_int_t info_request_publish_callback(ngx_int_t status, void *d, void *pd) {
609   //whatever
610   return NGX_OK;
611 }
612 
really_publish_info_request(void * pd)613 static void really_publish_info_request(void *pd) {
614   subscriber_t           *sub = pd;
615   ngx_http_request_t     *r = sub->request;
616   nchan_request_ctx_t    *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
617   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
618   ngx_str_t              *channel_id;
619   if((channel_id = nchan_get_channel_id(r, PUB, 1))==NULL) {
620     sub->fn->respond_status(sub, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL);
621     return;
622   }
623 
624   cf->storage_engine->request_subscriber_info(channel_id, ctx->subscriber_info_response_id, cf, (callback_pt) &info_request_publish_callback, r);
625   nchan_update_stub_status(total_published_messages, 1);
626 }
627 
nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t * sub,void * pd)628 static void nchan_subscriber_info_publish_info_request_after_subscribing(subscriber_t *sub, void *pd) {
629   //publishing needs to happen AFTER this subscriber has enqueued, not during.
630   nchan_add_oneshot_timer(really_publish_info_request, sub, 0);
631 }
632 
633 
nchan_pubsub_handler(ngx_http_request_t * r)634 ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) {
635   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
636   ngx_str_t              *channel_id;
637   subscriber_t           *sub;
638   nchan_msg_id_t         *msg_id;
639   ngx_int_t               rc = NGX_DONE;
640   nchan_request_ctx_t    *ctx;
641   nchan_group_limits_t    group_limits;
642 
643   if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
644     ngx_http_finalize_request(r, NGX_HTTP_CLIENT_CLOSED_REQUEST);
645     return NGX_ERROR;
646   }
647 
648   if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
649     return NGX_HTTP_INTERNAL_SERVER_ERROR;
650   }
651   ngx_http_set_ctx(r, ctx, ngx_nchan_module);
652 
653   //X-Accel-Redirected requests get their method mangled to GET. De-mangle it if necessary
654   if(r->upstream && r->upstream->headers_in.x_accel_redirect) {
655     //yep, we got x-accel-redirected. what was the original method?...
656     nchan_recover_x_accel_redirected_request_method(r);
657   }
658 
659   if(!nchan_match_origin_header(r, cf, ctx)) {
660     goto forbidden;
661   }
662 
663   if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
664     goto bad_msgid;
665   }
666 
667   if(parse_group_limits(r, cf, &group_limits) == NGX_OK) {
668     // unless the group already exists, these limits may only be set after this incoming request.
669     // TODO: fix this, although that will lead to even gnarlier control flow.
670     cf->storage_engine->set_group_limits(nchan_get_group_name(r, cf, ctx), cf, &group_limits, NULL, NULL);
671   }
672   else {
673     // there waas an error parsing group limit strings, and it has already been sent in the response.
674     // just quit.
675     return NGX_OK;
676   }
677 
678   if(cf->redis.enabled && !nchan_store_redis_ready(cf)) {
679     //using redis, and it's not ready yet
680     if(r->method == NGX_HTTP_POST || r->method == NGX_HTTP_PUT) {
681       //discard request body before responding
682       nchan_http_publisher_handler(r, nchan_publisher_unavailable_body_handler);
683     }
684     else {
685       nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
686     }
687     return NGX_OK;
688   }
689 
690   if(cf->pub.websocket || cf->pub.http) {
691     char *err;
692     if(!nchan_parse_message_buffer_config(r, cf, &err)) {
693       if(err) {
694         nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, err, 0);
695         return NGX_OK;
696       }
697       else {
698         nchan_respond_status(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 0);
699         return NGX_OK;
700       }
701     }
702   }
703 
704   if(nchan_detect_websocket_request(r)) {
705     //want websocket?
706     if(cf->sub.websocket) {
707       //we prefer to subscribe
708       if((channel_id = nchan_get_channel_id(r, SUB, 1)) == NULL) {
709         return r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR;
710       }
711 
712 #if FAKESHARD
713       memstore_sub_debug_start();
714 #endif
715       if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
716         goto bad_msgid;
717       }
718       if((sub = websocket_subscriber_create(r, msg_id)) == NULL) {
719         nchan_log_request_error(r, "unable to create websocket subscriber");
720         return NGX_HTTP_INTERNAL_SERVER_ERROR;
721       }
722       if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
723         return NGX_HTTP_INTERNAL_SERVER_ERROR;
724       }
725 #if FAKESHARD
726       memstore_sub_debug_end();
727 #endif
728     }
729     else if(cf->pub.websocket) {
730       //no need to subscribe, but keep a connection open for publishing
731       nchan_create_websocket_publisher(r);
732     }
733     else goto forbidden;
734     return NGX_DONE;
735   }
736   else {
737     subscriber_t *(*sub_create)(ngx_http_request_t *r, nchan_msg_id_t *msg_id) = NULL;
738 
739     switch(r->method) {
740       case NGX_HTTP_GET:
741         if(cf->sub.eventsource && nchan_detect_eventsource_request(r)) {
742           sub_create = eventsource_subscriber_create;
743         }
744         else if(cf->sub.http_chunked && nchan_detect_chunked_subscriber_request(r)) {
745           sub_create = http_chunked_subscriber_create;
746         }
747         else if(cf->sub.http_multipart && nchan_detect_multipart_subscriber_request(r)) {
748           sub_create = http_multipart_subscriber_create;
749         }
750         else if(cf->sub.poll) {
751           sub_create = intervalpoll_subscriber_create;
752         }
753         else if(cf->sub.http_raw_stream) {
754           sub_create = http_raw_stream_subscriber_create;
755         }
756         else if(cf->sub.longpoll) {
757           sub_create = longpoll_subscriber_create;
758         }
759         else if(cf->pub.http) {
760           nchan_http_publisher_handler(r, nchan_publisher_body_handler);
761         }
762         else {
763           goto forbidden;
764         }
765 
766         if(sub_create) {
767           if((channel_id = nchan_get_channel_id(r, SUB, 1)) == NULL) {
768             return r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR;
769           }
770 #if FAKESHARD
771           memstore_sub_debug_start();
772 #endif
773           if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
774             goto bad_msgid;
775           }
776 
777           if((sub = sub_create(r, msg_id)) == NULL) {
778             nchan_log_request_error(r, "unable to create subscriber");
779             return NGX_HTTP_INTERNAL_SERVER_ERROR;
780           }
781 
782           if(sub->fn->subscribe(sub, channel_id) != NGX_OK) {
783             return NGX_HTTP_INTERNAL_SERVER_ERROR;
784           }
785 #if FAKESHARD
786           memstore_sub_debug_end();
787 #endif
788         }
789 
790         break;
791 
792       case NGX_HTTP_POST:
793       case NGX_HTTP_PUT:
794         if(cf->pub.http) {
795           nchan_http_publisher_handler(r, nchan_publisher_body_handler);
796         }
797         else goto forbidden;
798         break;
799 
800       case NGX_HTTP_DELETE:
801         if(cf->pub.http) {
802           nchan_http_publisher_handler(r, nchan_publisher_body_handler);
803         }
804         else goto forbidden;
805         break;
806 
807       case NGX_HTTP_OPTIONS:
808         if(cf->pub.http && (cf->sub.poll || cf->sub.longpoll || cf->sub.eventsource || cf->sub.websocket)) {
809           nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_PUBSUB_HEADERS, &NCHAN_ALLOW_GET_POST_PUT_DELETE);
810         }
811         else if(cf->pub.http) {
812           nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_PUBLISHER_HEADERS, &NCHAN_ALLOW_GET_POST_PUT_DELETE);
813         }
814         else if(cf->sub.poll || cf->sub.longpoll || cf->sub.eventsource || cf->sub.websocket) {
815           nchan_OPTIONS_respond(r, &NCHAN_ACCESS_CONTROL_ALLOWED_SUBSCRIBER_HEADERS, &NCHAN_ALLOW_GET);
816         }
817         else goto forbidden;
818         break;
819     }
820   }
821   ctx->request_ran_content_handler = 1;
822   return rc;
823 
824 forbidden:
825   nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
826   ctx->request_ran_content_handler = 1;
827   return NGX_OK;
828 
829 bad_msgid:
830   nchan_respond_cstring(r, NGX_HTTP_BAD_REQUEST, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, "Message ID invalid", 0);
831   ctx->request_ran_content_handler = 1;
832   return NGX_OK;
833 
834 }
835 
channel_info_callback(ngx_int_t status,void * rptr,void * pd)836 static ngx_int_t channel_info_callback(ngx_int_t status, void *rptr, void *pd) {
837   ngx_http_request_t *r = nchan_get_safe_request_ptr(pd);
838   if(r == NULL) {
839     return NGX_ERROR;
840   }
841   if(status>=500 && status <= 599) {
842     nchan_http_finalize_request(r, status);
843   }
844   else {
845     nchan_http_finalize_request(r, nchan_response_channel_ptr_info( (nchan_channel_t *)rptr, r, 0));
846   }
847   return NGX_OK;
848 }
849 
clear_request_pointer(safe_request_ptr_t * pdata)850 static void clear_request_pointer(safe_request_ptr_t *pdata) {
851   if(pdata) {
852     pdata->r = NULL;
853   }
854 }
855 
nchan_set_safe_request_ptr(ngx_http_request_t * r)856 static safe_request_ptr_t *nchan_set_safe_request_ptr(ngx_http_request_t *r) {
857   safe_request_ptr_t           *data = ngx_alloc(sizeof(*data), ngx_cycle->log);
858   ngx_http_cleanup_t           *cln = ngx_http_cleanup_add(r, 0);
859 
860   if(!data || !cln) {
861     nchan_log_request_error(r, "couldn't allocate request cleanup stuff.");
862     if(cln) {
863       cln->data = NULL;
864       cln->handler = (ngx_http_cleanup_pt )clear_request_pointer;
865     }
866     nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
867     return NULL;
868   }
869 
870   data->cln = cln;
871   data->r = r;
872 
873   cln->data = data;
874   cln->handler = (ngx_http_cleanup_pt )clear_request_pointer;
875 
876   return data;
877 }
878 
nchan_get_safe_request_ptr(safe_request_ptr_t * d)879 static ngx_http_request_t *nchan_get_safe_request_ptr(safe_request_ptr_t *d) {
880   ngx_http_request_t    *r = d->r;
881   ngx_http_cleanup_t    *cln = d->cln;
882 
883   ngx_free(d);
884 
885   if(r) {
886     cln->data = NULL;
887   }
888 
889   return r;
890 }
891 
892 
publish_callback(ngx_int_t status,void * data,safe_request_ptr_t * pd)893 static ngx_int_t publish_callback(ngx_int_t status, void *data, safe_request_ptr_t *pd) {
894   nchan_request_ctx_t   *ctx;
895   static nchan_msg_id_t  empty_msgid = NCHAN_ZERO_MSGID;
896   nchan_channel_t       *ch = data;
897 
898   ngx_http_request_t    *r = nchan_get_safe_request_ptr(pd);
899 
900   if(r == NULL) { // the request has since disappered
901     return NGX_ERROR;
902   }
903   ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
904 
905   //DBG("publish_callback %V owner %i status %i", ch_id, memstore_channel_owner(ch_id), status);
906   switch(status) {
907     case NCHAN_MESSAGE_QUEUED:
908       //message was queued successfully, but there were no subscribers to receive it.
909       ctx->prev_msg_id = ctx->msg_id;
910       ctx->msg_id = ch != NULL ? ch->last_published_msg_id : empty_msgid;
911 
912       nchan_maybe_send_channel_event_message(r, CHAN_PUBLISH);
913       nchan_http_finalize_request(r, nchan_response_channel_ptr_info(ch, r, NGX_HTTP_ACCEPTED));
914       return NGX_OK;
915 
916     case NCHAN_MESSAGE_RECEIVED:
917       //message was queued successfully, and it was already sent to at least one subscriber
918       ctx->prev_msg_id = ctx->msg_id;
919       ctx->msg_id = ch != NULL ? ch->last_published_msg_id : empty_msgid;
920 
921       nchan_maybe_send_channel_event_message(r, CHAN_PUBLISH);
922       nchan_http_finalize_request(r, nchan_response_channel_ptr_info(ch, r, NGX_HTTP_CREATED));
923       return NGX_OK;
924 
925     case NGX_ERROR:
926       status = NGX_HTTP_INTERNAL_SERVER_ERROR;
927       /*fallthrough*/
928     case NGX_HTTP_INSUFFICIENT_STORAGE:
929     case NGX_HTTP_INTERNAL_SERVER_ERROR:
930     case NGX_HTTP_SERVICE_UNAVAILABLE:
931       //WTF?
932       nchan_log_request_error(r, "error publishing message (HTTP status code %i)", status);
933       ctx->prev_msg_id = empty_msgid;
934       ctx->msg_id = empty_msgid;
935       nchan_http_finalize_request(r, status);
936       return NGX_ERROR;
937 
938     case NGX_HTTP_FORBIDDEN:
939       ctx->prev_msg_id = empty_msgid;
940       ctx->msg_id = empty_msgid;
941       if(data) {
942         nchan_respond_cstring(r, NGX_HTTP_FORBIDDEN, &NCHAN_CONTENT_TYPE_TEXT_PLAIN, (char *)data, 1);
943       }
944       else {
945         nchan_http_finalize_request(r, NGX_HTTP_FORBIDDEN);
946       }
947       return NGX_OK;
948 
949     default:
950       //for debugging, mostly. I don't expect this branch to behit during regular operation
951       ctx->prev_msg_id = empty_msgid;;
952       ctx->msg_id = empty_msgid;
953       nchan_log_request_error(r, "TOTALLY UNEXPECTED error publishing message (HTTP status code %i)", status);
954       nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
955       return NGX_ERROR;
956   }
957 }
958 
nchan_publisher_post_request(ngx_http_request_t * r,ngx_str_t * content_type,size_t content_length,ngx_chain_t * request_body_chain,ngx_str_t * channel_id,nchan_loc_conf_t * cf)959 static void nchan_publisher_post_request(ngx_http_request_t *r, ngx_str_t *content_type, size_t content_length, ngx_chain_t *request_body_chain, ngx_str_t *channel_id, nchan_loc_conf_t *cf) {
960   ngx_buf_t                      *buf;
961   nchan_msg_t                    *msg;
962   ngx_str_t                      *eventsource_event;
963 
964   safe_request_ptr_t             *pd;
965 
966 #if FAKESHARD
967   memstore_pub_debug_start();
968 #endif
969   if((msg = ngx_pcalloc(r->pool, sizeof(*msg))) == NULL) {
970     nchan_log_request_error(r, "can't allocate msg in request pool");
971     nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
972     return;
973   }
974   msg->storage = NCHAN_MSG_POOL;
975 
976 
977   if(cf->eventsource_event.len > 0) {
978     msg->eventsource_event = &cf->eventsource_event;
979   }
980   else if((eventsource_event = nchan_get_header_value(r, NCHAN_HEADER_EVENTSOURCE_EVENT)) != NULL) {
981     msg->eventsource_event = eventsource_event;
982   }
983 
984   //content type
985   if(content_type) {
986     msg->content_type = content_type;
987   }
988 
989   if(content_length == 0) {
990     buf = ngx_create_temp_buf(r->pool, 0);
991   }
992   else if(request_body_chain!=NULL) {
993     buf = nchan_chain_to_single_buffer(r->pool, request_body_chain, content_length);
994   }
995   else {
996     nchan_log_request_error(r, "unexpected publisher message request body buffer location. please report this to the nchan developers.");
997     nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
998     return;
999   }
1000 
1001   msg->id.time = 0;
1002   msg->id.tag.fixed[0] = 0;
1003   msg->id.tagactive = 0;
1004   msg->id.tagcount = 1;
1005 
1006   msg->buf = *buf;
1007 #if NCHAN_MSG_LEAK_DEBUG
1008   msg->lbl = r->uri;
1009 #endif
1010   nchan_deflate_message_if_needed(msg, cf, r, r->pool);
1011   if((pd = nchan_set_safe_request_ptr(r)) == NULL) {
1012     return;
1013   }
1014 
1015   cf->storage_engine->publish(channel_id, msg, cf, (callback_pt) &publish_callback, pd);
1016   nchan_update_stub_status(total_published_messages, 1);
1017 #if FAKESHARD
1018   memstore_pub_debug_end();
1019 #endif
1020 }
1021 
1022 typedef struct {
1023   ngx_str_t       *ch_id;
1024 } nchan_pub_upstream_data_t;
1025 
1026 typedef struct {
1027   ngx_http_post_subrequest_t    psr;
1028   nchan_pub_upstream_data_t   psr_data;
1029 } nchan_pub_upstream_stuff_t;
1030 
nchan_publisher_upstream_handler(ngx_http_request_t * sr,void * data,ngx_int_t rc)1031 static ngx_int_t nchan_publisher_upstream_handler(ngx_http_request_t *sr, void *data, ngx_int_t rc) {
1032   ngx_http_request_t         *r = sr->parent;
1033   nchan_pub_upstream_data_t  *d = (nchan_pub_upstream_data_t *)data;
1034 
1035   //switch(r->headers_out
1036   if(rc == NGX_OK) {
1037     nchan_loc_conf_t          *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
1038     ngx_int_t                 code = sr->headers_out.status;
1039     ngx_str_t                *content_type;
1040     ngx_int_t                 content_length;
1041     ngx_chain_t              *request_chain;
1042 
1043     switch(code) {
1044       case NGX_HTTP_OK:
1045       case NGX_HTTP_CREATED:
1046       case NGX_HTTP_ACCEPTED:
1047         if(sr->upstream) {
1048           content_type = (sr->upstream->headers_in.content_type ? &sr->upstream->headers_in.content_type->value : NULL);
1049           content_length = nchan_subrequest_content_length(sr);
1050 #if nginx_version >= 1013010
1051           request_chain = sr->out;
1052 #else
1053           request_chain = sr->upstream->out_bufs;
1054 #endif
1055         }
1056         else {
1057           content_type = NULL;
1058           content_length = 0;
1059           request_chain = NULL;
1060         }
1061         nchan_publisher_post_request(r, content_type, content_length, request_chain, d->ch_id, cf);
1062         break;
1063 
1064       case NGX_HTTP_NOT_MODIFIED:
1065         content_type = (r->headers_in.content_type ? &r->headers_in.content_type->value : NULL);
1066         content_length = r->headers_in.content_length_n > 0 ? r->headers_in.content_length_n : 0;
1067         nchan_publisher_post_request(r, content_type, content_length, r->request_body->bufs, d->ch_id, cf);
1068         break;
1069 
1070       case NGX_HTTP_NO_CONTENT:
1071         //cancel publication
1072         nchan_http_finalize_request(r, NGX_HTTP_NO_CONTENT);
1073         break;
1074 
1075       default:
1076         nchan_http_finalize_request(r, NGX_HTTP_FORBIDDEN);
1077     }
1078   }
1079   else {
1080     nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1081   }
1082 
1083   return NGX_OK;
1084 }
1085 
nchan_publisher_body_handler_continued(ngx_http_request_t * r,ngx_str_t * channel_id,nchan_loc_conf_t * cf)1086 static void nchan_publisher_body_handler_continued(ngx_http_request_t *r, ngx_str_t *channel_id, nchan_loc_conf_t *cf) {
1087   ngx_http_complex_value_t       *publisher_upstream_request_url_ccv;
1088   static ngx_str_t                POST_REQUEST_STRING = {4, (u_char *)"POST "};
1089   safe_request_ptr_t             *pd;
1090 
1091   switch(r->method) {
1092     case NGX_HTTP_GET:
1093       if((pd = nchan_set_safe_request_ptr(r)) == NULL){
1094         return;
1095       }
1096       cf->storage_engine->find_channel(channel_id, cf, (callback_pt) &channel_info_callback, pd);
1097       break;
1098 
1099     case NGX_HTTP_PUT:
1100     case NGX_HTTP_POST:
1101       publisher_upstream_request_url_ccv = cf->publisher_upstream_request_url;
1102       if(publisher_upstream_request_url_ccv == NULL) {
1103         ngx_str_t    *content_type = (r->headers_in.content_type ? &r->headers_in.content_type->value : NULL);
1104         ngx_int_t     content_length = r->headers_in.content_length_n > 0 ? r->headers_in.content_length_n : 0;
1105         // no need to check for chunked transfer-encoding, nginx automatically sets the
1106         // content-length either way.
1107 
1108         nchan_publisher_post_request(r, content_type, content_length, r->request_body->bufs, channel_id, cf);
1109       }
1110       else {
1111         nchan_pub_upstream_stuff_t    *psr_stuff;
1112 
1113         if((psr_stuff = ngx_palloc(r->pool, sizeof(*psr_stuff))) == NULL) {
1114           nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest");
1115           nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1116           return;
1117         }
1118 
1119         ngx_http_post_subrequest_t    *psr = &psr_stuff->psr;
1120         nchan_pub_upstream_data_t     *psrd = &psr_stuff->psr_data;
1121         ngx_http_request_t            *sr;
1122         ngx_str_t                      publisher_upstream_request_url;
1123 
1124         ngx_http_complex_value(r, publisher_upstream_request_url_ccv, &publisher_upstream_request_url);
1125 
1126         psr->handler = nchan_publisher_upstream_handler;
1127         psr->data = psrd;
1128 
1129         psrd->ch_id = channel_id;
1130 
1131         ngx_http_subrequest(r, &publisher_upstream_request_url, NULL, &sr, psr, NGX_HTTP_SUBREQUEST_IN_MEMORY);
1132         nchan_adjust_subrequest(sr, NGX_HTTP_POST, &POST_REQUEST_STRING, r->request_body, r->headers_in.content_length_n);
1133         sr->args = r->args;
1134       }
1135       break;
1136 
1137     case NGX_HTTP_DELETE:
1138       if((pd = nchan_set_safe_request_ptr(r)) == NULL){
1139         return;
1140       }
1141       cf->storage_engine->delete_channel(channel_id, cf, (callback_pt) &channel_info_callback, pd);
1142       nchan_maybe_send_channel_event_message(r, CHAN_DELETE);
1143       break;
1144 
1145     default:
1146       nchan_respond_status(r, NGX_HTTP_FORBIDDEN, NULL, NULL, 0);
1147   }
1148 
1149 }
1150 
1151 typedef struct {
1152   ngx_str_t       *ch_id;
1153 } nchan_pub_subrequest_data_t;
1154 
1155 typedef struct {
1156   ngx_http_post_subrequest_t    psr;
1157   nchan_pub_subrequest_data_t   psr_data;
1158 } nchan_pub_subrequest_stuff_t;
1159 
1160 
nchan_publisher_body_authorize_handler(ngx_http_request_t * r,void * data,ngx_int_t rc)1161 static ngx_int_t nchan_publisher_body_authorize_handler(ngx_http_request_t *r, void *data, ngx_int_t rc) {
1162   nchan_pub_subrequest_data_t  *d = data;
1163 
1164   if(rc == NGX_OK) {
1165     nchan_loc_conf_t    *cf = ngx_http_get_module_loc_conf(r->parent, ngx_nchan_module);
1166     ngx_int_t            code = r->headers_out.status;
1167     if(code >= 200 && code <299) {
1168       //authorized. proceed as planned
1169       nchan_publisher_body_handler_continued(r->parent, d->ch_id, cf);
1170     }
1171     else { //anything else means forbidden
1172       nchan_http_finalize_request(r->parent, NGX_HTTP_FORBIDDEN);
1173     }
1174   }
1175   else {
1176     nchan_http_finalize_request(r->parent, rc);
1177   }
1178   return NGX_OK;
1179 }
1180 
nchan_publisher_unavailable_body_handler(ngx_http_request_t * r)1181 static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r) {
1182   nchan_http_finalize_request(r, NGX_HTTP_SERVICE_UNAVAILABLE);
1183   return;
1184 }
1185 
nchan_publisher_body_handler(ngx_http_request_t * r)1186 static void nchan_publisher_body_handler(ngx_http_request_t *r) {
1187   ngx_str_t                      *channel_id;
1188   nchan_loc_conf_t               *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
1189   ngx_table_elt_t                *content_length_elt;
1190   ngx_http_complex_value_t       *authorize_request_url_ccv = cf->authorize_request_url;
1191 
1192   if((channel_id = nchan_get_channel_id(r, PUB, 1))==NULL) {
1193     nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1194     return;
1195   }
1196 
1197   if(!authorize_request_url_ccv) {
1198     nchan_publisher_body_handler_continued(r, channel_id, cf);
1199   }
1200   else {
1201     nchan_pub_subrequest_stuff_t   *psr_stuff;
1202 
1203     if((psr_stuff = ngx_palloc(r->pool, sizeof(*psr_stuff))) == NULL) {
1204       nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest");
1205       nchan_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
1206       return;
1207     }
1208 
1209     ngx_http_post_subrequest_t    *psr = &psr_stuff->psr;
1210     nchan_pub_subrequest_data_t   *psrd = &psr_stuff->psr_data;
1211     ngx_http_request_t            *sr;
1212     ngx_str_t                      auth_request_url;
1213 
1214     ngx_http_complex_value(r, authorize_request_url_ccv, &auth_request_url);
1215 
1216     psr->handler = nchan_publisher_body_authorize_handler;
1217     psr->data = psrd;
1218 
1219     psrd->ch_id = channel_id;
1220 
1221     ngx_http_subrequest(r, &auth_request_url, NULL, &sr, psr, 0);
1222 
1223     if((sr->request_body = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t))) == NULL) {
1224       nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest body");
1225       nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1226       return;
1227     }
1228     if((content_length_elt = ngx_palloc(r->pool, sizeof(*content_length_elt))) == NULL) {
1229       nchan_log_request_error(r, "can't allocate memory for publisher auth subrequest content-length header");
1230       nchan_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
1231       return;
1232     }
1233 
1234     if(sr->headers_in.content_length) {
1235       *content_length_elt = *sr->headers_in.content_length;
1236       content_length_elt->value.len=1;
1237       content_length_elt->value.data=(u_char *)"0";
1238       sr->headers_in.content_length = content_length_elt;
1239     }
1240 
1241     sr->headers_in.content_length_n = 0;
1242     sr->args = r->args;
1243     sr->header_only = 1;
1244   }
1245 }
1246 
nchan_benchmark_handler(ngx_http_request_t * r)1247 ngx_int_t nchan_benchmark_handler(ngx_http_request_t *r) {
1248   nchan_request_ctx_t    *ctx;
1249 
1250   if(r->connection && (r->connection->read->eof || r->connection->read->pending_eof)) {
1251     return NGX_HTTP_INTERNAL_SERVER_ERROR;
1252   }
1253   if((ctx = ngx_pcalloc(r->pool, sizeof(nchan_request_ctx_t))) == NULL) {
1254     return NGX_HTTP_INTERNAL_SERVER_ERROR;
1255   }
1256   ngx_http_set_ctx(r, ctx, ngx_nchan_module);
1257 
1258   return nchan_benchmark_ws_initialize(r);
1259 
1260   /*
1261   ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
1262   nchan_bufchain_pool_init(ctx->bcp, r->pool);
1263   return nchan_benchmark_initialize(r);
1264   */
1265 }
1266