1 #include <arpa/inet.h>
2 #include <fcntl.h>
3 #include <ev.h>
4 
5 #if defined(__FreeBSD__) || defined(__DragonFly__)
6 # include <netinet/in.h> /* for struct sockaddr_in */
7 # include <sys/types.h>
8 # include <sys/socket.h>
9 #endif
10 
11 #ifdef WANT_SIGINT_HANDLING
12 # include <sys/signal.h>
13 #endif
14 
15 #include "filewrapper.h"
16 #include "portable_sendfile.h"
17 #include "common.h"
18 #include "wsgi.h"
19 #include "server.h"
20 #include "py2py3.h"
21 
22 #ifdef WANT_STATSD
23 #include "statsd-client.h"
24 #endif
25 
26 #define READ_BUFFER_SIZE 64*1024
27 #define Py_XCLEAR(obj) do { if(obj) { Py_DECREF(obj); obj = NULL; } } while(0)
28 #define GIL_LOCK(n) PyGILState_STATE _gilstate_##n = PyGILState_Ensure()
29 #define GIL_UNLOCK(n) PyGILState_Release(_gilstate_##n)
30 
31 static const char* http_error_messages[5] = {
32   NULL, /* Error codes start at 1 because 0 means "no error" */
33   "HTTP/1.1 400 Bad Request\r\n\r\n",
34   "HTTP/1.1 406 Length Required\r\n\r\n",
35   "HTTP/1.1 417 Expectation Failed\r\n\r\n",
36   "HTTP/1.1 500 Internal Server Error\r\n\r\n"
37 };
38 
39 static const char *CONTINUE = "HTTP/1.1 100 Continue\r\n\r\n";
40 
41 enum _rw_state {
42   not_yet_done = 1,
43   done,
44   aborted,
45   expect_continue,
46 };
47 typedef enum _rw_state read_state;
48 typedef enum _rw_state write_state;
49 
50 typedef struct {
51   ServerInfo* server_info;
52   ev_io accept_watcher;
53 } ThreadInfo;
54 
55 typedef void ev_io_callback(struct ev_loop*, ev_io*, const int);
56 
57 #ifdef WANT_SIGINT_HANDLING
58 typedef void ev_signal_callback(struct ev_loop*, ev_signal*, const int);
59 static ev_signal_callback ev_signal_on_sigint;
60 #endif
61 
62 #ifdef WANT_SIGNAL_HANDLING
63 typedef void ev_timer_callback(struct ev_loop*, ev_timer*, const int);
64 static ev_timer_callback ev_timer_ontick;
65 ev_timer timeout_watcher;
66 #endif
67 
68 static ev_io_callback ev_io_on_request;
69 static ev_io_callback ev_io_on_read;
70 static ev_io_callback ev_io_on_write;
71 static write_state on_write_sendfile(struct ev_loop*, Request*);
72 static write_state on_write_chunk(struct ev_loop*, Request*);
73 static bool do_send_chunk(Request*);
74 static bool do_sendfile(Request*);
75 static bool handle_nonzero_errno(Request*);
76 static void close_connection(struct ev_loop*, Request*);
77 
78 
server_run(ServerInfo * server_info)79 void server_run(ServerInfo* server_info)
80 {
81   struct ev_loop* mainloop = ev_loop_new(0);
82 
83   ThreadInfo thread_info;
84   thread_info.server_info = server_info;
85 
86   ev_set_userdata(mainloop, &thread_info);
87 
88   ev_io_init(&thread_info.accept_watcher, ev_io_on_request, server_info->sockfd, EV_READ);
89   ev_io_start(mainloop, &thread_info.accept_watcher);
90 
91 #ifdef WANT_SIGINT_HANDLING
92   ev_signal sigint_watcher;
93   ev_signal_init(&sigint_watcher, ev_signal_on_sigint, SIGINT);
94   ev_signal_start(mainloop, &sigint_watcher);
95 #endif
96 
97 #ifdef WANT_SIGNAL_HANDLING
98   ev_timer_init(&timeout_watcher, ev_timer_ontick, 0., SIGNAL_CHECK_INTERVAL);
99   ev_timer_start(mainloop, &timeout_watcher);
100   ev_set_priority(&timeout_watcher, EV_MINPRI);
101 #endif
102 
103   /* This is the program main loop */
104   Py_BEGIN_ALLOW_THREADS
105   ev_run(mainloop, 0);
106   ev_loop_destroy(mainloop);
107   Py_END_ALLOW_THREADS
108 }
109 
110 #ifdef WANT_SIGINT_HANDLING
111 static void
pyerr_set_interrupt(struct ev_loop * mainloop,struct ev_cleanup * watcher,const int events)112 pyerr_set_interrupt(struct ev_loop* mainloop, struct ev_cleanup* watcher, const int events)
113 {
114   PyErr_SetInterrupt();
115   free(watcher);
116 }
117 
118 static void
ev_signal_on_sigint(struct ev_loop * mainloop,ev_signal * watcher,const int events)119 ev_signal_on_sigint(struct ev_loop* mainloop, ev_signal* watcher, const int events)
120 {
121   /* Clean up and shut down this thread.
122    * (Shuts down the Python interpreter if this is the main thread) */
123   ev_cleanup* cleanup_watcher = malloc(sizeof(ev_cleanup));
124   ev_cleanup_init(cleanup_watcher, pyerr_set_interrupt);
125   ev_cleanup_start(mainloop, cleanup_watcher);
126 
127   ev_io_stop(mainloop, &((ThreadInfo*)ev_userdata(mainloop))->accept_watcher);
128   ev_signal_stop(mainloop, watcher);
129 #ifdef WANT_SIGNAL_HANDLING
130   ev_timer_stop(mainloop, &timeout_watcher);
131 #endif
132 }
133 #endif
134 
135 #ifdef WANT_SIGNAL_HANDLING
136 static void
ev_timer_ontick(struct ev_loop * mainloop,ev_timer * watcher,const int events)137 ev_timer_ontick(struct ev_loop* mainloop, ev_timer* watcher, const int events)
138 {
139   GIL_LOCK(0);
140   PyErr_CheckSignals();
141   GIL_UNLOCK(0);
142 }
143 #endif
144 
145 static void
ev_io_on_request(struct ev_loop * mainloop,ev_io * watcher,const int events)146 ev_io_on_request(struct ev_loop* mainloop, ev_io* watcher, const int events)
147 {
148   int client_fd;
149   struct sockaddr_in sockaddr;
150   socklen_t addrlen;
151 
152   addrlen = sizeof(struct sockaddr_in);
153   client_fd = accept(watcher->fd, (struct sockaddr*)&sockaddr, &addrlen);
154   if(client_fd < 0) {
155     DBG("Could not accept() client: errno %d", errno);
156     STATSD_INCREMENT("conn.accept.error");
157     return;
158   }
159 
160   int flags = fcntl(client_fd, F_GETFL, 0);
161   if(fcntl(client_fd, F_SETFL, (flags < 0 ? 0 : flags) | O_NONBLOCK) == -1) {
162     STATSD_INCREMENT("conn.accept.error");
163     DBG("Could not set_nonblocking() client %d: errno %d", client_fd, errno);
164     return;
165   }
166 
167   GIL_LOCK(0);
168 
169   Request* request = Request_new(
170     ((ThreadInfo*)ev_userdata(mainloop))->server_info,
171     client_fd,
172     inet_ntoa(sockaddr.sin_addr)
173   );
174 
175   GIL_UNLOCK(0);
176 
177   STATSD_INCREMENT("conn.accept.success");
178 
179   DBG_REQ(request, "Accepted client %s:%d on fd %d",
180           inet_ntoa(sockaddr.sin_addr), ntohs(sockaddr.sin_port), client_fd);
181 
182   ev_io_init(&request->ev_watcher, &ev_io_on_read,
183              client_fd, EV_READ);
184   ev_io_start(mainloop, &request->ev_watcher);
185 }
186 
187 
188 static void
start_reading(struct ev_loop * mainloop,Request * request)189 start_reading(struct ev_loop *mainloop, Request *request)
190 {
191   ev_io_init(&request->ev_watcher, &ev_io_on_read,
192              request->client_fd, EV_READ);
193   ev_io_start(mainloop, &request->ev_watcher);
194 }
195 
196 static void
start_writing(struct ev_loop * mainloop,Request * request)197 start_writing(struct ev_loop *mainloop, Request *request)
198 {
199   ev_io_init(&request->ev_watcher, &ev_io_on_write,
200              request->client_fd, EV_WRITE);
201   ev_io_start(mainloop, &request->ev_watcher);
202 }
203 
204 static void
ev_io_on_read(struct ev_loop * mainloop,ev_io * watcher,const int events)205 ev_io_on_read(struct ev_loop* mainloop, ev_io* watcher, const int events)
206 {
207   static char read_buf[READ_BUFFER_SIZE];
208 
209   Request* request = REQUEST_FROM_WATCHER(watcher);
210   read_state read_state;
211 
212   ssize_t read_bytes = read(
213     request->client_fd,
214     read_buf,
215     READ_BUFFER_SIZE
216   );
217 
218   GIL_LOCK(0);
219 
220   if (read_bytes == 0) {
221     /* Client disconnected */
222     read_state = aborted;
223     DBG_REQ(request, "Client disconnected");
224     STATSD_INCREMENT("req.error.client_disconnected");
225   } else if (read_bytes < 0) {
226     /* Would block or error */
227     if(errno == EAGAIN || errno == EWOULDBLOCK) {
228       read_state = not_yet_done;
229     } else {
230       read_state = aborted;
231       DBG_REQ(request, "Hit errno %d while read()ing", errno);
232       STATSD_INCREMENT("req.error.read");
233     }
234   } else {
235     /* OK, either expect more data or done reading */
236     Request_parse(request, read_buf, (size_t)read_bytes);
237     if(request->state.error_code) {
238       /* HTTP parse error */
239       read_state = done;
240       DBG_REQ(request, "Parse error");
241       STATSD_INCREMENT("req.error.parse");
242       request->current_chunk = _PEP3333_Bytes_FromString(
243         http_error_messages[request->state.error_code]);
244       assert(request->iterator == NULL);
245     } else if(request->state.parse_finished) {
246       /* HTTP parse successful, meaning we have the entire
247        * request (the header _and_ the body). */
248       read_state = done;
249 
250       STATSD_INCREMENT("req.success.read");
251 
252       if (!wsgi_call_application(request)) {
253         /* Response is "HTTP 500 Internal Server Error" */
254         DBG_REQ(request, "WSGI app error");
255         assert(PyErr_Occurred());
256         PyErr_Print();
257         assert(!request->state.chunked_response);
258         Py_XCLEAR(request->iterator);
259         request->current_chunk = _PEP3333_Bytes_FromString(
260           http_error_messages[HTTP_SERVER_ERROR]);
261         STATSD_INCREMENT("req.error.internal");
262       }
263     } else if (request->state.expect_continue) {
264       /*
265       ** Handle "Expect: 100-continue" header.
266       ** See https://tools.ietf.org/html/rfc2616#page-48 and `on_header_value`
267       ** in request.c for more details.
268       */
269       read_state = expect_continue;
270     } else {
271       /* Wait for more data */
272       read_state = not_yet_done;
273     }
274   }
275 
276   switch (read_state) {
277   case not_yet_done:
278     STATSD_INCREMENT("req.active");
279     break;
280   case expect_continue:
281     DBG_REQ(request, "pause read, write 100-continue");
282     ev_io_stop(mainloop, &request->ev_watcher);
283     request->current_chunk = _PEP3333_Bytes_FromString(CONTINUE);
284     start_writing(mainloop, request);
285     break;
286   case done:
287     DBG_REQ(request, "Stop read watcher, start write watcher");
288     ev_io_stop(mainloop, &request->ev_watcher);
289     start_writing(mainloop, request);
290     STATSD_INCREMENT("req.done");
291     break;
292   case aborted:
293     close_connection(mainloop, request);
294     STATSD_INCREMENT("req.aborted");
295     break;
296   }
297 
298   GIL_UNLOCK(0);
299 }
300 
301 static void
ev_io_on_write(struct ev_loop * mainloop,ev_io * watcher,const int events)302 ev_io_on_write(struct ev_loop* mainloop, ev_io* watcher, const int events)
303 {
304   /* Since the response writing code is fairly complex, I'll try to give a short
305    * overview of the different control flow paths etc.:
306    *
307    * On the very top level, there are two types of responses to distinguish:
308    * A) sendfile responses
309    * B) iterator/other responses
310    *
311    * These cases are handled by the 'on_write_sendfile' and 'on_write_chunk'
312    * routines, respectively.  They use the 'do_sendfile' and 'do_send_chunk'
313    * routines to do the actual write()-ing. The 'do_*' routines return true if
314    * there's some data left to send in the current chunk (or, in the case of
315    * sendfile, the end of the file has not been reached yet).
316    *
317    * When the 'do_*' routines return false, the 'on_write_*' routines have to
318    * figure out if there's a next chunk to send (e.g. in the case of a response iterator).
319    */
320   Request* request = REQUEST_FROM_WATCHER(watcher);
321 
322   GIL_LOCK(0);
323 
324   write_state write_state;
325   if(request->iterable && FileWrapper_CheckExact(request->iterable) && FileWrapper_GetFd(request->iterable) != -1) {
326     write_state = on_write_sendfile(mainloop, request);
327   } else {
328     write_state = on_write_chunk(mainloop, request);
329   }
330 
331   write_state = request->state.expect_continue
332     ? expect_continue
333     : write_state;
334 
335   switch(write_state) {
336   case not_yet_done:
337     STATSD_INCREMENT("resp.active");
338     break;
339   case done:
340     STATSD_INCREMENT("resp.done");
341     if(request->state.keep_alive) {
342       DBG_REQ(request, "done, keep-alive");
343       STATSD_INCREMENT("resp.done.keepalive");
344       ev_io_stop(mainloop, &request->ev_watcher);
345 
346       Request_clean(request);
347       Request_reset(request);
348 
349       start_reading(mainloop, request);
350     } else {
351       DBG_REQ(request, "done, close");
352       STATSD_INCREMENT("resp.conn.close");
353       close_connection(mainloop, request);
354     }
355     break;
356   case expect_continue:
357     DBG_REQ(request, "expect continue");
358     ev_io_stop(mainloop, &request->ev_watcher);
359 
360     request->state.expect_continue = false;
361     start_reading(mainloop, request);
362     break;
363   case aborted:
364     /* Response was aborted due to an error. We can't do anything graceful here
365      * because at least one chunk is already sent... just close the connection. */
366     close_connection(mainloop, request);
367     STATSD_INCREMENT("resp.aborted");
368     break;
369   }
370 
371   GIL_UNLOCK(0);
372 }
373 
374 static write_state
on_write_sendfile(struct ev_loop * mainloop,Request * request)375 on_write_sendfile(struct ev_loop* mainloop, Request* request)
376 {
377   /* A sendfile response is split into two phases:
378    * Phase A) sending HTTP headers
379    * Phase B) sending the actual file contents
380    */
381   if(request->current_chunk) {
382     /* Phase A) -- current_chunk contains the HTTP headers */
383     do_send_chunk(request);
384     // Either we have headers left to send, or current_chunk has been set to
385     // NULL and we'll fall into Phase B) on the next invocation.
386     return not_yet_done;
387   } else {
388     /* Phase B) */
389     if (do_sendfile(request)) {
390       // Haven't reached the end of file yet
391       return not_yet_done;
392     } else {
393       // Done with the file
394       return done;
395     }
396   }
397 }
398 
399 
400 static write_state
on_write_chunk(struct ev_loop * mainloop,Request * request)401 on_write_chunk(struct ev_loop* mainloop, Request* request)
402 {
403   if (do_send_chunk(request))
404     // data left to send in the current chunk
405     return not_yet_done;
406 
407   if(request->iterator) {
408     /* Reached the end of a chunk in the response iterator. Get next chunk. */
409     PyObject* next_chunk = wsgi_iterable_get_next_chunk(request);
410     if(next_chunk) {
411       /* We found another chunk to send. */
412       if(request->state.chunked_response) {
413         request->current_chunk = wrap_http_chunk_cruft_around(next_chunk);
414         Py_DECREF(next_chunk);
415       } else {
416         request->current_chunk = next_chunk;
417       }
418       assert(request->current_chunk_p == 0);
419       return not_yet_done;
420 
421     } else {
422       if(PyErr_Occurred()) {
423         /* Trying to get the next chunk raised an exception. */
424         PyErr_Print();
425         DBG_REQ(request, "Exception in iterator, can not recover");
426         return aborted;
427       } else {
428         /* This was the last chunk; cleanup. */
429         Py_CLEAR(request->iterator);
430         goto send_terminator_chunk;
431       }
432     }
433   } else {
434     /* We have no iterator to get more chunks from, so we're done.
435      * Reasons we might end up in this place:
436      * A) A parse or server error occurred
437      * C) We just finished a chunked response with the call to 'do_send_chunk'
438      *    above and now maybe have to send the terminating empty chunk.
439      * B) We used chunked responses earlier in the response and
440      *    are now sending the terminating empty chunk.
441      */
442     goto send_terminator_chunk;
443   }
444 
445   assert(0); // unreachable
446 
447 send_terminator_chunk:
448   if(request->state.chunked_response) {
449     /* We have to send a terminating empty chunk + \r\n */
450     request->current_chunk = _PEP3333_Bytes_FromString("0\r\n\r\n");
451     assert(request->current_chunk_p == 0);
452     // Next time we get here, don't send the terminating empty chunk again.
453     // XXX This is kind of a hack and should be refactored for easier understanding.
454     request->state.chunked_response = false;
455     return not_yet_done;
456   } else {
457     return done;
458   }
459 }
460 
461 /* Return true if there's data left to send, false if we reached the end of the chunk. */
462 static bool
do_send_chunk(Request * request)463 do_send_chunk(Request* request)
464 {
465   Py_ssize_t bytes_sent;
466 
467   assert(request->current_chunk != NULL);
468   assert(!(request->current_chunk_p == _PEP3333_Bytes_GET_SIZE(request->current_chunk)
469            && _PEP3333_Bytes_GET_SIZE(request->current_chunk) != 0));
470 
471   bytes_sent = write(
472     request->client_fd,
473     _PEP3333_Bytes_AS_DATA(request->current_chunk) + request->current_chunk_p,
474     _PEP3333_Bytes_GET_SIZE(request->current_chunk) - request->current_chunk_p
475   );
476 
477   if(bytes_sent == -1)
478     return handle_nonzero_errno(request);
479 
480   request->current_chunk_p += bytes_sent;
481   if(request->current_chunk_p == _PEP3333_Bytes_GET_SIZE(request->current_chunk)) {
482     Py_CLEAR(request->current_chunk);
483     request->current_chunk_p = 0;
484     return false;
485   }
486   return true;
487 }
488 
489 /* Return true if there's data left to send, false if we reached the end of the file. */
490 static bool
do_sendfile(Request * request)491 do_sendfile(Request* request)
492 {
493   Py_ssize_t bytes_sent = portable_sendfile(
494       request->client_fd,
495       FileWrapper_GetFd(request->iterable),
496       request->current_chunk_p
497   );
498   switch(bytes_sent) {
499   case -1:
500     if (handle_nonzero_errno(request)) {
501       return true;
502     } else {
503       FileWrapper_Done(request->iterable);
504       return false;
505     }
506   case 0:
507     FileWrapper_Done(request->iterable);
508     return false;
509   default:
510     request->current_chunk_p += bytes_sent;
511     return true;
512   }
513 }
514 
515 static bool
handle_nonzero_errno(Request * request)516 handle_nonzero_errno(Request* request)
517 {
518   if(errno == EAGAIN || errno == EWOULDBLOCK) {
519     /* Try again later */
520     return true;
521   } else {
522     /* Serious transmission failure. Hang up. */
523     fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
524     Py_XDECREF(request->current_chunk);
525     Py_XCLEAR(request->iterator);
526     request->state.keep_alive = false;
527     return false;
528   }
529 }
530 
531 static void
close_connection(struct ev_loop * mainloop,Request * request)532 close_connection(struct ev_loop *mainloop, Request* request)
533 {
534   DBG_REQ(request, "Closing socket");
535   ev_io_stop(mainloop, &request->ev_watcher);
536   close(request->client_fd);
537   Request_free(request);
538 }
539