1# $Id: /mirror/gungho/lib/Gungho/Engine/IO/Async.pm 9352 2007-11-21T02:13:31.513580Z lestrrat $ 2# 3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp> 4# All rights reserved. 5 6package Gungho::Engine::IO::Async; 7use strict; 8use warnings; 9use base qw(Gungho::Engine); 10use HTTP::Parser; 11use IO::Async::Buffer; 12use IO::Async::Notifier; 13use IO::Socket::INET; 14use Net::DNS; 15 16__PACKAGE__->mk_classdata($_) for qw(impl_class); 17__PACKAGE__->mk_accessors($_) for qw(context impl loop_delay resolver); 18 19# probe for available impl_class 20use constant HAVE_IO_POLL => (eval { use IO::Poll } && !$@); 21 22sub setup 23{ 24 my ($self, $c) = @_; 25 26 $self->context($c); 27 $self->setup_impl_class($c); 28 29 $self->loop_delay( $self->config->{loop_delay} ) if $self->config->{loop_delay}; 30 if (! $self->config->{dns}{disable}) { 31 $self->resolver(Net::DNS::Resolver->new); 32 } 33} 34 35sub setup_impl_class 36{ 37 my ($self, $c) = @_; 38 39 my $loop = $self->config->{loop}; 40 if (! $loop) { 41 $loop = HAVE_IO_POLL ? 42 'IO_Poll' : 43 'Select' 44 ; 45 } 46 my $pkg = $c->load_gungho_module($loop, 'Engine::IO::Async::Impl'); 47 $self->impl_class($pkg); 48 49 my $obj = $pkg->new(); 50 $obj->setup($c); 51 $self->impl( $obj ); 52} 53 54sub run 55{ 56 my ($self, $c) = @_; 57 $self->impl->run($c); 58} 59 60sub send_request 61{ 62 my ($self, $c, $request) = @_; 63 64 if ($self->resolver && $request->requires_name_lookup) { 65 $self->lookup_host($c, $request); 66 } else { 67 $request->uri->host( $request->notes('resolved_ip') ) 68 if $request->notes('resolved_ip'); 69 if ( ! $c->request_is_allowed($request)) { 70 return; 71 } 72 $self->start_request($c, $request); 73 } 74 return 1; 75} 76 77sub handle_response 78{ 79 my ($self, $c, $req, $res) = @_; 80 if (my $host = $req->notes('original_host')) { 81 # Put it back 82 $req->uri->host($host); 83 } 84 $c->handle_response($req, $c->prepare_response($res) ); 85} 86 87sub lookup_host 88{ 89 my ($self, $c, $request) = @_; 90 91 my $resolver = $self->resolver; 92 my $bgsock = $resolver->bgsend($request->uri->host); 93 my $notifier = IO::Async::Notifier->new( 94 handle => $bgsock, 95 on_read_ready => sub { 96 $self->impl->remove($_[0]); 97 $self->handle_dns_response( 98 $c, 99 $request, 100 $resolver->bgread($bgsock), 101 ); 102 } 103 ); 104 $self->impl->add($notifier); 105} 106 107sub start_request 108{ 109 my ($self, $c, $req) = @_; 110 my $uri = $req->uri; 111 my $socket = IO::Socket::INET->new( 112 PeerAddr => $uri->host, 113 PeerPort => $uri->port || $uri->default_port, 114 Blocking => 0, 115 ); 116 if ($@) { 117 $self->handle_response( 118 $c, 119 $req, 120 $c->_http_error(500, "Failed to connect to " . $uri->host . ": $@ 121", $req) 122 ); 123 return; 124 } 125 126 my $buffer = IO::Async::Buffer->new( 127 handle => $socket, 128 on_incoming_data => sub { 129 my ($notifier, $buffref, $closed) = @_; 130 131 my $parser = $notifier->{parser}; 132 my $st = $parser->add($$buffref); 133 $$buffref = ''; 134 135 if ($st == 0) { 136 my $res = $parser->object; 137 $c->notify('engine.handle_response', { request => $req, response => $res }); 138 $self->handle_response($c, $notifier->{request}, $res); 139 $notifier->handle_closed(); 140 $self->impl->remove($notifier); 141 } 142 }, 143 on_read_error => sub { 144 my $notifier = shift; 145 my $res = $c->_http_error(400, "incomplete response", $notifier->{request}); 146 $c->handle_response($c, $notifier->{request}, $res); 147 }, 148 on_write_error => sub { 149 my $notifier = shift; 150 my $res = $c->_http_error(500, "Could not write to socket ", $notifier->{request}); 151 $c->notify('engine.handle_response', { request => $req, response => $res }); 152 $self->handle_response($c, $notifier->{request}, $res); 153 } 154 ); 155 156 # Not a good thing, I know... 157 $buffer->{parser} = HTTP::Parser->new(response => 1); 158 $buffer->{request} = $req; 159 160 $c->notify('engine.send_request', { request => $req }); 161 $buffer->send($req->format); 162 $self->impl->add($buffer); 163} 164 165package Gungho::Engine::IO::Async::Impl::Select; 166use strict; 167use warnings; 168use base qw(IO::Async::Set::Select Class::Accessor::Fast); 169 170__PACKAGE__->mk_accessors($_) for qw(context); 171 172sub setup 173{ 174} 175 176sub run 177{ 178 my ($self, $c) = @_; 179 $self->context($c); 180 181 my $engine = $c->engine; 182 my ($rvec, $wvec, $evec); 183 my $timeout; 184 while ($c->is_running || keys %{$self->{notifiers}}) { 185 $c->dispatch_requests(); 186 187 $timeout = $engine->loop_delay; 188 if (! defined $timeout || $timeout <= 0) { 189 $timeout = 5; 190 } 191 ($rvec, $wvec, $evec) = ('' x 3); 192 193 $self->pre_select(\$rvec, \$wvec, \$evec, \$timeout); 194 select($rvec, $wvec, $evec, $timeout); 195 $self->post_select($rvec, $wvec, $evec); 196 } 197} 198 1991; 200 201__END__ 202 203=head1 NAME 204 205Gungho::Engine::IO::Async - IO::Async Engine 206 207=head1 SYNOPSIS 208 209 engine: 210 module: IO::Async 211 config: 212 loop_delay: 0.01 213 dns: 214 disable: 1 # Only if you don't want Gungho to resolve DNS 215 216 217=head1 DESCRIPTION 218 219This class uses IO::Async to dispatch requests. 220 221WARNING: This engine is still experimental. Patches welcome! 222In particular, this class definitely should cache connections. 223 224=head1 METHODS 225 226=head2 run 227 228=head2 setup 229 230=head2 setup_impl_class 231 232=head2 send_request 233 234=head2 handle_response 235 236=head2 start_request 237 238=head2 lookup_host 239 240=cut 241