1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 #include <util/nchan_bufchainpool.h>
4 #include "longpoll.h"
5 #include "longpoll-private.h"
6
7 //#define DEBUG_LEVEL NGX_LOG_WARN
8 #define DEBUG_LEVEL NGX_LOG_DEBUG
9
10 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:MULTIPART:" fmt, ##arg)
11 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:MULTIPART:" fmt, ##arg)
12 #include <assert.h>
13
14 typedef struct {
15 u_char boundary[50];
16 u_char *boundary_end;
17 } multipart_privdata_t;
18
19 typedef struct {
20 u_char charbuf[58 + 10*NCHAN_FIXED_MULTITAG_MAX];
21 void *prev;
22 void *next;
23 } headerbuf_t;
24
fsub_bcp(full_subscriber_t * fsub)25 static nchan_bufchain_pool_t *fsub_bcp(full_subscriber_t *fsub) {
26 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
27 return ctx->bcp;
28 }
29
multipart_ensure_headers_sent(full_subscriber_t * fsub)30 static void multipart_ensure_headers_sent(full_subscriber_t *fsub) {
31 nchan_buf_and_chain_t *bc;
32
33 ngx_http_request_t *r = fsub->sub.request;
34 ngx_http_core_loc_conf_t *clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
35 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
36 multipart_privdata_t *mpd = (multipart_privdata_t *)fsub->privdata;
37
38 if(!fsub->data.shook_hands) {
39 clcf->chunked_transfer_encoding = 0;
40 nchan_request_set_content_type_multipart_boundary_header(r, ctx);
41
42 nchan_cleverly_output_headers_only_for_later_response(r);
43
44 //set preamble in the request ctx. it would be nicer to store in in the subscriber data,
45 //but that would mean not reusing longpoll's fsub directly
46
47 r->header_only = 0;
48 r->chunked = 0;
49
50 if((bc = nchan_bufchain_pool_reserve(ctx->bcp, 1)) == NULL) {
51 ERR("can't reserve bufchain for multipart headers");
52 nchan_respond_status(fsub->sub.request, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 1);
53 return;
54 }
55
56 ngx_memzero(&bc->buf, sizeof(ngx_buf_t));
57 bc->buf.start = mpd->boundary + 2;
58 bc->buf.pos = bc->buf.start;
59 bc->buf.end = mpd->boundary_end;
60 bc->buf.last = bc->buf.end;
61 bc->buf.memory = 1;
62 bc->buf.last_buf = 0;
63 bc->buf.last_in_chain = 1;
64 bc->buf.flush = 1;
65
66 nchan_output_filter(r, &bc->chain);
67
68 fsub->data.shook_hands = 1;
69 }
70 }
headerbuf_alloc(void * pd)71 static void *headerbuf_alloc(void *pd) {
72 return ngx_palloc((ngx_pool_t *)pd, sizeof(headerbuf_t));
73 }
74
multipart_respond_message(subscriber_t * sub,nchan_msg_t * msg)75 static ngx_int_t multipart_respond_message(subscriber_t *sub, nchan_msg_t *msg) {
76
77 full_subscriber_t *fsub = (full_subscriber_t *)sub;
78 ngx_buf_t *buf, *msg_buf = &msg->buf, *msgid_buf;
79 ngx_int_t rc;
80 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(fsub->sub.request, ngx_nchan_module);
81 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
82 ngx_int_t n;
83 nchan_buf_and_chain_t *bc;
84 ngx_chain_t *chain;
85 ngx_file_t *file_copy;
86 multipart_privdata_t *mpd = (multipart_privdata_t *)fsub->privdata;
87
88 headerbuf_t *headerbuf = nchan_reuse_queue_push(ctx->output_str_queue);
89 u_char *cur = headerbuf->charbuf;
90
91 if(fsub->data.timeout_ev.timer_set) {
92 ngx_del_timer(&fsub->data.timeout_ev);
93 ngx_add_timer(&fsub->data.timeout_ev, sub->cf->subscriber_timeout * 1000);
94 }
95
96 //generate the headers
97 if(!cf->msg_in_etag_only) {
98 //msgtime
99 cur = ngx_cpymem(cur, "\r\nLast-Modified: ", sizeof("\r\nLast-Modified: ") - 1);
100 cur = ngx_http_time(cur, msg->id.time);
101 *cur++ = CR; *cur++ = LF;
102 //msgtag
103 cur = ngx_cpymem(cur, "Etag: ", sizeof("Etag: ") - 1);
104 cur += msgtag_to_strptr(&msg->id, (char *)cur);
105 *cur++ = CR; *cur++ = LF;
106 }
107 else {
108 ngx_str_t *tmp_etag = msgid_to_str(&msg->id);
109 cur = ngx_snprintf(cur, 58 + 10*NCHAN_FIXED_MULTITAG_MAX, "\r\nEtag: %V\r\n", tmp_etag);
110 }
111
112 n=4;
113 if(!msg->content_type) {
114 //don't need content_type buf'n'chain
115 n--;
116 }
117 if(ngx_buf_size(msg_buf) == 0) {
118 //don't need msgbuf
119 n --;
120 }
121 if((bc = nchan_bufchain_pool_reserve(ctx->bcp, n)) == NULL) {
122 ERR("can't allocate buf-and-chains for multipart/mixed client output");
123 return NGX_ERROR;
124 }
125
126 chain = &bc->chain;
127 msgid_buf = chain->buf;
128
129 //message id
130 ngx_memzero(chain->buf, sizeof(ngx_buf_t));
131 chain->buf->memory = 1;
132 chain->buf->start = headerbuf->charbuf;
133 chain->buf->pos = headerbuf->charbuf;
134
135 //content_type maybe
136 if(msg->content_type) {
137 chain = chain->next;
138 buf = chain->buf;
139
140 msgid_buf->last = cur;
141 msgid_buf->end = cur;
142
143 ngx_memzero(buf, sizeof(ngx_buf_t));
144 buf->memory = 1;
145 buf->start = cur;
146 buf->pos = cur;
147 buf->last = ngx_snprintf(cur, 255, "Content-Type: %V\r\n\r\n", msg->content_type);
148 buf->end = buf->last;
149 }
150 else {
151 *cur++ = CR; *cur++ = LF;
152 msgid_buf->last = cur;
153 msgid_buf->end = cur;
154 }
155
156 //msgbuf
157 if(ngx_buf_size(msg_buf) > 0) {
158 chain = chain->next;
159 buf = chain->buf;
160 ngx_memcpy(buf, msg_buf, sizeof(*msg_buf));
161 if(msg_buf->file) {
162 file_copy = nchan_bufchain_pool_reserve_file(ctx->bcp);
163 nchan_msg_buf_open_fd_if_needed(buf, file_copy, NULL);
164 }
165 buf->last_buf = 0;
166 buf->last_in_chain = 0;
167 buf->flush = 0;
168 }
169
170 chain = chain->next;
171 buf = chain->buf;
172 ngx_memzero(buf, sizeof(ngx_buf_t));
173 buf->start = &mpd->boundary[0];
174 buf->pos = buf->start;
175 buf->end = mpd->boundary_end;
176 buf->last = buf->end;
177 buf->memory = 1;
178 buf->last_buf = 0;
179 buf->last_in_chain = 1;
180 buf->flush = 1;
181
182 ctx->prev_msg_id = fsub->sub.last_msgid;
183 update_subscriber_last_msg_id(sub, msg);
184 ctx->msg_id = fsub->sub.last_msgid;
185
186 multipart_ensure_headers_sent(fsub);
187
188 DBG("%p output msg to subscriber", sub);
189
190 rc = nchan_output_msg_filter(fsub->sub.request, msg, &bc->chain);
191
192 return rc;
193 }
194
multipart_respond_status(subscriber_t * sub,ngx_int_t status_code,const ngx_str_t * status_line,ngx_chain_t * status_body)195 static ngx_int_t multipart_respond_status(subscriber_t *sub, ngx_int_t status_code, const ngx_str_t *status_line, ngx_chain_t *status_body){
196 nchan_buf_and_chain_t *bc;
197 static u_char *end_boundary=(u_char *)"--\r\n";
198 full_subscriber_t *fsub = (full_subscriber_t *)sub;
199 //nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
200
201 if(status_code == NGX_HTTP_NO_CONTENT || (status_code == NGX_HTTP_NOT_MODIFIED && !status_line)) {
202 //ignore
203 return NGX_OK;
204 }
205
206 if(fsub->data.shook_hands == 0 && status_code >= 400 && status_code <600) {
207 return subscriber_respond_unqueued_status(fsub, status_code, status_line, status_body);
208 }
209
210 multipart_ensure_headers_sent(fsub);
211
212 if((bc = nchan_bufchain_pool_reserve(fsub_bcp(fsub), 1)) == NULL) {
213 nchan_respond_status(sub->request, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL, NULL, 1);
214 return NGX_ERROR;
215 }
216
217 ngx_memzero(&bc->buf, sizeof(ngx_buf_t));
218 bc->buf.memory = 1;
219 bc->buf.last_buf = 1;
220 bc->buf.last_in_chain = 1;
221 bc->buf.flush = 1;
222 bc->buf.start = end_boundary;
223 bc->buf.pos = end_boundary;
224 bc->buf.end = end_boundary + 4;
225 bc->buf.last = bc->buf.end;
226
227 nchan_output_filter(fsub->sub.request, &bc->chain);
228
229 subscriber_maybe_dequeue_after_status_response(fsub, status_code);
230
231 return NGX_OK;
232 }
233
multipart_enqueue(subscriber_t * sub)234 static ngx_int_t multipart_enqueue(subscriber_t *sub) {
235 ngx_int_t rc;
236 full_subscriber_t *fsub = (full_subscriber_t *)sub;
237 DBG("%p output status to subscriber", sub);
238 rc = longpoll_enqueue(sub);
239 fsub->data.finalize_request = 0;
240 multipart_ensure_headers_sent(fsub);
241 sub->enqueued = 1;
242 return rc;
243 }
244
245 static subscriber_fn_t multipart_fn_data;
246 static subscriber_fn_t *multipart_fn = NULL;
247
248 static ngx_str_t sub_name = ngx_string("http-multipart");
249
250
http_multipart_subscriber_create(ngx_http_request_t * r,nchan_msg_id_t * msg_id)251 subscriber_t *http_multipart_subscriber_create(ngx_http_request_t *r, nchan_msg_id_t *msg_id) {
252 subscriber_t *sub = longpoll_subscriber_create(r, msg_id);
253 full_subscriber_t *fsub = (full_subscriber_t *)sub;
254 multipart_privdata_t *multipart_data;
255 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(fsub->sub.request, ngx_nchan_module);
256
257 if(multipart_fn == NULL) {
258 multipart_fn = &multipart_fn_data;
259 *multipart_fn = *sub->fn;
260 multipart_fn->enqueue = multipart_enqueue;
261 multipart_fn->respond_message = multipart_respond_message;
262 multipart_fn->respond_status = multipart_respond_status;
263 }
264
265 fsub->data.shook_hands = 0;
266
267 fsub->privdata = ngx_palloc(sub->request->pool, sizeof(multipart_privdata_t));
268 multipart_data = (multipart_privdata_t *)fsub->privdata;
269 multipart_data->boundary_end = ngx_snprintf(multipart_data->boundary, 50, "\r\n--%V", nchan_request_multipart_boundary(fsub->sub.request, ctx));
270
271 //header bufs -- unique per response
272 ctx->output_str_queue = ngx_palloc(r->pool, sizeof(*ctx->output_str_queue));
273 nchan_reuse_queue_init(ctx->output_str_queue, offsetof(headerbuf_t, prev), offsetof(headerbuf_t, next), headerbuf_alloc, NULL, sub->request->pool);
274
275 ctx->bcp = ngx_palloc(r->pool, sizeof(nchan_bufchain_pool_t));
276 nchan_bufchain_pool_init(ctx->bcp, r->pool);
277
278 nchan_subscriber_common_setup(sub, HTTP_MULTIPART, &sub_name, multipart_fn, 1, 0);
279 return sub;
280 }
281
nchan_detect_multipart_subscriber_request(ngx_http_request_t * r)282 ngx_int_t nchan_detect_multipart_subscriber_request(ngx_http_request_t *r) {
283 ngx_str_t *accept_header = nchan_get_accept_header_value(r);
284
285 if(accept_header && ngx_strnstr(accept_header->data, "multipart/mixed", accept_header->len)) {
286 return 1;
287 }
288
289 return 0;
290 }
291