1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_h1proto.h>
10 #include <nxt_websocket.h>
11 #include <nxt_websocket_header.h>
12
13 typedef struct {
14 uint16_t code;
15 uint8_t args;
16 nxt_str_t desc;
17 } nxt_ws_error_t;
18
19 static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
20 static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
21 void *data);
22 static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
23 nxt_h1proto_t *h1p);
24 static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
25 nxt_h1proto_t *h1p);
26 static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
27 nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
28 static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
29 static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
30 static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
31 static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
32 void *data);
33 static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
34 const nxt_ws_error_t *err, ...);
35 static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
36 static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
37
38 static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state;
39 static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state;
40
41 static const nxt_ws_error_t nxt_ws_err_out_of_memory = {
42 NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
43 0, nxt_string("Out of memory") };
44 static const nxt_ws_error_t nxt_ws_err_too_big = {
45 NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
46 1, nxt_string("Message too big: %uL bytes") };
47 static const nxt_ws_error_t nxt_ws_err_invalid_close_code = {
48 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
49 1, nxt_string("Close code %ud is not valid") };
50 static const nxt_ws_error_t nxt_ws_err_going_away = {
51 NXT_WEBSOCKET_CR_GOING_AWAY,
52 0, nxt_string("Remote peer is going away") };
53 static const nxt_ws_error_t nxt_ws_err_not_masked = {
54 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
55 0, nxt_string("Not masked client frame") };
56 static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = {
57 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
58 0, nxt_string("Fragmented control frame") };
59 static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = {
60 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
61 1, nxt_string("Control frame too big: %uL bytes") };
62 static const nxt_ws_error_t nxt_ws_err_invalid_close_len = {
63 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
64 0, nxt_string("Close frame payload length cannot be 1") };
65 static const nxt_ws_error_t nxt_ws_err_invalid_opcode = {
66 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
67 1, nxt_string("Unrecognized opcode %ud") };
68 static const nxt_ws_error_t nxt_ws_err_cont_expected = {
69 NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
70 1, nxt_string("Continuation expected, but %ud opcode received") };
71
72 void
nxt_h1p_websocket_first_frame_start(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * ws_frame)73 nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
74 nxt_buf_t *ws_frame)
75 {
76 nxt_conn_t *c;
77 nxt_timer_t *timer;
78 nxt_h1proto_t *h1p;
79 nxt_websocket_conf_t *websocket_conf;
80
81 nxt_debug(task, "h1p ws first frame start");
82
83 h1p = r->proto.h1;
84 c = h1p->conn;
85
86 if (!c->tcp_nodelay) {
87 nxt_conn_tcp_nodelay_on(task, c);
88 }
89
90 websocket_conf = &r->conf->socket_conf->websocket_conf;
91
92 if (nxt_slow_path(websocket_conf->keepalive_interval != 0)) {
93 h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
94 sizeof(nxt_h1p_websocket_timer_t));
95 if (nxt_slow_path(h1p->websocket_timer == NULL)) {
96 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
97 return;
98 }
99
100 h1p->websocket_timer->keepalive_interval =
101 websocket_conf->keepalive_interval;
102 h1p->websocket_timer->h1p = h1p;
103
104 timer = &h1p->websocket_timer->timer;
105 timer->task = &c->task;
106 timer->work_queue = &task->thread->engine->fast_work_queue;
107 timer->log = &c->log;
108 timer->bias = NXT_TIMER_DEFAULT_BIAS;
109 timer->handler = nxt_h1p_conn_ws_keepalive;
110 }
111
112 nxt_h1p_websocket_frame_start(task, r, ws_frame);
113 }
114
115
116 void
nxt_h1p_websocket_frame_start(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * ws_frame)117 nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
118 nxt_buf_t *ws_frame)
119 {
120 size_t size;
121 nxt_buf_t *in;
122 nxt_conn_t *c;
123 nxt_h1proto_t *h1p;
124
125 nxt_debug(task, "h1p ws frame start");
126
127 h1p = r->proto.h1;
128
129 if (nxt_slow_path(h1p->websocket_closed)) {
130 return;
131 }
132
133 c = h1p->conn;
134 c->read = ws_frame;
135
136 nxt_h1p_complete_buffers(task, h1p, 0);
137
138 in = c->read;
139 c->read_state = &nxt_h1p_read_ws_frame_header_state;
140
141 if (in == NULL) {
142 nxt_conn_read(task->thread->engine, c);
143 nxt_h1p_conn_ws_keepalive_enable(task, h1p);
144
145 } else {
146 size = nxt_buf_mem_used_size(&in->mem);
147
148 nxt_debug(task, "h1p read client ws frame");
149
150 nxt_memmove(in->mem.start, in->mem.pos, size);
151
152 in->mem.pos = in->mem.start;
153 in->mem.free = in->mem.start + size;
154
155 nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
156 }
157 }
158
159
160 static void
nxt_h1p_conn_ws_keepalive(nxt_task_t * task,void * obj,void * data)161 nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
162 {
163 nxt_buf_t *out;
164 nxt_timer_t *timer;
165 nxt_h1proto_t *h1p;
166 nxt_http_request_t *r;
167 nxt_websocket_header_t *wsh;
168 nxt_h1p_websocket_timer_t *ws_timer;
169
170 nxt_debug(task, "h1p conn ws keepalive");
171
172 timer = obj;
173 ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
174 h1p = ws_timer->h1p;
175
176 r = h1p->request;
177 if (nxt_slow_path(r == NULL)) {
178 return;
179 }
180
181 out = nxt_http_buf_mem(task, r, 2);
182 if (nxt_slow_path(out == NULL)) {
183 nxt_http_request_error_handler(task, r, r->proto.any);
184 return;
185 }
186
187 out->mem.start[0] = 0;
188 out->mem.start[1] = 0;
189
190 wsh = (nxt_websocket_header_t *) out->mem.start;
191 out->mem.free = nxt_websocket_frame_init(wsh, 0);
192
193 wsh->fin = 1;
194 wsh->opcode = NXT_WEBSOCKET_OP_PING;
195
196 nxt_http_request_send(task, r, out);
197 }
198
199
200 static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state
201 nxt_aligned(64) =
202 {
203 .ready_handler = nxt_h1p_conn_ws_frame_header_read,
204 .close_handler = nxt_h1p_conn_ws_error,
205 .error_handler = nxt_h1p_conn_ws_error,
206
207 .io_read_handler = nxt_h1p_ws_io_read_handler,
208
209 .timer_handler = nxt_h1p_conn_ws_timeout,
210 .timer_value = nxt_h1p_conn_request_timer_value,
211 .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
212 .timer_autoreset = 1,
213 };
214
215
216 static void
nxt_h1p_conn_ws_frame_header_read(nxt_task_t * task,void * obj,void * data)217 nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
218 {
219 size_t size, hsize, frame_size, max_frame_size;
220 uint64_t payload_len;
221 nxt_conn_t *c;
222 nxt_h1proto_t *h1p;
223 nxt_http_request_t *r;
224 nxt_event_engine_t *engine;
225 nxt_websocket_header_t *wsh;
226
227 c = obj;
228 h1p = data;
229
230 nxt_h1p_conn_ws_keepalive_disable(task, h1p);
231
232 size = nxt_buf_mem_used_size(&c->read->mem);
233
234 engine = task->thread->engine;
235
236 if (size < 2) {
237 nxt_debug(task, "h1p conn ws frame header read %z", size);
238
239 nxt_conn_read(engine, c);
240 nxt_h1p_conn_ws_keepalive_enable(task, h1p);
241
242 return;
243 }
244
245 wsh = (nxt_websocket_header_t *) c->read->mem.pos;
246
247 hsize = nxt_websocket_frame_header_size(wsh);
248
249 if (size < hsize) {
250 nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
251
252 nxt_conn_read(engine, c);
253 nxt_h1p_conn_ws_keepalive_enable(task, h1p);
254
255 return;
256 }
257
258 r = h1p->request;
259 if (nxt_slow_path(r == NULL)) {
260 return;
261 }
262
263 r->ws_frame = c->read;
264
265 if (nxt_slow_path(wsh->mask == 0)) {
266 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
267 return;
268 }
269
270 if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
271 if (nxt_slow_path(wsh->fin == 0)) {
272 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
273 return;
274 }
275
276 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
277 && wsh->opcode != NXT_WEBSOCKET_OP_PONG
278 && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
279 {
280 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
281 wsh->opcode);
282 return;
283 }
284
285 if (nxt_slow_path(wsh->payload_len > 125)) {
286 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
287 nxt_websocket_frame_payload_len(wsh));
288 return;
289 }
290
291 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
292 && wsh->payload_len == 1))
293 {
294 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
295 return;
296 }
297
298 } else {
299 if (h1p->websocket_cont_expected) {
300 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
301 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
302 wsh->opcode);
303 return;
304 }
305
306 } else {
307 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
308 && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
309 {
310 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
311 wsh->opcode);
312 return;
313 }
314 }
315
316 h1p->websocket_cont_expected = !wsh->fin;
317 }
318
319 max_frame_size = r->conf->socket_conf->websocket_conf.max_frame_size;
320
321 payload_len = nxt_websocket_frame_payload_len(wsh);
322
323 if (nxt_slow_path(hsize > max_frame_size
324 || payload_len > (max_frame_size - hsize)))
325 {
326 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
327 return;
328 }
329
330 c->read_state = &nxt_h1p_read_ws_frame_payload_state;
331
332 frame_size = payload_len + hsize;
333
334 nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
335
336 if (frame_size <= size) {
337 nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
338
339 return;
340 }
341
342 if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
343 c->read->mem.end = c->read->mem.start + frame_size;
344
345 } else {
346 nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
347
348 c->read->next = b;
349 c->read = b;
350 }
351
352 nxt_conn_read(engine, c);
353 nxt_h1p_conn_ws_keepalive_enable(task, h1p);
354 }
355
356
357 static void
nxt_h1p_conn_ws_keepalive_disable(nxt_task_t * task,nxt_h1proto_t * h1p)358 nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
359 {
360 nxt_timer_t *timer;
361
362 if (h1p->websocket_timer == NULL) {
363 return;
364 }
365
366 timer = &h1p->websocket_timer->timer;
367
368 if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
369 nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
370 return;
371 }
372
373 nxt_timer_disable(task->thread->engine, timer);
374 }
375
376
377 static void
nxt_h1p_conn_ws_keepalive_enable(nxt_task_t * task,nxt_h1proto_t * h1p)378 nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
379 {
380 nxt_timer_t *timer;
381
382 if (h1p->websocket_timer == NULL) {
383 return;
384 }
385
386 timer = &h1p->websocket_timer->timer;
387
388 if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
389 nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
390 return;
391 }
392
393 nxt_timer_add(task->thread->engine, timer,
394 h1p->websocket_timer->keepalive_interval);
395 }
396
397
398 static void
nxt_h1p_conn_ws_frame_process(nxt_task_t * task,nxt_conn_t * c,nxt_h1proto_t * h1p,nxt_websocket_header_t * wsh)399 nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
400 nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
401 {
402 size_t hsize;
403 uint8_t *p, *mask;
404 uint16_t code;
405 nxt_http_request_t *r;
406
407 r = h1p->request;
408
409 c->read = NULL;
410
411 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
412 nxt_h1p_conn_ws_pong(task, r, NULL);
413 return;
414 }
415
416 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
417 if (wsh->payload_len >= 2) {
418 hsize = nxt_websocket_frame_header_size(wsh);
419 mask = nxt_pointer_to(wsh, hsize - 4);
420 p = nxt_pointer_to(wsh, hsize);
421
422 code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
423
424 if (nxt_slow_path(code < 1000 || code >= 5000
425 || (code > 1003 && code < 1007)
426 || (code > 1014 && code < 3000)))
427 {
428 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
429 code);
430 return;
431 }
432 }
433
434 h1p->websocket_closed = 1;
435 }
436
437 r->state->ready_handler(task, r, NULL);
438 }
439
440
441 static void
nxt_h1p_conn_ws_error(nxt_task_t * task,void * obj,void * data)442 nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
443 {
444 nxt_h1proto_t *h1p;
445 nxt_http_request_t *r;
446
447 h1p = data;
448
449 nxt_debug(task, "h1p conn ws error");
450
451 r = h1p->request;
452
453 h1p->keepalive = 0;
454
455 if (nxt_fast_path(r != NULL)) {
456 r->state->error_handler(task, r, h1p);
457 }
458 }
459
460
461 static ssize_t
nxt_h1p_ws_io_read_handler(nxt_task_t * task,nxt_conn_t * c)462 nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
463 {
464 size_t size;
465 ssize_t n;
466 nxt_buf_t *b;
467
468 b = c->read;
469
470 if (b == NULL) {
471 /* Enough for control frame. */
472 size = 10 + 125;
473
474 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
475 if (nxt_slow_path(b == NULL)) {
476 c->socket.error = NXT_ENOMEM;
477 return NXT_ERROR;
478 }
479 }
480
481 n = c->io->recvbuf(c, b);
482
483 if (n > 0) {
484 c->read = b;
485
486 } else {
487 c->read = NULL;
488 nxt_mp_free(c->mem_pool, b);
489 }
490
491 return n;
492 }
493
494
495 static void
nxt_h1p_conn_ws_timeout(nxt_task_t * task,void * obj,void * data)496 nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
497 {
498 nxt_conn_t *c;
499 nxt_timer_t *timer;
500 nxt_h1proto_t *h1p;
501 nxt_http_request_t *r;
502
503 timer = obj;
504
505 nxt_debug(task, "h1p conn ws timeout");
506
507 c = nxt_read_timer_conn(timer);
508 c->block_read = 1;
509 /*
510 * Disable SO_LINGER off during socket closing
511 * to send "408 Request Timeout" error response.
512 */
513 c->socket.timedout = 0;
514
515 h1p = c->socket.data;
516 h1p->keepalive = 0;
517
518 r = h1p->request;
519 if (nxt_slow_path(r == NULL)) {
520 return;
521 }
522
523 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
524 }
525
526
527 static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state
528 nxt_aligned(64) =
529 {
530 .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
531 .close_handler = nxt_h1p_conn_ws_error,
532 .error_handler = nxt_h1p_conn_ws_error,
533
534 .timer_handler = nxt_h1p_conn_ws_timeout,
535 .timer_value = nxt_h1p_conn_request_timer_value,
536 .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
537 .timer_autoreset = 1,
538 };
539
540
541 static void
nxt_h1p_conn_ws_frame_payload_read(nxt_task_t * task,void * obj,void * data)542 nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
543 {
544 nxt_conn_t *c;
545 nxt_h1proto_t *h1p;
546 nxt_http_request_t *r;
547 nxt_event_engine_t *engine;
548 nxt_websocket_header_t *wsh;
549
550 c = obj;
551 h1p = data;
552
553 nxt_h1p_conn_ws_keepalive_disable(task, h1p);
554
555 nxt_debug(task, "h1p conn ws frame read");
556
557 if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
558 r = h1p->request;
559 if (nxt_slow_path(r == NULL)) {
560 return;
561 }
562
563 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
564
565 nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
566
567 return;
568 }
569
570 engine = task->thread->engine;
571
572 nxt_conn_read(engine, c);
573 nxt_h1p_conn_ws_keepalive_enable(task, h1p);
574 }
575
576
577 static void
hxt_h1p_send_ws_error(nxt_task_t * task,nxt_http_request_t * r,const nxt_ws_error_t * err,...)578 hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
579 const nxt_ws_error_t *err, ...)
580 {
581 u_char *p;
582 va_list args;
583 nxt_buf_t *out;
584 nxt_str_t desc;
585 nxt_websocket_header_t *wsh;
586 u_char buf[125];
587
588 if (nxt_slow_path(err->args)) {
589 va_start(args, err);
590 p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
591 args);
592 va_end(args);
593
594 desc.start = buf;
595 desc.length = p - buf;
596
597 } else {
598 desc = err->desc;
599 }
600
601 nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
602
603 out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
604 if (nxt_slow_path(out == NULL)) {
605 nxt_http_request_error_handler(task, r, r->proto.any);
606 return;
607 }
608
609 out->mem.start[0] = 0;
610 out->mem.start[1] = 0;
611
612 wsh = (nxt_websocket_header_t *) out->mem.start;
613 p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
614
615 wsh->fin = 1;
616 wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
617
618 *p++ = (err->code >> 8) & 0xFF;
619 *p++ = err->code & 0xFF;
620
621 out->mem.free = nxt_cpymem(p, desc.start, desc.length);
622 out->next = nxt_http_buf_last(r);
623
624 if (out->next != NULL) {
625 out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
626 }
627
628 nxt_http_request_send(task, r, out);
629 }
630
631
632 static void
nxt_h1p_conn_ws_error_sent(nxt_task_t * task,void * obj,void * data)633 nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
634 {
635 nxt_http_request_t *r;
636
637 r = data;
638
639 nxt_debug(task, "h1p conn ws error sent");
640
641 r->state->error_handler(task, r, r->proto.any);
642 }
643
644
645 static void
nxt_h1p_conn_ws_pong(nxt_task_t * task,void * obj,void * data)646 nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
647 {
648 uint8_t payload_len, i;
649 nxt_buf_t *b, *out, *next;
650 nxt_http_request_t *r;
651 nxt_websocket_header_t *wsh;
652 uint8_t mask[4];
653
654 nxt_debug(task, "h1p conn ws pong");
655
656 r = obj;
657 b = r->ws_frame;
658
659 wsh = (nxt_websocket_header_t *) b->mem.pos;
660 payload_len = wsh->payload_len;
661
662 b->mem.pos += 2;
663
664 nxt_memcpy(mask, b->mem.pos, 4);
665
666 b->mem.pos += 4;
667
668 out = nxt_http_buf_mem(task, r, 2 + payload_len);
669 if (nxt_slow_path(out == NULL)) {
670 nxt_http_request_error_handler(task, r, r->proto.any);
671 return;
672 }
673
674 out->mem.start[0] = 0;
675 out->mem.start[1] = 0;
676
677 wsh = (nxt_websocket_header_t *) out->mem.start;
678 out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
679
680 wsh->fin = 1;
681 wsh->opcode = NXT_WEBSOCKET_OP_PONG;
682
683 for (i = 0; i < payload_len; i++) {
684 while (nxt_buf_mem_used_size(&b->mem) == 0) {
685 next = b->next;
686 b->next = NULL;
687
688 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
689 b->completion_handler, task, b, b->parent);
690
691 b = next;
692 }
693
694 *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
695 }
696
697 r->ws_frame = b;
698
699 nxt_http_request_send(task, r, out);
700
701 nxt_http_request_ws_frame_start(task, r, r->ws_frame);
702 }
703