1 #include "EVAPI.h"
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <sys/socket.h>
6 #include <errno.h>
7 #include <ctype.h>
8 #include <netinet/ip.h>
9 #include <netinet/tcp.h>
10 #include <sys/uio.h>
11
12 #include "ppport.h"
13
14
15 ///////////////////////////////////////////////////////////////
16 // "Compile Time Options" - See Feersum.pm POD for information
17
18 #define MAX_HEADERS 64
19 #define MAX_HEADER_NAME_LEN 128
20 #define MAX_BODY_LEN 2147483647
21
22 #define READ_BUFSZ 4096
23 #define READ_INIT_FACTOR 2
24 #define READ_GROW_FACTOR 8
25
26 #define AUTOCORK_WRITES 1
27
28 #if 0
29 # define FLASH_SOCKET_POLICY_SUPPORT
30 #endif
31
32 #ifndef FLASH_SOCKET_POLICY
33 # define FLASH_SOCKET_POLICY "<?xml version=\"1.0\"?>\n<!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">\n<cross-domain-policy>\n<site-control permitted-cross-domain-policies=\"master-only\"/>\n<allow-access-from domain=\"*\" to-ports=\"*\" secure=\"false\"/>\n</cross-domain-policy>\n"
34 #endif
35
36 // may be lower for your platform (e.g. Solaris is 16). See POD.
37 #define FEERSUM_IOMATRIX_SIZE 64
38
39 // auto-detected in Makefile.PL by perl versions and ithread usage; override
40 // that here. See POD for details.
41 #if 0
42 # undef FEERSUM_STEAL
43 #endif
44
45 ///////////////////////////////////////////////////////////////
46
47
48 #ifdef __GNUC__
49 # define likely(x) __builtin_expect(!!(x), 1)
50 # define unlikely(x) __builtin_expect(!!(x), 0)
51 #else
52 # define likely(x) (x)
53 # define unlikely(x) (x)
54 #endif
55
56 #ifndef CRLF
57 #define CRLF "\015\012"
58 #endif
59 #define CRLFx2 CRLF CRLF
60
61 // make darwin, solaris and bsd happy:
62 #ifndef SOL_TCP
63 #define SOL_TCP IPPROTO_TCP
64 #endif
65
66 // Wish-list: %z formats for perl sprintf. Would make compiling a lot less
67 // noisy for systems that warn size_t and STRLEN are incompatible with
68 // %d/%u/%x.
69 #if Size_t_size == LONGSIZE
70 # define Sz_f "l"
71 # define Sz_t long
72 #elif Size_t_size == 8 && defined HAS_QUAD && QUADKIND == QUAD_IS_LONG_LONG
73 # define Sz_f "ll"
74 # define Sz_t long long
75 #else
76 // hope "int" works.
77 # define Sz_f ""
78 # define Sz_t int
79 #endif
80
81 #define Sz_uf Sz_f"u"
82 #define Sz_xf Sz_f"x"
83 #define Ssz_df Sz_f"d"
84 #define Sz unsigned Sz_t
85 #define Ssz Sz_t
86
87 #define WARN_PREFIX "Feersum: "
88
89 #ifndef DEBUG
90 #ifndef __inline
91 #define __inline
92 #endif
93 #define INLINE_UNLESS_DEBUG __inline
94 #else
95 #define INLINE_UNLESS_DEBUG
96 #endif
97
98 #define trouble(f_, ...) warn(WARN_PREFIX f_, ##__VA_ARGS__);
99
100 #ifdef DEBUG
101 #define trace(f_, ...) warn("%s:%-4d [%d] " f_, __FILE__, __LINE__, (int)getpid(), ##__VA_ARGS__)
102 #else
103 #define trace(...)
104 #endif
105
106 #if DEBUG >= 2
107 #define trace2(f_, ...) trace(f_, ##__VA_ARGS__)
108 #else
109 #define trace2(...)
110 #endif
111
112 #if DEBUG >= 3
113 #define trace3(f_, ...) trace(f_, ##__VA_ARGS__)
114 #else
115 #define trace3(...)
116 #endif
117
118 #include "picohttpparser-git/picohttpparser.c"
119 #include "rinq.c"
120
121 // Check FEERSUM_IOMATRIX_SIZE against what's actually usable on this
122 // platform. See Feersum.pm for an explanation
123 #if defined(IOV_MAX) && FEERSUM_IOMATRIX_SIZE > IOV_MAX
124 # undef FEERSUM_IOMATRIX_SIZE
125 # define FEERSUM_IOMATRIX_SIZE IOV_MAX
126 #elif defined(UIO_MAXIOV) && FEERSUM_IOMATRIX_SIZE > UIO_MAXIOV
127 # undef FEERSUM_IOMATRIX_SIZE
128 # define FEERSUM_IOMATRIX_SIZE UIO_MAXIOV
129 #endif
130
131 struct iomatrix {
132 unsigned offset;
133 unsigned count;
134 struct iovec iov[FEERSUM_IOMATRIX_SIZE];
135 SV *sv[FEERSUM_IOMATRIX_SIZE];
136 };
137
138 struct feer_req {
139 SV *buf;
140 const char* method;
141 size_t method_len;
142 const char* path;
143 size_t path_len;
144 int minor_version;
145 size_t num_headers;
146 struct phr_header headers[MAX_HEADERS];
147 };
148
149 enum feer_respond_state {
150 RESPOND_NOT_STARTED = 0,
151 RESPOND_NORMAL = 1,
152 RESPOND_STREAMING = 2,
153 RESPOND_SHUTDOWN = 3
154 };
155 #define RESPOND_STR(_n,_s) do { \
156 switch(_n) { \
157 case RESPOND_NOT_STARTED: _s = "NOT_STARTED(0)"; break; \
158 case RESPOND_NORMAL: _s = "NORMAL(1)"; break; \
159 case RESPOND_STREAMING: _s = "STREAMING(2)"; break; \
160 case RESPOND_SHUTDOWN: _s = "SHUTDOWN(4)"; break; \
161 } \
162 } while (0)
163
164 enum feer_receive_state {
165 RECEIVE_HEADERS = 0,
166 RECEIVE_BODY = 1,
167 RECEIVE_STREAMING = 2,
168 RECEIVE_SHUTDOWN = 3
169 };
170 #define RECEIVE_STR(_n,_s) do { \
171 switch(_n) { \
172 case RECEIVE_HEADERS: _s = "HEADERS(0)"; break; \
173 case RECEIVE_BODY: _s = "BODY(1)"; break; \
174 case RECEIVE_STREAMING: _s = "STREAMING(2)"; break; \
175 case RECEIVE_SHUTDOWN: _s = "SHUTDOWN(3)"; break; \
176 } \
177 } while (0)
178
179 struct feer_conn {
180 SV *self;
181 int fd;
182 struct sockaddr *sa;
183
184 struct ev_io read_ev_io;
185 struct ev_io write_ev_io;
186 struct ev_timer read_ev_timer;
187
188 SV *rbuf;
189 struct rinq *wbuf_rinq;
190
191 SV *poll_write_cb;
192 SV *ext_guard;
193
194 struct feer_req *req;
195 ssize_t expected_cl;
196 ssize_t received_cl;
197
198 enum feer_respond_state responding;
199 enum feer_receive_state receiving;
200
201 int in_callback;
202 int is_http11:1;
203 int poll_write_cb_is_io_handle:1;
204 int auto_cl:1;
205 };
206
207 typedef struct feer_conn feer_conn_handle; // for typemap
208
209 #define dCONN struct feer_conn *c = (struct feer_conn *)w->data
210 #define IsArrayRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVAV)
211 #define IsCodeRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVCV)
212
213 static HV* feersum_env(pTHX_ struct feer_conn *c);
214 static void feersum_start_response
215 (pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming);
216 static size_t feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
217 static void feersum_handle_psgi_response(
218 pTHX_ struct feer_conn *c, SV *ret, bool can_recurse);
219 static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);
220 static SV* feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard);
221
222 static void start_read_watcher(struct feer_conn *c);
223 static void stop_read_watcher(struct feer_conn *c);
224 static void restart_read_timer(struct feer_conn *c);
225 static void stop_read_timer(struct feer_conn *c);
226 static void start_write_watcher(struct feer_conn *c);
227 static void stop_write_watcher(struct feer_conn *c);
228
229 static void try_conn_write(EV_P_ struct ev_io *w, int revents);
230 static void try_conn_read(EV_P_ struct ev_io *w, int revents);
231 static void conn_read_timeout(EV_P_ struct ev_timer *w, int revents);
232 static bool process_request_headers(struct feer_conn *c, int body_offset);
233 static void sched_request_callback(struct feer_conn *c);
234 static void call_died (pTHX_ struct feer_conn *c, const char *cb_type);
235 static void call_request_callback(struct feer_conn *c);
236 static void call_poll_callback (struct feer_conn *c, bool is_write);
237 static void pump_io_handle (struct feer_conn *c, SV *io);
238
239 static void conn_write_ready (struct feer_conn *c);
240 static void respond_with_server_error(struct feer_conn *c, const char *msg, STRLEN msg_len, int code);
241
242 static void update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov);
243 static STRLEN add_sv_to_wbuf (struct feer_conn *c, SV *sv);
244 static STRLEN add_const_to_wbuf (struct feer_conn *c, const char *str, size_t str_len);
245 #define add_crlf_to_wbuf(c) add_const_to_wbuf(c,CRLF,2)
246 static void finish_wbuf (struct feer_conn *c);
247 static void add_chunk_sv_to_wbuf (struct feer_conn *c, SV *sv);
248 static void add_placeholder_to_wbuf (struct feer_conn *c, SV **sv, struct iovec **iov_ref);
249
250 static void uri_decode_sv (SV *sv);
251 static bool str_eq(const char *a, int a_len, const char *b, int b_len);
252 static bool str_case_eq(const char *a, int a_len, const char *b, int b_len);
253 static SV* fetch_av_normal (pTHX_ AV *av, I32 i);
254
255 static const char *http_code_to_msg (int code);
256 static int prep_socket (int fd, int is_tcp);
257
258 static HV *feer_stash, *feer_conn_stash;
259 static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
260 static MGVTBL psgix_io_vtbl;
261
262 static SV *request_cb_cv = NULL;
263 static bool request_cb_is_psgi = 0;
264 static SV *shutdown_cb_cv = NULL;
265 static bool shutting_down = 0;
266 static int active_conns = 0;
267 static double read_timeout = 5.0;
268
269 static SV *feer_server_name = NULL;
270 static SV *feer_server_port = NULL;
271
272 static ev_io accept_w;
273 static ev_prepare ep;
274 static ev_check ec;
275 struct ev_idle ei;
276
277 static struct rinq *request_ready_rinq = NULL;
278
279 static AV *psgi_ver;
280 static SV *psgi_serv10, *psgi_serv11, *crlf_sv;
281
282 // TODO: make this thread-local if and when there are multiple C threads:
283 struct ev_loop *feersum_ev_loop = NULL;
284 static HV *feersum_tmpl_env = NULL;
285
286 INLINE_UNLESS_DEBUG
287 static SV*
fetch_av_normal(pTHX_ AV * av,I32 i)288 fetch_av_normal (pTHX_ AV *av, I32 i)
289 {
290 SV **elt = av_fetch(av, i, 0);
291 if (elt == NULL) return NULL;
292 SV *sv = *elt;
293 // copy to remove magic
294 if (unlikely(SvMAGICAL(sv))) sv = sv_2mortal(newSVsv(sv));
295 if (unlikely(!SvOK(sv))) return NULL;
296 // usually array ref elems aren't RVs (for PSGI anyway)
297 if (unlikely(SvROK(sv))) sv = SvRV(sv);
298 return sv;
299 }
300
301 INLINE_UNLESS_DEBUG
302 static struct iomatrix *
next_iomatrix(struct feer_conn * c)303 next_iomatrix (struct feer_conn *c)
304 {
305 bool add_iomatrix = 0;
306 struct iomatrix *m;
307
308 if (!c->wbuf_rinq) {
309 trace3("next_iomatrix(%d): head\n", c->fd);
310 add_iomatrix = 1;
311 }
312 else {
313 // get the tail-end struct
314 m = (struct iomatrix *)c->wbuf_rinq->prev->ref;
315 trace3("next_iomatrix(%d): tail, count=%d, offset=%d\n",
316 c->fd, m->count, m->offset);
317 if (m->count >= FEERSUM_IOMATRIX_SIZE) {
318 add_iomatrix = 1;
319 }
320 }
321
322 if (add_iomatrix) {
323 trace3("next_iomatrix(%d): malloc\n", c->fd);
324 Newx(m,1,struct iomatrix);
325 Poison(m,1,struct iomatrix);
326 m->offset = m->count = 0;
327 rinq_push(&c->wbuf_rinq, m);
328 }
329
330 trace3("next_iomatrix(%d): end, count=%d, offset=%d\n",
331 c->fd, m->count, m->offset);
332 return m;
333 }
334
335 INLINE_UNLESS_DEBUG
336 static STRLEN
add_sv_to_wbuf(struct feer_conn * c,SV * sv)337 add_sv_to_wbuf(struct feer_conn *c, SV *sv)
338 {
339 struct iomatrix *m = next_iomatrix(c);
340 int idx = m->count++;
341 STRLEN cur;
342 if (unlikely(SvMAGICAL(sv))) {
343 sv = newSVsv(sv); // copy to force it to be normal.
344 }
345 else if (unlikely(SvPADTMP(sv))) {
346 // PADTMPs have their PVs re-used, so we can't simply keep a
347 // reference. TEMPs maybe behave in a similar way and are potentially
348 // stealable. If not stealing, we must make a copy.
349 #ifdef FEERSUM_STEAL
350 if (SvFLAGS(sv) == (SVs_PADTMP|SVf_POK|SVp_POK)) {
351 trace3("STEALING\n");
352 SV *theif = newSV(0);
353 sv_upgrade(theif, SVt_PV);
354
355 SvPV_set(theif, SvPVX(sv));
356 SvLEN_set(theif, SvLEN(sv));
357 SvCUR_set(theif, SvCUR(sv));
358
359 // make the temp null
360 (void)SvOK_off(sv);
361 SvPV_set(sv, NULL);
362 SvLEN_set(sv, 0);
363 SvCUR_set(sv, 0);
364
365 SvFLAGS(theif) |= SVf_READONLY|SVf_POK|SVp_POK;
366
367 sv = theif;
368 }
369 else {
370 sv = newSVsv(sv);
371 }
372 #else
373 sv = newSVsv(sv);
374 #endif
375 }
376 else {
377 sv = SvREFCNT_inc(sv);
378 }
379
380 m->iov[idx].iov_base = SvPV(sv, cur);
381 m->iov[idx].iov_len = cur;
382 m->sv[idx] = sv;
383
384 return cur;
385 }
386
387 INLINE_UNLESS_DEBUG
388 static STRLEN
add_const_to_wbuf(struct feer_conn * c,const char * str,size_t str_len)389 add_const_to_wbuf(struct feer_conn *c, const char *str, size_t str_len)
390 {
391 struct iomatrix *m = next_iomatrix(c);
392 int idx = m->count++;
393 m->iov[idx].iov_base = (void*)str;
394 m->iov[idx].iov_len = str_len;
395 m->sv[idx] = NULL;
396 return str_len;
397 }
398
399 INLINE_UNLESS_DEBUG
400 static void
add_placeholder_to_wbuf(struct feer_conn * c,SV ** sv,struct iovec ** iov_ref)401 add_placeholder_to_wbuf(struct feer_conn *c, SV **sv, struct iovec **iov_ref)
402 {
403 struct iomatrix *m = next_iomatrix(c);
404 int idx = m->count++;
405 *sv = newSV(31);
406 SvPOK_on(*sv);
407 m->sv[idx] = *sv;
408 *iov_ref = &m->iov[idx];
409 }
410
411 INLINE_UNLESS_DEBUG
412 static void
finish_wbuf(struct feer_conn * c)413 finish_wbuf(struct feer_conn *c)
414 {
415 if (!c->is_http11) return; // nothing required
416 add_const_to_wbuf(c, "0\r\n\r\n", 5); // terminating chunk
417 }
418
419 INLINE_UNLESS_DEBUG
420 static void
update_wbuf_placeholder(struct feer_conn * c,SV * sv,struct iovec * iov)421 update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov)
422 {
423 STRLEN cur;
424 // can't pass iov_len for cur; incompatible pointer type on some systems:
425 iov->iov_base = SvPV(sv,cur);
426 iov->iov_len = cur;
427 }
428
429 static void
add_chunk_sv_to_wbuf(struct feer_conn * c,SV * sv)430 add_chunk_sv_to_wbuf(struct feer_conn *c, SV *sv)
431 {
432 SV *chunk;
433 struct iovec *chunk_iov;
434 add_placeholder_to_wbuf(c, &chunk, &chunk_iov);
435 STRLEN cur = add_sv_to_wbuf(c, sv);
436 add_crlf_to_wbuf(c);
437 sv_setpvf(chunk, "%"Sz_xf CRLF, (Sz)cur);
438 update_wbuf_placeholder(c, chunk, chunk_iov);
439 }
440
441 static const char *
http_code_to_msg(int code)442 http_code_to_msg (int code) {
443 // http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
444 switch (code) {
445 case 100: return "Continue";
446 case 101: return "Switching Protocols";
447 case 102: return "Processing"; // RFC 2518
448 case 200: return "OK";
449 case 201: return "Created";
450 case 202: return "Accepted";
451 case 203: return "Non Authoritative Information";
452 case 204: return "No Content";
453 case 205: return "Reset Content";
454 case 206: return "Partial Content";
455 case 207: return "Multi-Status"; // RFC 4918 (WebDav)
456 case 300: return "Multiple Choices";
457 case 301: return "Moved Permanently";
458 case 302: return "Found";
459 case 303: return "See Other";
460 case 304: return "Not Modified";
461 case 305: return "Use Proxy";
462 case 307: return "Temporary Redirect";
463 case 400: return "Bad Request";
464 case 401: return "Unauthorized";
465 case 402: return "Payment Required";
466 case 403: return "Forbidden";
467 case 404: return "Not Found";
468 case 405: return "Method Not Allowed";
469 case 406: return "Not Acceptable";
470 case 407: return "Proxy Authentication Required";
471 case 408: return "Request Timeout";
472 case 409: return "Conflict";
473 case 410: return "Gone";
474 case 411: return "Length Required";
475 case 412: return "Precondition Failed";
476 case 413: return "Request Entity Too Large";
477 case 414: return "Request URI Too Long";
478 case 415: return "Unsupported Media Type";
479 case 416: return "Requested Range Not Satisfiable";
480 case 417: return "Expectation Failed";
481 case 418: return "I'm a teapot";
482 case 421: return "Too Many Connections"; // Microsoft?
483 case 422: return "Unprocessable Entity"; // RFC 4918
484 case 423: return "Locked"; // RFC 4918
485 case 424: return "Failed Dependency"; // RFC 4918
486 case 425: return "Unordered Collection"; // RFC 3648
487 case 426: return "Upgrade Required"; // RFC 2817
488 case 449: return "Retry With"; // Microsoft
489 case 450: return "Blocked by Parental Controls"; // Microsoft
490 case 500: return "Internal Server Error";
491 case 501: return "Not Implemented";
492 case 502: return "Bad Gateway";
493 case 503: return "Service Unavailable";
494 case 504: return "Gateway Timeout";
495 case 505: return "HTTP Version Not Supported";
496 case 506: return "Variant Also Negotiates"; // RFC 2295
497 case 507: return "Insufficient Storage"; // RFC 4918
498 case 509: return "Bandwidth Limit Exceeded"; // Apache mod
499 case 510: return "Not Extended"; // RFC 2774
500 case 530: return "User access denied"; // ??
501 default: break;
502 }
503
504 // default to the Nxx group names in RFC 2616
505 if (100 <= code && code <= 199) {
506 return "Informational";
507 }
508 else if (200 <= code && code <= 299) {
509 return "Success";
510 }
511 else if (300 <= code && code <= 399) {
512 return "Redirection";
513 }
514 else if (400 <= code && code <= 499) {
515 return "Client Error";
516 }
517 else {
518 return "Error";
519 }
520 }
521
522 static int
prep_socket(int fd,int is_tcp)523 prep_socket(int fd, int is_tcp)
524 {
525 int flags;
526
527 // make it non-blocking
528 flags = O_NONBLOCK;
529 if (unlikely(fcntl(fd, F_SETFL, flags) < 0))
530 return -1;
531
532 if (likely(is_tcp)) {
533 // flush writes immediately
534 flags = 1;
535 if (unlikely(setsockopt(fd, SOL_TCP, TCP_NODELAY, &flags, sizeof(int))))
536 return -1;
537 }
538
539 // handle URG data inline
540 flags = 1;
541 if (unlikely(setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &flags, sizeof(int))))
542 return -1;
543
544 // disable lingering
545 struct linger linger = { .l_onoff = 0, .l_linger = 0 };
546 if (unlikely(setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger))))
547 return -1;
548
549 return 0;
550 }
551
552 INLINE_UNLESS_DEBUG static void
safe_close_conn(struct feer_conn * c,const char * where)553 safe_close_conn(struct feer_conn *c, const char *where)
554 {
555 if (unlikely(c->fd < 0))
556 return;
557
558 // make it blocking
559 fcntl(c->fd, F_SETFL, 0);
560
561 if (unlikely(close(c->fd)))
562 perror(where);
563
564 c->fd = -1;
565 }
566
567 static struct feer_conn *
new_feer_conn(EV_P_ int conn_fd,struct sockaddr * sa)568 new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
569 {
570 SV *self = newSV(0);
571 SvUPGRADE(self, SVt_PVMG); // ensures sv_bless doesn't reallocate
572 SvGROW(self, sizeof(struct feer_conn));
573 SvPOK_only(self);
574 SvIOK_on(self);
575 SvIV_set(self,conn_fd);
576
577 struct feer_conn *c = (struct feer_conn *)SvPVX(self);
578 Zero(c, 1, struct feer_conn);
579
580 c->self = self;
581 c->fd = conn_fd;
582 c->sa = sa;
583 c->responding = RESPOND_NOT_STARTED;
584 c->receiving = RECEIVE_HEADERS;
585
586 ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
587 c->read_ev_io.data = (void *)c;
588
589 ev_init(&c->read_ev_timer, conn_read_timeout);
590 c->read_ev_timer.data = (void *)c;
591
592 trace3("made conn fd=%d self=%p, c=%p, cur=%"Sz_uf", len=%"Sz_uf"\n",
593 c->fd, self, c, (Sz)SvCUR(self), (Sz)SvLEN(self));
594
595 SV *rv = newRV_inc(c->self);
596 sv_bless(rv, feer_conn_stash); // so DESTROY can get called on read errors
597 SvREFCNT_dec(rv);
598
599 SvREADONLY_on(self); // turn off later for blessing
600 active_conns++;
601 return c;
602 }
603
604 // for use in the typemap:
605 INLINE_UNLESS_DEBUG
606 static struct feer_conn *
sv_2feer_conn(SV * rv)607 sv_2feer_conn (SV *rv)
608 {
609 if (unlikely(!sv_isa(rv,"Feersum::Connection")))
610 croak("object is not of type Feersum::Connection");
611 return (struct feer_conn *)SvPVX(SvRV(rv));
612 }
613
614 INLINE_UNLESS_DEBUG
615 static SV*
feer_conn_2sv(struct feer_conn * c)616 feer_conn_2sv (struct feer_conn *c)
617 {
618 return newRV_inc(c->self);
619 }
620
621 static feer_conn_handle *
sv_2feer_conn_handle(SV * rv,bool can_croak)622 sv_2feer_conn_handle (SV *rv, bool can_croak)
623 {
624 trace3("sv 2 conn_handle\n");
625 if (unlikely(!SvROK(rv))) croak("Expected a reference");
626 // do not allow subclassing
627 SV *sv = SvRV(rv);
628 if (likely(
629 sv_isobject(rv) &&
630 (SvSTASH(sv) == feer_conn_writer_stash ||
631 SvSTASH(sv) == feer_conn_reader_stash)
632 )) {
633 UV uv = SvUV(sv);
634 if (uv == 0) {
635 if (can_croak) croak("Operation not allowed: Handle is closed.");
636 return NULL;
637 }
638 return INT2PTR(feer_conn_handle*,uv);
639 }
640
641 if (can_croak)
642 croak("Expected a Feersum::Connection::Writer or ::Reader object");
643 return NULL;
644 }
645
646 static SV *
new_feer_conn_handle(pTHX_ struct feer_conn * c,bool is_writer)647 new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
648 {
649 SV *sv;
650 SvREFCNT_inc_void_NN(c->self);
651 sv = newRV_noinc(newSVuv(PTR2UV(c)));
652 sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
653 return sv;
654 }
655
656 #if DEBUG
657 # define change_responding_state(c, _to) do { \
658 enum feer_respond_state __to = (_to); \
659 enum feer_respond_state __from = c->responding; \
660 const char *_from_str, *_to_str; \
661 if (likely(__from != __to)) { \
662 RESPOND_STR(c->responding, _from_str); \
663 RESPOND_STR(__to, _to_str); \
664 trace2("==> responding state %d: %s to %s\n", \
665 c->fd,_from_str,_to_str); \
666 c->responding = __to; \
667 } \
668 } while (0)
669 # define change_receiving_state(c, _to) do { \
670 enum feer_receive_state __to = (_to); \
671 enum feer_receive_state __from = c->receiving; \
672 const char *_from_str, *_to_str; \
673 if (likely(__from != __to)) { \
674 RECEIVE_STR(c->receiving, _from_str); \
675 RECEIVE_STR(__to, _to_str); \
676 trace2("==> receiving state %d: %s to %s\n", \
677 c->fd,_from_str,_to_str); \
678 c->receiving = __to; \
679 } \
680 } while (0)
681 #else
682 # define change_responding_state(c, _to) c->responding = _to
683 # define change_receiving_state(c, _to) c->receiving = _to
684 #endif
685
686 INLINE_UNLESS_DEBUG static void
start_read_watcher(struct feer_conn * c)687 start_read_watcher(struct feer_conn *c) {
688 if (unlikely(ev_is_active(&c->read_ev_io)))
689 return;
690 trace("start read watcher %d\n",c->fd);
691 ev_io_start(feersum_ev_loop, &c->read_ev_io);
692 SvREFCNT_inc_void_NN(c->self);
693 }
694
695 INLINE_UNLESS_DEBUG static void
stop_read_watcher(struct feer_conn * c)696 stop_read_watcher(struct feer_conn *c) {
697 if (unlikely(!ev_is_active(&c->read_ev_io)))
698 return;
699 trace("stop read watcher %d\n",c->fd);
700 ev_io_stop(feersum_ev_loop, &c->read_ev_io);
701 SvREFCNT_dec(c->self);
702 }
703
704 INLINE_UNLESS_DEBUG static void
restart_read_timer(struct feer_conn * c)705 restart_read_timer(struct feer_conn *c) {
706 if (likely(!ev_is_active(&c->read_ev_timer))) {
707 trace("restart read timer %d\n",c->fd);
708 c->read_ev_timer.repeat = read_timeout;
709 SvREFCNT_inc_void_NN(c->self);
710 }
711 ev_timer_again(feersum_ev_loop, &c->read_ev_timer);
712 }
713
714 INLINE_UNLESS_DEBUG static void
stop_read_timer(struct feer_conn * c)715 stop_read_timer(struct feer_conn *c) {
716 if (unlikely(!ev_is_active(&c->read_ev_timer)))
717 return;
718 trace("stop read timer %d\n",c->fd);
719 ev_timer_stop(feersum_ev_loop, &c->read_ev_timer);
720 SvREFCNT_dec(c->self);
721 }
722
723 INLINE_UNLESS_DEBUG static void
start_write_watcher(struct feer_conn * c)724 start_write_watcher(struct feer_conn *c) {
725 if (unlikely(ev_is_active(&c->write_ev_io)))
726 return;
727 trace("start write watcher %d\n",c->fd);
728 ev_io_start(feersum_ev_loop, &c->write_ev_io);
729 SvREFCNT_inc_void_NN(c->self);
730 }
731
732 INLINE_UNLESS_DEBUG static void
stop_write_watcher(struct feer_conn * c)733 stop_write_watcher(struct feer_conn *c) {
734 if (unlikely(!ev_is_active(&c->write_ev_io)))
735 return;
736 trace("stop write watcher %d\n",c->fd);
737 ev_io_stop(feersum_ev_loop, &c->write_ev_io);
738 SvREFCNT_dec(c->self);
739 }
740
741
742 static void
process_request_ready_rinq(void)743 process_request_ready_rinq (void)
744 {
745 while (request_ready_rinq) {
746 struct feer_conn *c =
747 (struct feer_conn *)rinq_shift(&request_ready_rinq);
748 //trace("rinq shifted c=%p, head=%p\n", c, request_ready_rinq);
749
750 call_request_callback(c);
751
752 if (likely(c->wbuf_rinq)) {
753 // this was deferred until after the perl callback
754 conn_write_ready(c);
755 }
756 SvREFCNT_dec(c->self); // for the rinq
757 }
758 }
759
760 static void
prepare_cb(EV_P_ ev_prepare * w,int revents)761 prepare_cb (EV_P_ ev_prepare *w, int revents)
762 {
763 if (unlikely(revents & EV_ERROR)) {
764 trouble("EV error in prepare, revents=0x%08x\n", revents);
765 ev_break(EV_A, EVBREAK_ALL);
766 return;
767 }
768
769 if (!ev_is_active(&accept_w) && !shutting_down) {
770 ev_io_start(EV_A, &accept_w);
771 }
772 ev_prepare_stop(EV_A, w);
773 }
774
775 static void
check_cb(EV_P_ ev_check * w,int revents)776 check_cb (EV_P_ ev_check *w, int revents)
777 {
778 if (unlikely(revents & EV_ERROR)) {
779 trouble("EV error in check, revents=0x%08x\n", revents);
780 ev_break(EV_A, EVBREAK_ALL);
781 return;
782 }
783 trace3("check! head=%p\n", request_ready_rinq);
784 if (request_ready_rinq)
785 process_request_ready_rinq();
786 }
787
788 static void
idle_cb(EV_P_ ev_idle * w,int revents)789 idle_cb (EV_P_ ev_idle *w, int revents)
790 {
791 if (unlikely(revents & EV_ERROR)) {
792 trouble("EV error in idle, revents=0x%08x\n", revents);
793 ev_break(EV_A, EVBREAK_ALL);
794 return;
795 }
796 trace3("idle! head=%p\n", request_ready_rinq);
797 if (request_ready_rinq)
798 process_request_ready_rinq();
799 ev_idle_stop(EV_A, w);
800 }
801
802 static void
try_conn_write(EV_P_ struct ev_io * w,int revents)803 try_conn_write(EV_P_ struct ev_io *w, int revents)
804 {
805 dCONN;
806 int i;
807 struct iomatrix *m;
808
809 SvREFCNT_inc_void_NN(c->self);
810
811 // if it's marked writeable EV suggests we simply try write to it.
812 // Otherwise it is stopped and we should ditch this connection.
813 if (unlikely(revents & EV_ERROR && !(revents & EV_WRITE))) {
814 trace("EV error on write, fd=%d revents=0x%08x\n", w->fd, revents);
815 change_responding_state(c, RESPOND_SHUTDOWN);
816 goto try_write_finished;
817 }
818
819 if (unlikely(!c->wbuf_rinq)) {
820 if (unlikely(c->responding >= RESPOND_SHUTDOWN))
821 goto try_write_finished;
822
823 if (!c->poll_write_cb) {
824 // no callback and no data: wait for app to push to us.
825 if (c->responding == RESPOND_STREAMING)
826 goto try_write_paused;
827
828 trace("tried to write with an empty buffer %d resp=%d\n",w->fd,c->responding);
829 change_responding_state(c, RESPOND_SHUTDOWN);
830 goto try_write_finished;
831 }
832
833 if (c->poll_write_cb_is_io_handle)
834 pump_io_handle(c, c->poll_write_cb);
835 else
836 call_poll_callback(c, 1);
837
838 // callback didn't write anything:
839 if (unlikely(!c->wbuf_rinq)) goto try_write_again;
840 }
841
842 try_write_again_immediately:
843 m = (struct iomatrix *)c->wbuf_rinq->ref;
844 #if DEBUG >= 2
845 warn("going to write to %d:\n",c->fd);
846 for (i=0; i < m->count; i++) {
847 fprintf(stderr,"%.*s",
848 (int)m->iov[i].iov_len, (char*)m->iov[i].iov_base);
849 }
850 #endif
851
852 trace("going to write %d off=%d count=%d\n", w->fd, m->offset, m->count);
853 errno = 0;
854 ssize_t wrote = writev(w->fd, &m->iov[m->offset], m->count - m->offset);
855 trace("wrote %"Ssz_df" bytes to %d, errno=%d\n", (Ssz)wrote, w->fd, errno);
856
857 if (unlikely(wrote <= 0)) {
858 if (unlikely(wrote == 0))
859 goto try_write_again;
860 if (likely(errno == EAGAIN || errno == EINTR))
861 goto try_write_again;
862 perror("Feersum try_conn_write");
863 change_responding_state(c, RESPOND_SHUTDOWN);
864 goto try_write_finished;
865 }
866
867 for (i = m->offset; i < m->count && wrote > 0; i++) {
868 struct iovec *v = &m->iov[i];
869 if (unlikely(v->iov_len > wrote)) {
870 trace3("offset vector %d base=%p len=%"Sz_uf"\n",
871 w->fd, v->iov_base, (Sz)v->iov_len);
872 v->iov_base += wrote;
873 v->iov_len -= wrote;
874 // don't consume any more:
875 wrote = 0;
876 }
877 else {
878 trace3("consume vector %d base=%p len=%"Sz_uf" sv=%p\n",
879 w->fd, v->iov_base, (Sz)v->iov_len, m->sv[i]);
880 wrote -= v->iov_len;
881 m->offset++;
882 if (m->sv[i]) {
883 SvREFCNT_dec(m->sv[i]);
884 m->sv[i] = NULL;
885 }
886 }
887 }
888
889 if (likely(m->offset >= m->count)) {
890 trace2("all done with iomatrix %d state=%d\n",w->fd,c->responding);
891 rinq_shift(&c->wbuf_rinq);
892 Safefree(m);
893 if (!c->wbuf_rinq)
894 goto try_write_finished;
895 trace2("write again immediately %d state=%d\n",w->fd,c->responding);
896 goto try_write_again_immediately;
897 }
898 // else, fallthrough:
899 trace2("write fallthrough %d state=%d\n",w->fd,c->responding);
900
901 try_write_again:
902 trace("write again %d state=%d\n",w->fd,c->responding);
903 start_write_watcher(c);
904 goto try_write_cleanup;
905
906 try_write_finished:
907 // should always be responding, but just in case
908 switch(c->responding) {
909 case RESPOND_NOT_STARTED:
910 // the write watcher shouldn't ever get called before starting to
911 // respond. Shut it down if it does.
912 trace("unexpected try_write when response not started %d\n",c->fd);
913 goto try_write_shutdown;
914 case RESPOND_NORMAL:
915 goto try_write_shutdown;
916 case RESPOND_STREAMING:
917 if (c->poll_write_cb) goto try_write_again;
918 else goto try_write_paused;
919 case RESPOND_SHUTDOWN:
920 goto try_write_shutdown;
921 default:
922 goto try_write_cleanup;
923 }
924
925 try_write_paused:
926 trace3("write PAUSED %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
927 stop_write_watcher(c);
928 goto try_write_cleanup;
929
930 try_write_shutdown:
931 trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
932 change_responding_state(c, RESPOND_SHUTDOWN);
933 stop_write_watcher(c);
934 safe_close_conn(c, "close at write shutdown");
935
936 try_write_cleanup:
937 SvREFCNT_dec(c->self);
938 return;
939 }
940
941 static int
try_parse_http(struct feer_conn * c,size_t last_read)942 try_parse_http(struct feer_conn *c, size_t last_read)
943 {
944 struct feer_req *req = c->req;
945 if (likely(!req)) {
946 Newxz(req,1,struct feer_req);
947 c->req = req;
948 }
949
950 // GH#12 - incremental parsing sets num_headers to 0 each time; force it
951 // back on every invocation
952 req->num_headers = MAX_HEADERS;
953
954 return phr_parse_request(SvPVX(c->rbuf), SvCUR(c->rbuf),
955 &req->method, &req->method_len,
956 &req->path, &req->path_len, &req->minor_version,
957 req->headers, &req->num_headers,
958 (SvCUR(c->rbuf)-last_read));
959 }
960
961 static void
try_conn_read(EV_P_ ev_io * w,int revents)962 try_conn_read(EV_P_ ev_io *w, int revents)
963 {
964 dCONN;
965 SvREFCNT_inc_void_NN(c->self);
966
967 // if it's marked readable EV suggests we simply try read it. Otherwise it
968 // is stopped and we should ditch this connection.
969 if (unlikely(revents & EV_ERROR && !(revents & EV_READ))) {
970 trace("EV error on read, fd=%d revents=0x%08x\n", w->fd, revents);
971 goto try_read_error;
972 }
973
974 if (unlikely(c->receiving == RECEIVE_SHUTDOWN))
975 goto dont_read_again;
976
977 trace("try read %d\n",w->fd);
978
979 if (likely(!c->rbuf)) { // likely = optimize for small requests
980 trace("init rbuf for %d\n",w->fd);
981 c->rbuf = newSV(READ_INIT_FACTOR*READ_BUFSZ + 1);
982 SvPOK_on(c->rbuf);
983 }
984
985 ssize_t space_free = SvLEN(c->rbuf) - SvCUR(c->rbuf);
986 if (unlikely(space_free < READ_BUFSZ)) { // unlikely = optimize for small
987 size_t new_len = SvLEN(c->rbuf) + READ_GROW_FACTOR*READ_BUFSZ;
988 trace("moar memory %d: %"Sz_uf" to %"Sz_uf"\n",
989 w->fd, (Sz)SvLEN(c->rbuf), (Sz)new_len);
990 SvGROW(c->rbuf, new_len);
991 space_free += READ_GROW_FACTOR*READ_BUFSZ;
992 }
993
994 char *cur = SvPVX(c->rbuf) + SvCUR(c->rbuf);
995 ssize_t got_n = read(w->fd, cur, space_free);
996
997 if (unlikely(got_n <= 0)) {
998 if (unlikely(got_n == 0)) {
999 trace("EOF before complete request: %d\n",w->fd,SvCUR(c->rbuf));
1000 goto try_read_error;
1001 }
1002 if (likely(errno == EAGAIN || errno == EINTR))
1003 goto try_read_again;
1004 perror("try_conn_read error");
1005 goto try_read_error;
1006 }
1007
1008 trace("read %d %"Ssz_df"\n", w->fd, (Ssz)got_n);
1009 SvCUR(c->rbuf) += got_n;
1010 // likely = optimize for small requests
1011 if (likely(c->receiving == RECEIVE_HEADERS)) {
1012
1013 #ifdef FLASH_SOCKET_POLICY_SUPPORT
1014 if (unlikely(*SvPVX(c->rbuf) == '<')) {
1015 if (likely(SvCUR(c->rbuf) >= 22)) { // length of vvv
1016 if (str_eq(SvPVX(c->rbuf), 22, "<policy-file-request/>", 22)) {
1017 add_const_to_wbuf(c, STR_WITH_LEN(FLASH_SOCKET_POLICY));
1018 conn_write_ready(c);
1019 stop_read_watcher(c);
1020 stop_read_timer(c);
1021 // TODO: keep-alives: be sure to remove the 22 bytes
1022 // out of the rbuf
1023 change_receiving_state(c, RECEIVE_SHUTDOWN);
1024 change_responding_state(c, RESPOND_SHUTDOWN);
1025 goto dont_read_again;
1026 }
1027 }
1028 // "if prefixed with"
1029 else if (likely(str_eq(SvPVX(c->rbuf), SvCUR(c->rbuf),
1030 "<policy-file-request/>", SvCUR(c->rbuf))))
1031 {
1032 goto try_read_again;
1033 }
1034 }
1035 #endif
1036
1037 int ret = try_parse_http(c, (size_t)got_n);
1038 if (ret == -1) goto try_read_bad;
1039 if (ret == -2) goto try_read_again;
1040
1041 if (process_request_headers(c, ret))
1042 goto try_read_again_reset_timer;
1043 else
1044 goto dont_read_again;
1045 }
1046 else if (likely(c->receiving == RECEIVE_BODY)) {
1047 c->received_cl += got_n;
1048 if (c->received_cl < c->expected_cl)
1049 goto try_read_again_reset_timer;
1050 // body is complete
1051 sched_request_callback(c);
1052 goto dont_read_again;
1053 }
1054 else {
1055 trouble("unknown read state %d %d", w->fd, c->receiving);
1056 }
1057
1058 // fallthrough:
1059 try_read_error:
1060 trace("READ ERROR %d, refcnt=%d\n", w->fd, SvREFCNT(c->self));
1061 change_receiving_state(c, RECEIVE_SHUTDOWN);
1062 change_responding_state(c, RESPOND_SHUTDOWN);
1063 stop_read_watcher(c);
1064 stop_read_timer(c);
1065 stop_write_watcher(c);
1066 goto try_read_cleanup;
1067
1068 try_read_bad:
1069 trace("bad request %d\n", w->fd);
1070 respond_with_server_error(c, "Malformed request.\n", 0, 400);
1071 // TODO: when keep-alive, close conn instead of fallthrough here.
1072 // fallthrough:
1073 dont_read_again:
1074 trace("done reading %d\n", w->fd);
1075 change_receiving_state(c, RECEIVE_SHUTDOWN);
1076 stop_read_watcher(c);
1077 stop_read_timer(c);
1078 goto try_read_cleanup;
1079
1080 try_read_again_reset_timer:
1081 trace("(reset read timer) %d\n", w->fd);
1082 restart_read_timer(c);
1083 // fallthrough:
1084 try_read_again:
1085 trace("read again %d\n", w->fd);
1086 start_read_watcher(c);
1087
1088 try_read_cleanup:
1089 SvREFCNT_dec(c->self);
1090 }
1091
1092 static void
conn_read_timeout(EV_P_ ev_timer * w,int revents)1093 conn_read_timeout (EV_P_ ev_timer *w, int revents)
1094 {
1095 dCONN;
1096 SvREFCNT_inc_void_NN(c->self);
1097
1098 if (unlikely(!(revents & EV_TIMER) || c->receiving == RECEIVE_SHUTDOWN)) {
1099 // if there's no EV_TIMER then EV has stopped it on an error
1100 if (revents & EV_ERROR)
1101 trouble("EV error on read timer, fd=%d revents=0x%08x\n",
1102 c->fd,revents);
1103 goto read_timeout_cleanup;
1104 }
1105
1106 trace("read timeout %d\n", c->fd);
1107
1108 if (likely(c->responding == RESPOND_NOT_STARTED)) {
1109 const char *msg;
1110 if (c->receiving == RECEIVE_HEADERS) {
1111 msg = "Headers took too long.";
1112 }
1113 else {
1114 msg = "Timeout reading body.";
1115 }
1116 respond_with_server_error(c, msg, 0, 408);
1117 }
1118 else {
1119 // XXX as of 0.984 this appears to be dead code
1120 trace("read timeout while writing %d\n",c->fd);
1121 stop_write_watcher(c);
1122 stop_read_watcher(c);
1123 stop_read_timer(c);
1124 safe_close_conn(c, "close at read timeout");
1125 change_responding_state(c, RESPOND_SHUTDOWN);
1126 }
1127
1128 read_timeout_cleanup:
1129 stop_read_watcher(c);
1130 stop_read_timer(c);
1131 SvREFCNT_dec(c->self);
1132 }
1133
1134 static void
accept_cb(EV_P_ ev_io * w,int revents)1135 accept_cb (EV_P_ ev_io *w, int revents)
1136 {
1137 struct sockaddr_storage sa_buf;
1138 socklen_t sa_len;
1139
1140 if (unlikely(shutting_down)) {
1141 // shouldn't get called, but be defensive
1142 ev_io_stop(EV_A, w);
1143 close(w->fd);
1144 return;
1145 }
1146
1147 if (unlikely(revents & EV_ERROR)) {
1148 trouble("EV error in accept_cb, fd=%d, revents=0x%08x\n",w->fd,revents);
1149 ev_break(EV_A, EVBREAK_ALL);
1150 return;
1151 }
1152
1153 trace2("accept! revents=0x%08x\n", revents);
1154
1155 while (1) {
1156 sa_len = sizeof(struct sockaddr_storage);
1157 errno = 0;
1158
1159 int fd = accept(w->fd, (struct sockaddr *)&sa_buf, &sa_len);
1160 trace("accepted fd=%d, errno=%d\n", fd, errno);
1161 if (fd == -1) break;
1162
1163 int is_tcp = 1;
1164 #ifdef AF_UNIX
1165 if (unlikely(sa_buf.ss_family == AF_UNIX)) is_tcp = 0;
1166 #endif
1167
1168 assert(sa_len <= sizeof(struct sockaddr_storage));
1169 if (unlikely(prep_socket(fd, is_tcp))) {
1170 perror("prep_socket");
1171 trouble("prep_socket failed for %d\n", fd);
1172 close(fd);
1173 continue;
1174 }
1175
1176 struct sockaddr *sa = (struct sockaddr *)malloc(sa_len);
1177 memcpy(sa,&sa_buf,(size_t)sa_len);
1178 struct feer_conn *c = new_feer_conn(EV_A,fd,sa);
1179 start_read_watcher(c);
1180 restart_read_timer(c);
1181 assert(SvREFCNT(c->self) == 3);
1182 SvREFCNT_dec(c->self);
1183 }
1184 }
1185
1186 static void
sched_request_callback(struct feer_conn * c)1187 sched_request_callback (struct feer_conn *c)
1188 {
1189 trace("sched req callback: %d c=%p, head=%p\n", c->fd, c, request_ready_rinq);
1190 rinq_push(&request_ready_rinq, c);
1191 SvREFCNT_inc_void_NN(c->self); // for the rinq
1192 if (!ev_is_active(&ei)) {
1193 ev_idle_start(feersum_ev_loop, &ei);
1194 }
1195 }
1196
1197 // the unlikely/likely annotations here are trying to optimize for GET first
1198 // and POST second. Other entity-body requests are third in line.
1199 static bool
process_request_headers(struct feer_conn * c,int body_offset)1200 process_request_headers (struct feer_conn *c, int body_offset)
1201 {
1202 int err_code;
1203 const char *err;
1204 struct feer_req *req = c->req;
1205
1206 trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
1207 bool body_is_required;
1208 bool next_req_follows = 0;
1209
1210 c->is_http11 = (req->minor_version == 1);
1211
1212 change_receiving_state(c, RECEIVE_BODY);
1213
1214 if (likely(str_eq("GET", 3, req->method, req->method_len))) {
1215 // Not supposed to have a body. Additional bytes are either a
1216 // mistake, a websocket negotiation or pipelined requests under
1217 // HTTP/1.1
1218 next_req_follows = 1;
1219 }
1220 else if (likely(str_eq("OPTIONS", 7, req->method, req->method_len))) {
1221 body_is_required = 1;
1222 next_req_follows = 1;
1223 }
1224 else if (likely(str_eq("POST", 4, req->method, req->method_len))) {
1225 body_is_required = 1;
1226 }
1227 else if (str_eq("PUT", 3, req->method, req->method_len)) {
1228 body_is_required = 1;
1229 }
1230 else if (str_eq("HEAD", 4, req->method, req->method_len) ||
1231 str_eq("DELETE", 6, req->method, req->method_len))
1232 {
1233 next_req_follows = 1;
1234 }
1235 else {
1236 err = "Feersum doesn't support that method yet\n";
1237 err_code = 405;
1238 goto got_bad_request;
1239 }
1240
1241 #if DEBUG >= 2
1242 if (next_req_follows)
1243 trace2("next req follows fd=%d, boff=%d\n",c->fd,body_offset);
1244 if (body_is_required)
1245 trace2("body is required fd=%d, boff=%d\n",c->fd,body_offset);
1246 #endif
1247
1248 // a body or follow-on data potentially follows the headers. Let feer_req
1249 // retain its pointers into rbuf and make a new scalar for more body data.
1250 STRLEN from_len;
1251 char *from = SvPV(c->rbuf,from_len);
1252 from += body_offset;
1253 int need = from_len - body_offset;
1254 int new_alloc = (need > READ_INIT_FACTOR*READ_BUFSZ)
1255 ? need : READ_INIT_FACTOR*READ_BUFSZ-1;
1256 trace("new rbuf for body %d need=%d alloc=%d\n",c->fd, need, new_alloc);
1257 SV *new_rbuf = newSVpvn(need ? from : "", need);
1258
1259 req->buf = c->rbuf;
1260 c->rbuf = new_rbuf;
1261 SvCUR_set(req->buf, body_offset);
1262
1263 if (likely(next_req_follows)) // optimize for GET
1264 goto got_it_all;
1265
1266 // determine how much we need to read
1267 int i;
1268 UV expected = 0;
1269 for (i=0; i < req->num_headers; i++) {
1270 struct phr_header *hdr = &req->headers[i];
1271 if (!hdr->name) continue;
1272 // XXX: ignore multiple C-L headers?
1273 if (unlikely(
1274 str_case_eq("content-length", 14, hdr->name, hdr->name_len)))
1275 {
1276 int g = grok_number(hdr->value, hdr->value_len, &expected);
1277 if (likely(g == IS_NUMBER_IN_UV)) {
1278 if (unlikely(expected > MAX_BODY_LEN)) {
1279 err_code = 413;
1280 err = "Content length exceeds maximum\n";
1281 goto got_bad_request;
1282 }
1283 else
1284 goto got_cl;
1285 }
1286 else {
1287 err_code = 400;
1288 err = "invalid content-length\n";
1289 goto got_bad_request;
1290 }
1291 }
1292 // TODO: support "Connection: close" bodies
1293 // TODO: support "Transfer-Encoding: chunked" bodies
1294 }
1295
1296 if (body_is_required) {
1297 // Go the nginx route...
1298 err_code = 411;
1299 err = "Content-Length required\n";
1300 }
1301 else {
1302 // XXX TODO support requests that don't require a body
1303 err_code = 418;
1304 err = "Feersum doesn't know how to handle optional-body requests yet\n";
1305 }
1306
1307 got_bad_request:
1308 respond_with_server_error(c, err, 0, err_code);
1309 return 0;
1310
1311 got_cl:
1312 c->expected_cl = (ssize_t)expected;
1313 c->received_cl = SvCUR(c->rbuf);
1314 trace("expecting body %d size=%"Ssz_df" have=%"Ssz_df"\n",
1315 c->fd, (Ssz)c->expected_cl, (Ssz)c->received_cl);
1316 SvGROW(c->rbuf, c->expected_cl + 1);
1317
1318 // don't have enough bytes to schedule immediately?
1319 // unlikely = optimize for short requests
1320 if (unlikely(c->expected_cl && c->received_cl < c->expected_cl)) {
1321 // TODO: schedule the callback immediately and support a non-blocking
1322 // ->read method.
1323 // sched_request_callback(c);
1324 // change_receiving_state(c, RECEIVE_STREAM);
1325 return 1;
1326 }
1327 // fallthrough: have enough bytes
1328 got_it_all:
1329 sched_request_callback(c);
1330 return 0;
1331 }
1332
1333 static void
conn_write_ready(struct feer_conn * c)1334 conn_write_ready (struct feer_conn *c)
1335 {
1336 if (c->in_callback) return; // defer until out of callback
1337
1338 if (c->write_ev_io.data == NULL) {
1339 ev_io_init(&c->write_ev_io, try_conn_write, c->fd, EV_WRITE);
1340 c->write_ev_io.data = (void *)c;
1341 }
1342
1343 #if AUTOCORK_WRITES
1344 start_write_watcher(c);
1345 #else
1346 // attempt a non-blocking write immediately if we're not already
1347 // waiting for writability
1348 try_conn_write(feersum_ev_loop, &c->write_ev_io, EV_WRITE);
1349 #endif
1350 }
1351
1352 static void
respond_with_server_error(struct feer_conn * c,const char * msg,STRLEN msg_len,int err_code)1353 respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len, int err_code)
1354 {
1355 SV *tmp;
1356
1357 if (unlikely(c->responding != RESPOND_NOT_STARTED)) {
1358 trouble("Tried to send server error but already responding!");
1359 return;
1360 }
1361
1362 if (!msg_len) msg_len = strlen(msg);
1363 assert(msg_len < INT_MAX);
1364
1365 tmp = newSVpvf("HTTP/1.%d %d %s" CRLF
1366 "Content-Type: text/plain" CRLF
1367 "Connection: close" CRLF
1368 "Cache-Control: no-cache, no-store" CRLF
1369 "Content-Length: %"Ssz_df"" CRLFx2
1370 "%.*s",
1371 c->is_http11 ? 1 : 0,
1372 err_code, http_code_to_msg(err_code),
1373 (Ssz)msg_len,
1374 (int)msg_len, msg);
1375 add_sv_to_wbuf(c, sv_2mortal(tmp));
1376
1377 stop_read_watcher(c);
1378 stop_read_timer(c);
1379 change_responding_state(c, RESPOND_SHUTDOWN);
1380 change_receiving_state(c, RECEIVE_SHUTDOWN);
1381 conn_write_ready(c);
1382 }
1383
1384 INLINE_UNLESS_DEBUG bool
str_eq(const char * a,int a_len,const char * b,int b_len)1385 str_eq(const char *a, int a_len, const char *b, int b_len)
1386 {
1387 if (a_len != b_len) return 0;
1388 if (a == b) return 1;
1389 int i;
1390 for (i=0; i<a_len && i<b_len; i++) {
1391 if (a[i] != b[i]) return 0;
1392 }
1393 return 1;
1394 }
1395
1396 /*
1397 * Compares two strings, assumes that the first string is already lower-cased
1398 */
1399 INLINE_UNLESS_DEBUG bool
str_case_eq(const char * a,int a_len,const char * b,int b_len)1400 str_case_eq(const char *a, int a_len, const char *b, int b_len)
1401 {
1402 if (a_len != b_len) return 0;
1403 if (a == b) return 1;
1404 int i;
1405 for (i=0; i<a_len && i<b_len; i++) {
1406 if (a[i] != tolower(b[i])) return 0;
1407 }
1408 return 1;
1409 }
1410
1411 INLINE_UNLESS_DEBUG int
hex_decode(const char ch)1412 hex_decode(const char ch)
1413 {
1414 if (likely('0' <= ch && ch <= '9'))
1415 return ch - '0';
1416 else if ('A' <= ch && ch <= 'F')
1417 return ch - 'A' + 10;
1418 else if ('a' <= ch && ch <= 'f')
1419 return ch - 'a' + 10;
1420 return -1;
1421 }
1422
1423 static void
uri_decode_sv(SV * sv)1424 uri_decode_sv (SV *sv)
1425 {
1426 STRLEN len;
1427 char *ptr, *end, *decoded;
1428
1429 ptr = SvPV(sv, len);
1430 end = SvEND(sv);
1431
1432 // quickly scan for % so we can ignore decoding that portion of the string
1433 while (ptr < end) {
1434 if (unlikely(*ptr == '%')) goto needs_decode;
1435 ptr++;
1436 }
1437 return;
1438
1439 needs_decode:
1440
1441 // Up until ptr have been "decoded" already by virtue of those chars not
1442 // being encoded.
1443 decoded = ptr;
1444
1445 for (; ptr < end; ptr++) {
1446 if (unlikely(*ptr == '%') && likely(end - ptr >= 2)) {
1447 int c1 = hex_decode(ptr[1]);
1448 int c2 = hex_decode(ptr[2]);
1449 if (likely(c1 != -1 && c2 != -1)) {
1450 *decoded++ = (c1 << 4) + c2;
1451 ptr += 2;
1452 continue;
1453 }
1454 }
1455 *decoded++ = *ptr;
1456 }
1457
1458 *decoded = '\0'; // play nice with C
1459
1460 ptr = SvPV_nolen(sv);
1461 SvCUR_set(sv, decoded-ptr);
1462 }
1463
1464 static void
feersum_init_tmpl_env(pTHX)1465 feersum_init_tmpl_env(pTHX)
1466 {
1467 HV *e;
1468 e = newHV();
1469
1470 // constants
1471 hv_stores(e, "psgi.version", newRV((SV*)psgi_ver));
1472 hv_stores(e, "psgi.url_scheme", newSVpvs("http"));
1473 hv_stores(e, "psgi.run_once", &PL_sv_no);
1474 hv_stores(e, "psgi.nonblocking", &PL_sv_yes);
1475 hv_stores(e, "psgi.multithread", &PL_sv_no);
1476 hv_stores(e, "psgi.multiprocess", &PL_sv_no);
1477 hv_stores(e, "psgi.streaming", &PL_sv_yes);
1478 hv_stores(e, "psgi.errors", newRV((SV*)PL_stderrgv));
1479 hv_stores(e, "psgix.input.buffered", &PL_sv_yes);
1480 hv_stores(e, "psgix.output.buffered", &PL_sv_yes);
1481 hv_stores(e, "psgix.body.scalar_refs", &PL_sv_yes);
1482 hv_stores(e, "psgix.output.guard", &PL_sv_yes);
1483 hv_stores(e, "SCRIPT_NAME", newSVpvs(""));
1484
1485 // placeholders that get defined for every request
1486 hv_stores(e, "SERVER_PROTOCOL", &PL_sv_undef);
1487 hv_stores(e, "SERVER_NAME", &PL_sv_undef);
1488 hv_stores(e, "SERVER_PORT", &PL_sv_undef);
1489 hv_stores(e, "REQUEST_URI", &PL_sv_undef);
1490 hv_stores(e, "REQUEST_METHOD", &PL_sv_undef);
1491 hv_stores(e, "PATH_INFO", &PL_sv_undef);
1492 hv_stores(e, "REMOTE_ADDR", &PL_sv_placeholder);
1493 hv_stores(e, "REMOTE_PORT", &PL_sv_placeholder);
1494
1495 // defaults that get changed for some requests
1496 hv_stores(e, "psgi.input", &PL_sv_undef);
1497 hv_stores(e, "CONTENT_LENGTH", newSViv(0));
1498 hv_stores(e, "QUERY_STRING", newSVpvs(""));
1499
1500 // anticipated headers
1501 hv_stores(e, "CONTENT_TYPE", &PL_sv_placeholder);
1502 hv_stores(e, "HTTP_HOST", &PL_sv_placeholder);
1503 hv_stores(e, "HTTP_USER_AGENT", &PL_sv_placeholder);
1504 hv_stores(e, "HTTP_ACCEPT", &PL_sv_placeholder);
1505 hv_stores(e, "HTTP_ACCEPT_LANGUAGE", &PL_sv_placeholder);
1506 hv_stores(e, "HTTP_ACCEPT_CHARSET", &PL_sv_placeholder);
1507 hv_stores(e, "HTTP_KEEP_ALIVE", &PL_sv_placeholder);
1508 hv_stores(e, "HTTP_CONNECTION", &PL_sv_placeholder);
1509 hv_stores(e, "HTTP_REFERER", &PL_sv_placeholder);
1510 hv_stores(e, "HTTP_COOKIE", &PL_sv_placeholder);
1511 hv_stores(e, "HTTP_IF_MODIFIED_SINCE", &PL_sv_placeholder);
1512 hv_stores(e, "HTTP_IF_NONE_MATCH", &PL_sv_placeholder);
1513 hv_stores(e, "HTTP_CACHE_CONTROL", &PL_sv_placeholder);
1514
1515 hv_stores(e, "psgix.io", &PL_sv_placeholder);
1516
1517 feersum_tmpl_env = e;
1518 }
1519
1520 static HV*
feersum_env(pTHX_ struct feer_conn * c)1521 feersum_env(pTHX_ struct feer_conn *c)
1522 {
1523 HV *e;
1524 SV **hsv;
1525 int i,j;
1526 struct feer_req *r = c->req;
1527
1528 if (unlikely(!feersum_tmpl_env))
1529 feersum_init_tmpl_env(aTHX);
1530 e = newHVhv(feersum_tmpl_env);
1531
1532 trace("generating header (fd %d) %.*s\n",
1533 c->fd, (int)r->path_len, r->path);
1534
1535 SV *path = newSVpvn(r->path, r->path_len);
1536 hv_stores(e, "SERVER_NAME", newSVsv(feer_server_name));
1537 hv_stores(e, "SERVER_PORT", newSVsv(feer_server_port));
1538 hv_stores(e, "REQUEST_URI", path);
1539 hv_stores(e, "REQUEST_METHOD", newSVpvn(r->method,r->method_len));
1540 hv_stores(e, "SERVER_PROTOCOL", (r->minor_version == 1) ?
1541 newSVsv(psgi_serv11) : newSVsv(psgi_serv10));
1542
1543 SV *addr = &PL_sv_undef;
1544 SV *port = &PL_sv_undef;
1545 const char *str_addr;
1546 unsigned short s_port;
1547
1548 if (c->sa->sa_family == AF_INET) {
1549 struct sockaddr_in *in = (struct sockaddr_in *)c->sa;
1550 addr = newSV(INET_ADDRSTRLEN);
1551 str_addr = inet_ntop(AF_INET,&in->sin_addr,SvPVX(addr),INET_ADDRSTRLEN);
1552 s_port = ntohs(in->sin_port);
1553 }
1554 #ifdef AF_INET6
1555 else if (c->sa->sa_family == AF_INET6) {
1556 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)c->sa;
1557 addr = newSV(INET6_ADDRSTRLEN);
1558 str_addr = inet_ntop(AF_INET6,&in6->sin6_addr,SvPVX(addr),INET6_ADDRSTRLEN);
1559 s_port = ntohs(in6->sin6_port);
1560 }
1561 #endif
1562 #ifdef AF_UNIX
1563 else if (c->sa->sa_family == AF_UNIX) {
1564 str_addr = "unix";
1565 addr = newSV(sizeof(str_addr));
1566 memcpy(SvPVX(addr), str_addr, sizeof(str_addr));
1567 s_port = 0;
1568 }
1569 #endif
1570
1571 if (likely(str_addr)) {
1572 SvCUR(addr) = strlen(SvPVX(addr));
1573 SvPOK_on(addr);
1574 port = newSViv(s_port);
1575 }
1576 hv_stores(e, "REMOTE_ADDR", addr);
1577 hv_stores(e, "REMOTE_PORT", port);
1578
1579 if (unlikely(c->expected_cl > 0)) {
1580 hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
1581 hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c,0));
1582 }
1583 else if (request_cb_is_psgi) {
1584 // TODO: make psgi.input a valid, but always empty stream for PSGI mode?
1585 }
1586
1587 if (request_cb_is_psgi) {
1588 SV *fake_fh = newSViv(c->fd); // just some random dummy value
1589 SV *selfref = sv_2mortal(feer_conn_2sv(c));
1590 sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
1591 hv_stores(e, "psgix.io", fake_fh);
1592 }
1593
1594 {
1595 const char *qpos = r->path;
1596 SV *pinfo, *qstr;
1597
1598 // rather than memchr, for speed:
1599 while (*qpos != '?' && qpos < r->path + r->path_len)
1600 qpos++;
1601
1602 if (*qpos == '?') {
1603 pinfo = newSVpvn(r->path, (qpos - r->path));
1604 qpos++;
1605 qstr = newSVpvn(qpos, r->path_len - (qpos - r->path));
1606 }
1607 else {
1608 pinfo = newSVsv(path);
1609 qstr = NULL; // use template default
1610 }
1611 uri_decode_sv(pinfo);
1612 hv_stores(e, "PATH_INFO", pinfo);
1613 if (qstr != NULL) // hv template defaults QUERY_STRING to empty
1614 hv_stores(e, "QUERY_STRING", qstr);
1615 }
1616
1617 SV *val = NULL;
1618 char *kbuf;
1619 size_t kbuflen = 64;
1620 Newx(kbuf, kbuflen, char);
1621 kbuf[0]='H'; kbuf[1]='T'; kbuf[2]='T'; kbuf[3]='P'; kbuf[4]='_';
1622
1623 for (i=0; i<r->num_headers; i++) {
1624 struct phr_header *hdr = &(r->headers[i]);
1625 if (unlikely(hdr->name == NULL && val != NULL)) {
1626 trace("... multiline %.*s\n", (int)hdr->value_len, hdr->value);
1627 sv_catpvn(val, hdr->value, hdr->value_len);
1628 continue;
1629 }
1630 else if (unlikely(str_case_eq(
1631 STR_WITH_LEN("content-length"), hdr->name, hdr->name_len)))
1632 {
1633 // content length shouldn't show up as HTTP_CONTENT_LENGTH but
1634 // as CONTENT_LENGTH in the env-hash.
1635 continue;
1636 }
1637 else if (unlikely(str_case_eq(
1638 STR_WITH_LEN("content-type"), hdr->name, hdr->name_len)))
1639 {
1640 hv_stores(e, "CONTENT_TYPE",newSVpvn(hdr->value, hdr->value_len));
1641 continue;
1642 }
1643
1644 size_t klen = 5+hdr->name_len;
1645 if (kbuflen < klen) {
1646 kbuflen = klen;
1647 kbuf = Renew(kbuf, kbuflen, char);
1648 }
1649 char *key = kbuf + 5;
1650 for (j=0; j<hdr->name_len; j++) {
1651 char n = hdr->name[j];
1652 *key++ = (n == '-') ? '_' : toupper(n);
1653 }
1654
1655 SV **val = hv_fetch(e, kbuf, klen, 1);
1656 trace("adding header to env (fd %d) %.*s: %.*s\n",
1657 c->fd, (int)klen, kbuf, (int)hdr->value_len, hdr->value);
1658
1659 assert(val != NULL); // "fetch is store" flag should ensure this
1660 if (unlikely(SvPOK(*val))) {
1661 trace("... is multivalue\n");
1662 // extend header with comma
1663 sv_catpvn(*val, ", ", 2);
1664 sv_catpvn(*val, hdr->value, hdr->value_len);
1665 }
1666 else {
1667 // change from undef to a real value
1668 sv_setpvn(*val, hdr->value, hdr->value_len);
1669 }
1670 }
1671 Safefree(kbuf);
1672
1673 return e;
1674 }
1675
1676 static void
feersum_start_response(pTHX_ struct feer_conn * c,SV * message,AV * headers,int streaming)1677 feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
1678 int streaming)
1679 {
1680 const char *ptr;
1681 I32 i;
1682
1683 trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
1684
1685 if (unlikely(c->responding != RESPOND_NOT_STARTED))
1686 croak("already responding?!");
1687 change_responding_state(c, streaming ? RESPOND_STREAMING : RESPOND_NORMAL);
1688
1689 if (unlikely(!SvOK(message) || !(SvIOK(message) || SvPOK(message)))) {
1690 croak("Must define an HTTP status code or message");
1691 }
1692
1693 I32 avl = av_len(headers);
1694 if (unlikely(avl+1 % 2 == 1)) {
1695 croak("expected even-length array, got %d", avl+1);
1696 }
1697
1698 // int or 3 chars? use a stock message
1699 UV code = 0;
1700 if (SvIOK(message))
1701 code = SvIV(message);
1702 else if (SvUOK(message))
1703 code = SvUV(message);
1704 else {
1705 const int numtype = grok_number(SvPVX_const(message),3,&code);
1706 if (unlikely(numtype != IS_NUMBER_IN_UV))
1707 code = 0;
1708 }
1709 trace2("starting response fd=%d code=%"UVuf"\n",c->fd,code);
1710
1711 if (unlikely(!code))
1712 croak("first parameter is not a number or doesn't start with digits");
1713
1714 // for PSGI it's always just an IV so optimize for that
1715 if (likely(!SvPOK(message) || SvCUR(message) == 3)) {
1716 ptr = http_code_to_msg(code);
1717 message = sv_2mortal(newSVpvf("%"UVuf" %s",code,ptr));
1718 }
1719
1720 // don't generate or strip Content-Length headers for 304 or 1xx
1721 c->auto_cl = (code == 304 || (100 <= code && code <= 199)) ? 0 : 1;
1722
1723 add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
1724 add_sv_to_wbuf(c, message);
1725 add_crlf_to_wbuf(c);
1726
1727 for (i=0; i<avl; i+= 2) {
1728 SV **hdr = av_fetch(headers, i, 0);
1729 if (unlikely(!hdr || !SvOK(*hdr))) {
1730 trace("skipping undef header key");
1731 continue;
1732 }
1733
1734 SV **val = av_fetch(headers, i+1, 0);
1735 if (unlikely(!val || !SvOK(*val))) {
1736 trace("skipping undef header value");
1737 continue;
1738 }
1739
1740 STRLEN hlen;
1741 const char *hp = SvPV(*hdr, hlen);
1742 if (likely(c->auto_cl) &&
1743 unlikely(str_case_eq("content-length",14,hp,hlen)))
1744 {
1745 trace("ignoring content-length header in the response\n");
1746 continue;
1747 }
1748
1749 add_sv_to_wbuf(c, *hdr);
1750 add_const_to_wbuf(c, ": ", 2);
1751 add_sv_to_wbuf(c, *val);
1752 add_crlf_to_wbuf(c);
1753 }
1754
1755 if (streaming) {
1756 if (c->is_http11)
1757 add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
1758 else
1759 add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
1760 }
1761
1762 conn_write_ready(c);
1763 }
1764
1765 static size_t
feersum_write_whole_body(pTHX_ struct feer_conn * c,SV * body)1766 feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
1767 {
1768 size_t RETVAL;
1769 int i;
1770 bool body_is_string = 0;
1771 STRLEN cur;
1772
1773 if (c->responding != RESPOND_NORMAL)
1774 croak("can't use write_whole_body when in streaming mode");
1775
1776 if (!SvOK(body)) {
1777 body = sv_2mortal(newSVpvs(""));
1778 body_is_string = 1;
1779 }
1780 else if (SvROK(body)) {
1781 SV *refd = SvRV(body);
1782 if (SvOK(refd) && !SvROK(refd)) {
1783 body = refd;
1784 body_is_string = 1;
1785 }
1786 else if (SvTYPE(refd) != SVt_PVAV) {
1787 croak("body must be a scalar, scalar reference or array reference");
1788 }
1789 }
1790 else {
1791 body_is_string = 1;
1792 }
1793
1794 SV *cl_sv; // content-length future
1795 struct iovec *cl_iov;
1796 if (likely(c->auto_cl))
1797 add_placeholder_to_wbuf(c, &cl_sv, &cl_iov);
1798 else
1799 add_crlf_to_wbuf(c);
1800
1801 if (body_is_string) {
1802 cur = add_sv_to_wbuf(c,body);
1803 RETVAL = cur;
1804 }
1805 else {
1806 AV *abody = (AV*)SvRV(body);
1807 I32 amax = av_len(abody);
1808 RETVAL = 0;
1809 for (i=0; i<=amax; i++) {
1810 SV *sv = fetch_av_normal(aTHX_ abody, i);
1811 if (unlikely(!sv)) continue;
1812 cur = add_sv_to_wbuf(c,sv);
1813 trace("body part i=%d sv=%p cur=%"Sz_uf"\n", i, sv, (Sz)cur);
1814 RETVAL += cur;
1815 }
1816 }
1817
1818 if (likely(c->auto_cl)) {
1819 sv_setpvf(cl_sv, "Content-Length: %"Sz_uf"" CRLFx2, (Sz)RETVAL);
1820 update_wbuf_placeholder(c, cl_sv, cl_iov);
1821 }
1822
1823 change_responding_state(c, RESPOND_SHUTDOWN);
1824 conn_write_ready(c);
1825 return RETVAL;
1826 }
1827
1828 static void
feersum_start_psgi_streaming(pTHX_ struct feer_conn * c,SV * streamer)1829 feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
1830 {
1831 dSP;
1832 ENTER;
1833 SAVETMPS;
1834 PUSHMARK(SP);
1835 mXPUSHs(feer_conn_2sv(c));
1836 XPUSHs(streamer);
1837 PUTBACK;
1838 call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
1839 SPAGAIN;
1840 if (unlikely(SvTRUE(ERRSV))) {
1841 call_died(aTHX_ c, "PSGI stream initiator");
1842 }
1843 PUTBACK;
1844 FREETMPS;
1845 LEAVE;
1846 }
1847
1848 static void
feersum_handle_psgi_response(pTHX_ struct feer_conn * c,SV * ret,bool can_recurse)1849 feersum_handle_psgi_response(
1850 pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
1851 {
1852 if (unlikely(!SvOK(ret) || !SvROK(ret))) {
1853 sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
1854 call_died(aTHX_ c, "PSGI request");
1855 return;
1856 }
1857
1858 if (SvOK(ret) && unlikely(!IsArrayRef(ret))) {
1859 if (likely(can_recurse)) {
1860 trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
1861 feersum_start_psgi_streaming(aTHX_ c, ret);
1862 }
1863 else {
1864 sv_setpvs(ERRSV, "PSGI attempt to recurse in a streaming callback");
1865 call_died(aTHX_ c, "PSGI request");
1866 }
1867 return;
1868 }
1869
1870 AV *psgi_triplet = (AV*)SvRV(ret);
1871 if (unlikely(av_len(psgi_triplet)+1 != 3)) {
1872 sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
1873 call_died(aTHX_ c, "PSGI request");
1874 return;
1875 }
1876
1877 trace("PSGI response triplet, c=%p av=%p\n", c, psgi_triplet);
1878 // we know there's three elems so *should* be safe to de-ref
1879 SV *msg = *(av_fetch(psgi_triplet,0,0));
1880 SV *hdrs = *(av_fetch(psgi_triplet,1,0));
1881 SV *body = *(av_fetch(psgi_triplet,2,0));
1882
1883 AV *headers;
1884 if (IsArrayRef(hdrs))
1885 headers = (AV*)SvRV(hdrs);
1886 else {
1887 sv_setpvs(ERRSV, "PSGI Headers must be an array-ref");
1888 call_died(aTHX_ c, "PSGI request");
1889 return;
1890 }
1891
1892 if (likely(IsArrayRef(body))) {
1893 feersum_start_response(aTHX_ c, msg, headers, 0);
1894 feersum_write_whole_body(aTHX_ c, body);
1895 }
1896 else if (likely(SvROK(body))) { // probaby an IO::Handle-like object
1897 feersum_start_response(aTHX_ c, msg, headers, 1);
1898 c->poll_write_cb = newSVsv(body);
1899 c->poll_write_cb_is_io_handle = 1;
1900 conn_write_ready(c);
1901 }
1902 else {
1903 sv_setpvs(ERRSV, "Expected PSGI array-ref or IO::Handle-like body");
1904 call_died(aTHX_ c, "PSGI request");
1905 return;
1906 }
1907 }
1908
1909 static int
feersum_close_handle(pTHX_ struct feer_conn * c,bool is_writer)1910 feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
1911 {
1912 int RETVAL;
1913 if (is_writer) {
1914 trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
1915 if (c->poll_write_cb) {
1916 SvREFCNT_dec(c->poll_write_cb);
1917 c->poll_write_cb = NULL;
1918 }
1919 if (c->responding < RESPOND_SHUTDOWN) {
1920 finish_wbuf(c);
1921 conn_write_ready(c);
1922 change_responding_state(c, RESPOND_SHUTDOWN);
1923 }
1924 RETVAL = 1;
1925 }
1926 else {
1927 trace("close reader fd=%d, c=%p\n", c->fd, c);
1928 // TODO: ref-dec poll_read_cb
1929 if (c->rbuf) {
1930 SvREFCNT_dec(c->rbuf);
1931 c->rbuf = NULL;
1932 }
1933 RETVAL = shutdown(c->fd, SHUT_RD);
1934 change_receiving_state(c, RECEIVE_SHUTDOWN);
1935 }
1936
1937 // disassociate the handle from the conn
1938 SvREFCNT_dec(c->self);
1939 return RETVAL;
1940 }
1941
1942 static SV*
feersum_conn_guard(pTHX_ struct feer_conn * c,SV * guard)1943 feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard)
1944 {
1945 if (guard) {
1946 if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
1947 c->ext_guard = SvOK(guard) ? newSVsv(guard) : NULL;
1948 }
1949 return c->ext_guard ? newSVsv(c->ext_guard) : &PL_sv_undef;
1950 }
1951
1952 static void
call_died(pTHX_ struct feer_conn * c,const char * cb_type)1953 call_died (pTHX_ struct feer_conn *c, const char *cb_type)
1954 {
1955 dSP;
1956 #if DEBUG >= 1
1957 trace("An error was thrown in the %s callback: %-p\n",cb_type,ERRSV);
1958 #endif
1959 PUSHMARK(SP);
1960 mXPUSHs(newSVsv(ERRSV));
1961 PUTBACK;
1962 call_pv("Feersum::DIED", G_DISCARD|G_EVAL|G_VOID|G_KEEPERR);
1963 SPAGAIN;
1964
1965 respond_with_server_error(c,"Request handler exception.\n",0,500);
1966 sv_setsv(ERRSV, &PL_sv_undef);
1967 }
1968
1969 static void
call_request_callback(struct feer_conn * c)1970 call_request_callback (struct feer_conn *c)
1971 {
1972 dTHX;
1973 dSP;
1974 int flags;
1975 c->in_callback++;
1976 SvREFCNT_inc_void_NN(c->self);
1977
1978 trace("request callback c=%p\n", c);
1979
1980 ENTER;
1981 SAVETMPS;
1982 PUSHMARK(SP);
1983
1984 if (request_cb_is_psgi) {
1985 HV *env = feersum_env(aTHX_ c);
1986 mXPUSHs(newRV_noinc((SV*)env));
1987 flags = G_EVAL|G_SCALAR;
1988 }
1989 else {
1990 mXPUSHs(feer_conn_2sv(c));
1991 flags = G_DISCARD|G_EVAL|G_VOID;
1992 }
1993
1994 PUTBACK;
1995 int returned = call_sv(request_cb_cv, flags);
1996 SPAGAIN;
1997
1998 trace("called request callback, errsv? %d\n", SvTRUE(ERRSV) ? 1 : 0);
1999
2000 if (unlikely(SvTRUE(ERRSV))) {
2001 call_died(aTHX_ c, "request");
2002 returned = 0; // pretend nothing got returned
2003 }
2004
2005 SV *psgi_response;
2006 if (request_cb_is_psgi && likely(returned >= 1)) {
2007 psgi_response = POPs;
2008 SvREFCNT_inc_void_NN(psgi_response);
2009 }
2010
2011 trace("leaving request callback\n");
2012 PUTBACK;
2013
2014 if (request_cb_is_psgi && likely(returned >= 1)) {
2015 feersum_handle_psgi_response(aTHX_ c, psgi_response, 1); // can_recurse
2016 SvREFCNT_dec(psgi_response);
2017 }
2018
2019 //fangyousong
2020 if (request_cb_is_psgi && c->expected_cl > 0) {
2021 SvREFCNT_dec(c->self);
2022 }
2023
2024
2025 c->in_callback--;
2026 SvREFCNT_dec(c->self);
2027
2028 FREETMPS;
2029 LEAVE;
2030 }
2031
2032 static void
call_poll_callback(struct feer_conn * c,bool is_write)2033 call_poll_callback (struct feer_conn *c, bool is_write)
2034 {
2035 dTHX;
2036 dSP;
2037
2038 SV *cb = (is_write) ? c->poll_write_cb : NULL;
2039
2040 if (unlikely(cb == NULL)) return;
2041
2042 c->in_callback++;
2043
2044 trace("%s poll callback c=%p cbrv=%p\n",
2045 is_write ? "write" : "read", c, cb);
2046
2047 ENTER;
2048 SAVETMPS;
2049 PUSHMARK(SP);
2050 mXPUSHs(new_feer_conn_handle(aTHX_ c, is_write));
2051 PUTBACK;
2052 call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
2053 SPAGAIN;
2054
2055 trace("called %s poll callback, errsv? %d\n",
2056 is_write ? "write" : "read", SvTRUE(ERRSV) ? 1 : 0);
2057
2058 if (unlikely(SvTRUE(ERRSV))) {
2059 call_died(aTHX_ c, is_write ? "write poll" : "read poll");
2060 }
2061
2062 trace("leaving %s poll callback\n", is_write ? "write" : "read");
2063 PUTBACK;
2064 FREETMPS;
2065 LEAVE;
2066
2067 c->in_callback--;
2068 }
2069
2070 static void
pump_io_handle(struct feer_conn * c,SV * io)2071 pump_io_handle (struct feer_conn *c, SV *io)
2072 {
2073 dTHX;
2074 dSP;
2075
2076 if (unlikely(io == NULL)) return;
2077
2078 c->in_callback++;
2079
2080 trace("pump io handle %d\n", c->fd);
2081
2082 ENTER;
2083 SAVETMPS;
2084
2085 // Emulate `local $/ = \4096;`
2086 SV *old_rs = PL_rs;
2087 PL_rs = sv_2mortal(newRV_noinc(newSViv(4096)));
2088 sv_setsv(get_sv("/", GV_ADD), PL_rs);
2089
2090 PUSHMARK(SP);
2091 XPUSHs(c->poll_write_cb);
2092 PUTBACK;
2093 int returned = call_method("getline", G_SCALAR|G_EVAL);
2094 SPAGAIN;
2095
2096 trace("called getline on io handle fd=%d errsv=%d returned=%d\n",
2097 c->fd, SvTRUE(ERRSV) ? 1 : 0, returned);
2098
2099 if (unlikely(SvTRUE(ERRSV))) {
2100 call_died(aTHX_ c, "getline on io handle");
2101 goto done_pump_io;
2102 }
2103
2104 SV *ret = NULL;
2105 if (returned > 0)
2106 ret = POPs;
2107 if (ret && SvMAGICAL(ret))
2108 ret = sv_2mortal(newSVsv(ret));
2109
2110 if (unlikely(!ret || !SvOK(ret))) {
2111 // returned undef, so call the close method out of niceity
2112 PUSHMARK(SP);
2113 XPUSHs(c->poll_write_cb);
2114 PUTBACK;
2115 call_method("close", G_VOID|G_DISCARD|G_EVAL);
2116 SPAGAIN;
2117
2118 if (unlikely(SvTRUE(ERRSV))) {
2119 trouble("Couldn't close body IO handle: %-p",ERRSV);
2120 }
2121
2122 SvREFCNT_dec(c->poll_write_cb);
2123 c->poll_write_cb = NULL;
2124 finish_wbuf(c);
2125 change_responding_state(c, RESPOND_SHUTDOWN);
2126
2127 goto done_pump_io;
2128 }
2129
2130 if (c->is_http11)
2131 add_chunk_sv_to_wbuf(c, ret);
2132 else
2133 add_sv_to_wbuf(c, ret);
2134
2135 done_pump_io:
2136 trace("leaving pump io handle %d\n", c->fd);
2137
2138 PUTBACK;
2139 FREETMPS;
2140 LEAVE;
2141
2142 PL_rs = old_rs;
2143 sv_setsv(get_sv("/", GV_ADD), old_rs);
2144
2145 c->in_callback--;
2146 }
2147
2148 static int
psgix_io_svt_get(pTHX_ SV * sv,MAGIC * mg)2149 psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
2150 {
2151 dSP;
2152
2153 struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
2154 trace("invoking psgix.io magic for fd=%d\n", c->fd);
2155
2156 sv_unmagic(sv, PERL_MAGIC_ext);
2157
2158 ENTER;
2159 SAVETMPS;
2160
2161 PUSHMARK(SP);
2162 XPUSHs(sv);
2163 mXPUSHs(newSViv(c->fd));
2164 PUTBACK;
2165
2166 call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
2167 SPAGAIN;
2168
2169 if (unlikely(SvTRUE(ERRSV))) {
2170 call_died(aTHX_ c, "psgix.io magic");
2171 }
2172 else {
2173 SV *io_glob = SvRV(sv);
2174 GvSV(io_glob) = newRV_inc(c->self);
2175
2176 // Put whatever remainder data into the socket buffer.
2177 // Optimizes for the websocket case.
2178 //
2179 // TODO: For keepalive support the opposite operation is required;
2180 // pull the data out of the socket buffer and back into feersum.
2181 if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
2182 STRLEN rbuf_len;
2183 const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
2184 IO *io = GvIOp(io_glob);
2185 assert(io != NULL);
2186 PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
2187 sv_setpvs(c->rbuf, "");
2188 }
2189
2190 stop_read_watcher(c);
2191 stop_read_timer(c);
2192 // don't stop write watcher in case there's outstanding data.
2193 }
2194
2195 PUTBACK;
2196 FREETMPS;
2197 LEAVE;
2198 return 0;
2199 }
2200
2201 MODULE = Feersum PACKAGE = Feersum
2202
2203 PROTOTYPES: ENABLE
2204
2205 void
set_server_name_and_port(SV * self,SV * name,SV * port)2206 set_server_name_and_port(SV *self, SV *name, SV *port)
2207 PPCODE:
2208 {
2209 if (feer_server_name)
2210 SvREFCNT_dec(feer_server_name);
2211 feer_server_name = newSVsv(name);
2212 SvREADONLY_on(feer_server_name);
2213
2214 if (feer_server_port)
2215 SvREFCNT_dec(feer_server_port);
2216 feer_server_port = newSVsv(port);
2217 SvREADONLY_on(feer_server_port);
2218 }
2219
2220 void
accept_on_fd(SV * self,int fd)2221 accept_on_fd(SV *self, int fd)
2222 PPCODE:
2223 {
2224 trace("going to accept on %d\n",fd);
2225 feersum_ev_loop = EV_DEFAULT;
2226
2227 signal(SIGPIPE, SIG_IGN);
2228
2229 ev_prepare_init(&ep, prepare_cb);
2230 ev_prepare_start(feersum_ev_loop, &ep);
2231
2232 ev_check_init(&ec, check_cb);
2233 ev_check_start(feersum_ev_loop, &ec);
2234
2235 ev_idle_init(&ei, idle_cb);
2236
2237 ev_io_init(&accept_w, accept_cb, fd, EV_READ);
2238 }
2239
2240 void
unlisten(SV * self)2241 unlisten (SV *self)
2242 PPCODE:
2243 {
2244 trace("stopping accept\n");
2245 ev_prepare_stop(feersum_ev_loop, &ep);
2246 ev_check_stop(feersum_ev_loop, &ec);
2247 ev_idle_stop(feersum_ev_loop, &ei);
2248 ev_io_stop(feersum_ev_loop, &accept_w);
2249 }
2250
2251 void
request_handler(SV * self,SV * cb)2252 request_handler(SV *self, SV *cb)
2253 PROTOTYPE: $&
2254 ALIAS:
2255 psgi_request_handler = 1
2256 PPCODE:
2257 {
2258 if (unlikely(!SvOK(cb) || !SvROK(cb)))
2259 croak("can't supply an undef handler");
2260 if (request_cb_cv)
2261 SvREFCNT_dec(request_cb_cv);
2262 request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
2263 request_cb_is_psgi = ix;
2264 trace("assigned %s request handler %p\n",
2265 request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
2266 }
2267
2268 void
graceful_shutdown(SV * self,SV * cb)2269 graceful_shutdown (SV *self, SV *cb)
2270 PROTOTYPE: $&
2271 PPCODE:
2272 {
2273 if (!IsCodeRef(cb))
2274 croak("must supply a code reference");
2275 if (unlikely(shutting_down))
2276 croak("already shutting down");
2277 shutdown_cb_cv = newSVsv(cb);
2278 trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);
2279
2280 shutting_down = 1;
2281 ev_io_stop(feersum_ev_loop, &accept_w);
2282 close(accept_w.fd);
2283
2284 if (active_conns <= 0) {
2285 trace("shutdown is immediate\n");
2286 dSP;
2287 ENTER;
2288 SAVETMPS;
2289 PUSHMARK(SP);
2290 call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2291 PUTBACK;
2292 trace3("called shutdown handler\n");
2293 SvREFCNT_dec(shutdown_cb_cv);
2294 shutdown_cb_cv = NULL;
2295 FREETMPS;
2296 LEAVE;
2297 }
2298 }
2299
2300 double
2301 read_timeout (SV *self, ...)
2302 PROTOTYPE: $;$
2303 CODE:
2304 {
2305 if (items <= 1) {
2306 RETVAL = read_timeout;
2307 }
2308 else if (items == 2) {
2309 SV *duration = ST(1);
2310 NV new_read_timeout = SvNV(duration);
2311 if (!(new_read_timeout > 0.0)) {
2312 croak("must set a positive (non-zero) value for the timeout");
2313 }
2314 read_timeout = (double) new_read_timeout;
2315 }
2316 }
2317 OUTPUT:
2318 RETVAL
2319
2320 void
DESTROY(SV * self)2321 DESTROY (SV *self)
2322 PPCODE:
2323 {
2324 trace3("DESTROY server\n");
2325 if (request_cb_cv)
2326 SvREFCNT_dec(request_cb_cv);
2327 }
2328
2329 MODULE = Feersum PACKAGE = Feersum::Connection::Handle
2330
2331 PROTOTYPES: ENABLE
2332
2333 int
2334 fileno (feer_conn_handle *hdl)
2335 CODE:
2336 RETVAL = c->fd;
2337 OUTPUT:
2338 RETVAL
2339
2340 void
DESTROY(SV * self)2341 DESTROY (SV *self)
2342 ALIAS:
2343 Feersum::Connection::Reader::DESTROY = 1
2344 Feersum::Connection::Writer::DESTROY = 2
2345 PPCODE:
2346 {
2347 feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);
2348
2349 if (hdl == NULL) {
2350 trace3("DESTROY handle (closed) class=%s\n",
2351 HvNAME(SvSTASH(SvRV(self))));
2352 }
2353 else {
2354 struct feer_conn *c = (struct feer_conn *)hdl;
2355 trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
2356 HvNAME(SvSTASH(SvRV(self))));
2357 if (ix == 2) // only close the writer on destruction
2358 feersum_close_handle(aTHX_ c, 1);
2359 }
2360 }
2361
2362 SV*
2363 read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
2364 PROTOTYPE: $$$;$
2365 PPCODE:
2366 {
2367 STRLEN buf_len = 0, src_len = 0;
2368 ssize_t offset;
2369 char *buf_ptr, *src_ptr;
2370
2371 // optimizes for the "read everything" case.
2372
2373 if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
2374 offset = SvIV(ST(3));
2375 else
2376 offset = 0;
2377
2378 trace("read fd=%d : request len=%"Sz_uf" off=%"Ssz_df"\n",
2379 c->fd, (Sz)len, (Ssz)offset);
2380
2381 if (unlikely(c->receiving <= RECEIVE_HEADERS))
2382 // XXX as of 0.984 this is dead code
2383 croak("can't call read() until the body begins to arrive");
2384
2385 if (!SvOK(buf) || !SvPOK(buf)) {
2386 // force to a PV and ensure buffer space
2387 sv_setpvn(buf,"",0);
2388 SvGROW(buf, len+1);
2389 }
2390
2391 if (unlikely(SvREADONLY(buf)))
2392 croak("buffer must not be read-only");
2393
2394 if (unlikely(len == 0))
2395 XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string
2396
2397 buf_ptr = SvPV(buf, buf_len);
2398 if (likely(c->rbuf))
2399 src_ptr = SvPV(c->rbuf, src_len);
2400
2401 if (unlikely(len < 0))
2402 len = src_len;
2403
2404 if (unlikely(offset < 0))
2405 offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;
2406
2407 if (unlikely(len + offset > src_len))
2408 len = src_len - offset;
2409
2410 trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
2411 c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);
2412
2413 if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
2414 trace2("rbuf empty during read %d\n", c->fd);
2415 if (c->receiving == RECEIVE_SHUTDOWN) {
2416 XSRETURN_IV(0);
2417 }
2418 else {
2419 errno = EAGAIN;
2420 XSRETURN_UNDEF;
2421 }
2422 }
2423
2424 if (likely(len == src_len && offset == 0)) {
2425 trace2("appending entire rbuf fd=%d\n", c->fd);
2426 sv_2mortal(c->rbuf); // allow pv to be stolen
2427 if (likely(buf_len == 0)) {
2428 sv_setsv(buf, c->rbuf);
2429 }
2430 else {
2431 sv_catsv(buf, c->rbuf);
2432 }
2433 c->rbuf = NULL;
2434 }
2435 else {
2436 src_ptr += offset;
2437 trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
2438 c->fd, len, offset, src_ptr);
2439 SvGROW(buf, SvCUR(buf) + len);
2440 sv_catpvn(buf, src_ptr, len);
2441 if (likely(items == 3)) {
2442 // there wasn't an offset param, throw away beginning
2443 sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
2444 }
2445 }
2446
2447 XSRETURN_IV(len);
2448 }
2449
2450 STRLEN
2451 write (feer_conn_handle *hdl, ...)
2452 PROTOTYPE: $;$
2453 CODE:
2454 {
2455 if (unlikely(c->responding != RESPOND_STREAMING))
2456 croak("can only call write in streaming mode");
2457
2458 SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
2459 if (unlikely(!body || !SvOK(body)))
2460 XSRETURN_IV(0);
2461
2462 trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
2463 if (SvROK(body)) {
2464 SV *refd = SvRV(body);
2465 if (SvOK(refd) && SvPOK(refd)) {
2466 body = refd;
2467 }
2468 else {
2469 croak("body must be a scalar, scalar ref or undef");
2470 }
2471 }
2472 (void)SvPV(body, RETVAL);
2473
2474 if (c->is_http11)
2475 add_chunk_sv_to_wbuf(c, body);
2476 else
2477 add_sv_to_wbuf(c, body);
2478
2479 conn_write_ready(c);
2480 }
2481 OUTPUT:
2482 RETVAL
2483
2484 void
write_array(feer_conn_handle * hdl,AV * abody)2485 write_array (feer_conn_handle *hdl, AV *abody)
2486 PROTOTYPE: $$
2487 PPCODE:
2488 {
2489 if (unlikely(c->responding != RESPOND_STREAMING))
2490 croak("can only call write in streaming mode");
2491
2492 trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);
2493
2494 I32 amax = av_len(abody);
2495 int i;
2496 if (c->is_http11) {
2497 for (i=0; i<=amax; i++) {
2498 SV *sv = fetch_av_normal(aTHX_ abody, i);
2499 if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
2500 }
2501 }
2502 else {
2503 for (i=0; i<=amax; i++) {
2504 SV *sv = fetch_av_normal(aTHX_ abody, i);
2505 if (likely(sv)) add_sv_to_wbuf(c, sv);
2506 }
2507 }
2508
2509 conn_write_ready(c);
2510 }
2511
2512 int
2513 seek (feer_conn_handle *hdl, ssize_t offset, ...)
2514 PROTOTYPE: $$;$
2515 CODE:
2516 {
2517 int whence = SEEK_CUR;
2518 if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
2519 whence = SvIV(ST(2));
2520
2521 trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);
2522
2523 if (unlikely(!c->rbuf)) {
2524 // handle is effectively "closed"
2525 RETVAL = 0;
2526 }
2527 else if (offset == 0) {
2528 RETVAL = 1; // stay put for any whence
2529 }
2530 else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
2531 STRLEN len;
2532 const char *str = SvPV_const(c->rbuf, len);
2533 if (offset > len)
2534 offset = len;
2535 sv_chop(c->rbuf, str + offset);
2536 RETVAL = 1;
2537 }
2538 else if (offset < 0 && whence == SEEK_END) {
2539 STRLEN len;
2540 const char *str = SvPV_const(c->rbuf, len);
2541 offset += len; // can't be > len since block is offset<0
2542 if (offset == 0) {
2543 RETVAL = 1; // no-op, but OK
2544 }
2545 else if (offset > 0) {
2546 sv_chop(c->rbuf, str + offset);
2547 RETVAL = 1;
2548 }
2549 else {
2550 // past beginning of string
2551 RETVAL = 0;
2552 }
2553 }
2554 else {
2555 // invalid seek
2556 RETVAL = 0;
2557 }
2558 }
2559 OUTPUT:
2560 RETVAL
2561
2562 int
close(feer_conn_handle * hdl)2563 close (feer_conn_handle *hdl)
2564 PROTOTYPE: $
2565 ALIAS:
2566 Feersum::Connection::Reader::close = 1
2567 Feersum::Connection::Writer::close = 2
2568 CODE:
2569 {
2570 assert(ix);
2571 RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
2572 SvUVX(hdl_sv) = 0;
2573 }
2574 OUTPUT:
2575 RETVAL
2576
2577 void
_poll_cb(feer_conn_handle * hdl,SV * cb)2578 _poll_cb (feer_conn_handle *hdl, SV *cb)
2579 PROTOTYPE: $$
2580 ALIAS:
2581 Feersum::Connection::Reader::poll_cb = 1
2582 Feersum::Connection::Writer::poll_cb = 2
2583 PPCODE:
2584 {
2585 if (unlikely(ix < 1 || ix > 2))
2586 croak("can't call _poll_cb directly");
2587 else if (unlikely(ix == 1))
2588 croak("poll_cb for reading not yet supported"); // TODO poll_read_cb
2589
2590 if (c->poll_write_cb != NULL) {
2591 SvREFCNT_dec(c->poll_write_cb);
2592 c->poll_write_cb = NULL;
2593 }
2594
2595 if (!SvOK(cb)) {
2596 trace("unset poll_cb ix=%d\n", ix);
2597 return;
2598 }
2599 else if (unlikely(!IsCodeRef(cb)))
2600 croak("must supply a code reference to poll_cb");
2601
2602 c->poll_write_cb = newSVsv(cb);
2603 conn_write_ready(c);
2604 }
2605
2606 SV*
2607 response_guard (feer_conn_handle *hdl, ...)
2608 PROTOTYPE: $;$
2609 CODE:
2610 RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
2611 OUTPUT:
2612 RETVAL
2613
2614 MODULE = Feersum PACKAGE = Feersum::Connection
2615
2616 PROTOTYPES: ENABLE
2617
2618 SV *
2619 start_streaming (struct feer_conn *c, SV *message, AV *headers)
2620 PROTOTYPE: $$\@
2621 CODE:
2622 feersum_start_response(aTHX_ c, message, headers, 1);
2623 RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2624 OUTPUT:
2625 RETVAL
2626
2627 size_t
2628 send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
2629 PROTOTYPE: $$\@$
2630 CODE:
2631 feersum_start_response(aTHX_ c, message, headers, 0);
2632 if (unlikely(!SvOK(body)))
2633 croak("can't send_response with an undef body");
2634 RETVAL = feersum_write_whole_body(aTHX_ c, body);
2635 OUTPUT:
2636 RETVAL
2637
2638 SV*
_continue_streaming_psgi(struct feer_conn * c,SV * psgi_response)2639 _continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
2640 PROTOTYPE: $\@
2641 CODE:
2642 {
2643 AV *av;
2644 int len = 0;
2645
2646 if (IsArrayRef(psgi_response)) {
2647 av = (AV*)SvRV(psgi_response);
2648 len = av_len(av) + 1;
2649 }
2650
2651 if (len == 3) {
2652 // 0 is "don't recurse" (i.e. don't allow another code-ref)
2653 feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
2654 RETVAL = &PL_sv_undef;
2655 }
2656 else if (len == 2) {
2657 SV *message = *(av_fetch(av,0,0));
2658 SV *headers = *(av_fetch(av,1,0));
2659 if (unlikely(!IsArrayRef(headers)))
2660 croak("PSGI headers must be an array ref");
2661 feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
2662 RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
2663 }
2664 else {
2665 croak("PSGI response starter expects a 2 or 3 element array-ref");
2666 }
2667 }
2668 OUTPUT:
2669 RETVAL
2670
2671 void
2672 force_http10 (struct feer_conn *c)
2673 PROTOTYPE: $
2674 ALIAS:
2675 force_http11 = 1
2676 PPCODE:
2677 c->is_http11 = ix;
2678
2679 SV *
2680 env (struct feer_conn *c)
2681 PROTOTYPE: $
2682 CODE:
2683 RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
2684 OUTPUT:
2685 RETVAL
2686
2687 int
2688 fileno (struct feer_conn *c)
2689 CODE:
2690 RETVAL = c->fd;
2691 OUTPUT:
2692 RETVAL
2693
2694 SV*
2695 response_guard (struct feer_conn *c, ...)
2696 PROTOTYPE: $;$
2697 CODE:
2698 RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
2699 OUTPUT:
2700 RETVAL
2701
2702 void
DESTROY(struct feer_conn * c)2703 DESTROY (struct feer_conn *c)
2704 PPCODE:
2705 {
2706 int i;
2707 trace("DESTROY connection fd=%d c=%p\n", c->fd, c);
2708
2709 if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);
2710
2711 if (c->wbuf_rinq) {
2712 struct iomatrix *m;
2713 while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
2714 for (i=0; i < m->count; i++) {
2715 if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
2716 }
2717 Safefree(m);
2718 }
2719 }
2720
2721 if (likely(c->req)) {
2722 if (c->req->buf) SvREFCNT_dec(c->req->buf);
2723 Safefree(c->req);
2724 }
2725
2726 if (likely(c->sa)) free(c->sa);
2727
2728 safe_close_conn(c, "close at destruction");
2729
2730 if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);
2731
2732 if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
2733
2734 active_conns--;
2735
2736 if (unlikely(shutting_down && active_conns <= 0)) {
2737 ev_idle_stop(feersum_ev_loop, &ei);
2738 ev_prepare_stop(feersum_ev_loop, &ep);
2739 ev_check_stop(feersum_ev_loop, &ec);
2740
2741 trace3("... was last conn, going to try shutdown\n");
2742 if (shutdown_cb_cv) {
2743 PUSHMARK(SP);
2744 call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
2745 PUTBACK;
2746 trace3("... ok, called that handler\n");
2747 SvREFCNT_dec(shutdown_cb_cv);
2748 shutdown_cb_cv = NULL;
2749 }
2750 }
2751 }
2752
2753 MODULE = Feersum PACKAGE = Feersum
2754
2755 BOOT:
2756 {
2757 feer_stash = gv_stashpv("Feersum", 1);
2758 feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
2759 feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
2760 feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
2761 I_EV_API("Feersum");
2762
2763 psgi_ver = newAV();
2764 av_extend(psgi_ver, 2);
2765 av_push(psgi_ver, newSViv(1));
2766 av_push(psgi_ver, newSViv(1));
2767 SvREADONLY_on((SV*)psgi_ver);
2768
2769 psgi_serv10 = newSVpvs("HTTP/1.0");
2770 SvREADONLY_on(psgi_serv10);
2771 psgi_serv11 = newSVpvs("HTTP/1.1");
2772 SvREADONLY_on(psgi_serv11);
2773
2774 Zero(&psgix_io_vtbl, 1, MGVTBL);
2775 psgix_io_vtbl.svt_get = psgix_io_svt_get;
2776
2777 trace3("Feersum booted, iomatrix %lu "
2778 "(IOV_MAX=%u, FEERSUM_IOMATRIX_SIZE=%u), "
2779 "feer_req %lu, "
2780 "feer_conn %lu\n",
2781 (long unsigned int)sizeof(struct iomatrix),
2782 (unsigned int)IOV_MAX,
2783 (unsigned int)FEERSUM_IOMATRIX_SIZE,
2784 (long unsigned int)sizeof(struct feer_req),
2785 (long unsigned int)sizeof(struct feer_conn)
2786 );
2787 }
2788