1package Starman::Server;
2use strict;
3use base 'Net::Server::PreFork';
4
5use Data::Dump qw(dump);
6use Socket qw(IPPROTO_TCP TCP_NODELAY);
7use IO::Socket qw(:crlf);
8use HTTP::Parser::XS qw(parse_http_request);
9use HTTP::Status qw(status_message);
10use HTTP::Date qw(time2str);
11use POSIX qw(EINTR EPIPE ECONNRESET);
12use Symbol;
13
14use Plack::Util;
15use Plack::TempBuffer;
16
17use constant DEBUG        => $ENV{STARMAN_DEBUG} || 0;
18use constant CHUNKSIZE    => 64 * 1024;
19
20my $null_io = do { open my $io, "<", \""; $io };
21
22use Net::Server::SIG qw(register_sig);
23
24# Override Net::Server's HUP handling - just restart all the workers and that's about it
25sub sig_hup {
26    my $self = shift;
27    $self->hup_children;
28}
29
30sub run {
31    my($self, $app, $options) = @_;
32
33    $self->{app} = $app;
34    $self->{options} = $options;
35
36    my %extra = ();
37
38    if ($options->{net_server_args}) {
39        %extra = %{ $options->{net_server_args} };
40    }
41
42    if ( $options->{pid} ) {
43        $extra{pid_file} = $options->{pid};
44    }
45    if ( $options->{daemonize} ) {
46        $extra{setsid} = $extra{background} = 1;
47    }
48    if ( $options->{error_log} ) {
49        $extra{log_file} = $options->{error_log};
50    }
51    if ( DEBUG ) {
52        $extra{log_level} = 4;
53    }
54    if ( $options->{ssl_cert} ) {
55        $extra{SSL_cert_file} = $options->{ssl_cert};
56    }
57    if ( $options->{ssl_key} ) {
58        $extra{SSL_key_file} = $options->{ssl_key};
59    }
60    if (! exists $options->{keepalive}) {
61        $options->{keepalive} = 1;
62    }
63    if (! exists $options->{keepalive_timeout}) {
64        $options->{keepalive_timeout} = 1;
65    }
66    if (! exists $options->{read_timeout}) {
67        $options->{read_timeout} = 5;
68    }
69    if (! exists $options->{proctitle}) {
70        $options->{proctitle} = 1;
71    }
72
73    my @port;
74    for my $listen (@{$options->{listen} || [ "$options->{host}:$options->{port}" ]}) {
75        my %listen;
76        if ($listen =~ /:/) {
77            my($h, $p, $opt) = split /:/, $listen, 3;
78            $listen{host} = $h if $h;
79            $listen{port} = $p;
80            $listen{proto} = 'ssl' if 'ssl' eq lc $opt;
81        } else {
82            %listen = (
83                host  => 'localhost',
84                port  => $listen,
85                proto => 'unix',
86            );
87        }
88        push @port, \%listen;
89    }
90
91    my $workers = $options->{workers} || 5;
92    local @ARGV = ();
93
94    $self->SUPER::run(
95        port                => \@port,
96        host                => '*',   # default host
97        proto               => $options->{ssl} ? 'ssl' : 'tcp', # default proto
98        serialize           => ( $^O =~ m!(linux|darwin|bsd|cygwin)$! ) ? 'none' : 'flock',
99        min_servers         => $options->{min_servers}       || $workers,
100        min_spare_servers   => $options->{min_spare_servers} || $workers - 1,
101        max_spare_servers   => $options->{max_spare_servers} || $workers - 1,
102        max_servers         => $options->{max_servers}       || $workers,
103        max_requests        => $options->{max_requests}      || 1000,
104        user                => $options->{user}              || $>,
105        group               => $options->{group}             || $),
106        listen              => $options->{backlog}           || 1024,
107        check_for_waiting   => 1,
108        no_client_stdout    => 1,
109        %extra
110    );
111}
112
113sub pre_loop_hook {
114    my $self = shift;
115
116    my $port = $self->{server}->{port}->[0];
117    my $proto = $port->{proto} eq 'ssl'  ? 'https' :
118                $port->{proto} eq 'unix' ? 'unix'  :
119                                           'http';
120
121    $self->{options}{server_ready}->({
122        host => $port->{host},
123        port => $port->{port},
124        proto => $proto,
125        server_software => 'Starman',
126    }) if $self->{options}{server_ready};
127
128    register_sig(
129        TTIN => sub { $self->{server}->{$_}++ for qw( min_servers max_servers ) },
130        TTOU => sub { $self->{server}->{$_}-- for qw( min_servers max_servers ) },
131        QUIT => sub { $self->server_close(1) },
132    );
133}
134
135sub server_close {
136    my($self, $quit) = @_;
137
138    if ($quit) {
139        $self->log(2, $self->log_time . " Received QUIT. Running a graceful shutdown\n");
140        $self->{server}->{$_} = 0 for qw( min_servers max_servers );
141        $self->hup_children;
142        while (1) {
143            Net::Server::SIG::check_sigs();
144            $self->coordinate_children;
145            last if !keys %{$self->{server}{children}};
146            sleep 1;
147        }
148        $self->log(2, $self->log_time . " Worker processes cleaned up\n");
149    }
150
151    $self->SUPER::server_close();
152}
153
154sub run_parent {
155    my $self = shift;
156    $0 = "starman master " . join(" ", @{$self->{options}{argv} || []})
157        if $self->{options}{proctitle};
158    no warnings 'redefine';
159    local *Net::Server::PreFork::register_sig = sub {
160        my %args = @_;
161        delete $args{QUIT};
162        Net::Server::SIG::register_sig(%args);
163    };
164    $self->SUPER::run_parent(@_);
165}
166
167# The below methods run in the child process
168
169sub child_init_hook {
170    my $self = shift;
171    srand();
172    if ($self->{options}->{psgi_app_builder}) {
173        DEBUG && warn "[$$] Initializing the PSGI app\n";
174        $self->{app} = $self->{options}->{psgi_app_builder}->();
175    }
176    $0 = "starman worker " . join(" ", @{$self->{options}{argv} || []})
177        if $self->{options}{proctitle};
178
179}
180
181sub post_accept_hook {
182    my $self = shift;
183
184    $self->{client} = {
185        headerbuf => '',
186        inputbuf  => '',
187        keepalive => 1,
188    };
189}
190
191sub dispatch_request {
192    my ($self, $env) = @_;
193
194    # Run PSGI apps
195    my $res = Plack::Util::run_app($self->{app}, $env);
196
197    if (ref $res eq 'CODE') {
198        $res->(sub { $self->_finalize_response($env, $_[0]) });
199    } else {
200        $self->_finalize_response($env, $res);
201    }
202}
203
204sub process_request {
205    my $self = shift;
206    my $conn = $self->{server}->{client};
207
208    if ($conn->NS_proto eq 'TCP') {
209        setsockopt($conn, IPPROTO_TCP, TCP_NODELAY, 1)
210            or die $!;
211    }
212
213    while ( $self->{client}->{keepalive} ) {
214        last if !$conn->connected;
215
216        # Read until we see all headers
217        last if !$self->_read_headers;
218
219        my $env = {
220            REMOTE_ADDR     => $self->{server}->{peeraddr},
221            REMOTE_HOST     => $self->{server}->{peerhost} || $self->{server}->{peeraddr},
222            REMOTE_PORT     => $self->{server}->{peerport} || 0,
223            SERVER_NAME     => $self->{server}->{sockaddr} || 0, # XXX: needs to be resolved?
224            SERVER_PORT     => $self->{server}->{sockport} || 0,
225            SCRIPT_NAME     => '',
226            'psgi.version'      => [ 1, 1 ],
227            'psgi.errors'       => *STDERR,
228            'psgi.url_scheme'   => ($conn->NS_proto eq 'SSL' ? 'https' : 'http'),
229            'psgi.nonblocking'  => Plack::Util::FALSE,
230            'psgi.streaming'    => Plack::Util::TRUE,
231            'psgi.run_once'     => Plack::Util::FALSE,
232            'psgi.multithread'  => Plack::Util::FALSE,
233            'psgi.multiprocess' => Plack::Util::TRUE,
234            'psgix.io'          => $conn,
235            'psgix.input.buffered' => Plack::Util::TRUE,
236            'psgix.harakiri' => Plack::Util::TRUE,
237        };
238
239        # Parse headers
240        my $reqlen = parse_http_request(delete $self->{client}->{headerbuf}, $env);
241        if ( $reqlen == -1 ) {
242            # Bad request
243            DEBUG && warn "[$$] Bad request\n";
244            $self->_http_error(400, { SERVER_PROTOCOL => "HTTP/1.0" });
245            last;
246        }
247
248        # Initialize PSGI environment
249        # Determine whether we will keep the connection open after the request
250        my $connection = delete $env->{HTTP_CONNECTION};
251        my $proto = $env->{SERVER_PROTOCOL};
252        if ( $proto && $proto eq 'HTTP/1.0' ) {
253            if ( $connection && $connection =~ /^keep-alive$/i ) {
254                # Keep-alive only with explicit header in HTTP/1.0
255                $self->{client}->{keepalive} = 1;
256            }
257            else {
258                $self->{client}->{keepalive} = 0;
259            }
260        }
261        elsif ( $proto && $proto eq 'HTTP/1.1' ) {
262            if ( $connection && $connection =~ /^close$/i ) {
263                $self->{client}->{keepalive} = 0;
264            }
265            else {
266                # Keep-alive assumed in HTTP/1.1
267                $self->{client}->{keepalive} = 1;
268            }
269
270            # Do we need to send 100 Continue?
271            if ( $env->{HTTP_EXPECT} ) {
272                if ( lc $env->{HTTP_EXPECT} eq '100-continue' ) {
273                    _syswrite($conn, \('HTTP/1.1 100 Continue' . $CRLF . $CRLF));
274                    DEBUG && warn "[$$] Sent 100 Continue response\n";
275                }
276                else {
277                    DEBUG && warn "[$$] Invalid Expect header, returning 417\n";
278                    $self->_http_error( 417, $env );
279                    last;
280                }
281            }
282
283            unless ($env->{HTTP_HOST}) {
284                # No host, bad request
285                DEBUG && warn "[$$] Bad request, HTTP/1.1 without Host header\n";
286                $self->_http_error( 400, $env );
287                last;
288            }
289        }
290
291        unless ($self->{options}->{keepalive}) {
292            DEBUG && warn "[$$] keep-alive is disabled. Closing the connection after this request\n";
293            $self->{client}->{keepalive} = 0;
294        }
295
296        $self->_prepare_env($env);
297
298        $self->dispatch_request($env);
299
300        DEBUG && warn "[$$] Request done\n";
301
302        if ( $self->{client}->{keepalive} ) {
303            # If we still have data in the input buffer it may be a pipelined request
304            if ( $self->{client}->{inputbuf} ) {
305                if ( $self->{client}->{inputbuf} =~ /^(?:GET|HEAD)/ ) {
306                    if ( DEBUG ) {
307                        warn "Pipelined GET/HEAD request in input buffer: "
308                            . dump( $self->{client}->{inputbuf} ) . "\n";
309                    }
310
311                    # Continue processing the input buffer
312                    next;
313                }
314                else {
315                    # Input buffer just has junk, clear it
316                    if ( DEBUG ) {
317                        warn "Clearing junk from input buffer: "
318                            . dump( $self->{client}->{inputbuf} ) . "\n";
319                    }
320
321                    $self->{client}->{inputbuf} = '';
322                }
323            }
324
325            DEBUG && warn "[$$] Waiting on previous connection for keep-alive request...\n";
326
327            my $sel = IO::Select->new($conn);
328            last unless $sel->can_read($self->{options}->{keepalive_timeout});
329        }
330    }
331
332    DEBUG && warn "[$$] Closing connection\n";
333}
334
335sub _read_headers {
336    my $self = shift;
337
338    eval {
339        local $SIG{ALRM} = sub { die "Timed out\n"; };
340
341        alarm( $self->{options}->{read_timeout} );
342
343        while (1) {
344            # Do we have a full header in the buffer?
345            # This is before sysread so we don't read if we have a pipelined request
346            # waiting in the buffer
347            last if defined $self->{client}->{inputbuf} && $self->{client}->{inputbuf} =~ /$CR?$LF$CR?$LF/s;
348
349            # If not, read some data
350            my $read = sysread $self->{server}->{client}, my $buf, CHUNKSIZE;
351
352            if ( !defined $read || $read == 0 ) {
353                die "Read error: $!\n";
354            }
355
356            if ( DEBUG ) {
357                warn "[$$] Read $read bytes: " . dump($buf) . "\n";
358            }
359
360            $self->{client}->{inputbuf} .= $buf;
361        }
362    };
363
364    alarm(0);
365
366    if ( $@ ) {
367        if ( $@ =~ /Timed out/ ) {
368            DEBUG && warn "[$$] Client connection timed out\n";
369            return;
370        }
371
372        if ( $@ =~ /Read error/ ) {
373            DEBUG && warn "[$$] Read error: $!\n";
374            return;
375        }
376    }
377
378    # Pull out the complete header into a new buffer
379    $self->{client}->{headerbuf} = $self->{client}->{inputbuf};
380
381    # Save any left-over data, possibly body data or pipelined requests
382    $self->{client}->{inputbuf} =~ s/.*?$CR?$LF$CR?$LF//s;
383
384    return 1;
385}
386
387sub _http_error {
388    my ( $self, $code, $env ) = @_;
389
390    my $status = $code || 500;
391    my $msg    = status_message($status);
392
393    my $res = [
394        $status,
395        [ 'Content-Type' => 'text/plain', 'Content-Length' => length($msg) ],
396        [ $msg ],
397    ];
398
399    $self->{client}->{keepalive} = 0;
400    $self->_finalize_response($env, $res);
401}
402
403sub _prepare_env {
404    my($self, $env) = @_;
405
406    my $get_chunk = sub {
407        if ($self->{client}->{inputbuf}) {
408            my $chunk = delete $self->{client}->{inputbuf};
409            return ($chunk, length $chunk);
410        }
411        my $read = sysread $self->{server}->{client}, my($chunk), CHUNKSIZE;
412        return ($chunk, $read);
413    };
414
415    my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
416
417    if (my $cl = $env->{CONTENT_LENGTH}) {
418        my $buf = Plack::TempBuffer->new($cl);
419        while ($cl > 0) {
420            my($chunk, $read) = $get_chunk->();
421
422            if ( !defined $read || $read == 0 ) {
423                die "Read error: $!\n";
424            }
425
426            $cl -= $read;
427            $buf->print($chunk);
428        }
429        $env->{'psgi.input'} = $buf->rewind;
430    } elsif ($chunked) {
431        my $buf = Plack::TempBuffer->new;
432        my $chunk_buffer = '';
433        my $length;
434
435    DECHUNK:
436        while (1) {
437            my($chunk, $read) = $get_chunk->();
438            $chunk_buffer .= $chunk;
439
440            while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
441                my $trailer   = $1;
442                my $chunk_len = hex $2;
443
444                if ($chunk_len == 0) {
445                    last DECHUNK;
446                } elsif (length $chunk_buffer < $chunk_len + 2) {
447                    $chunk_buffer = $trailer . $chunk_buffer;
448                    last;
449                }
450
451                $buf->print(substr $chunk_buffer, 0, $chunk_len, '');
452                $chunk_buffer =~ s/^\015\012//;
453
454                $length += $chunk_len;
455            }
456
457            last unless $read && $read > 0;
458        }
459
460        $env->{CONTENT_LENGTH} = $length;
461        $env->{'psgi.input'}   = $buf->rewind;
462    } else {
463        $env->{'psgi.input'} = $null_io;
464    }
465}
466
467sub _finalize_response {
468    my($self, $env, $res) = @_;
469
470    if ($env->{'psgix.harakiri.commit'}) {
471        $self->{client}->{keepalive} = 0;
472        $self->{client}->{harakiri} = 1;
473    }
474
475    my $protocol = $env->{SERVER_PROTOCOL};
476    my $status   = $res->[0];
477    my $message  = status_message($status);
478
479    my(@headers, %headers);
480    push @headers, "$protocol $status $message";
481
482    # Switch on Transfer-Encoding: chunked if we don't know Content-Length.
483    my $chunked;
484    my $headers = $res->[1];
485    for (my $i = 0; $i < @$headers; $i += 2) {
486        my $k = $headers->[$i];
487        my $v = $headers->[$i + 1];
488        next if $k eq 'Connection';
489        push @headers, "$k: $v";
490        $headers{lc $k} = $v;
491    }
492
493    if ( $protocol eq 'HTTP/1.1' ) {
494        if ( !exists $headers{'content-length'} ) {
495            if ( $status !~ /^1\d\d|[23]04$/ && $env->{REQUEST_METHOD} ne 'HEAD' ) {
496                DEBUG && warn "[$$] Using chunked transfer-encoding to send unknown length body\n";
497                push @headers, 'Transfer-Encoding: chunked';
498                $chunked = 1;
499            }
500        }
501        elsif ( my $te = $headers{'transfer-encoding'} ) {
502            if ( $te eq 'chunked' ) {
503                DEBUG && warn "[$$] Chunked transfer-encoding set for response\n";
504                $chunked = 1;
505            }
506        }
507    } else {
508        if ( !exists $headers{'content-length'} ) {
509            DEBUG && warn "[$$] Disabling keep-alive after sending unknown length body on $protocol\n";
510            $self->{client}->{keepalive} = 0;
511        }
512    }
513
514    if ( ! $headers{date} ) {
515        push @headers, "Date: " . time2str( time() );
516    }
517
518    # Should we keep the connection open?
519    if ( $self->{client}->{keepalive} ) {
520        push @headers, 'Connection: keep-alive';
521    } else {
522        push @headers, 'Connection: close';
523    }
524
525    my $conn = $self->{server}->{client};
526
527    # Buffer the headers so they are sent with the first write() call
528    # This reduces the number of TCP packets we are sending
529    _syswrite($conn, \(join( $CRLF, @headers, '' ) . $CRLF));
530
531    if (defined $res->[2]) {
532        Plack::Util::foreach($res->[2], sub {
533            my $buffer = $_[0];
534            if ($chunked) {
535                my $len = length $buffer;
536                return unless $len;
537                $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
538            }
539            _syswrite($conn, \$buffer);
540        });
541        _syswrite($conn, \"0$CRLF$CRLF") if $chunked;
542    } else {
543        return Plack::Util::inline_object
544            write => sub {
545                my $buffer = $_[0];
546                if ($chunked) {
547                    my $len = length $buffer;
548                    return unless $len;
549                    $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
550                }
551                _syswrite($conn, \$buffer);
552            },
553            close => sub {
554                _syswrite($conn, \"0$CRLF$CRLF") if $chunked;
555            };
556    }
557}
558
559sub _syswrite {
560    my ($conn, $buffer_ref) = @_;
561
562    my $amount = length $$buffer_ref;
563    my $offset = 0;
564
565    while ($amount > 0) {
566        my $len = syswrite($conn, $$buffer_ref, $amount, $offset);
567
568        if (not defined $len) {
569            return if $! == EPIPE;
570            return if $! == ECONNRESET;
571            redo if $! == EINTR;
572            die "write error: $!";
573        }
574
575        $amount -= $len;
576        $offset += $len;
577
578        DEBUG && warn "[$$] Wrote $len byte", ($len == 1 ? '' : 's'), "\n";
579    }
580}
581
582sub post_client_connection_hook {
583    my $self = shift;
584    if ($self->{client}->{harakiri}) {
585        exit;
586    }
587}
588
5891;
590