1# $Id: /mirror/gungho/lib/Gungho/Engine/IO/Async.pm 9352 2007-11-21T02:13:31.513580Z lestrrat  $
2#
3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp>
4# All rights reserved.
5
6package Gungho::Engine::IO::Async;
7use strict;
8use warnings;
9use base qw(Gungho::Engine);
10use HTTP::Parser;
11use IO::Async::Buffer;
12use IO::Async::Notifier;
13use IO::Socket::INET;
14use Net::DNS;
15
16__PACKAGE__->mk_classdata($_) for qw(impl_class);
17__PACKAGE__->mk_accessors($_) for qw(context impl loop_delay resolver);
18
19# probe for available impl_class
20use constant HAVE_IO_POLL => (eval { use IO::Poll } && !$@);
21
22sub setup
23{
24    my ($self, $c) = @_;
25
26    $self->context($c);
27    $self->setup_impl_class($c);
28
29    $self->loop_delay( $self->config->{loop_delay} ) if $self->config->{loop_delay};
30    if (! $self->config->{dns}{disable}) {
31        $self->resolver(Net::DNS::Resolver->new);
32    }
33}
34
35sub setup_impl_class
36{
37    my ($self, $c) = @_;
38
39    my $loop = $self->config->{loop};
40    if (! $loop) {
41        $loop = HAVE_IO_POLL ?
42            'IO_Poll' :
43            'Select'
44        ;
45    }
46    my $pkg = $c->load_gungho_module($loop, 'Engine::IO::Async::Impl');
47    $self->impl_class($pkg);
48
49    my $obj = $pkg->new();
50    $obj->setup($c);
51    $self->impl( $obj );
52}
53
54sub run
55{
56    my ($self, $c) = @_;
57    $self->impl->run($c);
58}
59
60sub send_request
61{
62    my ($self, $c, $request) = @_;
63
64    if ($self->resolver && $request->requires_name_lookup) {
65        $self->lookup_host($c, $request);
66    } else {
67        $request->uri->host( $request->notes('resolved_ip') )
68            if $request->notes('resolved_ip');
69        if ( ! $c->request_is_allowed($request)) {
70            return;
71        }
72        $self->start_request($c, $request);
73    }
74    return 1;
75}
76
77sub handle_response
78{
79    my ($self, $c, $req, $res) = @_;
80    if (my $host = $req->notes('original_host')) {
81        # Put it back
82        $req->uri->host($host);
83    }
84    $c->handle_response($req, $c->prepare_response($res) );
85}
86
87sub lookup_host
88{
89    my ($self, $c, $request) = @_;
90
91    my $resolver = $self->resolver;
92    my $bgsock   = $resolver->bgsend($request->uri->host);
93    my $notifier = IO::Async::Notifier->new(
94        handle => $bgsock,
95        on_read_ready => sub {
96            $self->impl->remove($_[0]);
97            $self->handle_dns_response(
98                $c,
99                $request,
100                $resolver->bgread($bgsock),
101            );
102        }
103    );
104    $self->impl->add($notifier);
105}
106
107sub start_request
108{
109    my ($self, $c, $req) = @_;
110    my $uri  = $req->uri;
111    my $socket = IO::Socket::INET->new(
112        PeerAddr => $uri->host,
113        PeerPort => $uri->port || $uri->default_port,
114        Blocking => 0,
115    );
116    if ($@) {
117        $self->handle_response(
118            $c,
119            $req,
120            $c->_http_error(500, "Failed to connect to " . $uri->host . ": $@
121", $req)
122        );
123        return;
124    }
125
126    my $buffer = IO::Async::Buffer->new(
127        handle => $socket,
128        on_incoming_data => sub {
129            my ($notifier, $buffref, $closed) = @_;
130
131            my $parser = $notifier->{parser};
132            my $st = $parser->add($$buffref);
133            $$buffref = '';
134
135            if ($st == 0) {
136                my $res = $parser->object;
137                $c->notify('engine.handle_response', { request => $req, response => $res });
138                $self->handle_response($c, $notifier->{request}, $res);
139                $notifier->handle_closed();
140                $self->impl->remove($notifier);
141            }
142        },
143        on_read_error => sub {
144            my $notifier = shift;
145            my $res = $c->_http_error(400, "incomplete response", $notifier->{request});
146            $c->handle_response($c, $notifier->{request}, $res);
147        },
148        on_write_error => sub {
149            my $notifier = shift;
150            my $res = $c->_http_error(500, "Could not write to socket ", $notifier->{request});
151            $c->notify('engine.handle_response', { request => $req, response => $res });
152            $self->handle_response($c, $notifier->{request}, $res);
153        }
154    );
155
156    # Not a good thing, I know...
157    $buffer->{parser}  = HTTP::Parser->new(response => 1);
158    $buffer->{request} = $req;
159
160    $c->notify('engine.send_request', { request => $req });
161    $buffer->send($req->format);
162    $self->impl->add($buffer);
163}
164
165package Gungho::Engine::IO::Async::Impl::Select;
166use strict;
167use warnings;
168use base qw(IO::Async::Set::Select Class::Accessor::Fast);
169
170__PACKAGE__->mk_accessors($_) for qw(context);
171
172sub setup
173{
174}
175
176sub run
177{
178    my ($self, $c) = @_;
179    $self->context($c);
180
181    my $engine = $c->engine;
182    my ($rvec, $wvec, $evec);
183    my $timeout;
184    while ($c->is_running || keys %{$self->{notifiers}}) {
185        $c->dispatch_requests();
186
187        $timeout = $engine->loop_delay;
188        if (! defined $timeout || $timeout <= 0) {
189            $timeout = 5;
190        }
191        ($rvec, $wvec, $evec) = ('' x 3);
192
193        $self->pre_select(\$rvec, \$wvec, \$evec, \$timeout);
194        select($rvec, $wvec, $evec, $timeout);
195        $self->post_select($rvec, $wvec, $evec);
196    }
197}
198
1991;
200
201__END__
202
203=head1 NAME
204
205Gungho::Engine::IO::Async - IO::Async Engine
206
207=head1 SYNOPSIS
208
209  engine:
210    module: IO::Async
211    config:
212        loop_delay: 0.01
213        dns:
214            disable: 1 # Only if you don't want Gungho to resolve DNS
215
216
217=head1 DESCRIPTION
218
219This class uses IO::Async to dispatch requests.
220
221WARNING: This engine is still experimental. Patches welcome!
222In particular, this class definitely should cache connections.
223
224=head1 METHODS
225
226=head2 run
227
228=head2 setup
229
230=head2 setup_impl_class
231
232=head2 send_request
233
234=head2 handle_response
235
236=head2 start_request
237
238=head2 lookup_host
239
240=cut
241