1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_websocket.h>
15 #include <nxt_websocket_header.h>
16 #include <python/nxt_python_asgi.h>
17 #include <python/nxt_python_asgi_str.h>
18 
19 
20 enum {
21     NXT_WS_INIT,
22     NXT_WS_CONNECT,
23     NXT_WS_ACCEPTED,
24     NXT_WS_DISCONNECTED,
25     NXT_WS_CLOSED,
26 };
27 
28 
29 typedef struct {
30     nxt_queue_link_t            link;
31     nxt_unit_websocket_frame_t  *frame;
32 } nxt_py_asgi_penging_frame_t;
33 
34 
35 typedef struct {
36     PyObject_HEAD
37     nxt_unit_request_info_t  *req;
38     PyObject                 *receive_future;
39     PyObject                 *receive_exc_str;
40     int                      state;
41     nxt_queue_t              pending_frames;
42     uint64_t                 pending_payload_len;
43     uint64_t                 pending_frame_len;
44     int                      pending_fins;
45 } nxt_py_asgi_websocket_t;
46 
47 
48 static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
49 static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
50 static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
51     PyObject *dict);
52 static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
53     PyObject *dict);
54 static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
55     PyObject *dict);
56 static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
57     PyObject *msg);
58 static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
59     PyObject *exc);
60 static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
61 static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
62     nxt_unit_websocket_frame_t *frame);
63 static uint64_t nxt_py_asgi_websocket_pending_len(
64     nxt_py_asgi_websocket_t *ws);
65 static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
66     nxt_py_asgi_websocket_t *ws);
67 static PyObject *nxt_py_asgi_websocket_disconnect_msg(
68     nxt_py_asgi_websocket_t *ws);
69 static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
70 
71 
72 static PyMethodDef nxt_py_asgi_websocket_methods[] = {
73     { "receive",   nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
74     { "send",      nxt_py_asgi_websocket_send,    METH_O,      0 },
75     { "_done",     nxt_py_asgi_websocket_done,    METH_O,      0 },
76     { NULL, NULL, 0, 0 }
77 };
78 
79 static PyAsyncMethods nxt_py_asgi_async_methods = {
80     .am_await = nxt_py_asgi_await,
81 };
82 
83 static PyTypeObject nxt_py_asgi_websocket_type = {
84     PyVarObject_HEAD_INIT(NULL, 0)
85 
86     .tp_name      = "unit._asgi_websocket",
87     .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
88     .tp_dealloc   = nxt_py_asgi_dealloc,
89     .tp_as_async  = &nxt_py_asgi_async_methods,
90     .tp_flags     = Py_TPFLAGS_DEFAULT,
91     .tp_doc       = "unit ASGI WebSocket connection object",
92     .tp_iter      = nxt_py_asgi_iter,
93     .tp_iternext  = nxt_py_asgi_next,
94     .tp_methods   = nxt_py_asgi_websocket_methods,
95 };
96 
97 static uint64_t  nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
98 static uint64_t  nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
99 
100 
101 int
nxt_py_asgi_websocket_init(void)102 nxt_py_asgi_websocket_init(void)
103 {
104     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
105         nxt_unit_alert(NULL,
106               "Python failed to initialize the \"asgi_websocket\" type object");
107         return NXT_UNIT_ERROR;
108     }
109 
110     return NXT_UNIT_OK;
111 }
112 
113 
114 PyObject *
nxt_py_asgi_websocket_create(nxt_unit_request_info_t * req)115 nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
116 {
117     nxt_py_asgi_websocket_t  *ws;
118 
119     ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
120 
121     if (nxt_fast_path(ws != NULL)) {
122         ws->req = req;
123         ws->receive_future = NULL;
124         ws->receive_exc_str = NULL;
125         ws->state = NXT_WS_INIT;
126         nxt_queue_init(&ws->pending_frames);
127         ws->pending_payload_len = 0;
128         ws->pending_frame_len = 0;
129         ws->pending_fins = 0;
130     }
131 
132     return (PyObject *) ws;
133 }
134 
135 
136 static PyObject *
nxt_py_asgi_websocket_receive(PyObject * self,PyObject * none)137 nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
138 {
139     PyObject                 *future, *msg;
140     nxt_py_asgi_ctx_data_t   *ctx_data;
141     nxt_py_asgi_websocket_t  *ws;
142 
143     ws = (nxt_py_asgi_websocket_t *) self;
144 
145     nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
146 
147     /* If exception happened out of receive() call, raise it now. */
148     if (nxt_slow_path(ws->receive_exc_str != NULL)) {
149         PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
150 
151         ws->receive_exc_str = NULL;
152 
153         return NULL;
154     }
155 
156     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
157         nxt_unit_req_error(ws->req,
158                            "receive() called for closed WebSocket");
159 
160         return PyErr_Format(PyExc_RuntimeError,
161                             "WebSocket already closed");
162     }
163 
164     ctx_data = ws->req->ctx->data;
165 
166     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
167     if (nxt_slow_path(future == NULL)) {
168         nxt_unit_req_alert(ws->req, "Python failed to create Future object");
169         nxt_python_print_exception();
170 
171         return PyErr_Format(PyExc_RuntimeError,
172                             "failed to create Future object");
173     }
174 
175     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
176         ws->state = NXT_WS_CONNECT;
177 
178         msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
179 
180         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
181     }
182 
183     if (ws->pending_fins > 0) {
184         msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
185 
186         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
187     }
188 
189     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
190         msg = nxt_py_asgi_websocket_disconnect_msg(ws);
191 
192         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
193     }
194 
195     ws->receive_future = future;
196     Py_INCREF(ws->receive_future);
197 
198     return future;
199 }
200 
201 
202 static PyObject *
nxt_py_asgi_websocket_send(PyObject * self,PyObject * dict)203 nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
204 {
205     PyObject                 *type;
206     const char               *type_str;
207     Py_ssize_t               type_len;
208     nxt_py_asgi_websocket_t  *ws;
209 
210     static const nxt_str_t  websocket_accept = nxt_string("websocket.accept");
211     static const nxt_str_t  websocket_close = nxt_string("websocket.close");
212     static const nxt_str_t  websocket_send = nxt_string("websocket.send");
213 
214     ws = (nxt_py_asgi_websocket_t *) self;
215 
216     type = PyDict_GetItem(dict, nxt_py_type_str);
217     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
218         nxt_unit_req_error(ws->req, "asgi_websocket_send: "
219                            "'type' is not a unicode string");
220         return PyErr_Format(PyExc_TypeError,
221                             "'type' is not a unicode string");
222     }
223 
224     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
225 
226     nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
227                        (int) type_len, type_str);
228 
229     if (type_len == (Py_ssize_t) websocket_accept.length
230         && memcmp(type_str, websocket_accept.start, type_len) == 0)
231     {
232         return nxt_py_asgi_websocket_accept(ws, dict);
233     }
234 
235     if (type_len == (Py_ssize_t) websocket_close.length
236         && memcmp(type_str, websocket_close.start, type_len) == 0)
237     {
238         return nxt_py_asgi_websocket_close(ws, dict);
239     }
240 
241     if (type_len == (Py_ssize_t) websocket_send.length
242         && memcmp(type_str, websocket_send.start, type_len) == 0)
243     {
244         return nxt_py_asgi_websocket_send_frame(ws, dict);
245     }
246 
247     nxt_unit_req_error(ws->req, "asgi_websocket_send: "
248                        "unexpected 'type': '%.*s'", (int) type_len, type_str);
249     return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
250 }
251 
252 
253 static PyObject *
nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t * ws,PyObject * dict)254 nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
255 {
256     int                          rc;
257     char                         *subprotocol_str;
258     PyObject                     *res, *headers, *subprotocol;
259     Py_ssize_t                   subprotocol_len;
260     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
261     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
262 
263     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
264 
265     switch(ws->state) {
266     case NXT_WS_INIT:
267         return PyErr_Format(PyExc_RuntimeError,
268                             "WebSocket connect not received");
269     case NXT_WS_CONNECT:
270         break;
271 
272     case NXT_WS_ACCEPTED:
273         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
274 
275     case NXT_WS_DISCONNECTED:
276         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
277 
278     case NXT_WS_CLOSED:
279         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
280     }
281 
282     if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
283         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
284     }
285 
286     if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
287         return PyErr_Format(PyExc_RuntimeError, "response already sent");
288     }
289 
290     calc_size_ctx.fields_size = 0;
291     calc_size_ctx.fields_count = 0;
292 
293     headers = PyDict_GetItem(dict, nxt_py_headers_str);
294     if (headers != NULL) {
295         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
296                                        &calc_size_ctx);
297         if (nxt_slow_path(res == NULL)) {
298             return NULL;
299         }
300     }
301 
302     subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
303     if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
304         subprotocol_str = PyUnicode_DATA(subprotocol);
305         subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
306 
307         calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
308         calc_size_ctx.fields_count++;
309 
310     } else {
311         subprotocol_str = NULL;
312         subprotocol_len = 0;
313     }
314 
315     rc = nxt_unit_response_init(ws->req, 101,
316                                 calc_size_ctx.fields_count,
317                                 calc_size_ctx.fields_size);
318     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
319         return PyErr_Format(PyExc_RuntimeError,
320                             "failed to allocate response object");
321     }
322 
323     add_field_ctx.req = ws->req;
324     add_field_ctx.content_length = -1;
325 
326     if (headers != NULL) {
327         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
328                                        &add_field_ctx);
329         if (nxt_slow_path(res == NULL)) {
330             return NULL;
331         }
332     }
333 
334     if (subprotocol_len > 0) {
335         rc = nxt_unit_response_add_field(ws->req,
336                                          (const char *) ws_protocol.start,
337                                          ws_protocol.length,
338                                          subprotocol_str, subprotocol_len);
339         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
340             return PyErr_Format(PyExc_RuntimeError,
341                                 "failed to add header");
342         }
343     }
344 
345     rc = nxt_unit_response_send(ws->req);
346     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
347         return PyErr_Format(PyExc_RuntimeError, "failed to send response");
348     }
349 
350     ws->state = NXT_WS_ACCEPTED;
351 
352     Py_INCREF(ws);
353 
354     return (PyObject *) ws;
355 }
356 
357 
358 static PyObject *
nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t * ws,PyObject * dict)359 nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
360 {
361     int       rc;
362     uint16_t  status_code;
363     PyObject  *code;
364 
365     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
366         return PyErr_Format(PyExc_RuntimeError,
367                             "WebSocket connect not received");
368     }
369 
370     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
371         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
372     }
373 
374     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
375         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
376     }
377 
378     if (nxt_unit_response_is_websocket(ws->req)) {
379         code = PyDict_GetItem(dict, nxt_py_code_str);
380         if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
381             return PyErr_Format(PyExc_TypeError, "'code' is not integer");
382         }
383 
384         status_code = (code != NULL) ? htons(PyLong_AsLong(code))
385                                      : htons(NXT_WEBSOCKET_CR_NORMAL);
386 
387         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
388                                      1, &status_code, 2);
389         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
390             return PyErr_Format(PyExc_RuntimeError,
391                                 "failed to send close frame");
392         }
393 
394     } else {
395         rc = nxt_unit_response_init(ws->req, 403, 0, 0);
396         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
397             return PyErr_Format(PyExc_RuntimeError,
398                                 "failed to allocate response object");
399         }
400 
401         rc = nxt_unit_response_send(ws->req);
402         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
403             return PyErr_Format(PyExc_RuntimeError,
404                                 "failed to send response");
405         }
406     }
407 
408     ws->state = NXT_WS_CLOSED;
409 
410     Py_INCREF(ws);
411 
412     return (PyObject *) ws;
413 }
414 
415 
416 static PyObject *
nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t * ws,PyObject * dict)417 nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
418 {
419     int         rc;
420     uint8_t     opcode;
421     PyObject    *bytes, *text;
422     const void  *buf;
423     Py_ssize_t  buf_size;
424 
425     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
426         return PyErr_Format(PyExc_RuntimeError,
427                             "WebSocket connect not received");
428     }
429 
430     if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
431         return PyErr_Format(PyExc_RuntimeError,
432                             "WebSocket not accepted yet");
433     }
434 
435     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
436         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
437     }
438 
439     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
440         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
441     }
442 
443     bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
444     if (bytes == Py_None) {
445         bytes = NULL;
446     }
447 
448     if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
449         return PyErr_Format(PyExc_TypeError,
450                             "'bytes' is not a byte string");
451     }
452 
453     text = PyDict_GetItem(dict, nxt_py_text_str);
454     if (text == Py_None) {
455         text = NULL;
456     }
457 
458     if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
459         return PyErr_Format(PyExc_TypeError,
460                             "'text' is not a unicode string");
461     }
462 
463     if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
464         return PyErr_Format(PyExc_ValueError,
465                        "Exactly one of 'bytes' or 'text' must be non-None");
466     }
467 
468     if (bytes != NULL) {
469         buf = PyBytes_AS_STRING(bytes);
470         buf_size = PyBytes_GET_SIZE(bytes);
471         opcode = NXT_WEBSOCKET_OP_BINARY;
472 
473     } else {
474         buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
475         opcode = NXT_WEBSOCKET_OP_TEXT;
476     }
477 
478     rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
479     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
480         return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
481     }
482 
483     Py_INCREF(ws);
484     return (PyObject *) ws;
485 }
486 
487 
488 void
nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t * frame)489 nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
490 {
491     uint8_t                  opcode;
492     uint16_t                 status_code;
493     uint64_t                 rest;
494     PyObject                 *msg, *exc;
495     nxt_py_asgi_websocket_t  *ws;
496 
497     ws = frame->req->data;
498 
499     nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
500 
501     opcode = frame->header->opcode;
502     if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
503                       && opcode != NXT_WEBSOCKET_OP_TEXT
504                       && opcode != NXT_WEBSOCKET_OP_BINARY
505                       && opcode != NXT_WEBSOCKET_OP_CLOSE))
506     {
507         nxt_unit_websocket_done(frame);
508 
509         nxt_unit_req_debug(ws->req,
510                           "asgi_websocket_handler: ignore frame with opcode %d",
511                            opcode);
512 
513         return;
514     }
515 
516     if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
517         nxt_unit_websocket_done(frame);
518 
519         goto bad_state;
520     }
521 
522     rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
523 
524     if (nxt_slow_path(frame->payload_len > rest)) {
525         nxt_unit_websocket_done(frame);
526 
527         goto too_big;
528     }
529 
530     rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
531 
532     if (nxt_slow_path(frame->payload_len > rest)) {
533         nxt_unit_websocket_done(frame);
534 
535         goto too_big;
536     }
537 
538     if (ws->receive_future == NULL || frame->header->fin == 0) {
539         nxt_py_asgi_websocket_suspend_frame(frame);
540 
541         return;
542     }
543 
544     if (!nxt_queue_is_empty(&ws->pending_frames)) {
545         if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
546                           || opcode == NXT_WEBSOCKET_OP_BINARY))
547         {
548             nxt_unit_req_alert(ws->req,
549                          "Invalid state: pending frames with active receiver. "
550                          "CONT frame expected. (%d)", opcode);
551 
552             PyErr_SetString(PyExc_AssertionError,
553                          "Invalid state: pending frames with active receiver. "
554                          "CONT frame expected.");
555 
556             nxt_unit_websocket_done(frame);
557 
558             return;
559         }
560     }
561 
562     msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
563     if (nxt_slow_path(msg == NULL)) {
564         exc = PyErr_Occurred();
565         Py_INCREF(exc);
566 
567         goto raise;
568     }
569 
570     nxt_py_asgi_websocket_receive_done(ws, msg);
571 
572     return;
573 
574 bad_state:
575 
576     if (ws->receive_future == NULL) {
577         ws->receive_exc_str = nxt_py_bad_state_str;
578 
579         return;
580     }
581 
582     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
583                                        nxt_py_bad_state_str,
584                                        NULL);
585     if (nxt_slow_path(exc == NULL)) {
586         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
587         nxt_python_print_exception();
588 
589         exc = Py_None;
590         Py_INCREF(exc);
591     }
592 
593     goto raise;
594 
595 too_big:
596 
597     status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
598 
599     (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
600                                    1, &status_code, 2);
601 
602     ws->state = NXT_WS_CLOSED;
603 
604     if (ws->receive_future == NULL) {
605         ws->receive_exc_str = nxt_py_message_too_big_str;
606 
607         return;
608     }
609 
610     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
611                                        nxt_py_message_too_big_str,
612                                        NULL);
613     if (nxt_slow_path(exc == NULL)) {
614         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
615         nxt_python_print_exception();
616 
617         exc = Py_None;
618         Py_INCREF(exc);
619     }
620 
621 raise:
622 
623     nxt_py_asgi_websocket_receive_fail(ws, exc);
624 }
625 
626 
627 static void
nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t * ws,PyObject * msg)628 nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
629 {
630     PyObject  *future, *res;
631 
632     future = ws->receive_future;
633     ws->receive_future = NULL;
634 
635     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
636     if (nxt_slow_path(res == NULL)) {
637         nxt_unit_req_alert(ws->req, "'set_result' call failed");
638         nxt_python_print_exception();
639     }
640 
641     Py_XDECREF(res);
642     Py_DECREF(future);
643 
644     Py_DECREF(msg);
645 }
646 
647 
648 static void
nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t * ws,PyObject * exc)649 nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
650 {
651     PyObject  *future, *res;
652 
653     future = ws->receive_future;
654     ws->receive_future = NULL;
655 
656     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
657                                      NULL);
658     if (nxt_slow_path(res == NULL)) {
659         nxt_unit_req_alert(ws->req, "'set_exception' call failed");
660         nxt_python_print_exception();
661     }
662 
663     Py_XDECREF(res);
664     Py_DECREF(future);
665 
666     Py_DECREF(exc);
667 }
668 
669 
670 static void
nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t * frame)671 nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
672 {
673     int                          rc;
674     nxt_py_asgi_websocket_t      *ws;
675     nxt_py_asgi_penging_frame_t  *p;
676 
677     nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
678                        "%d, %"PRIu64", %d",
679                        frame->header->opcode, frame->payload_len,
680                        frame->header->fin);
681 
682     ws = frame->req->data;
683 
684     rc = nxt_unit_websocket_retain(frame);
685     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
686         nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
687 
688         nxt_unit_websocket_done(frame);
689 
690         PyErr_SetString(PyExc_RuntimeError,
691                         "Failed to retain frame for suspension.");
692 
693         return;
694     }
695 
696     p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
697     if (nxt_slow_path(p == NULL)) {
698         nxt_unit_req_alert(ws->req,
699                            "Failed to allocate buffer to suspend frame.");
700 
701         nxt_unit_websocket_done(frame);
702 
703         PyErr_SetString(PyExc_RuntimeError,
704                         "Failed to allocate buffer to suspend frame.");
705 
706         return;
707     }
708 
709     p->frame = frame;
710     nxt_queue_insert_tail(&ws->pending_frames, &p->link);
711 
712     ws->pending_payload_len += frame->payload_len;
713     ws->pending_fins += frame->header->fin;
714 
715     if (frame->header->fin) {
716         ws->pending_frame_len = 0;
717 
718     } else {
719         if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
720             ws->pending_frame_len += frame->payload_len;
721 
722         } else {
723             ws->pending_frame_len = frame->payload_len;
724         }
725     }
726 }
727 
728 
729 static PyObject *
nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t * ws,nxt_unit_websocket_frame_t * frame)730 nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
731     nxt_unit_websocket_frame_t *frame)
732 {
733     int                         fin;
734     char                        *buf;
735     uint8_t                     code_buf[2], opcode;
736     uint16_t                    code;
737     PyObject                    *msg, *data, *type, *data_key;
738     uint64_t                    payload_len;
739     nxt_unit_websocket_frame_t  *fin_frame;
740 
741     nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
742 
743     fin_frame = NULL;
744 
745     if (nxt_queue_is_empty(&ws->pending_frames)
746         || (frame != NULL
747             && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
748     {
749         payload_len = frame->payload_len;
750 
751     } else {
752         if (frame != NULL) {
753             payload_len = ws->pending_payload_len + frame->payload_len;
754             fin_frame = frame;
755 
756         } else {
757             payload_len = nxt_py_asgi_websocket_pending_len(ws);
758         }
759 
760         frame = nxt_py_asgi_websocket_pop_frame(ws);
761     }
762 
763     opcode = frame->header->opcode;
764 
765     if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
766         nxt_unit_req_alert(ws->req,
767                            "Invalid state: attempt to process CONT frame.");
768 
769         nxt_unit_websocket_done(frame);
770 
771         return PyErr_Format(PyExc_AssertionError,
772                             "Invalid state: attempt to process CONT frame.");
773     }
774 
775     type = nxt_py_websocket_receive_str;
776 
777     switch (opcode) {
778     case NXT_WEBSOCKET_OP_TEXT:
779         buf = nxt_unit_malloc(frame->req->ctx, payload_len);
780         if (nxt_slow_path(buf == NULL)) {
781             nxt_unit_req_alert(ws->req,
782                                "Failed to allocate buffer for payload (%d).",
783                                (int) payload_len);
784 
785             nxt_unit_websocket_done(frame);
786 
787             return PyErr_Format(PyExc_RuntimeError,
788                                 "Failed to allocate buffer for payload (%d).",
789                                 (int) payload_len);
790         }
791 
792         data = NULL;
793         data_key = nxt_py_text_str;
794 
795         break;
796 
797     case NXT_WEBSOCKET_OP_BINARY:
798         data = PyBytes_FromStringAndSize(NULL, payload_len);
799         if (nxt_slow_path(data == NULL)) {
800             nxt_unit_req_alert(ws->req,
801                                "Failed to create Bytes for payload (%d).",
802                                (int) payload_len);
803             nxt_python_print_exception();
804 
805             nxt_unit_websocket_done(frame);
806 
807             return PyErr_Format(PyExc_RuntimeError,
808                                 "Failed to create Bytes for payload.");
809         }
810 
811         buf = (char *) PyBytes_AS_STRING(data);
812         data_key = nxt_py_bytes_str;
813 
814         break;
815 
816     case NXT_WEBSOCKET_OP_CLOSE:
817         if (frame->payload_len >= 2) {
818             nxt_unit_websocket_read(frame, code_buf, 2);
819             code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
820 
821         } else {
822             code = NXT_WEBSOCKET_CR_NORMAL;
823         }
824 
825         nxt_unit_websocket_done(frame);
826 
827         data = PyLong_FromLong(code);
828         if (nxt_slow_path(data == NULL)) {
829             nxt_unit_req_alert(ws->req,
830                                "Failed to create Long from code %d.",
831                                (int) code);
832             nxt_python_print_exception();
833 
834             return PyErr_Format(PyExc_RuntimeError,
835                                 "Failed to create Long from code %d.",
836                                 (int) code);
837         }
838 
839         buf = NULL;
840         type = nxt_py_websocket_disconnect_str;
841         data_key = nxt_py_code_str;
842 
843         break;
844 
845     default:
846         nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
847 
848         nxt_unit_websocket_done(frame);
849 
850         return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
851                             opcode);
852     }
853 
854     if (buf != NULL) {
855         fin = frame->header->fin;
856         buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
857 
858         nxt_unit_websocket_done(frame);
859 
860         if (!fin) {
861             while (!nxt_queue_is_empty(&ws->pending_frames)) {
862                 frame = nxt_py_asgi_websocket_pop_frame(ws);
863                 fin = frame->header->fin;
864 
865                 buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
866 
867                 nxt_unit_websocket_done(frame);
868 
869                 if (fin) {
870                     break;
871                 }
872             }
873 
874             if (fin_frame != NULL) {
875                 buf += nxt_unit_websocket_read(fin_frame, buf,
876                                                fin_frame->payload_len);
877                 nxt_unit_websocket_done(fin_frame);
878             }
879         }
880 
881         if (opcode == NXT_WEBSOCKET_OP_TEXT) {
882             buf -= payload_len;
883 
884             data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
885 
886             nxt_unit_free(ws->req->ctx, buf);
887 
888             if (nxt_slow_path(data == NULL)) {
889                 nxt_unit_req_alert(ws->req,
890                                    "Failed to create Unicode for payload (%d).",
891                                    (int) payload_len);
892                 nxt_python_print_exception();
893 
894                 return PyErr_Format(PyExc_RuntimeError,
895                                     "Failed to create Unicode.");
896             }
897         }
898     }
899 
900     msg = nxt_py_asgi_new_msg(ws->req, type);
901     if (nxt_slow_path(msg == NULL)) {
902         Py_DECREF(data);
903         return NULL;
904     }
905 
906     if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
907         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
908 
909         Py_DECREF(msg);
910         Py_DECREF(data);
911 
912         return PyErr_Format(PyExc_RuntimeError,
913                             "Python failed to set 'msg.data' item");
914     }
915 
916     Py_DECREF(data);
917 
918     return msg;
919 }
920 
921 
922 static uint64_t
nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t * ws)923 nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
924 {
925     uint64_t                     res;
926     nxt_py_asgi_penging_frame_t  *p;
927 
928     res = 0;
929 
930     nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
931         res += p->frame->payload_len;
932 
933         if (p->frame->header->fin) {
934             nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
935                                (int) res);
936             return res;
937         }
938     } nxt_queue_loop;
939 
940     nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
941                        (int) res);
942     return res;
943 }
944 
945 
946 static nxt_unit_websocket_frame_t *
nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t * ws)947 nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
948 {
949     nxt_queue_link_t             *lnk;
950     nxt_unit_websocket_frame_t   *frame;
951     nxt_py_asgi_penging_frame_t  *p;
952 
953     lnk = nxt_queue_first(&ws->pending_frames);
954     nxt_queue_remove(lnk);
955 
956     p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
957 
958     frame = p->frame;
959     ws->pending_payload_len -= frame->payload_len;
960     ws->pending_fins -= frame->header->fin;
961 
962     nxt_unit_free(frame->req->ctx, p);
963 
964     nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
965                        "%d, %"PRIu64", %d",
966                        frame->header->opcode, frame->payload_len,
967                        frame->header->fin);
968 
969     return frame;
970 }
971 
972 
973 void
nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t * req)974 nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
975 {
976     PyObject                 *msg, *exc;
977     nxt_py_asgi_websocket_t  *ws;
978 
979     ws = req->data;
980 
981     nxt_unit_req_debug(req, "asgi_websocket_close_handler");
982 
983     if (nxt_slow_path(ws == NULL)) {
984         return;
985     }
986 
987     if (ws->receive_future == NULL) {
988         ws->state = NXT_WS_DISCONNECTED;
989 
990         return;
991     }
992 
993     msg = nxt_py_asgi_websocket_disconnect_msg(ws);
994     if (nxt_slow_path(msg == NULL)) {
995         exc = PyErr_Occurred();
996         Py_INCREF(exc);
997 
998         nxt_py_asgi_websocket_receive_fail(ws, exc);
999 
1000     } else {
1001         nxt_py_asgi_websocket_receive_done(ws, msg);
1002     }
1003 }
1004 
1005 
1006 static PyObject *
nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t * ws)1007 nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
1008 {
1009     PyObject  *msg, *code;
1010 
1011     msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
1012     if (nxt_slow_path(msg == NULL)) {
1013         return NULL;
1014     }
1015 
1016     code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
1017     if (nxt_slow_path(code == NULL)) {
1018         nxt_unit_req_alert(ws->req, "Python failed to create long");
1019         nxt_python_print_exception();
1020 
1021         Py_DECREF(msg);
1022 
1023         return PyErr_Format(PyExc_RuntimeError, "failed to create long");
1024     }
1025 
1026     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
1027         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
1028 
1029         Py_DECREF(msg);
1030         Py_DECREF(code);
1031 
1032         return PyErr_Format(PyExc_RuntimeError,
1033                             "Python failed to set 'msg.code' item");
1034     }
1035 
1036     Py_DECREF(code);
1037 
1038     return msg;
1039 }
1040 
1041 
1042 static PyObject *
nxt_py_asgi_websocket_done(PyObject * self,PyObject * future)1043 nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
1044 {
1045     int                      rc;
1046     uint16_t                 status_code;
1047     PyObject                 *res;
1048     nxt_py_asgi_websocket_t  *ws;
1049 
1050     ws = (nxt_py_asgi_websocket_t *) self;
1051 
1052     nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
1053 
1054     /*
1055      * Get Future.result() and it raises an exception, if coroutine exited
1056      * with exception.
1057      */
1058     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
1059     if (nxt_slow_path(res == NULL)) {
1060         nxt_unit_req_error(ws->req,
1061                            "Python failed to call 'future.result()'");
1062         nxt_python_print_exception();
1063 
1064         rc = NXT_UNIT_ERROR;
1065 
1066     } else {
1067         Py_DECREF(res);
1068 
1069         rc = NXT_UNIT_OK;
1070     }
1071 
1072     if (ws->state == NXT_WS_ACCEPTED) {
1073         status_code = (rc == NXT_UNIT_OK)
1074                       ? htons(NXT_WEBSOCKET_CR_NORMAL)
1075                       : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
1076 
1077         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
1078                                      1, &status_code, 2);
1079     }
1080 
1081     while (!nxt_queue_is_empty(&ws->pending_frames)) {
1082         nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
1083     }
1084 
1085     nxt_unit_request_done(ws->req, rc);
1086 
1087     Py_RETURN_NONE;
1088 }
1089 
1090 
1091 #endif /* NXT_HAVE_ASGI */
1092