1 #include <arpa/inet.h>
2 #include <fcntl.h>
3 #include <ev.h>
4
5 #if defined(__FreeBSD__) || defined(__DragonFly__)
6 # include <netinet/in.h> /* for struct sockaddr_in */
7 # include <sys/types.h>
8 # include <sys/socket.h>
9 #endif
10
11 #ifdef WANT_SIGINT_HANDLING
12 # include <sys/signal.h>
13 #endif
14
15 #include "filewrapper.h"
16 #include "portable_sendfile.h"
17 #include "common.h"
18 #include "wsgi.h"
19 #include "server.h"
20 #include "py2py3.h"
21
22 #ifdef WANT_STATSD
23 #include "statsd-client.h"
24 #endif
25
26 #define READ_BUFFER_SIZE 64*1024
27 #define Py_XCLEAR(obj) do { if(obj) { Py_DECREF(obj); obj = NULL; } } while(0)
28 #define GIL_LOCK(n) PyGILState_STATE _gilstate_##n = PyGILState_Ensure()
29 #define GIL_UNLOCK(n) PyGILState_Release(_gilstate_##n)
30
31 static const char* http_error_messages[5] = {
32 NULL, /* Error codes start at 1 because 0 means "no error" */
33 "HTTP/1.1 400 Bad Request\r\n\r\n",
34 "HTTP/1.1 406 Length Required\r\n\r\n",
35 "HTTP/1.1 417 Expectation Failed\r\n\r\n",
36 "HTTP/1.1 500 Internal Server Error\r\n\r\n"
37 };
38
39 static const char *CONTINUE = "HTTP/1.1 100 Continue\r\n\r\n";
40
41 enum _rw_state {
42 not_yet_done = 1,
43 done,
44 aborted,
45 expect_continue,
46 };
47 typedef enum _rw_state read_state;
48 typedef enum _rw_state write_state;
49
50 typedef struct {
51 ServerInfo* server_info;
52 ev_io accept_watcher;
53 } ThreadInfo;
54
55 typedef void ev_io_callback(struct ev_loop*, ev_io*, const int);
56
57 #ifdef WANT_SIGINT_HANDLING
58 typedef void ev_signal_callback(struct ev_loop*, ev_signal*, const int);
59 static ev_signal_callback ev_signal_on_sigint;
60 #endif
61
62 #ifdef WANT_SIGNAL_HANDLING
63 typedef void ev_timer_callback(struct ev_loop*, ev_timer*, const int);
64 static ev_timer_callback ev_timer_ontick;
65 ev_timer timeout_watcher;
66 #endif
67
68 static ev_io_callback ev_io_on_request;
69 static ev_io_callback ev_io_on_read;
70 static ev_io_callback ev_io_on_write;
71 static write_state on_write_sendfile(struct ev_loop*, Request*);
72 static write_state on_write_chunk(struct ev_loop*, Request*);
73 static bool do_send_chunk(Request*);
74 static bool do_sendfile(Request*);
75 static bool handle_nonzero_errno(Request*);
76 static void close_connection(struct ev_loop*, Request*);
77
78
server_run(ServerInfo * server_info)79 void server_run(ServerInfo* server_info)
80 {
81 struct ev_loop* mainloop = ev_loop_new(0);
82
83 ThreadInfo thread_info;
84 thread_info.server_info = server_info;
85
86 ev_set_userdata(mainloop, &thread_info);
87
88 ev_io_init(&thread_info.accept_watcher, ev_io_on_request, server_info->sockfd, EV_READ);
89 ev_io_start(mainloop, &thread_info.accept_watcher);
90
91 #ifdef WANT_SIGINT_HANDLING
92 ev_signal sigint_watcher;
93 ev_signal_init(&sigint_watcher, ev_signal_on_sigint, SIGINT);
94 ev_signal_start(mainloop, &sigint_watcher);
95 #endif
96
97 #ifdef WANT_SIGNAL_HANDLING
98 ev_timer_init(&timeout_watcher, ev_timer_ontick, 0., SIGNAL_CHECK_INTERVAL);
99 ev_timer_start(mainloop, &timeout_watcher);
100 ev_set_priority(&timeout_watcher, EV_MINPRI);
101 #endif
102
103 /* This is the program main loop */
104 Py_BEGIN_ALLOW_THREADS
105 ev_run(mainloop, 0);
106 ev_loop_destroy(mainloop);
107 Py_END_ALLOW_THREADS
108 }
109
110 #ifdef WANT_SIGINT_HANDLING
111 static void
pyerr_set_interrupt(struct ev_loop * mainloop,struct ev_cleanup * watcher,const int events)112 pyerr_set_interrupt(struct ev_loop* mainloop, struct ev_cleanup* watcher, const int events)
113 {
114 PyErr_SetInterrupt();
115 free(watcher);
116 }
117
118 static void
ev_signal_on_sigint(struct ev_loop * mainloop,ev_signal * watcher,const int events)119 ev_signal_on_sigint(struct ev_loop* mainloop, ev_signal* watcher, const int events)
120 {
121 /* Clean up and shut down this thread.
122 * (Shuts down the Python interpreter if this is the main thread) */
123 ev_cleanup* cleanup_watcher = malloc(sizeof(ev_cleanup));
124 ev_cleanup_init(cleanup_watcher, pyerr_set_interrupt);
125 ev_cleanup_start(mainloop, cleanup_watcher);
126
127 ev_io_stop(mainloop, &((ThreadInfo*)ev_userdata(mainloop))->accept_watcher);
128 ev_signal_stop(mainloop, watcher);
129 #ifdef WANT_SIGNAL_HANDLING
130 ev_timer_stop(mainloop, &timeout_watcher);
131 #endif
132 }
133 #endif
134
135 #ifdef WANT_SIGNAL_HANDLING
136 static void
ev_timer_ontick(struct ev_loop * mainloop,ev_timer * watcher,const int events)137 ev_timer_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
138 {
139 GIL_LOCK(0);
140 PyErr_CheckSignals();
141 GIL_UNLOCK(0);
142 }
143 #endif
144
145 static void
ev_io_on_request(struct ev_loop * mainloop,ev_io * watcher,const int events)146 ev_io_on_request(struct ev_loop* mainloop, ev_io* watcher, const int events)
147 {
148 int client_fd;
149 struct sockaddr_in sockaddr;
150 socklen_t addrlen;
151
152 addrlen = sizeof(struct sockaddr_in);
153 client_fd = accept(watcher->fd, (struct sockaddr*)&sockaddr, &addrlen);
154 if(client_fd < 0) {
155 DBG("Could not accept() client: errno %d", errno);
156 STATSD_INCREMENT("conn.accept.error");
157 return;
158 }
159
160 int flags = fcntl(client_fd, F_GETFL, 0);
161 if(fcntl(client_fd, F_SETFL, (flags < 0 ? 0 : flags) | O_NONBLOCK) == -1) {
162 STATSD_INCREMENT("conn.accept.error");
163 DBG("Could not set_nonblocking() client %d: errno %d", client_fd, errno);
164 return;
165 }
166
167 GIL_LOCK(0);
168
169 Request* request = Request_new(
170 ((ThreadInfo*)ev_userdata(mainloop))->server_info,
171 client_fd,
172 inet_ntoa(sockaddr.sin_addr)
173 );
174
175 GIL_UNLOCK(0);
176
177 STATSD_INCREMENT("conn.accept.success");
178
179 DBG_REQ(request, "Accepted client %s:%d on fd %d",
180 inet_ntoa(sockaddr.sin_addr), ntohs(sockaddr.sin_port), client_fd);
181
182 ev_io_init(&request->ev_watcher, &ev_io_on_read,
183 client_fd, EV_READ);
184 ev_io_start(mainloop, &request->ev_watcher);
185 }
186
187
188 static void
start_reading(struct ev_loop * mainloop,Request * request)189 start_reading(struct ev_loop *mainloop, Request *request)
190 {
191 ev_io_init(&request->ev_watcher, &ev_io_on_read,
192 request->client_fd, EV_READ);
193 ev_io_start(mainloop, &request->ev_watcher);
194 }
195
196 static void
start_writing(struct ev_loop * mainloop,Request * request)197 start_writing(struct ev_loop *mainloop, Request *request)
198 {
199 ev_io_init(&request->ev_watcher, &ev_io_on_write,
200 request->client_fd, EV_WRITE);
201 ev_io_start(mainloop, &request->ev_watcher);
202 }
203
204 static void
ev_io_on_read(struct ev_loop * mainloop,ev_io * watcher,const int events)205 ev_io_on_read(struct ev_loop* mainloop, ev_io* watcher, const int events)
206 {
207 static char read_buf[READ_BUFFER_SIZE];
208
209 Request* request = REQUEST_FROM_WATCHER(watcher);
210 read_state read_state;
211
212 ssize_t read_bytes = read(
213 request->client_fd,
214 read_buf,
215 READ_BUFFER_SIZE
216 );
217
218 GIL_LOCK(0);
219
220 if (read_bytes == 0) {
221 /* Client disconnected */
222 read_state = aborted;
223 DBG_REQ(request, "Client disconnected");
224 STATSD_INCREMENT("req.error.client_disconnected");
225 } else if (read_bytes < 0) {
226 /* Would block or error */
227 if(errno == EAGAIN || errno == EWOULDBLOCK) {
228 read_state = not_yet_done;
229 } else {
230 read_state = aborted;
231 DBG_REQ(request, "Hit errno %d while read()ing", errno);
232 STATSD_INCREMENT("req.error.read");
233 }
234 } else {
235 /* OK, either expect more data or done reading */
236 Request_parse(request, read_buf, (size_t)read_bytes);
237 if(request->state.error_code) {
238 /* HTTP parse error */
239 read_state = done;
240 DBG_REQ(request, "Parse error");
241 STATSD_INCREMENT("req.error.parse");
242 request->current_chunk = _PEP3333_Bytes_FromString(
243 http_error_messages[request->state.error_code]);
244 assert(request->iterator == NULL);
245 } else if(request->state.parse_finished) {
246 /* HTTP parse successful, meaning we have the entire
247 * request (the header _and_ the body). */
248 read_state = done;
249
250 STATSD_INCREMENT("req.success.read");
251
252 if (!wsgi_call_application(request)) {
253 /* Response is "HTTP 500 Internal Server Error" */
254 DBG_REQ(request, "WSGI app error");
255 assert(PyErr_Occurred());
256 PyErr_Print();
257 assert(!request->state.chunked_response);
258 Py_XCLEAR(request->iterator);
259 request->current_chunk = _PEP3333_Bytes_FromString(
260 http_error_messages[HTTP_SERVER_ERROR]);
261 STATSD_INCREMENT("req.error.internal");
262 }
263 } else if (request->state.expect_continue) {
264 /*
265 ** Handle "Expect: 100-continue" header.
266 ** See https://tools.ietf.org/html/rfc2616#page-48 and `on_header_value`
267 ** in request.c for more details.
268 */
269 read_state = expect_continue;
270 } else {
271 /* Wait for more data */
272 read_state = not_yet_done;
273 }
274 }
275
276 switch (read_state) {
277 case not_yet_done:
278 STATSD_INCREMENT("req.active");
279 break;
280 case expect_continue:
281 DBG_REQ(request, "pause read, write 100-continue");
282 ev_io_stop(mainloop, &request->ev_watcher);
283 request->current_chunk = _PEP3333_Bytes_FromString(CONTINUE);
284 start_writing(mainloop, request);
285 break;
286 case done:
287 DBG_REQ(request, "Stop read watcher, start write watcher");
288 ev_io_stop(mainloop, &request->ev_watcher);
289 start_writing(mainloop, request);
290 STATSD_INCREMENT("req.done");
291 break;
292 case aborted:
293 close_connection(mainloop, request);
294 STATSD_INCREMENT("req.aborted");
295 break;
296 }
297
298 GIL_UNLOCK(0);
299 }
300
301 static void
ev_io_on_write(struct ev_loop * mainloop,ev_io * watcher,const int events)302 ev_io_on_write(struct ev_loop* mainloop, ev_io* watcher, const int events)
303 {
304 /* Since the response writing code is fairly complex, I'll try to give a short
305 * overview of the different control flow paths etc.:
306 *
307 * On the very top level, there are two types of responses to distinguish:
308 * A) sendfile responses
309 * B) iterator/other responses
310 *
311 * These cases are handled by the 'on_write_sendfile' and 'on_write_chunk'
312 * routines, respectively. They use the 'do_sendfile' and 'do_send_chunk'
313 * routines to do the actual write()-ing. The 'do_*' routines return true if
314 * there's some data left to send in the current chunk (or, in the case of
315 * sendfile, the end of the file has not been reached yet).
316 *
317 * When the 'do_*' routines return false, the 'on_write_*' routines have to
318 * figure out if there's a next chunk to send (e.g. in the case of a response iterator).
319 */
320 Request* request = REQUEST_FROM_WATCHER(watcher);
321
322 GIL_LOCK(0);
323
324 write_state write_state;
325 if(request->iterable && FileWrapper_CheckExact(request->iterable) && FileWrapper_GetFd(request->iterable) != -1) {
326 write_state = on_write_sendfile(mainloop, request);
327 } else {
328 write_state = on_write_chunk(mainloop, request);
329 }
330
331 write_state = request->state.expect_continue
332 ? expect_continue
333 : write_state;
334
335 switch(write_state) {
336 case not_yet_done:
337 STATSD_INCREMENT("resp.active");
338 break;
339 case done:
340 STATSD_INCREMENT("resp.done");
341 if(request->state.keep_alive) {
342 DBG_REQ(request, "done, keep-alive");
343 STATSD_INCREMENT("resp.done.keepalive");
344 ev_io_stop(mainloop, &request->ev_watcher);
345
346 Request_clean(request);
347 Request_reset(request);
348
349 start_reading(mainloop, request);
350 } else {
351 DBG_REQ(request, "done, close");
352 STATSD_INCREMENT("resp.conn.close");
353 close_connection(mainloop, request);
354 }
355 break;
356 case expect_continue:
357 DBG_REQ(request, "expect continue");
358 ev_io_stop(mainloop, &request->ev_watcher);
359
360 request->state.expect_continue = false;
361 start_reading(mainloop, request);
362 break;
363 case aborted:
364 /* Response was aborted due to an error. We can't do anything graceful here
365 * because at least one chunk is already sent... just close the connection. */
366 close_connection(mainloop, request);
367 STATSD_INCREMENT("resp.aborted");
368 break;
369 }
370
371 GIL_UNLOCK(0);
372 }
373
374 static write_state
on_write_sendfile(struct ev_loop * mainloop,Request * request)375 on_write_sendfile(struct ev_loop* mainloop, Request* request)
376 {
377 /* A sendfile response is split into two phases:
378 * Phase A) sending HTTP headers
379 * Phase B) sending the actual file contents
380 */
381 if(request->current_chunk) {
382 /* Phase A) -- current_chunk contains the HTTP headers */
383 do_send_chunk(request);
384 // Either we have headers left to send, or current_chunk has been set to
385 // NULL and we'll fall into Phase B) on the next invocation.
386 return not_yet_done;
387 } else {
388 /* Phase B) */
389 if (do_sendfile(request)) {
390 // Haven't reached the end of file yet
391 return not_yet_done;
392 } else {
393 // Done with the file
394 return done;
395 }
396 }
397 }
398
399
400 static write_state
on_write_chunk(struct ev_loop * mainloop,Request * request)401 on_write_chunk(struct ev_loop* mainloop, Request* request)
402 {
403 if (do_send_chunk(request))
404 // data left to send in the current chunk
405 return not_yet_done;
406
407 if(request->iterator) {
408 /* Reached the end of a chunk in the response iterator. Get next chunk. */
409 PyObject* next_chunk = wsgi_iterable_get_next_chunk(request);
410 if(next_chunk) {
411 /* We found another chunk to send. */
412 if(request->state.chunked_response) {
413 request->current_chunk = wrap_http_chunk_cruft_around(next_chunk);
414 Py_DECREF(next_chunk);
415 } else {
416 request->current_chunk = next_chunk;
417 }
418 assert(request->current_chunk_p == 0);
419 return not_yet_done;
420
421 } else {
422 if(PyErr_Occurred()) {
423 /* Trying to get the next chunk raised an exception. */
424 PyErr_Print();
425 DBG_REQ(request, "Exception in iterator, can not recover");
426 return aborted;
427 } else {
428 /* This was the last chunk; cleanup. */
429 Py_CLEAR(request->iterator);
430 goto send_terminator_chunk;
431 }
432 }
433 } else {
434 /* We have no iterator to get more chunks from, so we're done.
435 * Reasons we might end up in this place:
436 * A) A parse or server error occurred
437 * C) We just finished a chunked response with the call to 'do_send_chunk'
438 * above and now maybe have to send the terminating empty chunk.
439 * B) We used chunked responses earlier in the response and
440 * are now sending the terminating empty chunk.
441 */
442 goto send_terminator_chunk;
443 }
444
445 assert(0); // unreachable
446
447 send_terminator_chunk:
448 if(request->state.chunked_response) {
449 /* We have to send a terminating empty chunk + \r\n */
450 request->current_chunk = _PEP3333_Bytes_FromString("0\r\n\r\n");
451 assert(request->current_chunk_p == 0);
452 // Next time we get here, don't send the terminating empty chunk again.
453 // XXX This is kind of a hack and should be refactored for easier understanding.
454 request->state.chunked_response = false;
455 return not_yet_done;
456 } else {
457 return done;
458 }
459 }
460
461 /* Return true if there's data left to send, false if we reached the end of the chunk. */
462 static bool
do_send_chunk(Request * request)463 do_send_chunk(Request* request)
464 {
465 Py_ssize_t bytes_sent;
466
467 assert(request->current_chunk != NULL);
468 assert(!(request->current_chunk_p == _PEP3333_Bytes_GET_SIZE(request->current_chunk)
469 && _PEP3333_Bytes_GET_SIZE(request->current_chunk) != 0));
470
471 bytes_sent = write(
472 request->client_fd,
473 _PEP3333_Bytes_AS_DATA(request->current_chunk) + request->current_chunk_p,
474 _PEP3333_Bytes_GET_SIZE(request->current_chunk) - request->current_chunk_p
475 );
476
477 if(bytes_sent == -1)
478 return handle_nonzero_errno(request);
479
480 request->current_chunk_p += bytes_sent;
481 if(request->current_chunk_p == _PEP3333_Bytes_GET_SIZE(request->current_chunk)) {
482 Py_CLEAR(request->current_chunk);
483 request->current_chunk_p = 0;
484 return false;
485 }
486 return true;
487 }
488
489 /* Return true if there's data left to send, false if we reached the end of the file. */
490 static bool
do_sendfile(Request * request)491 do_sendfile(Request* request)
492 {
493 Py_ssize_t bytes_sent = portable_sendfile(
494 request->client_fd,
495 FileWrapper_GetFd(request->iterable),
496 request->current_chunk_p
497 );
498 switch(bytes_sent) {
499 case -1:
500 if (handle_nonzero_errno(request)) {
501 return true;
502 } else {
503 FileWrapper_Done(request->iterable);
504 return false;
505 }
506 case 0:
507 FileWrapper_Done(request->iterable);
508 return false;
509 default:
510 request->current_chunk_p += bytes_sent;
511 return true;
512 }
513 }
514
515 static bool
handle_nonzero_errno(Request * request)516 handle_nonzero_errno(Request* request)
517 {
518 if(errno == EAGAIN || errno == EWOULDBLOCK) {
519 /* Try again later */
520 return true;
521 } else {
522 /* Serious transmission failure. Hang up. */
523 fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
524 Py_XDECREF(request->current_chunk);
525 Py_XCLEAR(request->iterator);
526 request->state.keep_alive = false;
527 return false;
528 }
529 }
530
531 static void
close_connection(struct ev_loop * mainloop,Request * request)532 close_connection(struct ev_loop *mainloop, Request* request)
533 {
534 DBG_REQ(request, "Closing socket");
535 ev_io_stop(mainloop, &request->ev_watcher);
536 close(request->client_fd);
537 Request_free(request);
538 }
539