1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 #include <util/nchan_bufchainpool.h>
4 #include "longpoll.h"
5 #include "longpoll-private.h"
6 
7 //#define DEBUG_LEVEL NGX_LOG_WARN
8 #define DEBUG_LEVEL NGX_LOG_DEBUG
9 
10 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:MULTIPART:" fmt, ##arg)
11 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:MULTIPART:" fmt, ##arg)
12 #include <assert.h>
13 
14 typedef struct {
15   u_char                 boundary[50];
16   u_char                *boundary_end;
17 } multipart_privdata_t;
18 
19 typedef struct {
20   u_char       charbuf[58 + 10*NCHAN_FIXED_MULTITAG_MAX];
21   void        *prev;
22   void        *next;
23 } headerbuf_t;
24 
fsub_bcp(full_subscriber_t * fsub)25 static nchan_bufchain_pool_t *fsub_bcp(full_subscriber_t *fsub) {
26   nchan_request_ctx_t            *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
27   return ctx->bcp;
28 }
29 
multipart_ensure_headers_sent(full_subscriber_t * fsub)30 static void multipart_ensure_headers_sent(full_subscriber_t *fsub) {
31   nchan_buf_and_chain_t          *bc;
32 
33   ngx_http_request_t             *r = fsub->sub.request;
34   ngx_http_core_loc_conf_t       *clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
35   nchan_request_ctx_t            *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
36   multipart_privdata_t           *mpd = (multipart_privdata_t *)fsub->privdata;
37 
38   if(!fsub->data.shook_hands) {
39     clcf->chunked_transfer_encoding = 0;
40     nchan_request_set_content_type_multipart_boundary_header(r, ctx);
41 
42     nchan_cleverly_output_headers_only_for_later_response(r);
43 
44     //set preamble in the request ctx. it would be nicer to store in in the subscriber data,
45     //but that would mean not reusing longpoll's fsub directly
46 
47     r->header_only = 0;
48     r->chunked = 0;
49 
50     if((bc = nchan_bufchain_pool_reserve(ctx->bcp, 1)) == NULL) {
51       ERR("can't reserve bufchain for multipart headers");
52       nchan_respond_status(fsub->sub.request, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 1);
53       return;
54     }
55 
56     ngx_memzero(&bc->buf, sizeof(ngx_buf_t));
57     bc->buf.start = mpd->boundary + 2;
58     bc->buf.pos = bc->buf.start;
59     bc->buf.end = mpd->boundary_end;
60     bc->buf.last = bc->buf.end;
61     bc->buf.memory = 1;
62     bc->buf.last_buf = 0;
63     bc->buf.last_in_chain = 1;
64     bc->buf.flush = 1;
65 
66     nchan_output_filter(r, &bc->chain);
67 
68     fsub->data.shook_hands = 1;
69   }
70 }
headerbuf_alloc(void * pd)71 static void *headerbuf_alloc(void *pd) {
72   return ngx_palloc((ngx_pool_t *)pd, sizeof(headerbuf_t));
73 }
74 
multipart_respond_message(subscriber_t * sub,nchan_msg_t * msg)75 static ngx_int_t multipart_respond_message(subscriber_t *sub,  nchan_msg_t *msg) {
76 
77   full_subscriber_t      *fsub = (full_subscriber_t  *)sub;
78   ngx_buf_t              *buf, *msg_buf = &msg->buf, *msgid_buf;
79   ngx_int_t               rc;
80   nchan_loc_conf_t       *cf = ngx_http_get_module_loc_conf(fsub->sub.request, ngx_nchan_module);
81   nchan_request_ctx_t    *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
82   ngx_int_t               n;
83   nchan_buf_and_chain_t  *bc;
84   ngx_chain_t            *chain;
85   ngx_file_t             *file_copy;
86   multipart_privdata_t   *mpd = (multipart_privdata_t *)fsub->privdata;
87 
88   headerbuf_t            *headerbuf = nchan_reuse_queue_push(ctx->output_str_queue);
89   u_char                 *cur = headerbuf->charbuf;
90 
91   if(fsub->data.timeout_ev.timer_set) {
92     ngx_del_timer(&fsub->data.timeout_ev);
93     ngx_add_timer(&fsub->data.timeout_ev, sub->cf->subscriber_timeout * 1000);
94   }
95 
96   //generate the headers
97   if(!cf->msg_in_etag_only) {
98     //msgtime
99     cur = ngx_cpymem(cur, "\r\nLast-Modified: ", sizeof("\r\nLast-Modified: ") - 1);
100     cur = ngx_http_time(cur, msg->id.time);
101     *cur++ = CR; *cur++ = LF;
102     //msgtag
103     cur = ngx_cpymem(cur, "Etag: ", sizeof("Etag: ") - 1);
104     cur += msgtag_to_strptr(&msg->id, (char *)cur);
105     *cur++ = CR; *cur++ = LF;
106   }
107   else {
108     ngx_str_t   *tmp_etag = msgid_to_str(&msg->id);
109     cur = ngx_snprintf(cur, 58 + 10*NCHAN_FIXED_MULTITAG_MAX, "\r\nEtag: %V\r\n", tmp_etag);
110   }
111 
112   n=4;
113   if(!msg->content_type) {
114     //don't need content_type buf'n'chain
115     n--;
116   }
117   if(ngx_buf_size(msg_buf) == 0) {
118     //don't need msgbuf
119     n --;
120   }
121   if((bc = nchan_bufchain_pool_reserve(ctx->bcp, n)) == NULL) {
122     ERR("can't allocate buf-and-chains for multipart/mixed client output");
123     return NGX_ERROR;
124   }
125 
126   chain = &bc->chain;
127   msgid_buf = chain->buf;
128 
129   //message id
130   ngx_memzero(chain->buf, sizeof(ngx_buf_t));
131   chain->buf->memory = 1;
132   chain->buf->start = headerbuf->charbuf;
133   chain->buf->pos = headerbuf->charbuf;
134 
135   //content_type maybe
136   if(msg->content_type) {
137     chain = chain->next;
138     buf = chain->buf;
139 
140     msgid_buf->last = cur;
141     msgid_buf->end = cur;
142 
143     ngx_memzero(buf, sizeof(ngx_buf_t));
144     buf->memory = 1;
145     buf->start = cur;
146     buf->pos = cur;
147     buf->last = ngx_snprintf(cur, 255, "Content-Type: %V\r\n\r\n", msg->content_type);
148     buf->end = buf->last;
149   }
150   else {
151     *cur++ = CR; *cur++ = LF;
152     msgid_buf->last = cur;
153     msgid_buf->end = cur;
154   }
155 
156   //msgbuf
157   if(ngx_buf_size(msg_buf) > 0) {
158     chain = chain->next;
159     buf = chain->buf;
160     ngx_memcpy(buf, msg_buf, sizeof(*msg_buf));
161     if(msg_buf->file) {
162       file_copy = nchan_bufchain_pool_reserve_file(ctx->bcp);
163       nchan_msg_buf_open_fd_if_needed(buf, file_copy, NULL);
164     }
165     buf->last_buf = 0;
166     buf->last_in_chain = 0;
167     buf->flush = 0;
168   }
169 
170   chain = chain->next;
171   buf = chain->buf;
172   ngx_memzero(buf, sizeof(ngx_buf_t));
173   buf->start = &mpd->boundary[0];
174   buf->pos = buf->start;
175   buf->end = mpd->boundary_end;
176   buf->last = buf->end;
177   buf->memory = 1;
178   buf->last_buf = 0;
179   buf->last_in_chain = 1;
180   buf->flush = 1;
181 
182   ctx->prev_msg_id = fsub->sub.last_msgid;
183   update_subscriber_last_msg_id(sub, msg);
184   ctx->msg_id = fsub->sub.last_msgid;
185 
186   multipart_ensure_headers_sent(fsub);
187 
188   DBG("%p output msg to subscriber", sub);
189 
190   rc = nchan_output_msg_filter(fsub->sub.request, msg, &bc->chain);
191 
192   return rc;
193 }
194 
multipart_respond_status(subscriber_t * sub,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)195 static ngx_int_t multipart_respond_status(subscriber_t *sub, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body){
196   nchan_buf_and_chain_t    *bc;
197   static u_char            *end_boundary=(u_char *)"--\r\n";
198   full_subscriber_t        *fsub = (full_subscriber_t  *)sub;
199   //nchan_request_ctx_t      *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
200 
201   if(status_code == NGX_HTTP_NO_CONTENT || (status_code == NGX_HTTP_NOT_MODIFIED && !status_line)) {
202     //ignore
203     return NGX_OK;
204   }
205 
206   if(fsub->data.shook_hands == 0 && status_code >= 400 && status_code <600) {
207     return subscriber_respond_unqueued_status(fsub, status_code, status_line, status_body);
208   }
209 
210   multipart_ensure_headers_sent(fsub);
211 
212   if((bc = nchan_bufchain_pool_reserve(fsub_bcp(fsub), 1)) == NULL) {
213     nchan_respond_status(sub->request, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 1);
214     return NGX_ERROR;
215   }
216 
217   ngx_memzero(&bc->buf, sizeof(ngx_buf_t));
218   bc->buf.memory = 1;
219   bc->buf.last_buf = 1;
220   bc->buf.last_in_chain = 1;
221   bc->buf.flush = 1;
222   bc->buf.start = end_boundary;
223   bc->buf.pos = end_boundary;
224   bc->buf.end = end_boundary + 4;
225   bc->buf.last = bc->buf.end;
226 
227   nchan_output_filter(fsub->sub.request, &bc->chain);
228 
229   subscriber_maybe_dequeue_after_status_response(fsub, status_code);
230 
231   return NGX_OK;
232 }
233 
multipart_enqueue(subscriber_t * sub)234 static ngx_int_t multipart_enqueue(subscriber_t *sub) {
235   ngx_int_t           rc;
236   full_subscriber_t  *fsub = (full_subscriber_t *)sub;
237   DBG("%p output status to subscriber", sub);
238   rc = longpoll_enqueue(sub);
239   fsub->data.finalize_request = 0;
240   multipart_ensure_headers_sent(fsub);
241   sub->enqueued = 1;
242   return rc;
243 }
244 
245 static       subscriber_fn_t  multipart_fn_data;
246 static       subscriber_fn_t *multipart_fn = NULL;
247 
248 static       ngx_str_t   sub_name = ngx_string("http-multipart");
249 
250 
http_multipart_subscriber_create(ngx_http_request_t * r,nchan_msg_id_t * msg_id)251 subscriber_t *http_multipart_subscriber_create(ngx_http_request_t *r, nchan_msg_id_t *msg_id) {
252   subscriber_t         *sub = longpoll_subscriber_create(r, msg_id);
253   full_subscriber_t    *fsub = (full_subscriber_t *)sub;
254   multipart_privdata_t *multipart_data;
255   nchan_request_ctx_t  *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
256 
257   if(multipart_fn == NULL) {
258     multipart_fn = &multipart_fn_data;
259     *multipart_fn = *sub->fn;
260     multipart_fn->enqueue = multipart_enqueue;
261     multipart_fn->respond_message = multipart_respond_message;
262     multipart_fn->respond_status = multipart_respond_status;
263   }
264 
265   fsub->data.shook_hands = 0;
266 
267   fsub->privdata = ngx_palloc(sub->request->pool, sizeof(multipart_privdata_t));
268   multipart_data = (multipart_privdata_t *)fsub->privdata;
269   multipart_data->boundary_end = ngx_snprintf(multipart_data->boundary, 50, "\r\n--%V", nchan_request_multipart_boundary(fsub->sub.request, ctx));
270 
271   //header bufs -- unique per response
272   ctx->output_str_queue = ngx_palloc(r->pool, sizeof(*ctx->output_str_queue));
273   nchan_reuse_queue_init(ctx->output_str_queue, offsetof(headerbuf_t, prev), offsetof(headerbuf_t, next), headerbuf_alloc, NULL, sub->request->pool);
274 
275   ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
276   nchan_bufchain_pool_init(ctx->bcp, r->pool);
277 
278   nchan_subscriber_common_setup(sub, HTTP_MULTIPART, &sub_name, multipart_fn, 1, 0);
279   return sub;
280 }
281 
nchan_detect_multipart_subscriber_request(ngx_http_request_t * r)282 ngx_int_t nchan_detect_multipart_subscriber_request(ngx_http_request_t *r) {
283   ngx_str_t       *accept_header = nchan_get_accept_header_value(r);
284 
285   if(accept_header && ngx_strnstr(accept_header->data, "multipart/mixed", accept_header->len)) {
286     return 1;
287   }
288 
289   return 0;
290 }
291