1# $Id: /mirror/gungho/lib/Gungho/Engine/Danga/Socket.pm 9352 2007-11-21T02:13:31.513580Z lestrrat $ 2# 3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp> 4# All rights reserved. 5 6package Gungho::Engine::Danga::Socket; 7use strict; 8use warnings; 9use base qw(Gungho::Engine); 10use Danga::Socket::Callback; 11use HTTP::Parser; 12use IO::Socket::INET; 13use Net::DNS; 14 15# Danga::Socket uses the field pragma, which breaks things 16# if we try to subclass from both Gungho::Engine and Danga::Socket. 17 18__PACKAGE__->mk_accessors($_) for qw(active_requests context loop_delay resolver); 19 20sub setup 21{ 22 my $self = shift; 23 $self->active_requests({}); 24 $self->loop_delay( $self->config->{loop_delay} ) if $self->config->{loop_delay}; 25 if (! $self->config->{dns}{disable}) { 26 $self->resolver(Net::DNS::Resolver->new); 27 } 28 $self->next::method(@_); 29} 30 31sub run 32{ 33 my ($self, $c) = @_; 34 35 $self->context($c); 36 Danga::Socket->SetPostLoopCallback( 37 sub { 38 $c->dispatch_requests(); 39 40 my $delay = $self->loop_delay; 41 if (! defined $delay || $delay <= 0) { 42 $delay = 2; 43 } 44 select(undef, undef, undef, $delay); 45 46 my $continue = $c->is_running || Danga::Socket->WatchedSockets(); 47 48 if (! $continue) { 49 $c->log->info("no more requests, stopping..."); 50 } 51 return $continue; 52 } 53 ); 54 Danga::Socket->EventLoop(); 55} 56 57sub send_request 58{ 59 my $self = shift; 60 my $c = shift; 61 my $req = shift; 62 63 if ($self->resolver && $req->requires_name_lookup) { 64 $self->lookup_name($c, $req); 65 } else { 66 $req->uri->host( $req->notes('resolved_ip') ) 67 if $req->notes('resolved_ip'); 68 if (! $c->request_is_allowed($req)) { 69 return; 70 } 71 $self->start_request($c, $req); 72 } 73 return 1; 74} 75 76sub lookup_name 77{ 78 my ($self, $c, $req) = @_; 79 my $resolver = $self->resolver; 80 my $bgsock = $resolver->bgsend($req->uri->host); 81 82 my $danga = Danga::Socket::Callback->new( 83 handle => $bgsock, 84 on_read_ready => sub { 85 my $ds = shift; 86 delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) }; 87 $self->handle_dns_response( 88 $c, 89 $req, 90 $resolver->bgread($ds->sock) 91 ); 92 }, 93 on_error => sub { 94 my $ds = shift; 95 delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) }; 96 $self->handle_response( 97 $c, 98 $req, 99 $c->_http_error(500, "Failed to resolve host " . $req->uri->host, $req) 100 ); 101 } 102 ); 103} 104 105sub start_request 106{ 107 my ($self, $c, $req) = @_; 108 my $uri = $req->uri; 109 110 my $socket = IO::Socket::INET->new( 111 PeerAddr => $uri->host, 112 PeerPort => $uri->port || $uri->default_port, 113 Blocking => 0, 114 ); 115 if ($@) { 116 $self->handle_response( 117 $req, 118 $c->_http_error(500, "Failed to connect to " . $uri->host . ": $@", $req) 119 ); 120 return; 121 } 122 123 $req->headers->push_header(user_agent => $c->user_agent); 124 my $danga = Danga::Socket::Callback->new( 125 handle => $socket, 126 context => { write_done => 0, context => $c }, 127 on_write_ready => sub { 128 my $ds = shift; 129 if ($ds->{context}{write_done}) { 130 if ($ds->write(undef)) { 131 $ds->watch_write(0); 132 } 133 } 134 my $c = $ds->{context}{context}; 135 136 $c->notify('engine.send_request', { request => $req }); 137 my $req_str = $req->format(); 138 if ($ds->write($req_str)) { 139 $ds->watch_write(0); 140 } 141 $ds->{context}{write_done} = 1; 142 }, 143 on_read_ready => sub { 144 my $ds = shift; 145 my $parser = $req->notes('parser'); 146 if (! $parser) { 147 $parser = HTTP::Parser->new(response => 1); 148 $req->notes('parser', $parser); 149 } 150 151 my ($buf, $success); 152 while(1) { 153 my $bytes = sysread($ds->sock(), $buf, 8192); 154 last if ($bytes || 0) <= 0; 155 156 my $parser_status = $parser->add($buf); 157 158 if ($parser_status == 0 ) { 159 $success = 1; 160 last; 161 } 162 } 163 164 if (! $success) { 165 $self->handle_response( 166 $req, 167 $c->_http_error(400, "incomplete response", $req) 168 ); 169 return; 170 } 171 172 my $response = $parser->object; 173 $response->request($req); 174 $ds->watch_read(0); 175 delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) }; 176 $self->handle_response($req, $response); 177 } 178 ); 179 180 $req->notes(danga => $danga); 181} 182 183sub handle_response 184{ 185 my $self = shift; 186 my $request = shift; 187 my $response = shift; 188 delete $self->active_requests->{$request->id}; 189 my $danga = $request->notes('danga'); 190 $request->notes('danga', undef); 191 undef $danga; 192 193 if (my $host = $request->notes('original_host')) { 194 $request->uri->host($host); 195 } 196 197 my $c = $self->context; 198 $c->handle_response($request, $c->prepare_response($response) ); 199} 200 2011; 202 203__END__ 204 205=head1 NAME 206 207Gungho::Engine::Danga::Socket - Gungho Engine Using Danga::Socket 208 209=head1 DESCRIPTION 210 211This class uses Danga::Socket to dispatch requests. 212 213WARNING: This engine is still experimental. Patches welcome! 214In particular, this class definitely should cache connections. 215 216=head1 METHODS 217 218=head2 setup 219 220=head2 run 221 222=head2 lookup_name 223 224=head2 send_request 225 226=head2 start_request 227 228=head2 handle_response 229 230=cut 231