1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16 
17 
18 typedef struct {
19     PyObject_HEAD
20     nxt_unit_request_info_t  *req;
21     nxt_queue_link_t         link;
22     PyObject                 *receive_future;
23     PyObject                 *send_future;
24     uint64_t                 content_length;
25     uint64_t                 bytes_sent;
26     PyObject                 *send_body;
27     Py_ssize_t               send_body_off;
28     uint8_t                  complete;
29     uint8_t                  closed;
30     uint8_t                  empty_body_received;
31 } nxt_py_asgi_http_t;
32 
33 
34 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
35 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
36 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
37 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
38     PyObject *dict);
39 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
40     PyObject *dict);
41 static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http);
42 static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http,
43     PyObject *future, PyObject *msg);
44 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
45 
46 
47 static PyMethodDef nxt_py_asgi_http_methods[] = {
48     { "receive",   nxt_py_asgi_http_receive, METH_NOARGS, 0 },
49     { "send",      nxt_py_asgi_http_send,    METH_O,      0 },
50     { "_done",     nxt_py_asgi_http_done,    METH_O,      0 },
51     { NULL, NULL, 0, 0 }
52 };
53 
54 static PyAsyncMethods nxt_py_asgi_async_methods = {
55     .am_await = nxt_py_asgi_await,
56 };
57 
58 static PyTypeObject nxt_py_asgi_http_type = {
59     PyVarObject_HEAD_INIT(NULL, 0)
60 
61     .tp_name      = "unit._asgi_http",
62     .tp_basicsize = sizeof(nxt_py_asgi_http_t),
63     .tp_dealloc   = nxt_py_asgi_dealloc,
64     .tp_as_async  = &nxt_py_asgi_async_methods,
65     .tp_flags     = Py_TPFLAGS_DEFAULT,
66     .tp_doc       = "unit ASGI HTTP request object",
67     .tp_iter      = nxt_py_asgi_iter,
68     .tp_iternext  = nxt_py_asgi_next,
69     .tp_methods   = nxt_py_asgi_http_methods,
70 };
71 
72 static Py_ssize_t  nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
73 
74 
75 int
nxt_py_asgi_http_init(void)76 nxt_py_asgi_http_init(void)
77 {
78     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
79         nxt_unit_alert(NULL,
80                        "Python failed to initialize the 'http' type object");
81         return NXT_UNIT_ERROR;
82     }
83 
84     return NXT_UNIT_OK;
85 }
86 
87 
88 PyObject *
nxt_py_asgi_http_create(nxt_unit_request_info_t * req)89 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
90 {
91     nxt_py_asgi_http_t  *http;
92 
93     http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
94 
95     if (nxt_fast_path(http != NULL)) {
96         http->req = req;
97         http->receive_future = NULL;
98         http->send_future = NULL;
99         http->content_length = -1;
100         http->bytes_sent = 0;
101         http->send_body = NULL;
102         http->send_body_off = 0;
103         http->complete = 0;
104         http->closed = 0;
105         http->empty_body_received = 0;
106     }
107 
108     return (PyObject *) http;
109 }
110 
111 
112 static PyObject *
nxt_py_asgi_http_receive(PyObject * self,PyObject * none)113 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
114 {
115     PyObject                 *msg, *future;
116     nxt_py_asgi_http_t       *http;
117     nxt_py_asgi_ctx_data_t   *ctx_data;
118     nxt_unit_request_info_t  *req;
119 
120     http = (nxt_py_asgi_http_t *) self;
121     req = http->req;
122 
123     nxt_unit_req_debug(req, "asgi_http_receive");
124 
125     if (nxt_slow_path(http->closed || http->complete )) {
126         msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
127 
128     } else {
129         msg = nxt_py_asgi_http_read_msg(http);
130     }
131 
132     if (nxt_slow_path(msg == NULL)) {
133         return NULL;
134     }
135 
136     ctx_data = req->ctx->data;
137 
138     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
139     if (nxt_slow_path(future == NULL)) {
140         nxt_unit_req_alert(req, "Python failed to create Future object");
141         nxt_python_print_exception();
142 
143         Py_DECREF(msg);
144 
145         return PyErr_Format(PyExc_RuntimeError,
146                             "failed to create Future object");
147     }
148 
149     if (msg != Py_None) {
150         return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
151     }
152 
153     http->receive_future = future;
154     Py_INCREF(http->receive_future);
155 
156     Py_DECREF(msg);
157 
158     return future;
159 }
160 
161 
162 static PyObject *
nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t * http)163 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
164 {
165     char                     *body_buf;
166     ssize_t                  read_res;
167     PyObject                 *msg, *body;
168     Py_ssize_t               size;
169     nxt_unit_request_info_t  *req;
170 
171     req = http->req;
172 
173     size = req->content_length;
174 
175     if (size > nxt_py_asgi_http_body_buf_size) {
176         size = nxt_py_asgi_http_body_buf_size;
177     }
178 
179     if (size == 0) {
180         if (http->empty_body_received) {
181             Py_RETURN_NONE;
182         }
183 
184         http->empty_body_received = 1;
185     }
186 
187     if (size > 0) {
188         body = PyBytes_FromStringAndSize(NULL, size);
189         if (nxt_slow_path(body == NULL)) {
190             nxt_unit_req_alert(req, "Python failed to create body byte string");
191             nxt_python_print_exception();
192 
193             return PyErr_Format(PyExc_RuntimeError,
194                                 "failed to create Bytes object");
195         }
196 
197         body_buf = PyBytes_AS_STRING(body);
198 
199         read_res = nxt_unit_request_read(req, body_buf, size);
200 
201     } else {
202         body = NULL;
203         read_res = 0;
204     }
205 
206     if (read_res > 0 || read_res == size) {
207         msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
208         if (nxt_slow_path(msg == NULL)) {
209             Py_XDECREF(body);
210 
211             return NULL;
212         }
213 
214 #define SET_ITEM(dict, key, value) \
215     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
216                         == -1))                                                \
217     {                                                                          \
218         nxt_unit_req_alert(req,                                                \
219                            "Python failed to set '" #dict "." #key "' item");  \
220         PyErr_SetString(PyExc_RuntimeError,                                    \
221                         "Python failed to set '" #dict "." #key "' item");     \
222         goto fail;                                                             \
223     }
224 
225         if (body != NULL) {
226             SET_ITEM(msg, body, body)
227         }
228 
229         if (req->content_length > 0) {
230             SET_ITEM(msg, more_body, Py_True)
231         }
232 
233 #undef SET_ITEM
234 
235         Py_XDECREF(body);
236 
237         return msg;
238     }
239 
240     Py_XDECREF(body);
241 
242     Py_RETURN_NONE;
243 
244 fail:
245 
246     Py_DECREF(msg);
247     Py_XDECREF(body);
248 
249     return NULL;
250 }
251 
252 
253 static PyObject *
nxt_py_asgi_http_send(PyObject * self,PyObject * dict)254 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
255 {
256     PyObject            *type;
257     const char          *type_str;
258     Py_ssize_t          type_len;
259     nxt_py_asgi_http_t  *http;
260 
261     static const nxt_str_t  response_start = nxt_string("http.response.start");
262     static const nxt_str_t  response_body = nxt_string("http.response.body");
263 
264     http = (nxt_py_asgi_http_t *) self;
265 
266     type = PyDict_GetItem(dict, nxt_py_type_str);
267     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
268         nxt_unit_req_error(http->req, "asgi_http_send: "
269                                       "'type' is not a unicode string");
270         return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
271     }
272 
273     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
274 
275     nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
276                        (int) type_len, type_str);
277 
278     if (nxt_unit_response_is_init(http->req)) {
279         if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
280             return nxt_py_asgi_http_response_body(http, dict);
281         }
282 
283         return PyErr_Format(PyExc_RuntimeError,
284                             "Expected ASGI message 'http.response.body', "
285                             "but got '%U'", type);
286     }
287 
288     if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
289         return nxt_py_asgi_http_response_start(http, dict);
290     }
291 
292     return PyErr_Format(PyExc_RuntimeError,
293                         "Expected ASGI message 'http.response.start', "
294                         "but got '%U'", type);
295 }
296 
297 
298 static PyObject *
nxt_py_asgi_http_response_start(nxt_py_asgi_http_t * http,PyObject * dict)299 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
300 {
301     int                          rc;
302     PyObject                     *status, *headers, *res;
303     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
304     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
305 
306     status = PyDict_GetItem(dict, nxt_py_status_str);
307     if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
308         nxt_unit_req_error(http->req, "asgi_http_response_start: "
309                                       "'status' is not an integer");
310         return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
311     }
312 
313     calc_size_ctx.fields_size = 0;
314     calc_size_ctx.fields_count = 0;
315 
316     headers = PyDict_GetItem(dict, nxt_py_headers_str);
317     if (headers != NULL) {
318         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
319                                        &calc_size_ctx);
320         if (nxt_slow_path(res == NULL)) {
321             return NULL;
322         }
323 
324         Py_DECREF(res);
325     }
326 
327     rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
328                                 calc_size_ctx.fields_count,
329                                 calc_size_ctx.fields_size);
330     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
331         return PyErr_Format(PyExc_RuntimeError,
332                             "failed to allocate response object");
333     }
334 
335     add_field_ctx.req = http->req;
336     add_field_ctx.content_length = -1;
337 
338     if (headers != NULL) {
339         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
340                                        &add_field_ctx);
341         if (nxt_slow_path(res == NULL)) {
342             return NULL;
343         }
344 
345         Py_DECREF(res);
346     }
347 
348     http->content_length = add_field_ctx.content_length;
349 
350     Py_INCREF(http);
351     return (PyObject *) http;
352 }
353 
354 
355 static PyObject *
nxt_py_asgi_http_response_body(nxt_py_asgi_http_t * http,PyObject * dict)356 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
357 {
358     int                     rc;
359     char                    *body_str;
360     ssize_t                 sent;
361     PyObject                *body, *more_body, *future;
362     Py_ssize_t              body_len, body_off;
363     nxt_py_asgi_ctx_data_t  *ctx_data;
364 
365     body = PyDict_GetItem(dict, nxt_py_body_str);
366     if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
367         return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
368     }
369 
370     more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
371     if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
372         return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
373     }
374 
375     if (nxt_slow_path(http->complete)) {
376         return PyErr_Format(PyExc_RuntimeError,
377                             "Unexpected ASGI message 'http.response.body' "
378                             "sent, after response already completed");
379     }
380 
381     if (nxt_slow_path(http->send_future != NULL)) {
382         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
383     }
384 
385     if (body != NULL) {
386         body_str = PyBytes_AS_STRING(body);
387         body_len = PyBytes_GET_SIZE(body);
388 
389         nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
390                            (int) body_len, (more_body == Py_True) );
391 
392         if (nxt_slow_path(http->bytes_sent + body_len
393                               > http->content_length))
394         {
395             return PyErr_Format(PyExc_RuntimeError,
396                                 "Response content longer than Content-Length");
397         }
398 
399         body_off = 0;
400 
401         ctx_data = http->req->ctx->data;
402 
403         while (body_len > 0) {
404             sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
405             if (nxt_slow_path(sent < 0)) {
406                 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
407             }
408 
409             if (nxt_slow_path(sent == 0)) {
410                 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
411                                    "out of shared memory, %d",
412                                    (int) body_len);
413 
414                 future = PyObject_CallObject(ctx_data->loop_create_future,
415                                              NULL);
416                 if (nxt_slow_path(future == NULL)) {
417                     nxt_unit_req_alert(http->req,
418                                        "Python failed to create Future object");
419                     nxt_python_print_exception();
420 
421                     return PyErr_Format(PyExc_RuntimeError,
422                                         "failed to create Future object");
423                 }
424 
425                 http->send_body = body;
426                 Py_INCREF(http->send_body);
427                 http->send_body_off = body_off;
428 
429                 nxt_py_asgi_drain_wait(http->req, &http->link);
430 
431                 http->send_future = future;
432                 Py_INCREF(http->send_future);
433 
434                 return future;
435             }
436 
437             body_str += sent;
438             body_len -= sent;
439             body_off += sent;
440             http->bytes_sent += sent;
441         }
442 
443     } else {
444         nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
445                            (more_body == Py_True) );
446 
447         if (!nxt_unit_response_is_sent(http->req)) {
448             rc = nxt_unit_response_send(http->req);
449             if (nxt_slow_path(rc != NXT_UNIT_OK)) {
450                 return PyErr_Format(PyExc_RuntimeError,
451                                     "failed to send response");
452             }
453         }
454     }
455 
456     if (more_body == NULL || more_body == Py_False) {
457         http->complete = 1;
458 
459         nxt_py_asgi_http_emit_disconnect(http);
460     }
461 
462     Py_INCREF(http);
463     return (PyObject *) http;
464 }
465 
466 
467 static void
nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t * http)468 nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http)
469 {
470     PyObject  *msg, *future;
471 
472     if (http->receive_future == NULL) {
473         return;
474     }
475 
476     msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str);
477     if (nxt_slow_path(msg == NULL)) {
478         return;
479     }
480 
481     if (msg == Py_None) {
482         Py_DECREF(msg);
483         return;
484     }
485 
486     future = http->receive_future;
487     http->receive_future = NULL;
488 
489     nxt_py_asgi_http_set_result(http, future, msg);
490 
491     Py_DECREF(msg);
492 }
493 
494 
495 static void
nxt_py_asgi_http_set_result(nxt_py_asgi_http_t * http,PyObject * future,PyObject * msg)496 nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future,
497     PyObject *msg)
498 {
499     PyObject  *res;
500 
501     res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL);
502     if (nxt_slow_path(res == NULL)) {
503         nxt_unit_req_alert(http->req, "'done' call failed");
504         nxt_python_print_exception();
505     }
506 
507     if (nxt_fast_path(res == Py_False)) {
508         res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg,
509                                          NULL);
510         if (nxt_slow_path(res == NULL)) {
511             nxt_unit_req_alert(http->req, "'set_result' call failed");
512             nxt_python_print_exception();
513         }
514 
515     } else {
516         res = NULL;
517     }
518 
519     Py_XDECREF(res);
520     Py_DECREF(future);
521 }
522 
523 
524 void
nxt_py_asgi_http_data_handler(nxt_unit_request_info_t * req)525 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
526 {
527     PyObject            *msg, *future;
528     nxt_py_asgi_http_t  *http;
529 
530     http = req->data;
531 
532     nxt_unit_req_debug(req, "asgi_http_data_handler");
533 
534     if (http->receive_future == NULL) {
535         return;
536     }
537 
538     msg = nxt_py_asgi_http_read_msg(http);
539     if (nxt_slow_path(msg == NULL)) {
540         return;
541     }
542 
543     if (msg == Py_None) {
544         Py_DECREF(msg);
545         return;
546     }
547 
548     future = http->receive_future;
549     http->receive_future = NULL;
550 
551     nxt_py_asgi_http_set_result(http, future, msg);
552 
553     Py_DECREF(msg);
554 }
555 
556 
557 int
nxt_py_asgi_http_drain(nxt_queue_link_t * lnk)558 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
559 {
560     char                *body_str;
561     ssize_t             sent;
562     PyObject            *future, *exc, *res;
563     Py_ssize_t          body_len;
564     nxt_py_asgi_http_t  *http;
565 
566     http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
567 
568     body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
569     body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
570 
571     nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
572 
573     while (body_len > 0) {
574         sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
575         if (nxt_slow_path(sent < 0)) {
576             goto fail;
577         }
578 
579         if (nxt_slow_path(sent == 0)) {
580             return NXT_UNIT_AGAIN;
581         }
582 
583         body_str += sent;
584         body_len -= sent;
585 
586         http->send_body_off += sent;
587         http->bytes_sent += sent;
588     }
589 
590     Py_CLEAR(http->send_body);
591 
592     future = http->send_future;
593     http->send_future = NULL;
594 
595     nxt_py_asgi_http_set_result(http, future, Py_None);
596 
597     return NXT_UNIT_OK;
598 
599 fail:
600 
601     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
602                                        nxt_py_failed_to_send_body_str,
603                                        NULL);
604     if (nxt_slow_path(exc == NULL)) {
605         nxt_unit_req_alert(http->req, "RuntimeError create failed");
606         nxt_python_print_exception();
607 
608         exc = Py_None;
609         Py_INCREF(exc);
610     }
611 
612     future = http->send_future;
613     http->send_future = NULL;
614 
615     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
616                                      NULL);
617     if (nxt_slow_path(res == NULL)) {
618         nxt_unit_req_alert(http->req, "'set_exception' call failed");
619         nxt_python_print_exception();
620     }
621 
622     Py_XDECREF(res);
623     Py_DECREF(future);
624     Py_DECREF(exc);
625 
626     return NXT_UNIT_ERROR;
627 }
628 
629 
630 void
nxt_py_asgi_http_close_handler(nxt_unit_request_info_t * req)631 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
632 {
633     nxt_py_asgi_http_t  *http;
634 
635     http = req->data;
636 
637     nxt_unit_req_debug(req, "asgi_http_close_handler");
638 
639     if (nxt_fast_path(http != NULL)) {
640         http->closed = 1;
641 
642         nxt_py_asgi_http_emit_disconnect(http);
643     }
644 }
645 
646 
647 static PyObject *
nxt_py_asgi_http_done(PyObject * self,PyObject * future)648 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
649 {
650     int                 rc;
651     PyObject            *res;
652     nxt_py_asgi_http_t  *http;
653 
654     http = (nxt_py_asgi_http_t *) self;
655 
656     nxt_unit_req_debug(http->req, "asgi_http_done");
657 
658     /*
659      * Get Future.result() and it raises an exception, if coroutine exited
660      * with exception.
661      */
662     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
663     if (nxt_slow_path(res == NULL)) {
664         nxt_unit_req_error(http->req,
665                            "Python failed to call 'future.result()'");
666         nxt_python_print_exception();
667 
668         rc = NXT_UNIT_ERROR;
669 
670     } else {
671         Py_DECREF(res);
672 
673         rc = NXT_UNIT_OK;
674     }
675 
676     nxt_unit_request_done(http->req, rc);
677 
678     Py_RETURN_NONE;
679 }
680 
681 
682 #endif /* NXT_HAVE_ASGI */
683