1 #include "EVAPI.h"
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <sys/socket.h>
6 #include <errno.h>
7 #include <ctype.h>
8 #include <netinet/ip.h>
9 #include <netinet/tcp.h>
10 #include <sys/uio.h>
11 
12 #include "ppport.h"
13 
14 
15 ///////////////////////////////////////////////////////////////
16 // "Compile Time Options" - See Feersum.pm POD for information
17 
18 #define MAX_HEADERS 64
19 #define MAX_HEADER_NAME_LEN 128
20 #define MAX_BODY_LEN 2147483647
21 
22 #define READ_BUFSZ 4096
23 #define READ_INIT_FACTOR 2
24 #define READ_GROW_FACTOR 8
25 
26 #define AUTOCORK_WRITES 1
27 
28 #if 0
29 # define FLASH_SOCKET_POLICY_SUPPORT
30 #endif
31 
32 #ifndef FLASH_SOCKET_POLICY
33 # define FLASH_SOCKET_POLICY "<?xml version=\"1.0\"?>\n<!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">\n<cross-domain-policy>\n<site-control permitted-cross-domain-policies=\"master-only\"/>\n<allow-access-from domain=\"*\" to-ports=\"*\" secure=\"false\"/>\n</cross-domain-policy>\n"
34 #endif
35 
36 // may be lower for your platform (e.g. Solaris is 16).  See POD.
37 #define FEERSUM_IOMATRIX_SIZE 64
38 
39 // auto-detected in Makefile.PL by perl versions and ithread usage; override
40 // that here. See POD for details.
41 #if 0
42 # undef FEERSUM_STEAL
43 #endif
44 
45 ///////////////////////////////////////////////////////////////
46 
47 
48 #ifdef __GNUC__
49 # define likely(x)   __builtin_expect(!!(x), 1)
50 # define unlikely(x) __builtin_expect(!!(x), 0)
51 #else
52 # define likely(x)   (x)
53 # define unlikely(x) (x)
54 #endif
55 
56 #ifndef CRLF
57 #define CRLF "\015\012"
58 #endif
59 #define CRLFx2 CRLF CRLF
60 
61 // make darwin, solaris and bsd happy:
62 #ifndef SOL_TCP
63  #define SOL_TCP IPPROTO_TCP
64 #endif
65 
66 // Wish-list: %z formats for perl sprintf.  Would make compiling a lot less
67 // noisy for systems that warn size_t and STRLEN are incompatible with
68 // %d/%u/%x.
69 #if Size_t_size == LONGSIZE
70 # define Sz_f "l"
71 # define Sz_t long
72 #elif Size_t_size == 8 && defined HAS_QUAD && QUADKIND == QUAD_IS_LONG_LONG
73 # define Sz_f "ll"
74 # define Sz_t long long
75 #else
76 // hope "int" works.
77 # define Sz_f ""
78 # define Sz_t int
79 #endif
80 
81 #define Sz_uf Sz_f"u"
82 #define Sz_xf Sz_f"x"
83 #define Ssz_df Sz_f"d"
84 #define Sz unsigned Sz_t
85 #define Ssz Sz_t
86 
87 #define WARN_PREFIX "Feersum: "
88 
89 #ifndef DEBUG
90  #ifndef __inline
91   #define __inline
92  #endif
93  #define INLINE_UNLESS_DEBUG __inline
94 #else
95  #define INLINE_UNLESS_DEBUG
96 #endif
97 
98 #define trouble(f_, ...) warn(WARN_PREFIX f_, ##__VA_ARGS__);
99 
100 #ifdef DEBUG
101 #define trace(f_, ...) warn("%s:%-4d [%d] " f_, __FILE__, __LINE__, (int)getpid(), ##__VA_ARGS__)
102 #else
103 #define trace(...)
104 #endif
105 
106 #if DEBUG >= 2
107 #define trace2(f_, ...) trace(f_, ##__VA_ARGS__)
108 #else
109 #define trace2(...)
110 #endif
111 
112 #if DEBUG >= 3
113 #define trace3(f_, ...) trace(f_, ##__VA_ARGS__)
114 #else
115 #define trace3(...)
116 #endif
117 
118 #include "picohttpparser-git/picohttpparser.c"
119 #include "rinq.c"
120 
121 // Check FEERSUM_IOMATRIX_SIZE against what's actually usable on this
122 // platform. See Feersum.pm for an explanation
123 #if defined(IOV_MAX) && FEERSUM_IOMATRIX_SIZE > IOV_MAX
124 # undef FEERSUM_IOMATRIX_SIZE
125 # define FEERSUM_IOMATRIX_SIZE IOV_MAX
126 #elif defined(UIO_MAXIOV) && FEERSUM_IOMATRIX_SIZE > UIO_MAXIOV
127 # undef FEERSUM_IOMATRIX_SIZE
128 # define FEERSUM_IOMATRIX_SIZE UIO_MAXIOV
129 #endif
130 
131 struct iomatrix {
132     unsigned offset;
133     unsigned count;
134     struct iovec iov[FEERSUM_IOMATRIX_SIZE];
135     SV *sv[FEERSUM_IOMATRIX_SIZE];
136 };
137 
138 struct feer_req {
139     SV *buf;
140     const char* method;
141     size_t method_len;
142     const char* path;
143     size_t path_len;
144     int minor_version;
145     size_t num_headers;
146     struct phr_header headers[MAX_HEADERS];
147 };
148 
149 enum feer_respond_state {
150     RESPOND_NOT_STARTED = 0,
151     RESPOND_NORMAL = 1,
152     RESPOND_STREAMING = 2,
153     RESPOND_SHUTDOWN = 3
154 };
155 #define RESPOND_STR(_n,_s) do { \
156     switch(_n) { \
157     case RESPOND_NOT_STARTED: _s = "NOT_STARTED(0)"; break; \
158     case RESPOND_NORMAL:      _s = "NORMAL(1)"; break; \
159     case RESPOND_STREAMING:   _s = "STREAMING(2)"; break; \
160     case RESPOND_SHUTDOWN:    _s = "SHUTDOWN(4)"; break; \
161     } \
162 } while (0)
163 
164 enum feer_receive_state {
165     RECEIVE_HEADERS = 0,
166     RECEIVE_BODY = 1,
167     RECEIVE_STREAMING = 2,
168     RECEIVE_SHUTDOWN = 3
169 };
170 #define RECEIVE_STR(_n,_s) do { \
171     switch(_n) { \
172     case RECEIVE_HEADERS:   _s = "HEADERS(0)"; break; \
173     case RECEIVE_BODY:      _s = "BODY(1)"; break; \
174     case RECEIVE_STREAMING: _s = "STREAMING(2)"; break; \
175     case RECEIVE_SHUTDOWN:  _s = "SHUTDOWN(3)"; break; \
176     } \
177 } while (0)
178 
179 struct feer_conn {
180     SV *self;
181     int fd;
182     struct sockaddr *sa;
183 
184     struct ev_io read_ev_io;
185     struct ev_io write_ev_io;
186     struct ev_timer read_ev_timer;
187 
188     SV *rbuf;
189     struct rinq *wbuf_rinq;
190 
191     SV *poll_write_cb;
192     SV *ext_guard;
193 
194     struct feer_req *req;
195     ssize_t expected_cl;
196     ssize_t received_cl;
197 
198     enum feer_respond_state responding;
199     enum feer_receive_state receiving;
200 
201     int in_callback;
202     int is_http11:1;
203     int poll_write_cb_is_io_handle:1;
204     int auto_cl:1;
205 };
206 
207 typedef struct feer_conn feer_conn_handle; // for typemap
208 
209 #define dCONN struct feer_conn *c = (struct feer_conn *)w->data
210 #define IsArrayRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVAV)
211 #define IsCodeRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVCV)
212 
213 static HV* feersum_env(pTHX_ struct feer_conn *c);
214 static void feersum_start_response
215     (pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming);
216 static size_t feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
217 static void feersum_handle_psgi_response(
218     pTHX_ struct feer_conn *c, SV *ret, bool can_recurse);
219 static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);
220 static SV* feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard);
221 
222 static void start_read_watcher(struct feer_conn *c);
223 static void stop_read_watcher(struct feer_conn *c);
224 static void restart_read_timer(struct feer_conn *c);
225 static void stop_read_timer(struct feer_conn *c);
226 static void start_write_watcher(struct feer_conn *c);
227 static void stop_write_watcher(struct feer_conn *c);
228 
229 static void try_conn_write(EV_P_ struct ev_io *w, int revents);
230 static void try_conn_read(EV_P_ struct ev_io *w, int revents);
231 static void conn_read_timeout(EV_P_ struct ev_timer *w, int revents);
232 static bool process_request_headers(struct feer_conn *c, int body_offset);
233 static void sched_request_callback(struct feer_conn *c);
234 static void call_died (pTHX_ struct feer_conn *c, const char *cb_type);
235 static void call_request_callback(struct feer_conn *c);
236 static void call_poll_callback (struct feer_conn *c, bool is_write);
237 static void pump_io_handle (struct feer_conn *c, SV *io);
238 
239 static void conn_write_ready (struct feer_conn *c);
240 static void respond_with_server_error(struct feer_conn *c, const char *msg, STRLEN msg_len, int code);
241 
242 static void update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov);
243 static STRLEN add_sv_to_wbuf (struct feer_conn *c, SV *sv);
244 static STRLEN add_const_to_wbuf (struct feer_conn *c, const char *str, size_t str_len);
245 #define add_crlf_to_wbuf(c) add_const_to_wbuf(c,CRLF,2)
246 static void finish_wbuf (struct feer_conn *c);
247 static void add_chunk_sv_to_wbuf (struct feer_conn *c, SV *sv);
248 static void add_placeholder_to_wbuf (struct feer_conn *c, SV **sv, struct iovec **iov_ref);
249 
250 static void uri_decode_sv (SV *sv);
251 static bool str_eq(const char *a, int a_len, const char *b, int b_len);
252 static bool str_case_eq(const char *a, int a_len, const char *b, int b_len);
253 static SV* fetch_av_normal (pTHX_ AV *av, I32 i);
254 
255 static const char *http_code_to_msg (int code);
256 static int prep_socket (int fd, int is_tcp);
257 
258 static HV *feer_stash, *feer_conn_stash;
259 static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
260 static MGVTBL psgix_io_vtbl;
261 
262 static SV *request_cb_cv = NULL;
263 static bool request_cb_is_psgi = 0;
264 static SV *shutdown_cb_cv = NULL;
265 static bool shutting_down = 0;
266 static int active_conns = 0;
267 static double read_timeout = 5.0;
268 
269 static SV *feer_server_name = NULL;
270 static SV *feer_server_port = NULL;
271 
272 static ev_io accept_w;
273 static ev_prepare ep;
274 static ev_check   ec;
275 struct ev_idle    ei;
276 
277 static struct rinq *request_ready_rinq = NULL;
278 
279 static AV *psgi_ver;
280 static SV *psgi_serv10, *psgi_serv11, *crlf_sv;
281 
282 // TODO: make this thread-local if and when there are multiple C threads:
283 struct ev_loop *feersum_ev_loop = NULL;
284 static HV *feersum_tmpl_env = NULL;
285 
286 INLINE_UNLESS_DEBUG
287 static SV*
fetch_av_normal(pTHX_ AV * av,I32 i)288 fetch_av_normal (pTHX_ AV *av, I32 i)
289 {
290     SV **elt = av_fetch(av, i, 0);
291     if (elt == NULL) return NULL;
292     SV *sv = *elt;
293     // copy to remove magic
294     if (unlikely(SvMAGICAL(sv))) sv = sv_2mortal(newSVsv(sv));
295     if (unlikely(!SvOK(sv))) return NULL;
296     // usually array ref elems aren't RVs (for PSGI anyway)
297     if (unlikely(SvROK(sv))) sv = SvRV(sv);
298     return sv;
299 }
300 
301 INLINE_UNLESS_DEBUG
302 static struct iomatrix *
next_iomatrix(struct feer_conn * c)303 next_iomatrix (struct feer_conn *c)
304 {
305     bool add_iomatrix = 0;
306     struct iomatrix *m;
307 
308     if (!c->wbuf_rinq) {
309         trace3("next_iomatrix(%d): head\n", c->fd);
310         add_iomatrix = 1;
311     }
312     else {
313         // get the tail-end struct
314         m = (struct iomatrix *)c->wbuf_rinq->prev->ref;
315         trace3("next_iomatrix(%d): tail, count=%d, offset=%d\n",
316             c->fd, m->count, m->offset);
317         if (m->count >= FEERSUM_IOMATRIX_SIZE) {
318             add_iomatrix = 1;
319         }
320     }
321 
322     if (add_iomatrix) {
323         trace3("next_iomatrix(%d): malloc\n", c->fd);
324         Newx(m,1,struct iomatrix);
325         Poison(m,1,struct iomatrix);
326         m->offset = m->count = 0;
327         rinq_push(&c->wbuf_rinq, m);
328     }
329 
330     trace3("next_iomatrix(%d): end, count=%d, offset=%d\n",
331         c->fd, m->count, m->offset);
332     return m;
333 }
334 
335 INLINE_UNLESS_DEBUG
336 static STRLEN
add_sv_to_wbuf(struct feer_conn * c,SV * sv)337 add_sv_to_wbuf(struct feer_conn *c, SV *sv)
338 {
339     struct iomatrix *m = next_iomatrix(c);
340     int idx = m->count++;
341     STRLEN cur;
342     if (unlikely(SvMAGICAL(sv))) {
343         sv = newSVsv(sv); // copy to force it to be normal.
344     }
345     else if (unlikely(SvPADTMP(sv))) {
346         // PADTMPs have their PVs re-used, so we can't simply keep a
347         // reference.  TEMPs maybe behave in a similar way and are potentially
348         // stealable.  If not stealing, we must make a copy.
349 #ifdef FEERSUM_STEAL
350         if (SvFLAGS(sv) == (SVs_PADTMP|SVf_POK|SVp_POK)) {
351             trace3("STEALING\n");
352             SV *theif = newSV(0);
353             sv_upgrade(theif, SVt_PV);
354 
355             SvPV_set(theif, SvPVX(sv));
356             SvLEN_set(theif, SvLEN(sv));
357             SvCUR_set(theif, SvCUR(sv));
358 
359             // make the temp null
360             (void)SvOK_off(sv);
361             SvPV_set(sv, NULL);
362             SvLEN_set(sv, 0);
363             SvCUR_set(sv, 0);
364 
365             SvFLAGS(theif) |= SVf_READONLY|SVf_POK|SVp_POK;
366 
367             sv = theif;
368         }
369         else {
370             sv = newSVsv(sv);
371         }
372 #else
373         sv = newSVsv(sv);
374 #endif
375     }
376     else {
377         sv = SvREFCNT_inc(sv);
378     }
379 
380     m->iov[idx].iov_base = SvPV(sv, cur);
381     m->iov[idx].iov_len = cur;
382     m->sv[idx] = sv;
383 
384     return cur;
385 }
386 
387 INLINE_UNLESS_DEBUG
388 static STRLEN
add_const_to_wbuf(struct feer_conn * c,const char * str,size_t str_len)389 add_const_to_wbuf(struct feer_conn *c, const char *str, size_t str_len)
390 {
391     struct iomatrix *m = next_iomatrix(c);
392     int idx = m->count++;
393     m->iov[idx].iov_base = (void*)str;
394     m->iov[idx].iov_len = str_len;
395     m->sv[idx] = NULL;
396     return str_len;
397 }
398 
399 INLINE_UNLESS_DEBUG
400 static void
add_placeholder_to_wbuf(struct feer_conn * c,SV ** sv,struct iovec ** iov_ref)401 add_placeholder_to_wbuf(struct feer_conn *c, SV **sv, struct iovec **iov_ref)
402 {
403     struct iomatrix *m = next_iomatrix(c);
404     int idx = m->count++;
405     *sv = newSV(31);
406     SvPOK_on(*sv);
407     m->sv[idx] = *sv;
408     *iov_ref = &m->iov[idx];
409 }
410 
411 INLINE_UNLESS_DEBUG
412 static void
finish_wbuf(struct feer_conn * c)413 finish_wbuf(struct feer_conn *c)
414 {
415     if (!c->is_http11) return; // nothing required
416     add_const_to_wbuf(c, "0\r\n\r\n", 5); // terminating chunk
417 }
418 
419 INLINE_UNLESS_DEBUG
420 static void
update_wbuf_placeholder(struct feer_conn * c,SV * sv,struct iovec * iov)421 update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov)
422 {
423     STRLEN cur;
424     // can't pass iov_len for cur; incompatible pointer type on some systems:
425     iov->iov_base = SvPV(sv,cur);
426     iov->iov_len = cur;
427 }
428 
429 static void
add_chunk_sv_to_wbuf(struct feer_conn * c,SV * sv)430 add_chunk_sv_to_wbuf(struct feer_conn *c, SV *sv)
431 {
432     SV *chunk;
433     struct iovec *chunk_iov;
434     add_placeholder_to_wbuf(c, &chunk, &chunk_iov);
435     STRLEN cur = add_sv_to_wbuf(c, sv);
436     add_crlf_to_wbuf(c);
437     sv_setpvf(chunk, "%"Sz_xf CRLF, (Sz)cur);
438     update_wbuf_placeholder(c, chunk, chunk_iov);
439 }
440 
441 static const char *
http_code_to_msg(int code)442 http_code_to_msg (int code) {
443     // http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
444     switch (code) {
445         case 100: return "Continue";
446         case 101: return "Switching Protocols";
447         case 102: return "Processing"; // RFC 2518
448         case 200: return "OK";
449         case 201: return "Created";
450         case 202: return "Accepted";
451         case 203: return "Non Authoritative Information";
452         case 204: return "No Content";
453         case 205: return "Reset Content";
454         case 206: return "Partial Content";
455         case 207: return "Multi-Status"; // RFC 4918 (WebDav)
456         case 300: return "Multiple Choices";
457         case 301: return "Moved Permanently";
458         case 302: return "Found";
459         case 303: return "See Other";
460         case 304: return "Not Modified";
461         case 305: return "Use Proxy";
462         case 307: return "Temporary Redirect";
463         case 400: return "Bad Request";
464         case 401: return "Unauthorized";
465         case 402: return "Payment Required";
466         case 403: return "Forbidden";
467         case 404: return "Not Found";
468         case 405: return "Method Not Allowed";
469         case 406: return "Not Acceptable";
470         case 407: return "Proxy Authentication Required";
471         case 408: return "Request Timeout";
472         case 409: return "Conflict";
473         case 410: return "Gone";
474         case 411: return "Length Required";
475         case 412: return "Precondition Failed";
476         case 413: return "Request Entity Too Large";
477         case 414: return "Request URI Too Long";
478         case 415: return "Unsupported Media Type";
479         case 416: return "Requested Range Not Satisfiable";
480         case 417: return "Expectation Failed";
481         case 418: return "I'm a teapot";
482         case 421: return "Too Many Connections"; // Microsoft?
483         case 422: return "Unprocessable Entity"; // RFC 4918
484         case 423: return "Locked"; // RFC 4918
485         case 424: return "Failed Dependency"; // RFC 4918
486         case 425: return "Unordered Collection"; // RFC 3648
487         case 426: return "Upgrade Required"; // RFC 2817
488         case 449: return "Retry With"; // Microsoft
489         case 450: return "Blocked by Parental Controls"; // Microsoft
490         case 500: return "Internal Server Error";
491         case 501: return "Not Implemented";
492         case 502: return "Bad Gateway";
493         case 503: return "Service Unavailable";
494         case 504: return "Gateway Timeout";
495         case 505: return "HTTP Version Not Supported";
496         case 506: return "Variant Also Negotiates"; // RFC 2295
497         case 507: return "Insufficient Storage"; // RFC 4918
498         case 509: return "Bandwidth Limit Exceeded"; // Apache mod
499         case 510: return "Not Extended"; // RFC 2774
500         case 530: return "User access denied"; // ??
501         default: break;
502     }
503 
504     // default to the Nxx group names in RFC 2616
505     if (100 <= code && code <= 199) {
506         return "Informational";
507     }
508     else if (200 <= code && code <= 299) {
509         return "Success";
510     }
511     else if (300 <= code && code <= 399) {
512         return "Redirection";
513     }
514     else if (400 <= code && code <= 499) {
515         return "Client Error";
516     }
517     else {
518         return "Error";
519     }
520 }
521 
522 static int
prep_socket(int fd,int is_tcp)523 prep_socket(int fd, int is_tcp)
524 {
525     int flags;
526 
527     // make it non-blocking
528     flags = O_NONBLOCK;
529     if (unlikely(fcntl(fd, F_SETFL, flags) < 0))
530         return -1;
531 
532     if (likely(is_tcp)) {
533         // flush writes immediately
534         flags = 1;
535         if (unlikely(setsockopt(fd, SOL_TCP, TCP_NODELAY, &flags, sizeof(int))))
536             return -1;
537     }
538 
539     // handle URG data inline
540     flags = 1;
541     if (unlikely(setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &flags, sizeof(int))))
542         return -1;
543 
544     // disable lingering
545     struct linger linger = { .l_onoff = 0, .l_linger = 0 };
546     if (unlikely(setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger))))
547         return -1;
548 
549     return 0;
550 }
551 
552 INLINE_UNLESS_DEBUG static void
safe_close_conn(struct feer_conn * c,const char * where)553 safe_close_conn(struct feer_conn *c, const char *where)
554 {
555     if (unlikely(c->fd < 0))
556         return;
557 
558     // make it blocking
559     fcntl(c->fd, F_SETFL, 0);
560 
561     if (unlikely(close(c->fd)))
562         perror(where);
563 
564     c->fd = -1;
565 }
566 
567 static struct feer_conn *
new_feer_conn(EV_P_ int conn_fd,struct sockaddr * sa)568 new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
569 {
570     SV *self = newSV(0);
571     SvUPGRADE(self, SVt_PVMG); // ensures sv_bless doesn't reallocate
572     SvGROW(self, sizeof(struct feer_conn));
573     SvPOK_only(self);
574     SvIOK_on(self);
575     SvIV_set(self,conn_fd);
576 
577     struct feer_conn *c = (struct feer_conn *)SvPVX(self);
578     Zero(c, 1, struct feer_conn);
579 
580     c->self = self;
581     c->fd = conn_fd;
582     c->sa = sa;
583     c->responding = RESPOND_NOT_STARTED;
584     c->receiving = RECEIVE_HEADERS;
585 
586     ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
587     c->read_ev_io.data = (void *)c;
588 
589     ev_init(&c->read_ev_timer, conn_read_timeout);
590     c->read_ev_timer.data = (void *)c;
591 
592     trace3("made conn fd=%d self=%p, c=%p, cur=%"Sz_uf", len=%"Sz_uf"\n",
593         c->fd, self, c, (Sz)SvCUR(self), (Sz)SvLEN(self));
594 
595     SV *rv = newRV_inc(c->self);
596     sv_bless(rv, feer_conn_stash); // so DESTROY can get called on read errors
597     SvREFCNT_dec(rv);
598 
599     SvREADONLY_on(self); // turn off later for blessing
600     active_conns++;
601     return c;
602 }
603 
604 // for use in the typemap:
605 INLINE_UNLESS_DEBUG
606 static struct feer_conn *
sv_2feer_conn(SV * rv)607 sv_2feer_conn (SV *rv)
608 {
609     if (unlikely(!sv_isa(rv,"Feersum::Connection")))
610        croak("object is not of type Feersum::Connection");
611     return (struct feer_conn *)SvPVX(SvRV(rv));
612 }
613 
614 INLINE_UNLESS_DEBUG
615 static SV*
feer_conn_2sv(struct feer_conn * c)616 feer_conn_2sv (struct feer_conn *c)
617 {
618     return newRV_inc(c->self);
619 }
620 
621 static feer_conn_handle *
sv_2feer_conn_handle(SV * rv,bool can_croak)622 sv_2feer_conn_handle (SV *rv, bool can_croak)
623 {
624     trace3("sv 2 conn_handle\n");
625     if (unlikely(!SvROK(rv))) croak("Expected a reference");
626     // do not allow subclassing
627     SV *sv = SvRV(rv);
628     if (likely(
629         sv_isobject(rv) &&
630         (SvSTASH(sv) == feer_conn_writer_stash ||
631          SvSTASH(sv) == feer_conn_reader_stash)
632     )) {
633         UV uv = SvUV(sv);
634         if (uv == 0) {
635             if (can_croak) croak("Operation not allowed: Handle is closed.");
636             return NULL;
637         }
638         return INT2PTR(feer_conn_handle*,uv);
639     }
640 
641     if (can_croak)
642         croak("Expected a Feersum::Connection::Writer or ::Reader object");
643     return NULL;
644 }
645 
646 static SV *
new_feer_conn_handle(pTHX_ struct feer_conn * c,bool is_writer)647 new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
648 {
649     SV *sv;
650     SvREFCNT_inc_void_NN(c->self);
651     sv = newRV_noinc(newSVuv(PTR2UV(c)));
652     sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
653     return sv;
654 }
655 
656 #if DEBUG
657 # define change_responding_state(c, _to) do { \
658     enum feer_respond_state __to = (_to); \
659     enum feer_respond_state __from = c->responding; \
660     const char *_from_str, *_to_str; \
661     if (likely(__from != __to)) { \
662         RESPOND_STR(c->responding, _from_str); \
663         RESPOND_STR(__to, _to_str); \
664         trace2("==> responding state %d: %s to %s\n", \
665             c->fd,_from_str,_to_str); \
666         c->responding = __to; \
667     } \
668 } while (0)
669 # define change_receiving_state(c, _to) do { \
670     enum feer_receive_state __to = (_to); \
671     enum feer_receive_state __from = c->receiving; \
672     const char *_from_str, *_to_str; \
673     if (likely(__from != __to)) { \
674         RECEIVE_STR(c->receiving, _from_str); \
675         RECEIVE_STR(__to, _to_str); \
676         trace2("==> receiving state %d: %s to %s\n", \
677             c->fd,_from_str,_to_str); \
678         c->receiving = __to; \
679     } \
680 } while (0)
681 #else
682 # define change_responding_state(c, _to) c->responding = _to
683 # define change_receiving_state(c, _to) c->receiving = _to
684 #endif
685 
686 INLINE_UNLESS_DEBUG static void
start_read_watcher(struct feer_conn * c)687 start_read_watcher(struct feer_conn *c) {
688     if (unlikely(ev_is_active(&c->read_ev_io)))
689         return;
690     trace("start read watcher %d\n",c->fd);
691     ev_io_start(feersum_ev_loop, &c->read_ev_io);
692     SvREFCNT_inc_void_NN(c->self);
693 }
694 
695 INLINE_UNLESS_DEBUG static void
stop_read_watcher(struct feer_conn * c)696 stop_read_watcher(struct feer_conn *c) {
697     if (unlikely(!ev_is_active(&c->read_ev_io)))
698         return;
699     trace("stop read watcher %d\n",c->fd);
700     ev_io_stop(feersum_ev_loop, &c->read_ev_io);
701     SvREFCNT_dec(c->self);
702 }
703 
704 INLINE_UNLESS_DEBUG static void
restart_read_timer(struct feer_conn * c)705 restart_read_timer(struct feer_conn *c) {
706     if (likely(!ev_is_active(&c->read_ev_timer))) {
707         trace("restart read timer %d\n",c->fd);
708         c->read_ev_timer.repeat = read_timeout;
709         SvREFCNT_inc_void_NN(c->self);
710     }
711     ev_timer_again(feersum_ev_loop, &c->read_ev_timer);
712 }
713 
714 INLINE_UNLESS_DEBUG static void
stop_read_timer(struct feer_conn * c)715 stop_read_timer(struct feer_conn *c) {
716     if (unlikely(!ev_is_active(&c->read_ev_timer)))
717         return;
718     trace("stop read timer %d\n",c->fd);
719     ev_timer_stop(feersum_ev_loop, &c->read_ev_timer);
720     SvREFCNT_dec(c->self);
721 }
722 
723 INLINE_UNLESS_DEBUG static void
start_write_watcher(struct feer_conn * c)724 start_write_watcher(struct feer_conn *c) {
725     if (unlikely(ev_is_active(&c->write_ev_io)))
726         return;
727     trace("start write watcher %d\n",c->fd);
728     ev_io_start(feersum_ev_loop, &c->write_ev_io);
729     SvREFCNT_inc_void_NN(c->self);
730 }
731 
732 INLINE_UNLESS_DEBUG static void
stop_write_watcher(struct feer_conn * c)733 stop_write_watcher(struct feer_conn *c) {
734     if (unlikely(!ev_is_active(&c->write_ev_io)))
735         return;
736     trace("stop write watcher %d\n",c->fd);
737     ev_io_stop(feersum_ev_loop, &c->write_ev_io);
738     SvREFCNT_dec(c->self);
739 }
740 
741 
742 static void
process_request_ready_rinq(void)743 process_request_ready_rinq (void)
744 {
745     while (request_ready_rinq) {
746         struct feer_conn *c =
747             (struct feer_conn *)rinq_shift(&request_ready_rinq);
748         //trace("rinq shifted c=%p, head=%p\n", c, request_ready_rinq);
749 
750         call_request_callback(c);
751 
752         if (likely(c->wbuf_rinq)) {
753             // this was deferred until after the perl callback
754             conn_write_ready(c);
755         }
756         SvREFCNT_dec(c->self); // for the rinq
757     }
758 }
759 
760 static void
prepare_cb(EV_P_ ev_prepare * w,int revents)761 prepare_cb (EV_P_ ev_prepare *w, int revents)
762 {
763     if (unlikely(revents & EV_ERROR)) {
764         trouble("EV error in prepare, revents=0x%08x\n", revents);
765         ev_break(EV_A, EVBREAK_ALL);
766         return;
767     }
768 
769     if (!ev_is_active(&accept_w) && !shutting_down) {
770         ev_io_start(EV_A, &accept_w);
771     }
772     ev_prepare_stop(EV_A, w);
773 }
774 
775 static void
check_cb(EV_P_ ev_check * w,int revents)776 check_cb (EV_P_ ev_check *w, int revents)
777 {
778     if (unlikely(revents & EV_ERROR)) {
779         trouble("EV error in check, revents=0x%08x\n", revents);
780         ev_break(EV_A, EVBREAK_ALL);
781         return;
782     }
783     trace3("check! head=%p\n", request_ready_rinq);
784     if (request_ready_rinq)
785         process_request_ready_rinq();
786 }
787 
788 static void
idle_cb(EV_P_ ev_idle * w,int revents)789 idle_cb (EV_P_ ev_idle *w, int revents)
790 {
791     if (unlikely(revents & EV_ERROR)) {
792         trouble("EV error in idle, revents=0x%08x\n", revents);
793         ev_break(EV_A, EVBREAK_ALL);
794         return;
795     }
796     trace3("idle! head=%p\n", request_ready_rinq);
797     if (request_ready_rinq)
798         process_request_ready_rinq();
799     ev_idle_stop(EV_A, w);
800 }
801 
802 static void
try_conn_write(EV_P_ struct ev_io * w,int revents)803 try_conn_write(EV_P_ struct ev_io *w, int revents)
804 {
805     dCONN;
806     int i;
807     struct iomatrix *m;
808 
809     SvREFCNT_inc_void_NN(c->self);
810 
811     // if it's marked writeable EV suggests we simply try write to it.
812     // Otherwise it is stopped and we should ditch this connection.
813     if (unlikely(revents & EV_ERROR && !(revents & EV_WRITE))) {
814         trace("EV error on write, fd=%d revents=0x%08x\n", w->fd, revents);
815         change_responding_state(c, RESPOND_SHUTDOWN);
816         goto try_write_finished;
817     }
818 
819     if (unlikely(!c->wbuf_rinq)) {
820         if (unlikely(c->responding >= RESPOND_SHUTDOWN))
821             goto try_write_finished;
822 
823         if (!c->poll_write_cb) {
824             // no callback and no data: wait for app to push to us.
825             if (c->responding == RESPOND_STREAMING)
826                 goto try_write_paused;
827 
828             trace("tried to write with an empty buffer %d resp=%d\n",w->fd,c->responding);
829             change_responding_state(c, RESPOND_SHUTDOWN);
830             goto try_write_finished;
831         }
832 
833         if (c->poll_write_cb_is_io_handle)
834             pump_io_handle(c, c->poll_write_cb);
835         else
836             call_poll_callback(c, 1);
837 
838         // callback didn't write anything:
839         if (unlikely(!c->wbuf_rinq)) goto try_write_again;
840     }
841 
842 try_write_again_immediately:
843     m = (struct iomatrix *)c->wbuf_rinq->ref;
844 #if DEBUG >= 2
845     warn("going to write to %d:\n",c->fd);
846     for (i=0; i < m->count; i++) {
847         fprintf(stderr,"%.*s",
848             (int)m->iov[i].iov_len, (char*)m->iov[i].iov_base);
849     }
850 #endif
851 
852     trace("going to write %d off=%d count=%d\n", w->fd, m->offset, m->count);
853     errno = 0;
854     ssize_t wrote = writev(w->fd, &m->iov[m->offset], m->count - m->offset);
855     trace("wrote %"Ssz_df" bytes to %d, errno=%d\n", (Ssz)wrote, w->fd, errno);
856 
857     if (unlikely(wrote <= 0)) {
858         if (unlikely(wrote == 0))
859             goto try_write_again;
860         if (likely(errno == EAGAIN || errno == EINTR))
861             goto try_write_again;
862         perror("Feersum try_conn_write");
863         change_responding_state(c, RESPOND_SHUTDOWN);
864         goto try_write_finished;
865     }
866 
867     for (i = m->offset; i < m->count && wrote > 0; i++) {
868         struct iovec *v = &m->iov[i];
869         if (unlikely(v->iov_len > wrote)) {
870             trace3("offset vector %d  base=%p len=%"Sz_uf"\n",
871                 w->fd, v->iov_base, (Sz)v->iov_len);
872             v->iov_base += wrote;
873             v->iov_len  -= wrote;
874             // don't consume any more:
875             wrote = 0;
876         }
877         else {
878             trace3("consume vector %d base=%p len=%"Sz_uf" sv=%p\n",
879                 w->fd, v->iov_base, (Sz)v->iov_len, m->sv[i]);
880             wrote -= v->iov_len;
881             m->offset++;
882             if (m->sv[i]) {
883                 SvREFCNT_dec(m->sv[i]);
884                 m->sv[i] = NULL;
885             }
886         }
887     }
888 
889     if (likely(m->offset >= m->count)) {
890         trace2("all done with iomatrix %d state=%d\n",w->fd,c->responding);
891         rinq_shift(&c->wbuf_rinq);
892         Safefree(m);
893         if (!c->wbuf_rinq)
894             goto try_write_finished;
895         trace2("write again immediately %d state=%d\n",w->fd,c->responding);
896         goto try_write_again_immediately;
897     }
898     // else, fallthrough:
899     trace2("write fallthrough %d state=%d\n",w->fd,c->responding);
900 
901 try_write_again:
902     trace("write again %d state=%d\n",w->fd,c->responding);
903     start_write_watcher(c);
904     goto try_write_cleanup;
905 
906 try_write_finished:
907     // should always be responding, but just in case
908     switch(c->responding) {
909     case RESPOND_NOT_STARTED:
910         // the write watcher shouldn't ever get called before starting to
911         // respond. Shut it down if it does.
912         trace("unexpected try_write when response not started %d\n",c->fd);
913         goto try_write_shutdown;
914     case RESPOND_NORMAL:
915         goto try_write_shutdown;
916     case RESPOND_STREAMING:
917         if (c->poll_write_cb) goto try_write_again;
918         else goto try_write_paused;
919     case RESPOND_SHUTDOWN:
920         goto try_write_shutdown;
921     default:
922         goto try_write_cleanup;
923     }
924 
925 try_write_paused:
926     trace3("write PAUSED %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
927     stop_write_watcher(c);
928     goto try_write_cleanup;
929 
930 try_write_shutdown:
931     trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
932     change_responding_state(c, RESPOND_SHUTDOWN);
933     stop_write_watcher(c);
934     safe_close_conn(c, "close at write shutdown");
935 
936 try_write_cleanup:
937     SvREFCNT_dec(c->self);
938     return;
939 }
940 
941 static int
try_parse_http(struct feer_conn * c,size_t last_read)942 try_parse_http(struct feer_conn *c, size_t last_read)
943 {
944     struct feer_req *req = c->req;
945     if (likely(!req)) {
946         Newxz(req,1,struct feer_req);
947         c->req = req;
948     }
949 
950     // GH#12 - incremental parsing sets num_headers to 0 each time; force it
951     // back on every invocation
952     req->num_headers = MAX_HEADERS;
953 
954     return phr_parse_request(SvPVX(c->rbuf), SvCUR(c->rbuf),
955         &req->method, &req->method_len,
956         &req->path, &req->path_len, &req->minor_version,
957         req->headers, &req->num_headers,
958         (SvCUR(c->rbuf)-last_read));
959 }
960 
961 static void
try_conn_read(EV_P_ ev_io * w,int revents)962 try_conn_read(EV_P_ ev_io *w, int revents)
963 {
964     dCONN;
965     SvREFCNT_inc_void_NN(c->self);
966 
967     // if it's marked readable EV suggests we simply try read it. Otherwise it
968     // is stopped and we should ditch this connection.
969     if (unlikely(revents & EV_ERROR && !(revents & EV_READ))) {
970         trace("EV error on read, fd=%d revents=0x%08x\n", w->fd, revents);
971         goto try_read_error;
972     }
973 
974     if (unlikely(c->receiving == RECEIVE_SHUTDOWN))
975         goto dont_read_again;
976 
977     trace("try read %d\n",w->fd);
978 
979     if (likely(!c->rbuf)) { // likely = optimize for small requests
980         trace("init rbuf for %d\n",w->fd);
981         c->rbuf = newSV(READ_INIT_FACTOR*READ_BUFSZ + 1);
982         SvPOK_on(c->rbuf);
983     }
984 
985     ssize_t space_free = SvLEN(c->rbuf) - SvCUR(c->rbuf);
986     if (unlikely(space_free < READ_BUFSZ)) { // unlikely = optimize for small
987         size_t new_len = SvLEN(c->rbuf) + READ_GROW_FACTOR*READ_BUFSZ;
988         trace("moar memory %d: %"Sz_uf" to %"Sz_uf"\n",
989             w->fd, (Sz)SvLEN(c->rbuf), (Sz)new_len);
990         SvGROW(c->rbuf, new_len);
991         space_free += READ_GROW_FACTOR*READ_BUFSZ;
992     }
993 
994     char *cur = SvPVX(c->rbuf) + SvCUR(c->rbuf);
995     ssize_t got_n = read(w->fd, cur, space_free);
996 
997     if (unlikely(got_n <= 0)) {
998         if (unlikely(got_n == 0)) {
999             trace("EOF before complete request: %d\n",w->fd,SvCUR(c->rbuf));
1000             goto try_read_error;
1001         }
1002         if (likely(errno == EAGAIN || errno == EINTR))
1003             goto try_read_again;
1004         perror("try_conn_read error");
1005         goto try_read_error;
1006     }
1007 
1008     trace("read %d %"Ssz_df"\n", w->fd, (Ssz)got_n);
1009     SvCUR(c->rbuf) += got_n;
1010     // likely = optimize for small requests
1011     if (likely(c->receiving == RECEIVE_HEADERS)) {
1012 
1013 #ifdef FLASH_SOCKET_POLICY_SUPPORT
1014         if (unlikely(*SvPVX(c->rbuf) == '<')) {
1015             if (likely(SvCUR(c->rbuf) >= 22)) { // length of vvv
1016                 if (str_eq(SvPVX(c->rbuf), 22, "<policy-file-request/>", 22)) {
1017                     add_const_to_wbuf(c, STR_WITH_LEN(FLASH_SOCKET_POLICY));
1018                     conn_write_ready(c);
1019                     stop_read_watcher(c);
1020                     stop_read_timer(c);
1021                     // TODO: keep-alives: be sure to remove the 22 bytes
1022                     // out of the rbuf
1023                     change_receiving_state(c, RECEIVE_SHUTDOWN);
1024                     change_responding_state(c, RESPOND_SHUTDOWN);
1025                     goto dont_read_again;
1026                 }
1027             }
1028             // "if prefixed with"
1029             else if (likely(str_eq(SvPVX(c->rbuf), SvCUR(c->rbuf),
1030                                    "<policy-file-request/>", SvCUR(c->rbuf))))
1031             {
1032                 goto try_read_again;
1033             }
1034         }
1035 #endif
1036 
1037         int ret = try_parse_http(c, (size_t)got_n);
1038         if (ret == -1) goto try_read_bad;
1039         if (ret == -2) goto try_read_again;
1040 
1041         if (process_request_headers(c, ret))
1042             goto try_read_again_reset_timer;
1043         else
1044             goto dont_read_again;
1045     }
1046     else if (likely(c->receiving == RECEIVE_BODY)) {
1047         c->received_cl += got_n;
1048         if (c->received_cl < c->expected_cl)
1049             goto try_read_again_reset_timer;
1050         // body is complete
1051         sched_request_callback(c);
1052         goto dont_read_again;
1053     }
1054     else {
1055         trouble("unknown read state %d %d", w->fd, c->receiving);
1056     }
1057 
1058     // fallthrough:
1059 try_read_error:
1060     trace("READ ERROR %d, refcnt=%d\n", w->fd, SvREFCNT(c->self));
1061     change_receiving_state(c, RECEIVE_SHUTDOWN);
1062     change_responding_state(c, RESPOND_SHUTDOWN);
1063     stop_read_watcher(c);
1064     stop_read_timer(c);
1065     stop_write_watcher(c);
1066     goto try_read_cleanup;
1067 
1068 try_read_bad:
1069     trace("bad request %d\n", w->fd);
1070     respond_with_server_error(c, "Malformed request.\n", 0, 400);
1071     // TODO: when keep-alive, close conn instead of fallthrough here.
1072     // fallthrough:
1073 dont_read_again:
1074     trace("done reading %d\n", w->fd);
1075     change_receiving_state(c, RECEIVE_SHUTDOWN);
1076     stop_read_watcher(c);
1077     stop_read_timer(c);
1078     goto try_read_cleanup;
1079 
1080 try_read_again_reset_timer:
1081     trace("(reset read timer) %d\n", w->fd);
1082     restart_read_timer(c);
1083     // fallthrough:
1084 try_read_again:
1085     trace("read again %d\n", w->fd);
1086     start_read_watcher(c);
1087 
1088 try_read_cleanup:
1089     SvREFCNT_dec(c->self);
1090 }
1091 
1092 static void
conn_read_timeout(EV_P_ ev_timer * w,int revents)1093 conn_read_timeout (EV_P_ ev_timer *w, int revents)
1094 {
1095     dCONN;
1096     SvREFCNT_inc_void_NN(c->self);
1097 
1098     if (unlikely(!(revents & EV_TIMER) || c->receiving == RECEIVE_SHUTDOWN)) {
1099         // if there's no EV_TIMER then EV has stopped it on an error
1100         if (revents & EV_ERROR)
1101             trouble("EV error on read timer, fd=%d revents=0x%08x\n",
1102                 c->fd,revents);
1103         goto read_timeout_cleanup;
1104     }
1105 
1106     trace("read timeout %d\n", c->fd);
1107 
1108     if (likely(c->responding == RESPOND_NOT_STARTED)) {
1109         const char *msg;
1110         if (c->receiving == RECEIVE_HEADERS) {
1111             msg = "Headers took too long.";
1112         }
1113         else {
1114             msg = "Timeout reading body.";
1115         }
1116         respond_with_server_error(c, msg, 0, 408);
1117     }
1118     else {
1119         // XXX as of 0.984 this appears to be dead code
1120         trace("read timeout while writing %d\n",c->fd);
1121         stop_write_watcher(c);
1122         stop_read_watcher(c);
1123         stop_read_timer(c);
1124         safe_close_conn(c, "close at read timeout");
1125         change_responding_state(c, RESPOND_SHUTDOWN);
1126     }
1127 
1128 read_timeout_cleanup:
1129     stop_read_watcher(c);
1130     stop_read_timer(c);
1131     SvREFCNT_dec(c->self);
1132 }
1133 
1134 static void
accept_cb(EV_P_ ev_io * w,int revents)1135 accept_cb (EV_P_ ev_io *w, int revents)
1136 {
1137     struct sockaddr_storage sa_buf;
1138     socklen_t sa_len;
1139 
1140     if (unlikely(shutting_down)) {
1141         // shouldn't get called, but be defensive
1142         ev_io_stop(EV_A, w);
1143         close(w->fd);
1144         return;
1145     }
1146 
1147     if (unlikely(revents & EV_ERROR)) {
1148         trouble("EV error in accept_cb, fd=%d, revents=0x%08x\n",w->fd,revents);
1149         ev_break(EV_A, EVBREAK_ALL);
1150         return;
1151     }
1152 
1153     trace2("accept! revents=0x%08x\n", revents);
1154 
1155     while (1) {
1156         sa_len = sizeof(struct sockaddr_storage);
1157         errno = 0;
1158 
1159         int fd = accept(w->fd, (struct sockaddr *)&sa_buf, &sa_len);
1160         trace("accepted fd=%d, errno=%d\n", fd, errno);
1161         if (fd == -1) break;
1162 
1163         int is_tcp = 1;
1164 #ifdef AF_UNIX
1165         if (unlikely(sa_buf.ss_family == AF_UNIX)) is_tcp = 0;
1166 #endif
1167 
1168         assert(sa_len <= sizeof(struct sockaddr_storage));
1169         if (unlikely(prep_socket(fd, is_tcp))) {
1170             perror("prep_socket");
1171             trouble("prep_socket failed for %d\n", fd);
1172             close(fd);
1173             continue;
1174         }
1175 
1176         struct sockaddr *sa = (struct sockaddr *)malloc(sa_len);
1177         memcpy(sa,&sa_buf,(size_t)sa_len);
1178         struct feer_conn *c = new_feer_conn(EV_A,fd,sa);
1179         start_read_watcher(c);
1180         restart_read_timer(c);
1181         assert(SvREFCNT(c->self) == 3);
1182         SvREFCNT_dec(c->self);
1183     }
1184 }
1185 
1186 static void
sched_request_callback(struct feer_conn * c)1187 sched_request_callback (struct feer_conn *c)
1188 {
1189     trace("sched req callback: %d c=%p, head=%p\n", c->fd, c, request_ready_rinq);
1190     rinq_push(&request_ready_rinq, c);
1191     SvREFCNT_inc_void_NN(c->self); // for the rinq
1192     if (!ev_is_active(&ei)) {
1193         ev_idle_start(feersum_ev_loop, &ei);
1194     }
1195 }
1196 
1197 // the unlikely/likely annotations here are trying to optimize for GET first
1198 // and POST second.  Other entity-body requests are third in line.
1199 static bool
process_request_headers(struct feer_conn * c,int body_offset)1200 process_request_headers (struct feer_conn *c, int body_offset)
1201 {
1202     int err_code;
1203     const char *err;
1204     struct feer_req *req = c->req;
1205 
1206     trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
1207     bool body_is_required;
1208     bool next_req_follows = 0;
1209 
1210     c->is_http11 = (req->minor_version == 1);
1211 
1212     change_receiving_state(c, RECEIVE_BODY);
1213 
1214     if (likely(str_eq("GET", 3, req->method, req->method_len))) {
1215         // Not supposed to have a body.  Additional bytes are either a
1216         // mistake, a websocket negotiation or pipelined requests under
1217         // HTTP/1.1
1218         next_req_follows = 1;
1219     }
1220     else if (likely(str_eq("OPTIONS", 7, req->method, req->method_len))) {
1221         body_is_required = 1;
1222         next_req_follows = 1;
1223     }
1224     else if (likely(str_eq("POST", 4, req->method, req->method_len))) {
1225         body_is_required = 1;
1226     }
1227     else if (str_eq("PUT", 3, req->method, req->method_len)) {
1228         body_is_required = 1;
1229     }
1230     else if (str_eq("HEAD", 4, req->method, req->method_len) ||
1231              str_eq("DELETE", 6, req->method, req->method_len))
1232     {
1233         next_req_follows = 1;
1234     }
1235     else {
1236         err = "Feersum doesn't support that method yet\n";
1237         err_code = 405;
1238         goto got_bad_request;
1239     }
1240 
1241 #if DEBUG >= 2
1242     if (next_req_follows)
1243         trace2("next req follows fd=%d, boff=%d\n",c->fd,body_offset);
1244     if (body_is_required)
1245         trace2("body is required fd=%d, boff=%d\n",c->fd,body_offset);
1246 #endif
1247 
1248     // a body or follow-on data potentially follows the headers. Let feer_req
1249     // retain its pointers into rbuf and make a new scalar for more body data.
1250     STRLEN from_len;
1251     char *from = SvPV(c->rbuf,from_len);
1252     from += body_offset;
1253     int need = from_len - body_offset;
1254     int new_alloc = (need > READ_INIT_FACTOR*READ_BUFSZ)
1255         ? need : READ_INIT_FACTOR*READ_BUFSZ-1;
1256     trace("new rbuf for body %d need=%d alloc=%d\n",c->fd, need, new_alloc);
1257     SV *new_rbuf = newSVpvn(need ? from : "", need);
1258 
1259     req->buf = c->rbuf;
1260     c->rbuf = new_rbuf;
1261     SvCUR_set(req->buf, body_offset);
1262 
1263     if (likely(next_req_follows)) // optimize for GET
1264         goto got_it_all;
1265 
1266     // determine how much we need to read
1267     int i;
1268     UV expected = 0;
1269     for (i=0; i < req->num_headers; i++) {
1270         struct phr_header *hdr = &req->headers[i];
1271         if (!hdr->name) continue;
1272         // XXX: ignore multiple C-L headers?
1273         if (unlikely(
1274              str_case_eq("content-length", 14, hdr->name, hdr->name_len)))
1275         {
1276             int g = grok_number(hdr->value, hdr->value_len, &expected);
1277             if (likely(g == IS_NUMBER_IN_UV)) {
1278                 if (unlikely(expected > MAX_BODY_LEN)) {
1279                     err_code = 413;
1280                     err = "Content length exceeds maximum\n";
1281                     goto got_bad_request;
1282                 }
1283                 else
1284                     goto got_cl;
1285             }
1286             else {
1287                 err_code = 400;
1288                 err = "invalid content-length\n";
1289                 goto got_bad_request;
1290             }
1291         }
1292         // TODO: support "Connection: close" bodies
1293         // TODO: support "Transfer-Encoding: chunked" bodies
1294     }
1295 
1296     if (body_is_required) {
1297         // Go the nginx route...
1298         err_code = 411;
1299         err = "Content-Length required\n";
1300     }
1301     else {
1302         // XXX TODO support requests that don't require a body
1303         err_code = 418;
1304         err = "Feersum doesn't know how to handle optional-body requests yet\n";
1305     }
1306 
1307 got_bad_request:
1308     respond_with_server_error(c, err, 0, err_code);
1309     return 0;
1310 
1311 got_cl:
1312     c->expected_cl = (ssize_t)expected;
1313     c->received_cl = SvCUR(c->rbuf);
1314     trace("expecting body %d size=%"Ssz_df" have=%"Ssz_df"\n",
1315         c->fd, (Ssz)c->expected_cl, (Ssz)c->received_cl);
1316     SvGROW(c->rbuf, c->expected_cl + 1);
1317 
1318     // don't have enough bytes to schedule immediately?
1319     // unlikely = optimize for short requests
1320     if (unlikely(c->expected_cl && c->received_cl < c->expected_cl)) {
1321         // TODO: schedule the callback immediately and support a non-blocking
1322         // ->read method.
1323         // sched_request_callback(c);
1324         // change_receiving_state(c, RECEIVE_STREAM);
1325         return 1;
1326     }
1327     // fallthrough: have enough bytes
1328 got_it_all:
1329     sched_request_callback(c);
1330     return 0;
1331 }
1332 
1333 static void
conn_write_ready(struct feer_conn * c)1334 conn_write_ready (struct feer_conn *c)
1335 {
1336     if (c->in_callback) return; // defer until out of callback
1337 
1338     if (c->write_ev_io.data == NULL) {
1339         ev_io_init(&c->write_ev_io, try_conn_write, c->fd, EV_WRITE);
1340         c->write_ev_io.data = (void *)c;
1341     }
1342 
1343 #if AUTOCORK_WRITES
1344     start_write_watcher(c);
1345 #else
1346     // attempt a non-blocking write immediately if we're not already
1347     // waiting for writability
1348     try_conn_write(feersum_ev_loop, &c->write_ev_io, EV_WRITE);
1349 #endif
1350 }
1351 
1352 static void
respond_with_server_error(struct feer_conn * c,const char * msg,STRLEN msg_len,int err_code)1353 respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len, int err_code)
1354 {
1355     SV *tmp;
1356 
1357     if (unlikely(c->responding != RESPOND_NOT_STARTED)) {
1358         trouble("Tried to send server error but already responding!");
1359         return;
1360     }
1361 
1362     if (!msg_len) msg_len = strlen(msg);
1363     assert(msg_len < INT_MAX);
1364 
1365     tmp = newSVpvf("HTTP/1.%d %d %s" CRLF
1366                    "Content-Type: text/plain" CRLF
1367                    "Connection: close" CRLF
1368                    "Cache-Control: no-cache, no-store" CRLF
1369                    "Content-Length: %"Ssz_df"" CRLFx2
1370                    "%.*s",
1371               c->is_http11 ? 1 : 0,
1372               err_code, http_code_to_msg(err_code),
1373               (Ssz)msg_len,
1374               (int)msg_len, msg);
1375     add_sv_to_wbuf(c, sv_2mortal(tmp));
1376 
1377     stop_read_watcher(c);
1378     stop_read_timer(c);
1379     change_responding_state(c, RESPOND_SHUTDOWN);
1380     change_receiving_state(c, RECEIVE_SHUTDOWN);
1381     conn_write_ready(c);
1382 }
1383 
1384 INLINE_UNLESS_DEBUG bool
str_eq(const char * a,int a_len,const char * b,int b_len)1385 str_eq(const char *a, int a_len, const char *b, int b_len)
1386 {
1387     if (a_len != b_len) return 0;
1388     if (a == b) return 1;
1389     int i;
1390     for (i=0; i<a_len && i<b_len; i++) {
1391         if (a[i] != b[i]) return 0;
1392     }
1393     return 1;
1394 }
1395 
1396 /*
1397  * Compares two strings, assumes that the first string is already lower-cased
1398  */
1399 INLINE_UNLESS_DEBUG bool
str_case_eq(const char * a,int a_len,const char * b,int b_len)1400 str_case_eq(const char *a, int a_len, const char *b, int b_len)
1401 {
1402     if (a_len != b_len) return 0;
1403     if (a == b) return 1;
1404     int i;
1405     for (i=0; i<a_len && i<b_len; i++) {
1406         if (a[i] != tolower(b[i])) return 0;
1407     }
1408     return 1;
1409 }
1410 
1411 INLINE_UNLESS_DEBUG int
hex_decode(const char ch)1412 hex_decode(const char ch)
1413 {
1414     if (likely('0' <= ch && ch <= '9'))
1415         return ch - '0';
1416     else if ('A' <= ch && ch <= 'F')
1417         return ch - 'A' + 10;
1418     else if ('a' <= ch && ch <= 'f')
1419         return ch - 'a' + 10;
1420     return -1;
1421 }
1422 
1423 static void
uri_decode_sv(SV * sv)1424 uri_decode_sv (SV *sv)
1425 {
1426     STRLEN len;
1427     char *ptr, *end, *decoded;
1428 
1429     ptr = SvPV(sv, len);
1430     end = SvEND(sv);
1431 
1432     // quickly scan for % so we can ignore decoding that portion of the string
1433     while (ptr < end) {
1434         if (unlikely(*ptr == '%')) goto needs_decode;
1435         ptr++;
1436     }
1437     return;
1438 
1439 needs_decode:
1440 
1441     // Up until ptr have been "decoded" already by virtue of those chars not
1442     // being encoded.
1443     decoded = ptr;
1444 
1445     for (; ptr < end; ptr++) {
1446         if (unlikely(*ptr == '%') && likely(end - ptr >= 2)) {
1447             int c1 = hex_decode(ptr[1]);
1448             int c2 = hex_decode(ptr[2]);
1449             if (likely(c1 != -1 && c2 != -1)) {
1450                 *decoded++ = (c1 << 4) + c2;
1451                 ptr += 2;
1452                 continue;
1453             }
1454         }
1455         *decoded++ = *ptr;
1456     }
1457 
1458     *decoded = '\0'; // play nice with C
1459 
1460     ptr = SvPV_nolen(sv);
1461     SvCUR_set(sv, decoded-ptr);
1462 }
1463 
1464 static void
feersum_init_tmpl_env(pTHX)1465 feersum_init_tmpl_env(pTHX)
1466 {
1467     HV *e;
1468     e = newHV();
1469 
1470     // constants
1471     hv_stores(e, "psgi.version", newRV((SV*)psgi_ver));
1472     hv_stores(e, "psgi.url_scheme", newSVpvs("http"));
1473     hv_stores(e, "psgi.run_once", &PL_sv_no);
1474     hv_stores(e, "psgi.nonblocking", &PL_sv_yes);
1475     hv_stores(e, "psgi.multithread", &PL_sv_no);
1476     hv_stores(e, "psgi.multiprocess", &PL_sv_no);
1477     hv_stores(e, "psgi.streaming", &PL_sv_yes);
1478     hv_stores(e, "psgi.errors", newRV((SV*)PL_stderrgv));
1479     hv_stores(e, "psgix.input.buffered", &PL_sv_yes);
1480     hv_stores(e, "psgix.output.buffered", &PL_sv_yes);
1481     hv_stores(e, "psgix.body.scalar_refs", &PL_sv_yes);
1482     hv_stores(e, "psgix.output.guard", &PL_sv_yes);
1483     hv_stores(e, "SCRIPT_NAME", newSVpvs(""));
1484 
1485     // placeholders that get defined for every request
1486     hv_stores(e, "SERVER_PROTOCOL", &PL_sv_undef);
1487     hv_stores(e, "SERVER_NAME", &PL_sv_undef);
1488     hv_stores(e, "SERVER_PORT", &PL_sv_undef);
1489     hv_stores(e, "REQUEST_URI", &PL_sv_undef);
1490     hv_stores(e, "REQUEST_METHOD", &PL_sv_undef);
1491     hv_stores(e, "PATH_INFO", &PL_sv_undef);
1492     hv_stores(e, "REMOTE_ADDR", &PL_sv_placeholder);
1493     hv_stores(e, "REMOTE_PORT", &PL_sv_placeholder);
1494 
1495     // defaults that get changed for some requests
1496     hv_stores(e, "psgi.input", &PL_sv_undef);
1497     hv_stores(e, "CONTENT_LENGTH", newSViv(0));
1498     hv_stores(e, "QUERY_STRING", newSVpvs(""));
1499 
1500     // anticipated headers
1501     hv_stores(e, "CONTENT_TYPE", &PL_sv_placeholder);
1502     hv_stores(e, "HTTP_HOST", &PL_sv_placeholder);
1503     hv_stores(e, "HTTP_USER_AGENT", &PL_sv_placeholder);
1504     hv_stores(e, "HTTP_ACCEPT", &PL_sv_placeholder);
1505     hv_stores(e, "HTTP_ACCEPT_LANGUAGE", &PL_sv_placeholder);
1506     hv_stores(e, "HTTP_ACCEPT_CHARSET", &PL_sv_placeholder);
1507     hv_stores(e, "HTTP_KEEP_ALIVE", &PL_sv_placeholder);
1508     hv_stores(e, "HTTP_CONNECTION", &PL_sv_placeholder);
1509     hv_stores(e, "HTTP_REFERER", &PL_sv_placeholder);
1510     hv_stores(e, "HTTP_COOKIE", &PL_sv_placeholder);
1511     hv_stores(e, "HTTP_IF_MODIFIED_SINCE", &PL_sv_placeholder);
1512     hv_stores(e, "HTTP_IF_NONE_MATCH", &PL_sv_placeholder);
1513     hv_stores(e, "HTTP_CACHE_CONTROL", &PL_sv_placeholder);
1514 
1515     hv_stores(e, "psgix.io", &PL_sv_placeholder);
1516 
1517     feersum_tmpl_env = e;
1518 }
1519 
1520 static HV*
feersum_env(pTHX_ struct feer_conn * c)1521 feersum_env(pTHX_ struct feer_conn *c)
1522 {
1523     HV *e;
1524     SV **hsv;
1525     int i,j;
1526     struct feer_req *r = c->req;
1527 
1528     if (unlikely(!feersum_tmpl_env))
1529         feersum_init_tmpl_env(aTHX);
1530     e = newHVhv(feersum_tmpl_env);
1531 
1532     trace("generating header (fd %d) %.*s\n",
1533         c->fd, (int)r->path_len, r->path);
1534 
1535     SV *path = newSVpvn(r->path, r->path_len);
1536     hv_stores(e, "SERVER_NAME", newSVsv(feer_server_name));
1537     hv_stores(e, "SERVER_PORT", newSVsv(feer_server_port));
1538     hv_stores(e, "REQUEST_URI", path);
1539     hv_stores(e, "REQUEST_METHOD", newSVpvn(r->method,r->method_len));
1540     hv_stores(e, "SERVER_PROTOCOL", (r->minor_version == 1) ?
1541         newSVsv(psgi_serv11) : newSVsv(psgi_serv10));
1542 
1543     SV *addr = &PL_sv_undef;
1544     SV *port = &PL_sv_undef;
1545     const char *str_addr;
1546     unsigned short s_port;
1547 
1548     if (c->sa->sa_family == AF_INET) {
1549         struct sockaddr_in *in = (struct sockaddr_in *)c->sa;
1550         addr = newSV(INET_ADDRSTRLEN);
1551         str_addr = inet_ntop(AF_INET,&in->sin_addr,SvPVX(addr),INET_ADDRSTRLEN);
1552         s_port = ntohs(in->sin_port);
1553     }
1554 #ifdef AF_INET6
1555     else if (c->sa->sa_family == AF_INET6) {
1556         struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)c->sa;
1557         addr = newSV(INET6_ADDRSTRLEN);
1558         str_addr = inet_ntop(AF_INET6,&in6->sin6_addr,SvPVX(addr),INET6_ADDRSTRLEN);
1559         s_port = ntohs(in6->sin6_port);
1560     }
1561 #endif
1562 #ifdef AF_UNIX
1563     else if (c->sa->sa_family == AF_UNIX) {
1564         str_addr = "unix";
1565         addr = newSV(sizeof(str_addr));
1566         memcpy(SvPVX(addr), str_addr, sizeof(str_addr));
1567         s_port = 0;
1568     }
1569 #endif
1570 
1571     if (likely(str_addr)) {
1572         SvCUR(addr) = strlen(SvPVX(addr));
1573         SvPOK_on(addr);
1574         port = newSViv(s_port);
1575     }
1576     hv_stores(e, "REMOTE_ADDR", addr);
1577     hv_stores(e, "REMOTE_PORT", port);
1578 
1579     if (unlikely(c->expected_cl > 0)) {
1580         hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
1581         hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c,0));
1582     }
1583     else if (request_cb_is_psgi) {
1584         // TODO: make psgi.input a valid, but always empty stream for PSGI mode?
1585     }
1586 
1587     if (request_cb_is_psgi) {
1588         SV *fake_fh = newSViv(c->fd); // just some random dummy value
1589         SV *selfref = sv_2mortal(feer_conn_2sv(c));
1590         sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
1591         hv_stores(e, "psgix.io", fake_fh);
1592     }
1593 
1594     {
1595         const char *qpos = r->path;
1596         SV *pinfo, *qstr;
1597 
1598         // rather than memchr, for speed:
1599         while (*qpos != '?' && qpos < r->path + r->path_len)
1600             qpos++;
1601 
1602         if (*qpos == '?') {
1603             pinfo = newSVpvn(r->path, (qpos - r->path));
1604             qpos++;
1605             qstr = newSVpvn(qpos, r->path_len - (qpos - r->path));
1606         }
1607         else {
1608             pinfo = newSVsv(path);
1609             qstr = NULL; // use template default
1610         }
1611         uri_decode_sv(pinfo);
1612         hv_stores(e, "PATH_INFO", pinfo);
1613         if (qstr != NULL) // hv template defaults QUERY_STRING to empty
1614             hv_stores(e, "QUERY_STRING", qstr);
1615     }
1616 
1617     SV *val = NULL;
1618     char *kbuf;
1619     size_t kbuflen = 64;
1620     Newx(kbuf, kbuflen, char);
1621     kbuf[0]='H'; kbuf[1]='T'; kbuf[2]='T'; kbuf[3]='P'; kbuf[4]='_';
1622 
1623     for (i=0; i<r->num_headers; i++) {
1624         struct phr_header *hdr = &(r->headers[i]);
1625         if (unlikely(hdr->name == NULL && val != NULL)) {
1626             trace("... multiline %.*s\n", (int)hdr->value_len, hdr->value);
1627             sv_catpvn(val, hdr->value, hdr->value_len);
1628             continue;
1629         }
1630         else if (unlikely(str_case_eq(
1631             STR_WITH_LEN("content-length"), hdr->name, hdr->name_len)))
1632         {
1633             // content length shouldn't show up as HTTP_CONTENT_LENGTH but
1634             // as CONTENT_LENGTH in the env-hash.
1635             continue;
1636         }
1637         else if (unlikely(str_case_eq(
1638             STR_WITH_LEN("content-type"), hdr->name, hdr->name_len)))
1639         {
1640             hv_stores(e, "CONTENT_TYPE",newSVpvn(hdr->value, hdr->value_len));
1641             continue;
1642         }
1643 
1644         size_t klen = 5+hdr->name_len;
1645         if (kbuflen < klen) {
1646             kbuflen = klen;
1647             kbuf = Renew(kbuf, kbuflen, char);
1648         }
1649         char *key = kbuf + 5;
1650         for (j=0; j<hdr->name_len; j++) {
1651             char n = hdr->name[j];
1652             *key++ = (n == '-') ? '_' : toupper(n);
1653         }
1654 
1655         SV **val = hv_fetch(e, kbuf, klen, 1);
1656         trace("adding header to env (fd %d) %.*s: %.*s\n",
1657             c->fd, (int)klen, kbuf, (int)hdr->value_len, hdr->value);
1658 
1659         assert(val != NULL); // "fetch is store" flag should ensure this
1660         if (unlikely(SvPOK(*val))) {
1661             trace("... is multivalue\n");
1662             // extend header with comma
1663             sv_catpvn(*val, ", ", 2);
1664             sv_catpvn(*val, hdr->value, hdr->value_len);
1665         }
1666         else {
1667             // change from undef to a real value
1668             sv_setpvn(*val, hdr->value, hdr->value_len);
1669         }
1670     }
1671     Safefree(kbuf);
1672 
1673     return e;
1674 }
1675 
1676 static void
feersum_start_response(pTHX_ struct feer_conn * c,SV * message,AV * headers,int streaming)1677 feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
1678                         int streaming)
1679 {
1680     const char *ptr;
1681     I32 i;
1682 
1683     trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
1684 
1685     if (unlikely(c->responding != RESPOND_NOT_STARTED))
1686         croak("already responding?!");
1687     change_responding_state(c, streaming ? RESPOND_STREAMING : RESPOND_NORMAL);
1688 
1689     if (unlikely(!SvOK(message) || !(SvIOK(message) || SvPOK(message)))) {
1690         croak("Must define an HTTP status code or message");
1691     }
1692 
1693     I32 avl = av_len(headers);
1694     if (unlikely(avl+1 % 2 == 1)) {
1695         croak("expected even-length array, got %d", avl+1);
1696     }
1697 
1698     // int or 3 chars? use a stock message
1699     UV code = 0;
1700     if (SvIOK(message))
1701         code = SvIV(message);
1702     else if (SvUOK(message))
1703         code = SvUV(message);
1704     else {
1705         const int numtype = grok_number(SvPVX_const(message),3,&code);
1706         if (unlikely(numtype != IS_NUMBER_IN_UV))
1707             code = 0;
1708     }
1709     trace2("starting response fd=%d code=%"UVuf"\n",c->fd,code);
1710 
1711     if (unlikely(!code))
1712         croak("first parameter is not a number or doesn't start with digits");
1713 
1714     // for PSGI it's always just an IV so optimize for that
1715     if (likely(!SvPOK(message) || SvCUR(message) == 3)) {
1716         ptr = http_code_to_msg(code);
1717         message = sv_2mortal(newSVpvf("%"UVuf" %s",code,ptr));
1718     }
1719 
1720     // don't generate or strip Content-Length headers for 304 or 1xx
1721     c->auto_cl = (code == 304 || (100 <= code && code <= 199)) ? 0 : 1;
1722 
1723     add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
1724     add_sv_to_wbuf(c, message);
1725     add_crlf_to_wbuf(c);
1726 
1727     for (i=0; i<avl; i+= 2) {
1728         SV **hdr = av_fetch(headers, i, 0);
1729         if (unlikely(!hdr || !SvOK(*hdr))) {
1730             trace("skipping undef header key");
1731             continue;
1732         }
1733 
1734         SV **val = av_fetch(headers, i+1, 0);
1735         if (unlikely(!val || !SvOK(*val))) {
1736             trace("skipping undef header value");
1737             continue;
1738         }
1739 
1740         STRLEN hlen;
1741         const char *hp = SvPV(*hdr, hlen);
1742         if (likely(c->auto_cl) &&
1743             unlikely(str_case_eq("content-length",14,hp,hlen)))
1744         {
1745             trace("ignoring content-length header in the response\n");
1746             continue;
1747         }
1748 
1749         add_sv_to_wbuf(c, *hdr);
1750         add_const_to_wbuf(c, ": ", 2);
1751         add_sv_to_wbuf(c, *val);
1752         add_crlf_to_wbuf(c);
1753     }
1754 
1755     if (streaming) {
1756         if (c->is_http11)
1757             add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
1758         else
1759             add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
1760     }
1761 
1762     conn_write_ready(c);
1763 }
1764 
1765 static size_t
feersum_write_whole_body(pTHX_ struct feer_conn * c,SV * body)1766 feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
1767 {
1768     size_t RETVAL;
1769     int i;
1770     bool body_is_string = 0;
1771     STRLEN cur;
1772 
1773     if (c->responding != RESPOND_NORMAL)
1774         croak("can't use write_whole_body when in streaming mode");
1775 
1776     if (!SvOK(body)) {
1777         body = sv_2mortal(newSVpvs(""));
1778         body_is_string = 1;
1779     }
1780     else if (SvROK(body)) {
1781         SV *refd = SvRV(body);
1782         if (SvOK(refd) && !SvROK(refd)) {
1783             body = refd;
1784             body_is_string = 1;
1785         }
1786         else if (SvTYPE(refd) != SVt_PVAV) {
1787             croak("body must be a scalar, scalar reference or array reference");
1788         }
1789     }
1790     else {
1791         body_is_string = 1;
1792     }
1793 
1794     SV *cl_sv; // content-length future
1795     struct iovec *cl_iov;
1796     if (likely(c->auto_cl))
1797         add_placeholder_to_wbuf(c, &cl_sv, &cl_iov);
1798     else
1799         add_crlf_to_wbuf(c);
1800 
1801     if (body_is_string) {
1802         cur = add_sv_to_wbuf(c,body);
1803         RETVAL = cur;
1804     }
1805     else {
1806         AV *abody = (AV*)SvRV(body);
1807         I32 amax = av_len(abody);
1808         RETVAL = 0;
1809         for (i=0; i<=amax; i++) {
1810             SV *sv = fetch_av_normal(aTHX_ abody, i);
1811             if (unlikely(!sv)) continue;
1812             cur = add_sv_to_wbuf(c,sv);
1813             trace("body part i=%d sv=%p cur=%"Sz_uf"\n", i, sv, (Sz)cur);
1814             RETVAL += cur;
1815         }
1816     }
1817 
1818     if (likely(c->auto_cl)) {
1819         sv_setpvf(cl_sv, "Content-Length: %"Sz_uf"" CRLFx2, (Sz)RETVAL);
1820         update_wbuf_placeholder(c, cl_sv, cl_iov);
1821     }
1822 
1823     change_responding_state(c, RESPOND_SHUTDOWN);
1824     conn_write_ready(c);
1825     return RETVAL;
1826 }
1827 
1828 static void
feersum_start_psgi_streaming(pTHX_ struct feer_conn * c,SV * streamer)1829 feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
1830 {
1831     dSP;
1832     ENTER;
1833     SAVETMPS;
1834     PUSHMARK(SP);
1835     mXPUSHs(feer_conn_2sv(c));
1836     XPUSHs(streamer);
1837     PUTBACK;
1838     call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
1839     SPAGAIN;
1840     if (unlikely(SvTRUE(ERRSV))) {
1841         call_died(aTHX_ c, "PSGI stream initiator");
1842     }
1843     PUTBACK;
1844     FREETMPS;
1845     LEAVE;
1846 }
1847 
1848 static void
feersum_handle_psgi_response(pTHX_ struct feer_conn * c,SV * ret,bool can_recurse)1849 feersum_handle_psgi_response(
1850     pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
1851 {
1852     if (unlikely(!SvOK(ret) || !SvROK(ret))) {
1853         sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
1854         call_died(aTHX_ c, "PSGI request");
1855         return;
1856     }
1857 
1858     if (SvOK(ret) && unlikely(!IsArrayRef(ret))) {
1859         if (likely(can_recurse)) {
1860             trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
1861             feersum_start_psgi_streaming(aTHX_ c, ret);
1862         }
1863         else {
1864             sv_setpvs(ERRSV, "PSGI attempt to recurse in a streaming callback");
1865             call_died(aTHX_ c, "PSGI request");
1866         }
1867         return;
1868     }
1869 
1870     AV *psgi_triplet = (AV*)SvRV(ret);
1871     if (unlikely(av_len(psgi_triplet)+1 != 3)) {
1872         sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
1873         call_died(aTHX_ c, "PSGI request");
1874         return;
1875     }
1876 
1877     trace("PSGI response triplet, c=%p av=%p\n", c, psgi_triplet);
1878     // we know there's three elems so *should* be safe to de-ref
1879     SV *msg =  *(av_fetch(psgi_triplet,0,0));
1880     SV *hdrs = *(av_fetch(psgi_triplet,1,0));
1881     SV *body = *(av_fetch(psgi_triplet,2,0));
1882 
1883     AV *headers;
1884     if (IsArrayRef(hdrs))
1885         headers = (AV*)SvRV(hdrs);
1886     else {
1887         sv_setpvs(ERRSV, "PSGI Headers must be an array-ref");
1888         call_died(aTHX_ c, "PSGI request");
1889         return;
1890     }
1891 
1892     if (likely(IsArrayRef(body))) {
1893         feersum_start_response(aTHX_ c, msg, headers, 0);
1894         feersum_write_whole_body(aTHX_ c, body);
1895     }
1896     else if (likely(SvROK(body))) { // probaby an IO::Handle-like object
1897         feersum_start_response(aTHX_ c, msg, headers, 1);
1898         c->poll_write_cb = newSVsv(body);
1899         c->poll_write_cb_is_io_handle = 1;
1900         conn_write_ready(c);
1901     }
1902     else {
1903         sv_setpvs(ERRSV, "Expected PSGI array-ref or IO::Handle-like body");
1904         call_died(aTHX_ c, "PSGI request");
1905         return;
1906     }
1907 }
1908 
1909 static int
feersum_close_handle(pTHX_ struct feer_conn * c,bool is_writer)1910 feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
1911 {
1912     int RETVAL;
1913     if (is_writer) {
1914         trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
1915         if (c->poll_write_cb) {
1916             SvREFCNT_dec(c->poll_write_cb);
1917             c->poll_write_cb = NULL;
1918         }
1919         if (c->responding < RESPOND_SHUTDOWN) {
1920             finish_wbuf(c);
1921             conn_write_ready(c);
1922             change_responding_state(c, RESPOND_SHUTDOWN);
1923         }
1924         RETVAL = 1;
1925     }
1926     else {
1927         trace("close reader fd=%d, c=%p\n", c->fd, c);
1928         // TODO: ref-dec poll_read_cb
1929         if (c->rbuf) {
1930             SvREFCNT_dec(c->rbuf);
1931             c->rbuf = NULL;
1932         }
1933         RETVAL = shutdown(c->fd, SHUT_RD);
1934         change_receiving_state(c, RECEIVE_SHUTDOWN);
1935     }
1936 
1937     // disassociate the handle from the conn
1938     SvREFCNT_dec(c->self);
1939     return RETVAL;
1940 }
1941 
1942 static SV*
feersum_conn_guard(pTHX_ struct feer_conn * c,SV * guard)1943 feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard)
1944 {
1945     if (guard) {
1946         if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
1947         c->ext_guard = SvOK(guard) ? newSVsv(guard) : NULL;
1948     }
1949     return c->ext_guard ? newSVsv(c->ext_guard) : &PL_sv_undef;
1950 }
1951 
1952 static void
call_died(pTHX_ struct feer_conn * c,const char * cb_type)1953 call_died (pTHX_ struct feer_conn *c, const char *cb_type)
1954 {
1955     dSP;
1956 #if DEBUG >= 1
1957     trace("An error was thrown in the %s callback: %-p\n",cb_type,ERRSV);
1958 #endif
1959     PUSHMARK(SP);
1960     mXPUSHs(newSVsv(ERRSV));
1961     PUTBACK;
1962     call_pv("Feersum::DIED", G_DISCARD|G_EVAL|G_VOID|G_KEEPERR);
1963     SPAGAIN;
1964 
1965     respond_with_server_error(c,"Request handler exception.\n",0,500);
1966     sv_setsv(ERRSV, &PL_sv_undef);
1967 }
1968 
1969 static void
call_request_callback(struct feer_conn * c)1970 call_request_callback (struct feer_conn *c)
1971 {
1972     dTHX;
1973     dSP;
1974     int flags;
1975     c->in_callback++;
1976     SvREFCNT_inc_void_NN(c->self);
1977 
1978     trace("request callback c=%p\n", c);
1979 
1980     ENTER;
1981     SAVETMPS;
1982     PUSHMARK(SP);
1983 
1984     if (request_cb_is_psgi) {
1985         HV *env = feersum_env(aTHX_ c);
1986         mXPUSHs(newRV_noinc((SV*)env));
1987         flags = G_EVAL|G_SCALAR;
1988     }
1989     else {
1990         mXPUSHs(feer_conn_2sv(c));
1991         flags = G_DISCARD|G_EVAL|G_VOID;
1992     }
1993 
1994     PUTBACK;
1995     int returned = call_sv(request_cb_cv, flags);
1996     SPAGAIN;
1997 
1998     trace("called request callback, errsv? %d\n", SvTRUE(ERRSV) ? 1 : 0);
1999 
2000     if (unlikely(SvTRUE(ERRSV))) {
2001         call_died(aTHX_ c, "request");
2002         returned = 0; // pretend nothing got returned
2003     }
2004 
2005     SV *psgi_response;
2006     if (request_cb_is_psgi && likely(returned >= 1)) {
2007         psgi_response = POPs;
2008         SvREFCNT_inc_void_NN(psgi_response);
2009     }
2010 
2011     trace("leaving request callback\n");
2012     PUTBACK;
2013 
2014     if (request_cb_is_psgi && likely(returned >= 1)) {
2015         feersum_handle_psgi_response(aTHX_ c, psgi_response, 1); // can_recurse
2016         SvREFCNT_dec(psgi_response);
2017     }
2018 
2019     //fangyousong
2020     if (request_cb_is_psgi && c->expected_cl > 0) {
2021         SvREFCNT_dec(c->self);
2022     }
2023 
2024 
2025     c->in_callback--;
2026     SvREFCNT_dec(c->self);
2027 
2028     FREETMPS;
2029     LEAVE;
2030 }
2031 
2032 static void
call_poll_callback(struct feer_conn * c,bool is_write)2033 call_poll_callback (struct feer_conn *c, bool is_write)
2034 {
2035     dTHX;
2036     dSP;
2037 
2038     SV *cb = (is_write) ? c->poll_write_cb : NULL;
2039 
2040     if (unlikely(cb == NULL)) return;
2041 
2042     c->in_callback++;
2043 
2044     trace("%s poll callback c=%p cbrv=%p\n",
2045         is_write ? "write" : "read", c, cb);
2046 
2047     ENTER;
2048     SAVETMPS;
2049     PUSHMARK(SP);
2050     mXPUSHs(new_feer_conn_handle(aTHX_ c, is_write));
2051     PUTBACK;
2052     call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
2053     SPAGAIN;
2054 
2055     trace("called %s poll callback, errsv? %d\n",
2056         is_write ? "write" : "read", SvTRUE(ERRSV) ? 1 : 0);
2057 
2058     if (unlikely(SvTRUE(ERRSV))) {
2059         call_died(aTHX_ c, is_write ? "write poll" : "read poll");
2060     }
2061 
2062     trace("leaving %s poll callback\n", is_write ? "write" : "read");
2063     PUTBACK;
2064     FREETMPS;
2065     LEAVE;
2066 
2067     c->in_callback--;
2068 }
2069 
2070 static void
pump_io_handle(struct feer_conn * c,SV * io)2071 pump_io_handle (struct feer_conn *c, SV *io)
2072 {
2073     dTHX;
2074     dSP;
2075 
2076     if (unlikely(io == NULL)) return;
2077 
2078     c->in_callback++;
2079 
2080     trace("pump io handle %d\n", c->fd);
2081 
2082     ENTER;
2083     SAVETMPS;
2084 
2085     // Emulate `local $/ = \4096;`
2086     SV *old_rs = PL_rs;
2087     PL_rs = sv_2mortal(newRV_noinc(newSViv(4096)));
2088     sv_setsv(get_sv("/", GV_ADD), PL_rs);
2089 
2090     PUSHMARK(SP);
2091     XPUSHs(c->poll_write_cb);
2092     PUTBACK;
2093     int returned = call_method("getline", G_SCALAR|G_EVAL);
2094     SPAGAIN;
2095 
2096     trace("called getline on io handle fd=%d errsv=%d returned=%d\n",
2097         c->fd, SvTRUE(ERRSV) ? 1 : 0, returned);
2098 
2099     if (unlikely(SvTRUE(ERRSV))) {
2100         call_died(aTHX_ c, "getline on io handle");
2101         goto done_pump_io;
2102     }
2103 
2104     SV *ret = NULL;
2105     if (returned > 0)
2106         ret = POPs;
2107     if (ret && SvMAGICAL(ret))
2108         ret = sv_2mortal(newSVsv(ret));
2109 
2110     if (unlikely(!ret || !SvOK(ret))) {
2111         // returned undef, so call the close method out of niceity
2112         PUSHMARK(SP);
2113         XPUSHs(c->poll_write_cb);
2114         PUTBACK;
2115         call_method("close", G_VOID|G_DISCARD|G_EVAL);
2116         SPAGAIN;
2117 
2118         if (unlikely(SvTRUE(ERRSV))) {
2119             trouble("Couldn't close body IO handle: %-p",ERRSV);
2120         }
2121 
2122         SvREFCNT_dec(c->poll_write_cb);
2123         c->poll_write_cb = NULL;
2124         finish_wbuf(c);
2125         change_responding_state(c, RESPOND_SHUTDOWN);
2126 
2127         goto done_pump_io;
2128     }
2129 
2130     if (c->is_http11)
2131         add_chunk_sv_to_wbuf(c, ret);
2132     else
2133         add_sv_to_wbuf(c, ret);
2134 
2135 done_pump_io:
2136     trace("leaving pump io handle %d\n", c->fd);
2137 
2138     PUTBACK;
2139     FREETMPS;
2140     LEAVE;
2141 
2142     PL_rs = old_rs;
2143     sv_setsv(get_sv("/", GV_ADD), old_rs);
2144 
2145     c->in_callback--;
2146 }
2147 
2148 static int
psgix_io_svt_get(pTHX_ SV * sv,MAGIC * mg)2149 psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
2150 {
2151     dSP;
2152 
2153     struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
2154     trace("invoking psgix.io magic for fd=%d\n", c->fd);
2155 
2156     sv_unmagic(sv, PERL_MAGIC_ext);
2157 
2158     ENTER;
2159     SAVETMPS;
2160 
2161     PUSHMARK(SP);
2162     XPUSHs(sv);
2163     mXPUSHs(newSViv(c->fd));
2164     PUTBACK;
2165 
2166     call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
2167     SPAGAIN;
2168 
2169     if (unlikely(SvTRUE(ERRSV))) {
2170         call_died(aTHX_ c, "psgix.io magic");
2171     }
2172     else {
2173         SV *io_glob   = SvRV(sv);
2174         GvSV(io_glob) = newRV_inc(c->self);
2175 
2176         // Put whatever remainder data into the socket buffer.
2177         // Optimizes for the websocket case.
2178         //
2179         // TODO: For keepalive support the opposite operation is required;
2180         // pull the data out of the socket buffer and back into feersum.
2181         if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
2182             STRLEN rbuf_len;
2183             const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
2184             IO *io = GvIOp(io_glob);
2185             assert(io != NULL);
2186             PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
2187             sv_setpvs(c->rbuf, "");
2188         }
2189 
2190         stop_read_watcher(c);
2191         stop_read_timer(c);
2192         // don't stop write watcher in case there's outstanding data.
2193     }
2194 
2195     PUTBACK;
2196     FREETMPS;
2197     LEAVE;
2198     return 0;
2199 }
2200 
2201 MODULE = Feersum		PACKAGE = Feersum
2202 
2203 PROTOTYPES: ENABLE
2204 
2205 void
set_server_name_and_port(SV * self,SV * name,SV * port)2206 set_server_name_and_port(SV *self, SV *name, SV *port)
2207     PPCODE:
2208 {
2209     if (feer_server_name)
2210         SvREFCNT_dec(feer_server_name);
2211     feer_server_name = newSVsv(name);
2212     SvREADONLY_on(feer_server_name);
2213 
2214     if (feer_server_port)
2215         SvREFCNT_dec(feer_server_port);
2216     feer_server_port = newSVsv(port);
2217     SvREADONLY_on(feer_server_port);
2218 }
2219 
2220 void
accept_on_fd(SV * self,int fd)2221 accept_on_fd(SV *self, int fd)
2222     PPCODE:
2223 {
2224     trace("going to accept on %d\n",fd);
2225     feersum_ev_loop = EV_DEFAULT;
2226 
2227     signal(SIGPIPE, SIG_IGN);
2228 
2229     ev_prepare_init(&ep, prepare_cb);
2230     ev_prepare_start(feersum_ev_loop, &ep);
2231 
2232     ev_check_init(&ec, check_cb);
2233     ev_check_start(feersum_ev_loop, &ec);
2234 
2235     ev_idle_init(&ei, idle_cb);
2236 
2237     ev_io_init(&accept_w, accept_cb, fd, EV_READ);
2238 }
2239 
2240 void
unlisten(SV * self)2241 unlisten (SV *self)
2242     PPCODE:
2243 {
2244     trace("stopping accept\n");
2245     ev_prepare_stop(feersum_ev_loop, &ep);
2246     ev_check_stop(feersum_ev_loop, &ec);
2247     ev_idle_stop(feersum_ev_loop, &ei);
2248     ev_io_stop(feersum_ev_loop, &accept_w);
2249 }
2250 
2251 void
request_handler(SV * self,SV * cb)2252 request_handler(SV *self, SV *cb)
2253     PROTOTYPE: $&
2254     ALIAS:
2255         psgi_request_handler = 1
2256     PPCODE:
2257 {
2258     if (unlikely(!SvOK(cb) || !SvROK(cb)))
2259         croak("can't supply an undef handler");
2260     if (request_cb_cv)
2261         SvREFCNT_dec(request_cb_cv);
2262     request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
2263     request_cb_is_psgi = ix;
2264     trace("assigned %s request handler %p\n",
2265         request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
2266 }
2267 
2268 void
graceful_shutdown(SV * self,SV * cb)2269 graceful_shutdown (SV *self, SV *cb)
2270     PROTOTYPE: $&
2271     PPCODE:
2272 {
2273     if (!IsCodeRef(cb))
2274         croak("must supply a code reference");
2275     if (unlikely(shutting_down))
2276         croak("already shutting down");
2277     shutdown_cb_cv = newSVsv(cb);
2278     trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);
2279 
2280     shutting_down = 1;
2281     ev_io_stop(feersum_ev_loop, &accept_w);
2282     close(accept_w.fd);
2283 
2284     if (active_conns <= 0) {
2285         trace("shutdown is immediate\n");
2286         dSP;
2287         ENTER;
2288         SAVETMPS;
2289         PUSHMARK(SP);
2290         call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2291         PUTBACK;
2292         trace3("called shutdown handler\n");
2293         SvREFCNT_dec(shutdown_cb_cv);
2294         shutdown_cb_cv = NULL;
2295         FREETMPS;
2296         LEAVE;
2297     }
2298 }
2299 
2300 double
2301 read_timeout (SV *self, ...)
2302     PROTOTYPE: $;$
2303     CODE:
2304 {
2305     if (items <= 1) {
2306         RETVAL = read_timeout;
2307     }
2308     else if (items == 2) {
2309         SV *duration = ST(1);
2310         NV new_read_timeout = SvNV(duration);
2311         if (!(new_read_timeout > 0.0)) {
2312             croak("must set a positive (non-zero) value for the timeout");
2313         }
2314         read_timeout = (double) new_read_timeout;
2315     }
2316 }
2317     OUTPUT:
2318         RETVAL
2319 
2320 void
DESTROY(SV * self)2321 DESTROY (SV *self)
2322     PPCODE:
2323 {
2324     trace3("DESTROY server\n");
2325     if (request_cb_cv)
2326         SvREFCNT_dec(request_cb_cv);
2327 }
2328 
2329 MODULE = Feersum	PACKAGE = Feersum::Connection::Handle
2330 
2331 PROTOTYPES: ENABLE
2332 
2333 int
2334 fileno (feer_conn_handle *hdl)
2335     CODE:
2336         RETVAL = c->fd;
2337     OUTPUT:
2338         RETVAL
2339 
2340 void
DESTROY(SV * self)2341 DESTROY (SV *self)
2342     ALIAS:
2343         Feersum::Connection::Reader::DESTROY = 1
2344         Feersum::Connection::Writer::DESTROY = 2
2345     PPCODE:
2346 {
2347     feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);
2348 
2349     if (hdl == NULL) {
2350         trace3("DESTROY handle (closed) class=%s\n",
2351             HvNAME(SvSTASH(SvRV(self))));
2352     }
2353     else {
2354         struct feer_conn *c = (struct feer_conn *)hdl;
2355         trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
2356             HvNAME(SvSTASH(SvRV(self))));
2357         if (ix == 2) // only close the writer on destruction
2358             feersum_close_handle(aTHX_ c, 1);
2359     }
2360 }
2361 
2362 SV*
2363 read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
2364     PROTOTYPE: $$$;$
2365     PPCODE:
2366 {
2367     STRLEN buf_len = 0, src_len = 0;
2368     ssize_t offset;
2369     char *buf_ptr, *src_ptr;
2370 
2371     // optimizes for the "read everything" case.
2372 
2373     if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
2374         offset = SvIV(ST(3));
2375     else
2376         offset = 0;
2377 
2378     trace("read fd=%d : request    len=%"Sz_uf" off=%"Ssz_df"\n",
2379         c->fd, (Sz)len, (Ssz)offset);
2380 
2381     if (unlikely(c->receiving <= RECEIVE_HEADERS))
2382         // XXX as of 0.984 this is dead code
2383         croak("can't call read() until the body begins to arrive");
2384 
2385     if (!SvOK(buf) || !SvPOK(buf)) {
2386         // force to a PV and ensure buffer space
2387         sv_setpvn(buf,"",0);
2388         SvGROW(buf, len+1);
2389     }
2390 
2391     if (unlikely(SvREADONLY(buf)))
2392         croak("buffer must not be read-only");
2393 
2394     if (unlikely(len == 0))
2395         XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string
2396 
2397     buf_ptr = SvPV(buf, buf_len);
2398     if (likely(c->rbuf))
2399         src_ptr = SvPV(c->rbuf, src_len);
2400 
2401     if (unlikely(len < 0))
2402         len = src_len;
2403 
2404     if (unlikely(offset < 0))
2405         offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;
2406 
2407     if (unlikely(len + offset > src_len))
2408         len = src_len - offset;
2409 
2410     trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
2411         c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);
2412 
2413     if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
2414         trace2("rbuf empty during read %d\n", c->fd);
2415         if (c->receiving == RECEIVE_SHUTDOWN) {
2416             XSRETURN_IV(0);
2417         }
2418         else {
2419             errno = EAGAIN;
2420             XSRETURN_UNDEF;
2421         }
2422     }
2423 
2424     if (likely(len == src_len && offset == 0)) {
2425         trace2("appending entire rbuf fd=%d\n", c->fd);
2426         sv_2mortal(c->rbuf); // allow pv to be stolen
2427         if (likely(buf_len == 0)) {
2428             sv_setsv(buf, c->rbuf);
2429         }
2430         else {
2431             sv_catsv(buf, c->rbuf);
2432         }
2433         c->rbuf = NULL;
2434     }
2435     else {
2436         src_ptr += offset;
2437         trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
2438             c->fd, len, offset, src_ptr);
2439         SvGROW(buf, SvCUR(buf) + len);
2440         sv_catpvn(buf, src_ptr, len);
2441         if (likely(items == 3)) {
2442             // there wasn't an offset param, throw away beginning
2443             sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
2444         }
2445     }
2446 
2447     XSRETURN_IV(len);
2448 }
2449 
2450 STRLEN
2451 write (feer_conn_handle *hdl, ...)
2452     PROTOTYPE: $;$
2453     CODE:
2454 {
2455     if (unlikely(c->responding != RESPOND_STREAMING))
2456         croak("can only call write in streaming mode");
2457 
2458     SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
2459     if (unlikely(!body || !SvOK(body)))
2460         XSRETURN_IV(0);
2461 
2462     trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
2463     if (SvROK(body)) {
2464         SV *refd = SvRV(body);
2465         if (SvOK(refd) && SvPOK(refd)) {
2466             body = refd;
2467         }
2468         else {
2469             croak("body must be a scalar, scalar ref or undef");
2470         }
2471     }
2472     (void)SvPV(body, RETVAL);
2473 
2474     if (c->is_http11)
2475         add_chunk_sv_to_wbuf(c, body);
2476     else
2477         add_sv_to_wbuf(c, body);
2478 
2479     conn_write_ready(c);
2480 }
2481     OUTPUT:
2482         RETVAL
2483 
2484 void
write_array(feer_conn_handle * hdl,AV * abody)2485 write_array (feer_conn_handle *hdl, AV *abody)
2486     PROTOTYPE: $$
2487     PPCODE:
2488 {
2489     if (unlikely(c->responding != RESPOND_STREAMING))
2490         croak("can only call write in streaming mode");
2491 
2492     trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);
2493 
2494     I32 amax = av_len(abody);
2495     int i;
2496     if (c->is_http11) {
2497         for (i=0; i<=amax; i++) {
2498             SV *sv = fetch_av_normal(aTHX_ abody, i);
2499             if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
2500         }
2501     }
2502     else {
2503         for (i=0; i<=amax; i++) {
2504             SV *sv = fetch_av_normal(aTHX_ abody, i);
2505             if (likely(sv)) add_sv_to_wbuf(c, sv);
2506         }
2507     }
2508 
2509     conn_write_ready(c);
2510 }
2511 
2512 int
2513 seek (feer_conn_handle *hdl, ssize_t offset, ...)
2514     PROTOTYPE: $$;$
2515     CODE:
2516 {
2517     int whence = SEEK_CUR;
2518     if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
2519         whence = SvIV(ST(2));
2520 
2521     trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);
2522 
2523     if (unlikely(!c->rbuf)) {
2524         // handle is effectively "closed"
2525         RETVAL = 0;
2526     }
2527     else if (offset == 0) {
2528         RETVAL = 1; // stay put for any whence
2529     }
2530     else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
2531         STRLEN len;
2532         const char *str = SvPV_const(c->rbuf, len);
2533         if (offset > len)
2534             offset = len;
2535         sv_chop(c->rbuf, str + offset);
2536         RETVAL = 1;
2537     }
2538     else if (offset < 0 && whence == SEEK_END) {
2539         STRLEN len;
2540         const char *str = SvPV_const(c->rbuf, len);
2541         offset += len; // can't be > len since block is offset<0
2542         if (offset == 0) {
2543             RETVAL = 1; // no-op, but OK
2544         }
2545         else if (offset > 0) {
2546             sv_chop(c->rbuf, str + offset);
2547             RETVAL = 1;
2548         }
2549         else {
2550             // past beginning of string
2551             RETVAL = 0;
2552         }
2553     }
2554     else {
2555         // invalid seek
2556         RETVAL = 0;
2557     }
2558 }
2559     OUTPUT:
2560         RETVAL
2561 
2562 int
close(feer_conn_handle * hdl)2563 close (feer_conn_handle *hdl)
2564     PROTOTYPE: $
2565     ALIAS:
2566         Feersum::Connection::Reader::close = 1
2567         Feersum::Connection::Writer::close = 2
2568     CODE:
2569 {
2570     assert(ix);
2571     RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
2572     SvUVX(hdl_sv) = 0;
2573 }
2574     OUTPUT:
2575         RETVAL
2576 
2577 void
_poll_cb(feer_conn_handle * hdl,SV * cb)2578 _poll_cb (feer_conn_handle *hdl, SV *cb)
2579     PROTOTYPE: $$
2580     ALIAS:
2581         Feersum::Connection::Reader::poll_cb = 1
2582         Feersum::Connection::Writer::poll_cb = 2
2583     PPCODE:
2584 {
2585     if (unlikely(ix < 1 || ix > 2))
2586         croak("can't call _poll_cb directly");
2587     else if (unlikely(ix == 1))
2588         croak("poll_cb for reading not yet supported"); // TODO poll_read_cb
2589 
2590     if (c->poll_write_cb != NULL) {
2591         SvREFCNT_dec(c->poll_write_cb);
2592         c->poll_write_cb = NULL;
2593     }
2594 
2595     if (!SvOK(cb)) {
2596         trace("unset poll_cb ix=%d\n", ix);
2597         return;
2598     }
2599     else if (unlikely(!IsCodeRef(cb)))
2600         croak("must supply a code reference to poll_cb");
2601 
2602     c->poll_write_cb = newSVsv(cb);
2603     conn_write_ready(c);
2604 }
2605 
2606 SV*
2607 response_guard (feer_conn_handle *hdl, ...)
2608     PROTOTYPE: $;$
2609     CODE:
2610         RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
2611     OUTPUT:
2612         RETVAL
2613 
2614 MODULE = Feersum	PACKAGE = Feersum::Connection
2615 
2616 PROTOTYPES: ENABLE
2617 
2618 SV *
2619 start_streaming (struct feer_conn *c, SV *message, AV *headers)
2620     PROTOTYPE: $$\@
2621     CODE:
2622         feersum_start_response(aTHX_ c, message, headers, 1);
2623         RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2624     OUTPUT:
2625         RETVAL
2626 
2627 size_t
2628 send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
2629     PROTOTYPE: $$\@$
2630     CODE:
2631         feersum_start_response(aTHX_ c, message, headers, 0);
2632         if (unlikely(!SvOK(body)))
2633             croak("can't send_response with an undef body");
2634         RETVAL = feersum_write_whole_body(aTHX_ c, body);
2635     OUTPUT:
2636         RETVAL
2637 
2638 SV*
_continue_streaming_psgi(struct feer_conn * c,SV * psgi_response)2639 _continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
2640     PROTOTYPE: $\@
2641     CODE:
2642 {
2643     AV *av;
2644     int len = 0;
2645 
2646     if (IsArrayRef(psgi_response)) {
2647         av = (AV*)SvRV(psgi_response);
2648         len = av_len(av) + 1;
2649     }
2650 
2651     if (len == 3) {
2652         // 0 is "don't recurse" (i.e. don't allow another code-ref)
2653         feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
2654         RETVAL = &PL_sv_undef;
2655     }
2656     else if (len == 2) {
2657         SV *message = *(av_fetch(av,0,0));
2658         SV *headers = *(av_fetch(av,1,0));
2659         if (unlikely(!IsArrayRef(headers)))
2660             croak("PSGI headers must be an array ref");
2661         feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
2662         RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2663     }
2664     else {
2665         croak("PSGI response starter expects a 2 or 3 element array-ref");
2666     }
2667 }
2668     OUTPUT:
2669         RETVAL
2670 
2671 void
2672 force_http10 (struct feer_conn *c)
2673     PROTOTYPE: $
2674     ALIAS:
2675         force_http11 = 1
2676     PPCODE:
2677         c->is_http11 = ix;
2678 
2679 SV *
2680 env (struct feer_conn *c)
2681     PROTOTYPE: $
2682     CODE:
2683         RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
2684     OUTPUT:
2685         RETVAL
2686 
2687 int
2688 fileno (struct feer_conn *c)
2689     CODE:
2690         RETVAL = c->fd;
2691     OUTPUT:
2692         RETVAL
2693 
2694 SV*
2695 response_guard (struct feer_conn *c, ...)
2696     PROTOTYPE: $;$
2697     CODE:
2698         RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
2699     OUTPUT:
2700         RETVAL
2701 
2702 void
DESTROY(struct feer_conn * c)2703 DESTROY (struct feer_conn *c)
2704     PPCODE:
2705 {
2706     int i;
2707     trace("DESTROY connection fd=%d c=%p\n", c->fd, c);
2708 
2709     if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);
2710 
2711     if (c->wbuf_rinq) {
2712         struct iomatrix *m;
2713         while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
2714             for (i=0; i < m->count; i++) {
2715                 if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
2716             }
2717             Safefree(m);
2718         }
2719     }
2720 
2721     if (likely(c->req)) {
2722         if (c->req->buf) SvREFCNT_dec(c->req->buf);
2723         Safefree(c->req);
2724     }
2725 
2726     if (likely(c->sa)) free(c->sa);
2727 
2728     safe_close_conn(c, "close at destruction");
2729 
2730     if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);
2731 
2732     if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
2733 
2734     active_conns--;
2735 
2736     if (unlikely(shutting_down && active_conns <= 0)) {
2737         ev_idle_stop(feersum_ev_loop, &ei);
2738         ev_prepare_stop(feersum_ev_loop, &ep);
2739         ev_check_stop(feersum_ev_loop, &ec);
2740 
2741         trace3("... was last conn, going to try shutdown\n");
2742         if (shutdown_cb_cv) {
2743             PUSHMARK(SP);
2744             call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2745             PUTBACK;
2746             trace3("... ok, called that handler\n");
2747             SvREFCNT_dec(shutdown_cb_cv);
2748             shutdown_cb_cv = NULL;
2749         }
2750     }
2751 }
2752 
2753 MODULE = Feersum	PACKAGE = Feersum
2754 
2755 BOOT:
2756     {
2757         feer_stash = gv_stashpv("Feersum", 1);
2758         feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
2759         feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
2760         feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
2761         I_EV_API("Feersum");
2762 
2763         psgi_ver = newAV();
2764         av_extend(psgi_ver, 2);
2765         av_push(psgi_ver, newSViv(1));
2766         av_push(psgi_ver, newSViv(1));
2767         SvREADONLY_on((SV*)psgi_ver);
2768 
2769         psgi_serv10 = newSVpvs("HTTP/1.0");
2770         SvREADONLY_on(psgi_serv10);
2771         psgi_serv11 = newSVpvs("HTTP/1.1");
2772         SvREADONLY_on(psgi_serv11);
2773 
2774         Zero(&psgix_io_vtbl, 1, MGVTBL);
2775         psgix_io_vtbl.svt_get = psgix_io_svt_get;
2776 
2777         trace3("Feersum booted, iomatrix %lu "
2778                 "(IOV_MAX=%u, FEERSUM_IOMATRIX_SIZE=%u), "
2779             "feer_req %lu, "
2780             "feer_conn %lu\n",
2781             (long unsigned int)sizeof(struct iomatrix),
2782                 (unsigned int)IOV_MAX,
2783                 (unsigned int)FEERSUM_IOMATRIX_SIZE,
2784             (long unsigned int)sizeof(struct feer_req),
2785             (long unsigned int)sizeof(struct feer_conn)
2786         );
2787     }
2788