1package Starman::Server; 2use strict; 3use base 'Net::Server::PreFork'; 4 5use Data::Dump qw(dump); 6use Socket qw(IPPROTO_TCP TCP_NODELAY); 7use IO::Socket qw(:crlf); 8use HTTP::Parser::XS qw(parse_http_request); 9use HTTP::Status qw(status_message); 10use HTTP::Date qw(time2str); 11use POSIX qw(EINTR EPIPE ECONNRESET); 12use Symbol; 13 14use Plack::Util; 15use Plack::TempBuffer; 16 17use constant DEBUG => $ENV{STARMAN_DEBUG} || 0; 18use constant CHUNKSIZE => 64 * 1024; 19 20my $null_io = do { open my $io, "<", \""; $io }; 21 22use Net::Server::SIG qw(register_sig); 23 24# Override Net::Server's HUP handling - just restart all the workers and that's about it 25sub sig_hup { 26 my $self = shift; 27 $self->hup_children; 28} 29 30sub run { 31 my($self, $app, $options) = @_; 32 33 $self->{app} = $app; 34 $self->{options} = $options; 35 36 my %extra = (); 37 38 if ($options->{net_server_args}) { 39 %extra = %{ $options->{net_server_args} }; 40 } 41 42 if ( $options->{pid} ) { 43 $extra{pid_file} = $options->{pid}; 44 } 45 if ( $options->{daemonize} ) { 46 $extra{setsid} = $extra{background} = 1; 47 } 48 if ( $options->{error_log} ) { 49 $extra{log_file} = $options->{error_log}; 50 } 51 if ( DEBUG ) { 52 $extra{log_level} = 4; 53 } 54 if ( $options->{ssl_cert} ) { 55 $extra{SSL_cert_file} = $options->{ssl_cert}; 56 } 57 if ( $options->{ssl_key} ) { 58 $extra{SSL_key_file} = $options->{ssl_key}; 59 } 60 if (! exists $options->{keepalive}) { 61 $options->{keepalive} = 1; 62 } 63 if (! exists $options->{keepalive_timeout}) { 64 $options->{keepalive_timeout} = 1; 65 } 66 if (! exists $options->{read_timeout}) { 67 $options->{read_timeout} = 5; 68 } 69 if (! exists $options->{proctitle}) { 70 $options->{proctitle} = 1; 71 } 72 73 my @port; 74 for my $listen (@{$options->{listen} || [ "$options->{host}:$options->{port}" ]}) { 75 my %listen; 76 if ($listen =~ /:/) { 77 my($h, $p, $opt) = split /:/, $listen, 3; 78 $listen{host} = $h if $h; 79 $listen{port} = $p; 80 $listen{proto} = 'ssl' if 'ssl' eq lc $opt; 81 } else { 82 %listen = ( 83 host => 'localhost', 84 port => $listen, 85 proto => 'unix', 86 ); 87 } 88 push @port, \%listen; 89 } 90 91 my $workers = $options->{workers} || 5; 92 local @ARGV = (); 93 94 $self->SUPER::run( 95 port => \@port, 96 host => '*', # default host 97 proto => $options->{ssl} ? 'ssl' : 'tcp', # default proto 98 serialize => ( $^O =~ m!(linux|darwin|bsd|cygwin)$! ) ? 'none' : 'flock', 99 min_servers => $options->{min_servers} || $workers, 100 min_spare_servers => $options->{min_spare_servers} || $workers - 1, 101 max_spare_servers => $options->{max_spare_servers} || $workers - 1, 102 max_servers => $options->{max_servers} || $workers, 103 max_requests => $options->{max_requests} || 1000, 104 user => $options->{user} || $>, 105 group => $options->{group} || $), 106 listen => $options->{backlog} || 1024, 107 check_for_waiting => 1, 108 no_client_stdout => 1, 109 %extra 110 ); 111} 112 113sub pre_loop_hook { 114 my $self = shift; 115 116 my $port = $self->{server}->{port}->[0]; 117 my $proto = $port->{proto} eq 'ssl' ? 'https' : 118 $port->{proto} eq 'unix' ? 'unix' : 119 'http'; 120 121 $self->{options}{server_ready}->({ 122 host => $port->{host}, 123 port => $port->{port}, 124 proto => $proto, 125 server_software => 'Starman', 126 }) if $self->{options}{server_ready}; 127 128 register_sig( 129 TTIN => sub { $self->{server}->{$_}++ for qw( min_servers max_servers ) }, 130 TTOU => sub { $self->{server}->{$_}-- for qw( min_servers max_servers ) }, 131 QUIT => sub { $self->server_close(1) }, 132 ); 133} 134 135sub server_close { 136 my($self, $quit) = @_; 137 138 if ($quit) { 139 $self->log(2, $self->log_time . " Received QUIT. Running a graceful shutdown\n"); 140 $self->{server}->{$_} = 0 for qw( min_servers max_servers ); 141 $self->hup_children; 142 while (1) { 143 Net::Server::SIG::check_sigs(); 144 $self->coordinate_children; 145 last if !keys %{$self->{server}{children}}; 146 sleep 1; 147 } 148 $self->log(2, $self->log_time . " Worker processes cleaned up\n"); 149 } 150 151 $self->SUPER::server_close(); 152} 153 154sub run_parent { 155 my $self = shift; 156 $0 = "starman master " . join(" ", @{$self->{options}{argv} || []}) 157 if $self->{options}{proctitle}; 158 no warnings 'redefine'; 159 local *Net::Server::PreFork::register_sig = sub { 160 my %args = @_; 161 delete $args{QUIT}; 162 Net::Server::SIG::register_sig(%args); 163 }; 164 $self->SUPER::run_parent(@_); 165} 166 167# The below methods run in the child process 168 169sub child_init_hook { 170 my $self = shift; 171 srand(); 172 if ($self->{options}->{psgi_app_builder}) { 173 DEBUG && warn "[$$] Initializing the PSGI app\n"; 174 $self->{app} = $self->{options}->{psgi_app_builder}->(); 175 } 176 $0 = "starman worker " . join(" ", @{$self->{options}{argv} || []}) 177 if $self->{options}{proctitle}; 178 179} 180 181sub post_accept_hook { 182 my $self = shift; 183 184 $self->{client} = { 185 headerbuf => '', 186 inputbuf => '', 187 keepalive => 1, 188 }; 189} 190 191sub dispatch_request { 192 my ($self, $env) = @_; 193 194 # Run PSGI apps 195 my $res = Plack::Util::run_app($self->{app}, $env); 196 197 if (ref $res eq 'CODE') { 198 $res->(sub { $self->_finalize_response($env, $_[0]) }); 199 } else { 200 $self->_finalize_response($env, $res); 201 } 202} 203 204sub process_request { 205 my $self = shift; 206 my $conn = $self->{server}->{client}; 207 208 if ($conn->NS_proto eq 'TCP') { 209 setsockopt($conn, IPPROTO_TCP, TCP_NODELAY, 1) 210 or die $!; 211 } 212 213 while ( $self->{client}->{keepalive} ) { 214 last if !$conn->connected; 215 216 # Read until we see all headers 217 last if !$self->_read_headers; 218 219 my $env = { 220 REMOTE_ADDR => $self->{server}->{peeraddr}, 221 REMOTE_HOST => $self->{server}->{peerhost} || $self->{server}->{peeraddr}, 222 REMOTE_PORT => $self->{server}->{peerport} || 0, 223 SERVER_NAME => $self->{server}->{sockaddr} || 0, # XXX: needs to be resolved? 224 SERVER_PORT => $self->{server}->{sockport} || 0, 225 SCRIPT_NAME => '', 226 'psgi.version' => [ 1, 1 ], 227 'psgi.errors' => *STDERR, 228 'psgi.url_scheme' => ($conn->NS_proto eq 'SSL' ? 'https' : 'http'), 229 'psgi.nonblocking' => Plack::Util::FALSE, 230 'psgi.streaming' => Plack::Util::TRUE, 231 'psgi.run_once' => Plack::Util::FALSE, 232 'psgi.multithread' => Plack::Util::FALSE, 233 'psgi.multiprocess' => Plack::Util::TRUE, 234 'psgix.io' => $conn, 235 'psgix.input.buffered' => Plack::Util::TRUE, 236 'psgix.harakiri' => Plack::Util::TRUE, 237 }; 238 239 # Parse headers 240 my $reqlen = parse_http_request(delete $self->{client}->{headerbuf}, $env); 241 if ( $reqlen == -1 ) { 242 # Bad request 243 DEBUG && warn "[$$] Bad request\n"; 244 $self->_http_error(400, { SERVER_PROTOCOL => "HTTP/1.0" }); 245 last; 246 } 247 248 # Initialize PSGI environment 249 # Determine whether we will keep the connection open after the request 250 my $connection = delete $env->{HTTP_CONNECTION}; 251 my $proto = $env->{SERVER_PROTOCOL}; 252 if ( $proto && $proto eq 'HTTP/1.0' ) { 253 if ( $connection && $connection =~ /^keep-alive$/i ) { 254 # Keep-alive only with explicit header in HTTP/1.0 255 $self->{client}->{keepalive} = 1; 256 } 257 else { 258 $self->{client}->{keepalive} = 0; 259 } 260 } 261 elsif ( $proto && $proto eq 'HTTP/1.1' ) { 262 if ( $connection && $connection =~ /^close$/i ) { 263 $self->{client}->{keepalive} = 0; 264 } 265 else { 266 # Keep-alive assumed in HTTP/1.1 267 $self->{client}->{keepalive} = 1; 268 } 269 270 # Do we need to send 100 Continue? 271 if ( $env->{HTTP_EXPECT} ) { 272 if ( lc $env->{HTTP_EXPECT} eq '100-continue' ) { 273 _syswrite($conn, \('HTTP/1.1 100 Continue' . $CRLF . $CRLF)); 274 DEBUG && warn "[$$] Sent 100 Continue response\n"; 275 } 276 else { 277 DEBUG && warn "[$$] Invalid Expect header, returning 417\n"; 278 $self->_http_error( 417, $env ); 279 last; 280 } 281 } 282 283 unless ($env->{HTTP_HOST}) { 284 # No host, bad request 285 DEBUG && warn "[$$] Bad request, HTTP/1.1 without Host header\n"; 286 $self->_http_error( 400, $env ); 287 last; 288 } 289 } 290 291 unless ($self->{options}->{keepalive}) { 292 DEBUG && warn "[$$] keep-alive is disabled. Closing the connection after this request\n"; 293 $self->{client}->{keepalive} = 0; 294 } 295 296 $self->_prepare_env($env); 297 298 $self->dispatch_request($env); 299 300 DEBUG && warn "[$$] Request done\n"; 301 302 if ( $self->{client}->{keepalive} ) { 303 # If we still have data in the input buffer it may be a pipelined request 304 if ( $self->{client}->{inputbuf} ) { 305 if ( $self->{client}->{inputbuf} =~ /^(?:GET|HEAD)/ ) { 306 if ( DEBUG ) { 307 warn "Pipelined GET/HEAD request in input buffer: " 308 . dump( $self->{client}->{inputbuf} ) . "\n"; 309 } 310 311 # Continue processing the input buffer 312 next; 313 } 314 else { 315 # Input buffer just has junk, clear it 316 if ( DEBUG ) { 317 warn "Clearing junk from input buffer: " 318 . dump( $self->{client}->{inputbuf} ) . "\n"; 319 } 320 321 $self->{client}->{inputbuf} = ''; 322 } 323 } 324 325 DEBUG && warn "[$$] Waiting on previous connection for keep-alive request...\n"; 326 327 my $sel = IO::Select->new($conn); 328 last unless $sel->can_read($self->{options}->{keepalive_timeout}); 329 } 330 } 331 332 DEBUG && warn "[$$] Closing connection\n"; 333} 334 335sub _read_headers { 336 my $self = shift; 337 338 eval { 339 local $SIG{ALRM} = sub { die "Timed out\n"; }; 340 341 alarm( $self->{options}->{read_timeout} ); 342 343 while (1) { 344 # Do we have a full header in the buffer? 345 # This is before sysread so we don't read if we have a pipelined request 346 # waiting in the buffer 347 last if defined $self->{client}->{inputbuf} && $self->{client}->{inputbuf} =~ /$CR?$LF$CR?$LF/s; 348 349 # If not, read some data 350 my $read = sysread $self->{server}->{client}, my $buf, CHUNKSIZE; 351 352 if ( !defined $read || $read == 0 ) { 353 die "Read error: $!\n"; 354 } 355 356 if ( DEBUG ) { 357 warn "[$$] Read $read bytes: " . dump($buf) . "\n"; 358 } 359 360 $self->{client}->{inputbuf} .= $buf; 361 } 362 }; 363 364 alarm(0); 365 366 if ( $@ ) { 367 if ( $@ =~ /Timed out/ ) { 368 DEBUG && warn "[$$] Client connection timed out\n"; 369 return; 370 } 371 372 if ( $@ =~ /Read error/ ) { 373 DEBUG && warn "[$$] Read error: $!\n"; 374 return; 375 } 376 } 377 378 # Pull out the complete header into a new buffer 379 $self->{client}->{headerbuf} = $self->{client}->{inputbuf}; 380 381 # Save any left-over data, possibly body data or pipelined requests 382 $self->{client}->{inputbuf} =~ s/.*?$CR?$LF$CR?$LF//s; 383 384 return 1; 385} 386 387sub _http_error { 388 my ( $self, $code, $env ) = @_; 389 390 my $status = $code || 500; 391 my $msg = status_message($status); 392 393 my $res = [ 394 $status, 395 [ 'Content-Type' => 'text/plain', 'Content-Length' => length($msg) ], 396 [ $msg ], 397 ]; 398 399 $self->{client}->{keepalive} = 0; 400 $self->_finalize_response($env, $res); 401} 402 403sub _prepare_env { 404 my($self, $env) = @_; 405 406 my $get_chunk = sub { 407 if ($self->{client}->{inputbuf}) { 408 my $chunk = delete $self->{client}->{inputbuf}; 409 return ($chunk, length $chunk); 410 } 411 my $read = sysread $self->{server}->{client}, my($chunk), CHUNKSIZE; 412 return ($chunk, $read); 413 }; 414 415 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' }; 416 417 if (my $cl = $env->{CONTENT_LENGTH}) { 418 my $buf = Plack::TempBuffer->new($cl); 419 while ($cl > 0) { 420 my($chunk, $read) = $get_chunk->(); 421 422 if ( !defined $read || $read == 0 ) { 423 die "Read error: $!\n"; 424 } 425 426 $cl -= $read; 427 $buf->print($chunk); 428 } 429 $env->{'psgi.input'} = $buf->rewind; 430 } elsif ($chunked) { 431 my $buf = Plack::TempBuffer->new; 432 my $chunk_buffer = ''; 433 my $length; 434 435 DECHUNK: 436 while (1) { 437 my($chunk, $read) = $get_chunk->(); 438 $chunk_buffer .= $chunk; 439 440 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) { 441 my $trailer = $1; 442 my $chunk_len = hex $2; 443 444 if ($chunk_len == 0) { 445 last DECHUNK; 446 } elsif (length $chunk_buffer < $chunk_len + 2) { 447 $chunk_buffer = $trailer . $chunk_buffer; 448 last; 449 } 450 451 $buf->print(substr $chunk_buffer, 0, $chunk_len, ''); 452 $chunk_buffer =~ s/^\015\012//; 453 454 $length += $chunk_len; 455 } 456 457 last unless $read && $read > 0; 458 } 459 460 $env->{CONTENT_LENGTH} = $length; 461 $env->{'psgi.input'} = $buf->rewind; 462 } else { 463 $env->{'psgi.input'} = $null_io; 464 } 465} 466 467sub _finalize_response { 468 my($self, $env, $res) = @_; 469 470 if ($env->{'psgix.harakiri.commit'}) { 471 $self->{client}->{keepalive} = 0; 472 $self->{client}->{harakiri} = 1; 473 } 474 475 my $protocol = $env->{SERVER_PROTOCOL}; 476 my $status = $res->[0]; 477 my $message = status_message($status); 478 479 my(@headers, %headers); 480 push @headers, "$protocol $status $message"; 481 482 # Switch on Transfer-Encoding: chunked if we don't know Content-Length. 483 my $chunked; 484 my $headers = $res->[1]; 485 for (my $i = 0; $i < @$headers; $i += 2) { 486 my $k = $headers->[$i]; 487 my $v = $headers->[$i + 1]; 488 next if $k eq 'Connection'; 489 push @headers, "$k: $v"; 490 $headers{lc $k} = $v; 491 } 492 493 if ( $protocol eq 'HTTP/1.1' ) { 494 if ( !exists $headers{'content-length'} ) { 495 if ( $status !~ /^1\d\d|[23]04$/ && $env->{REQUEST_METHOD} ne 'HEAD' ) { 496 DEBUG && warn "[$$] Using chunked transfer-encoding to send unknown length body\n"; 497 push @headers, 'Transfer-Encoding: chunked'; 498 $chunked = 1; 499 } 500 } 501 elsif ( my $te = $headers{'transfer-encoding'} ) { 502 if ( $te eq 'chunked' ) { 503 DEBUG && warn "[$$] Chunked transfer-encoding set for response\n"; 504 $chunked = 1; 505 } 506 } 507 } else { 508 if ( !exists $headers{'content-length'} ) { 509 DEBUG && warn "[$$] Disabling keep-alive after sending unknown length body on $protocol\n"; 510 $self->{client}->{keepalive} = 0; 511 } 512 } 513 514 if ( ! $headers{date} ) { 515 push @headers, "Date: " . time2str( time() ); 516 } 517 518 # Should we keep the connection open? 519 if ( $self->{client}->{keepalive} ) { 520 push @headers, 'Connection: keep-alive'; 521 } else { 522 push @headers, 'Connection: close'; 523 } 524 525 my $conn = $self->{server}->{client}; 526 527 # Buffer the headers so they are sent with the first write() call 528 # This reduces the number of TCP packets we are sending 529 _syswrite($conn, \(join( $CRLF, @headers, '' ) . $CRLF)); 530 531 if (defined $res->[2]) { 532 Plack::Util::foreach($res->[2], sub { 533 my $buffer = $_[0]; 534 if ($chunked) { 535 my $len = length $buffer; 536 return unless $len; 537 $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF; 538 } 539 _syswrite($conn, \$buffer); 540 }); 541 _syswrite($conn, \"0$CRLF$CRLF") if $chunked; 542 } else { 543 return Plack::Util::inline_object 544 write => sub { 545 my $buffer = $_[0]; 546 if ($chunked) { 547 my $len = length $buffer; 548 return unless $len; 549 $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF; 550 } 551 _syswrite($conn, \$buffer); 552 }, 553 close => sub { 554 _syswrite($conn, \"0$CRLF$CRLF") if $chunked; 555 }; 556 } 557} 558 559sub _syswrite { 560 my ($conn, $buffer_ref) = @_; 561 562 my $amount = length $$buffer_ref; 563 my $offset = 0; 564 565 while ($amount > 0) { 566 my $len = syswrite($conn, $$buffer_ref, $amount, $offset); 567 568 if (not defined $len) { 569 return if $! == EPIPE; 570 return if $! == ECONNRESET; 571 redo if $! == EINTR; 572 die "write error: $!"; 573 } 574 575 $amount -= $len; 576 $offset += $len; 577 578 DEBUG && warn "[$$] Wrote $len byte", ($len == 1 ? '' : 's'), "\n"; 579 } 580} 581 582sub post_client_connection_hook { 583 my $self = shift; 584 if ($self->{client}->{harakiri}) { 585 exit; 586 } 587} 588 5891; 590