1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7 #include <python/nxt_python.h>
8
9 #if (NXT_HAVE_ASGI)
10
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_websocket.h>
15 #include <nxt_websocket_header.h>
16 #include <python/nxt_python_asgi.h>
17 #include <python/nxt_python_asgi_str.h>
18
19
20 enum {
21 NXT_WS_INIT,
22 NXT_WS_CONNECT,
23 NXT_WS_ACCEPTED,
24 NXT_WS_DISCONNECTED,
25 NXT_WS_CLOSED,
26 };
27
28
29 typedef struct {
30 nxt_queue_link_t link;
31 nxt_unit_websocket_frame_t *frame;
32 } nxt_py_asgi_penging_frame_t;
33
34
35 typedef struct {
36 PyObject_HEAD
37 nxt_unit_request_info_t *req;
38 PyObject *receive_future;
39 PyObject *receive_exc_str;
40 int state;
41 nxt_queue_t pending_frames;
42 uint64_t pending_payload_len;
43 uint64_t pending_frame_len;
44 int pending_fins;
45 } nxt_py_asgi_websocket_t;
46
47
48 static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
49 static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
50 static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
51 PyObject *dict);
52 static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
53 PyObject *dict);
54 static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
55 PyObject *dict);
56 static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
57 PyObject *msg);
58 static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
59 PyObject *exc);
60 static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
61 static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
62 nxt_unit_websocket_frame_t *frame);
63 static uint64_t nxt_py_asgi_websocket_pending_len(
64 nxt_py_asgi_websocket_t *ws);
65 static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
66 nxt_py_asgi_websocket_t *ws);
67 static PyObject *nxt_py_asgi_websocket_disconnect_msg(
68 nxt_py_asgi_websocket_t *ws);
69 static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
70
71
72 static PyMethodDef nxt_py_asgi_websocket_methods[] = {
73 { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
74 { "send", nxt_py_asgi_websocket_send, METH_O, 0 },
75 { "_done", nxt_py_asgi_websocket_done, METH_O, 0 },
76 { NULL, NULL, 0, 0 }
77 };
78
79 static PyAsyncMethods nxt_py_asgi_async_methods = {
80 .am_await = nxt_py_asgi_await,
81 };
82
83 static PyTypeObject nxt_py_asgi_websocket_type = {
84 PyVarObject_HEAD_INIT(NULL, 0)
85
86 .tp_name = "unit._asgi_websocket",
87 .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
88 .tp_dealloc = nxt_py_asgi_dealloc,
89 .tp_as_async = &nxt_py_asgi_async_methods,
90 .tp_flags = Py_TPFLAGS_DEFAULT,
91 .tp_doc = "unit ASGI WebSocket connection object",
92 .tp_iter = nxt_py_asgi_iter,
93 .tp_iternext = nxt_py_asgi_next,
94 .tp_methods = nxt_py_asgi_websocket_methods,
95 };
96
97 static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
98 static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
99
100
101 int
nxt_py_asgi_websocket_init(void)102 nxt_py_asgi_websocket_init(void)
103 {
104 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
105 nxt_unit_alert(NULL,
106 "Python failed to initialize the \"asgi_websocket\" type object");
107 return NXT_UNIT_ERROR;
108 }
109
110 return NXT_UNIT_OK;
111 }
112
113
114 PyObject *
nxt_py_asgi_websocket_create(nxt_unit_request_info_t * req)115 nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
116 {
117 nxt_py_asgi_websocket_t *ws;
118
119 ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
120
121 if (nxt_fast_path(ws != NULL)) {
122 ws->req = req;
123 ws->receive_future = NULL;
124 ws->receive_exc_str = NULL;
125 ws->state = NXT_WS_INIT;
126 nxt_queue_init(&ws->pending_frames);
127 ws->pending_payload_len = 0;
128 ws->pending_frame_len = 0;
129 ws->pending_fins = 0;
130 }
131
132 return (PyObject *) ws;
133 }
134
135
136 static PyObject *
nxt_py_asgi_websocket_receive(PyObject * self,PyObject * none)137 nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
138 {
139 PyObject *future, *msg;
140 nxt_py_asgi_ctx_data_t *ctx_data;
141 nxt_py_asgi_websocket_t *ws;
142
143 ws = (nxt_py_asgi_websocket_t *) self;
144
145 nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
146
147 /* If exception happened out of receive() call, raise it now. */
148 if (nxt_slow_path(ws->receive_exc_str != NULL)) {
149 PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
150
151 ws->receive_exc_str = NULL;
152
153 return NULL;
154 }
155
156 if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
157 nxt_unit_req_error(ws->req,
158 "receive() called for closed WebSocket");
159
160 return PyErr_Format(PyExc_RuntimeError,
161 "WebSocket already closed");
162 }
163
164 ctx_data = ws->req->ctx->data;
165
166 future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
167 if (nxt_slow_path(future == NULL)) {
168 nxt_unit_req_alert(ws->req, "Python failed to create Future object");
169 nxt_python_print_exception();
170
171 return PyErr_Format(PyExc_RuntimeError,
172 "failed to create Future object");
173 }
174
175 if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
176 ws->state = NXT_WS_CONNECT;
177
178 msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
179
180 return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
181 }
182
183 if (ws->pending_fins > 0) {
184 msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
185
186 return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
187 }
188
189 if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
190 msg = nxt_py_asgi_websocket_disconnect_msg(ws);
191
192 return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
193 }
194
195 ws->receive_future = future;
196 Py_INCREF(ws->receive_future);
197
198 return future;
199 }
200
201
202 static PyObject *
nxt_py_asgi_websocket_send(PyObject * self,PyObject * dict)203 nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
204 {
205 PyObject *type;
206 const char *type_str;
207 Py_ssize_t type_len;
208 nxt_py_asgi_websocket_t *ws;
209
210 static const nxt_str_t websocket_accept = nxt_string("websocket.accept");
211 static const nxt_str_t websocket_close = nxt_string("websocket.close");
212 static const nxt_str_t websocket_send = nxt_string("websocket.send");
213
214 ws = (nxt_py_asgi_websocket_t *) self;
215
216 type = PyDict_GetItem(dict, nxt_py_type_str);
217 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
218 nxt_unit_req_error(ws->req, "asgi_websocket_send: "
219 "'type' is not a unicode string");
220 return PyErr_Format(PyExc_TypeError,
221 "'type' is not a unicode string");
222 }
223
224 type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
225
226 nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
227 (int) type_len, type_str);
228
229 if (type_len == (Py_ssize_t) websocket_accept.length
230 && memcmp(type_str, websocket_accept.start, type_len) == 0)
231 {
232 return nxt_py_asgi_websocket_accept(ws, dict);
233 }
234
235 if (type_len == (Py_ssize_t) websocket_close.length
236 && memcmp(type_str, websocket_close.start, type_len) == 0)
237 {
238 return nxt_py_asgi_websocket_close(ws, dict);
239 }
240
241 if (type_len == (Py_ssize_t) websocket_send.length
242 && memcmp(type_str, websocket_send.start, type_len) == 0)
243 {
244 return nxt_py_asgi_websocket_send_frame(ws, dict);
245 }
246
247 nxt_unit_req_error(ws->req, "asgi_websocket_send: "
248 "unexpected 'type': '%.*s'", (int) type_len, type_str);
249 return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
250 }
251
252
253 static PyObject *
nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t * ws,PyObject * dict)254 nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
255 {
256 int rc;
257 char *subprotocol_str;
258 PyObject *res, *headers, *subprotocol;
259 Py_ssize_t subprotocol_len;
260 nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
261 nxt_py_asgi_add_field_ctx_t add_field_ctx;
262
263 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
264
265 switch(ws->state) {
266 case NXT_WS_INIT:
267 return PyErr_Format(PyExc_RuntimeError,
268 "WebSocket connect not received");
269 case NXT_WS_CONNECT:
270 break;
271
272 case NXT_WS_ACCEPTED:
273 return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
274
275 case NXT_WS_DISCONNECTED:
276 return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
277
278 case NXT_WS_CLOSED:
279 return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
280 }
281
282 if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
283 return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
284 }
285
286 if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
287 return PyErr_Format(PyExc_RuntimeError, "response already sent");
288 }
289
290 calc_size_ctx.fields_size = 0;
291 calc_size_ctx.fields_count = 0;
292
293 headers = PyDict_GetItem(dict, nxt_py_headers_str);
294 if (headers != NULL) {
295 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
296 &calc_size_ctx);
297 if (nxt_slow_path(res == NULL)) {
298 return NULL;
299 }
300 }
301
302 subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
303 if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
304 subprotocol_str = PyUnicode_DATA(subprotocol);
305 subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
306
307 calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
308 calc_size_ctx.fields_count++;
309
310 } else {
311 subprotocol_str = NULL;
312 subprotocol_len = 0;
313 }
314
315 rc = nxt_unit_response_init(ws->req, 101,
316 calc_size_ctx.fields_count,
317 calc_size_ctx.fields_size);
318 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
319 return PyErr_Format(PyExc_RuntimeError,
320 "failed to allocate response object");
321 }
322
323 add_field_ctx.req = ws->req;
324 add_field_ctx.content_length = -1;
325
326 if (headers != NULL) {
327 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
328 &add_field_ctx);
329 if (nxt_slow_path(res == NULL)) {
330 return NULL;
331 }
332 }
333
334 if (subprotocol_len > 0) {
335 rc = nxt_unit_response_add_field(ws->req,
336 (const char *) ws_protocol.start,
337 ws_protocol.length,
338 subprotocol_str, subprotocol_len);
339 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
340 return PyErr_Format(PyExc_RuntimeError,
341 "failed to add header");
342 }
343 }
344
345 rc = nxt_unit_response_send(ws->req);
346 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
347 return PyErr_Format(PyExc_RuntimeError, "failed to send response");
348 }
349
350 ws->state = NXT_WS_ACCEPTED;
351
352 Py_INCREF(ws);
353
354 return (PyObject *) ws;
355 }
356
357
358 static PyObject *
nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t * ws,PyObject * dict)359 nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
360 {
361 int rc;
362 uint16_t status_code;
363 PyObject *code;
364
365 if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
366 return PyErr_Format(PyExc_RuntimeError,
367 "WebSocket connect not received");
368 }
369
370 if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
371 return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
372 }
373
374 if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
375 return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
376 }
377
378 if (nxt_unit_response_is_websocket(ws->req)) {
379 code = PyDict_GetItem(dict, nxt_py_code_str);
380 if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
381 return PyErr_Format(PyExc_TypeError, "'code' is not integer");
382 }
383
384 status_code = (code != NULL) ? htons(PyLong_AsLong(code))
385 : htons(NXT_WEBSOCKET_CR_NORMAL);
386
387 rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
388 1, &status_code, 2);
389 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
390 return PyErr_Format(PyExc_RuntimeError,
391 "failed to send close frame");
392 }
393
394 } else {
395 rc = nxt_unit_response_init(ws->req, 403, 0, 0);
396 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
397 return PyErr_Format(PyExc_RuntimeError,
398 "failed to allocate response object");
399 }
400
401 rc = nxt_unit_response_send(ws->req);
402 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
403 return PyErr_Format(PyExc_RuntimeError,
404 "failed to send response");
405 }
406 }
407
408 ws->state = NXT_WS_CLOSED;
409
410 Py_INCREF(ws);
411
412 return (PyObject *) ws;
413 }
414
415
416 static PyObject *
nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t * ws,PyObject * dict)417 nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
418 {
419 int rc;
420 uint8_t opcode;
421 PyObject *bytes, *text;
422 const void *buf;
423 Py_ssize_t buf_size;
424
425 if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
426 return PyErr_Format(PyExc_RuntimeError,
427 "WebSocket connect not received");
428 }
429
430 if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
431 return PyErr_Format(PyExc_RuntimeError,
432 "WebSocket not accepted yet");
433 }
434
435 if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
436 return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
437 }
438
439 if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
440 return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
441 }
442
443 bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
444 if (bytes == Py_None) {
445 bytes = NULL;
446 }
447
448 if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
449 return PyErr_Format(PyExc_TypeError,
450 "'bytes' is not a byte string");
451 }
452
453 text = PyDict_GetItem(dict, nxt_py_text_str);
454 if (text == Py_None) {
455 text = NULL;
456 }
457
458 if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
459 return PyErr_Format(PyExc_TypeError,
460 "'text' is not a unicode string");
461 }
462
463 if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
464 return PyErr_Format(PyExc_ValueError,
465 "Exactly one of 'bytes' or 'text' must be non-None");
466 }
467
468 if (bytes != NULL) {
469 buf = PyBytes_AS_STRING(bytes);
470 buf_size = PyBytes_GET_SIZE(bytes);
471 opcode = NXT_WEBSOCKET_OP_BINARY;
472
473 } else {
474 buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
475 opcode = NXT_WEBSOCKET_OP_TEXT;
476 }
477
478 rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
479 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
480 return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
481 }
482
483 Py_INCREF(ws);
484 return (PyObject *) ws;
485 }
486
487
488 void
nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t * frame)489 nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
490 {
491 uint8_t opcode;
492 uint16_t status_code;
493 uint64_t rest;
494 PyObject *msg, *exc;
495 nxt_py_asgi_websocket_t *ws;
496
497 ws = frame->req->data;
498
499 nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
500
501 opcode = frame->header->opcode;
502 if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
503 && opcode != NXT_WEBSOCKET_OP_TEXT
504 && opcode != NXT_WEBSOCKET_OP_BINARY
505 && opcode != NXT_WEBSOCKET_OP_CLOSE))
506 {
507 nxt_unit_websocket_done(frame);
508
509 nxt_unit_req_debug(ws->req,
510 "asgi_websocket_handler: ignore frame with opcode %d",
511 opcode);
512
513 return;
514 }
515
516 if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
517 nxt_unit_websocket_done(frame);
518
519 goto bad_state;
520 }
521
522 rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
523
524 if (nxt_slow_path(frame->payload_len > rest)) {
525 nxt_unit_websocket_done(frame);
526
527 goto too_big;
528 }
529
530 rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
531
532 if (nxt_slow_path(frame->payload_len > rest)) {
533 nxt_unit_websocket_done(frame);
534
535 goto too_big;
536 }
537
538 if (ws->receive_future == NULL || frame->header->fin == 0) {
539 nxt_py_asgi_websocket_suspend_frame(frame);
540
541 return;
542 }
543
544 if (!nxt_queue_is_empty(&ws->pending_frames)) {
545 if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
546 || opcode == NXT_WEBSOCKET_OP_BINARY))
547 {
548 nxt_unit_req_alert(ws->req,
549 "Invalid state: pending frames with active receiver. "
550 "CONT frame expected. (%d)", opcode);
551
552 PyErr_SetString(PyExc_AssertionError,
553 "Invalid state: pending frames with active receiver. "
554 "CONT frame expected.");
555
556 nxt_unit_websocket_done(frame);
557
558 return;
559 }
560 }
561
562 msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
563 if (nxt_slow_path(msg == NULL)) {
564 exc = PyErr_Occurred();
565 Py_INCREF(exc);
566
567 goto raise;
568 }
569
570 nxt_py_asgi_websocket_receive_done(ws, msg);
571
572 return;
573
574 bad_state:
575
576 if (ws->receive_future == NULL) {
577 ws->receive_exc_str = nxt_py_bad_state_str;
578
579 return;
580 }
581
582 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
583 nxt_py_bad_state_str,
584 NULL);
585 if (nxt_slow_path(exc == NULL)) {
586 nxt_unit_req_alert(ws->req, "RuntimeError create failed");
587 nxt_python_print_exception();
588
589 exc = Py_None;
590 Py_INCREF(exc);
591 }
592
593 goto raise;
594
595 too_big:
596
597 status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
598
599 (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
600 1, &status_code, 2);
601
602 ws->state = NXT_WS_CLOSED;
603
604 if (ws->receive_future == NULL) {
605 ws->receive_exc_str = nxt_py_message_too_big_str;
606
607 return;
608 }
609
610 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
611 nxt_py_message_too_big_str,
612 NULL);
613 if (nxt_slow_path(exc == NULL)) {
614 nxt_unit_req_alert(ws->req, "RuntimeError create failed");
615 nxt_python_print_exception();
616
617 exc = Py_None;
618 Py_INCREF(exc);
619 }
620
621 raise:
622
623 nxt_py_asgi_websocket_receive_fail(ws, exc);
624 }
625
626
627 static void
nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t * ws,PyObject * msg)628 nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
629 {
630 PyObject *future, *res;
631
632 future = ws->receive_future;
633 ws->receive_future = NULL;
634
635 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
636 if (nxt_slow_path(res == NULL)) {
637 nxt_unit_req_alert(ws->req, "'set_result' call failed");
638 nxt_python_print_exception();
639 }
640
641 Py_XDECREF(res);
642 Py_DECREF(future);
643
644 Py_DECREF(msg);
645 }
646
647
648 static void
nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t * ws,PyObject * exc)649 nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
650 {
651 PyObject *future, *res;
652
653 future = ws->receive_future;
654 ws->receive_future = NULL;
655
656 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
657 NULL);
658 if (nxt_slow_path(res == NULL)) {
659 nxt_unit_req_alert(ws->req, "'set_exception' call failed");
660 nxt_python_print_exception();
661 }
662
663 Py_XDECREF(res);
664 Py_DECREF(future);
665
666 Py_DECREF(exc);
667 }
668
669
670 static void
nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t * frame)671 nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
672 {
673 int rc;
674 nxt_py_asgi_websocket_t *ws;
675 nxt_py_asgi_penging_frame_t *p;
676
677 nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
678 "%d, %"PRIu64", %d",
679 frame->header->opcode, frame->payload_len,
680 frame->header->fin);
681
682 ws = frame->req->data;
683
684 rc = nxt_unit_websocket_retain(frame);
685 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
686 nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
687
688 nxt_unit_websocket_done(frame);
689
690 PyErr_SetString(PyExc_RuntimeError,
691 "Failed to retain frame for suspension.");
692
693 return;
694 }
695
696 p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
697 if (nxt_slow_path(p == NULL)) {
698 nxt_unit_req_alert(ws->req,
699 "Failed to allocate buffer to suspend frame.");
700
701 nxt_unit_websocket_done(frame);
702
703 PyErr_SetString(PyExc_RuntimeError,
704 "Failed to allocate buffer to suspend frame.");
705
706 return;
707 }
708
709 p->frame = frame;
710 nxt_queue_insert_tail(&ws->pending_frames, &p->link);
711
712 ws->pending_payload_len += frame->payload_len;
713 ws->pending_fins += frame->header->fin;
714
715 if (frame->header->fin) {
716 ws->pending_frame_len = 0;
717
718 } else {
719 if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
720 ws->pending_frame_len += frame->payload_len;
721
722 } else {
723 ws->pending_frame_len = frame->payload_len;
724 }
725 }
726 }
727
728
729 static PyObject *
nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t * ws,nxt_unit_websocket_frame_t * frame)730 nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
731 nxt_unit_websocket_frame_t *frame)
732 {
733 int fin;
734 char *buf;
735 uint8_t code_buf[2], opcode;
736 uint16_t code;
737 PyObject *msg, *data, *type, *data_key;
738 uint64_t payload_len;
739 nxt_unit_websocket_frame_t *fin_frame;
740
741 nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
742
743 fin_frame = NULL;
744
745 if (nxt_queue_is_empty(&ws->pending_frames)
746 || (frame != NULL
747 && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
748 {
749 payload_len = frame->payload_len;
750
751 } else {
752 if (frame != NULL) {
753 payload_len = ws->pending_payload_len + frame->payload_len;
754 fin_frame = frame;
755
756 } else {
757 payload_len = nxt_py_asgi_websocket_pending_len(ws);
758 }
759
760 frame = nxt_py_asgi_websocket_pop_frame(ws);
761 }
762
763 opcode = frame->header->opcode;
764
765 if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
766 nxt_unit_req_alert(ws->req,
767 "Invalid state: attempt to process CONT frame.");
768
769 nxt_unit_websocket_done(frame);
770
771 return PyErr_Format(PyExc_AssertionError,
772 "Invalid state: attempt to process CONT frame.");
773 }
774
775 type = nxt_py_websocket_receive_str;
776
777 switch (opcode) {
778 case NXT_WEBSOCKET_OP_TEXT:
779 buf = nxt_unit_malloc(frame->req->ctx, payload_len);
780 if (nxt_slow_path(buf == NULL)) {
781 nxt_unit_req_alert(ws->req,
782 "Failed to allocate buffer for payload (%d).",
783 (int) payload_len);
784
785 nxt_unit_websocket_done(frame);
786
787 return PyErr_Format(PyExc_RuntimeError,
788 "Failed to allocate buffer for payload (%d).",
789 (int) payload_len);
790 }
791
792 data = NULL;
793 data_key = nxt_py_text_str;
794
795 break;
796
797 case NXT_WEBSOCKET_OP_BINARY:
798 data = PyBytes_FromStringAndSize(NULL, payload_len);
799 if (nxt_slow_path(data == NULL)) {
800 nxt_unit_req_alert(ws->req,
801 "Failed to create Bytes for payload (%d).",
802 (int) payload_len);
803 nxt_python_print_exception();
804
805 nxt_unit_websocket_done(frame);
806
807 return PyErr_Format(PyExc_RuntimeError,
808 "Failed to create Bytes for payload.");
809 }
810
811 buf = (char *) PyBytes_AS_STRING(data);
812 data_key = nxt_py_bytes_str;
813
814 break;
815
816 case NXT_WEBSOCKET_OP_CLOSE:
817 if (frame->payload_len >= 2) {
818 nxt_unit_websocket_read(frame, code_buf, 2);
819 code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
820
821 } else {
822 code = NXT_WEBSOCKET_CR_NORMAL;
823 }
824
825 nxt_unit_websocket_done(frame);
826
827 data = PyLong_FromLong(code);
828 if (nxt_slow_path(data == NULL)) {
829 nxt_unit_req_alert(ws->req,
830 "Failed to create Long from code %d.",
831 (int) code);
832 nxt_python_print_exception();
833
834 return PyErr_Format(PyExc_RuntimeError,
835 "Failed to create Long from code %d.",
836 (int) code);
837 }
838
839 buf = NULL;
840 type = nxt_py_websocket_disconnect_str;
841 data_key = nxt_py_code_str;
842
843 break;
844
845 default:
846 nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
847
848 nxt_unit_websocket_done(frame);
849
850 return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
851 opcode);
852 }
853
854 if (buf != NULL) {
855 fin = frame->header->fin;
856 buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
857
858 nxt_unit_websocket_done(frame);
859
860 if (!fin) {
861 while (!nxt_queue_is_empty(&ws->pending_frames)) {
862 frame = nxt_py_asgi_websocket_pop_frame(ws);
863 fin = frame->header->fin;
864
865 buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
866
867 nxt_unit_websocket_done(frame);
868
869 if (fin) {
870 break;
871 }
872 }
873
874 if (fin_frame != NULL) {
875 buf += nxt_unit_websocket_read(fin_frame, buf,
876 fin_frame->payload_len);
877 nxt_unit_websocket_done(fin_frame);
878 }
879 }
880
881 if (opcode == NXT_WEBSOCKET_OP_TEXT) {
882 buf -= payload_len;
883
884 data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
885
886 nxt_unit_free(ws->req->ctx, buf);
887
888 if (nxt_slow_path(data == NULL)) {
889 nxt_unit_req_alert(ws->req,
890 "Failed to create Unicode for payload (%d).",
891 (int) payload_len);
892 nxt_python_print_exception();
893
894 return PyErr_Format(PyExc_RuntimeError,
895 "Failed to create Unicode.");
896 }
897 }
898 }
899
900 msg = nxt_py_asgi_new_msg(ws->req, type);
901 if (nxt_slow_path(msg == NULL)) {
902 Py_DECREF(data);
903 return NULL;
904 }
905
906 if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
907 nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
908
909 Py_DECREF(msg);
910 Py_DECREF(data);
911
912 return PyErr_Format(PyExc_RuntimeError,
913 "Python failed to set 'msg.data' item");
914 }
915
916 Py_DECREF(data);
917
918 return msg;
919 }
920
921
922 static uint64_t
nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t * ws)923 nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
924 {
925 uint64_t res;
926 nxt_py_asgi_penging_frame_t *p;
927
928 res = 0;
929
930 nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
931 res += p->frame->payload_len;
932
933 if (p->frame->header->fin) {
934 nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
935 (int) res);
936 return res;
937 }
938 } nxt_queue_loop;
939
940 nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
941 (int) res);
942 return res;
943 }
944
945
946 static nxt_unit_websocket_frame_t *
nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t * ws)947 nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
948 {
949 nxt_queue_link_t *lnk;
950 nxt_unit_websocket_frame_t *frame;
951 nxt_py_asgi_penging_frame_t *p;
952
953 lnk = nxt_queue_first(&ws->pending_frames);
954 nxt_queue_remove(lnk);
955
956 p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
957
958 frame = p->frame;
959 ws->pending_payload_len -= frame->payload_len;
960 ws->pending_fins -= frame->header->fin;
961
962 nxt_unit_free(frame->req->ctx, p);
963
964 nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
965 "%d, %"PRIu64", %d",
966 frame->header->opcode, frame->payload_len,
967 frame->header->fin);
968
969 return frame;
970 }
971
972
973 void
nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t * req)974 nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
975 {
976 PyObject *msg, *exc;
977 nxt_py_asgi_websocket_t *ws;
978
979 ws = req->data;
980
981 nxt_unit_req_debug(req, "asgi_websocket_close_handler");
982
983 if (nxt_slow_path(ws == NULL)) {
984 return;
985 }
986
987 if (ws->receive_future == NULL) {
988 ws->state = NXT_WS_DISCONNECTED;
989
990 return;
991 }
992
993 msg = nxt_py_asgi_websocket_disconnect_msg(ws);
994 if (nxt_slow_path(msg == NULL)) {
995 exc = PyErr_Occurred();
996 Py_INCREF(exc);
997
998 nxt_py_asgi_websocket_receive_fail(ws, exc);
999
1000 } else {
1001 nxt_py_asgi_websocket_receive_done(ws, msg);
1002 }
1003 }
1004
1005
1006 static PyObject *
nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t * ws)1007 nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
1008 {
1009 PyObject *msg, *code;
1010
1011 msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
1012 if (nxt_slow_path(msg == NULL)) {
1013 return NULL;
1014 }
1015
1016 code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
1017 if (nxt_slow_path(code == NULL)) {
1018 nxt_unit_req_alert(ws->req, "Python failed to create long");
1019 nxt_python_print_exception();
1020
1021 Py_DECREF(msg);
1022
1023 return PyErr_Format(PyExc_RuntimeError, "failed to create long");
1024 }
1025
1026 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
1027 nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
1028
1029 Py_DECREF(msg);
1030 Py_DECREF(code);
1031
1032 return PyErr_Format(PyExc_RuntimeError,
1033 "Python failed to set 'msg.code' item");
1034 }
1035
1036 Py_DECREF(code);
1037
1038 return msg;
1039 }
1040
1041
1042 static PyObject *
nxt_py_asgi_websocket_done(PyObject * self,PyObject * future)1043 nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
1044 {
1045 int rc;
1046 uint16_t status_code;
1047 PyObject *res;
1048 nxt_py_asgi_websocket_t *ws;
1049
1050 ws = (nxt_py_asgi_websocket_t *) self;
1051
1052 nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
1053
1054 /*
1055 * Get Future.result() and it raises an exception, if coroutine exited
1056 * with exception.
1057 */
1058 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
1059 if (nxt_slow_path(res == NULL)) {
1060 nxt_unit_req_error(ws->req,
1061 "Python failed to call 'future.result()'");
1062 nxt_python_print_exception();
1063
1064 rc = NXT_UNIT_ERROR;
1065
1066 } else {
1067 Py_DECREF(res);
1068
1069 rc = NXT_UNIT_OK;
1070 }
1071
1072 if (ws->state == NXT_WS_ACCEPTED) {
1073 status_code = (rc == NXT_UNIT_OK)
1074 ? htons(NXT_WEBSOCKET_CR_NORMAL)
1075 : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
1076
1077 rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
1078 1, &status_code, 2);
1079 }
1080
1081 while (!nxt_queue_is_empty(&ws->pending_frames)) {
1082 nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
1083 }
1084
1085 nxt_unit_request_done(ws->req, rc);
1086
1087 Py_RETURN_NONE;
1088 }
1089
1090
1091 #endif /* NXT_HAVE_ASGI */
1092