1# $Id: /mirror/gungho/lib/Gungho/Engine/Danga/Socket.pm 9352 2007-11-21T02:13:31.513580Z lestrrat  $
2#
3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp>
4# All rights reserved.
5
6package Gungho::Engine::Danga::Socket;
7use strict;
8use warnings;
9use base qw(Gungho::Engine);
10use Danga::Socket::Callback;
11use HTTP::Parser;
12use IO::Socket::INET;
13use Net::DNS;
14
15# Danga::Socket uses the field pragma, which breaks things
16# if we try to subclass from both Gungho::Engine and Danga::Socket.
17
18__PACKAGE__->mk_accessors($_) for qw(active_requests context loop_delay resolver);
19
20sub setup
21{
22    my $self = shift;
23    $self->active_requests({});
24    $self->loop_delay( $self->config->{loop_delay} ) if $self->config->{loop_delay};
25    if (! $self->config->{dns}{disable}) {
26        $self->resolver(Net::DNS::Resolver->new);
27    }
28    $self->next::method(@_);
29}
30
31sub run
32{
33    my ($self, $c) = @_;
34
35    $self->context($c);
36    Danga::Socket->SetPostLoopCallback(
37        sub {
38            $c->dispatch_requests();
39
40            my $delay = $self->loop_delay;
41            if (! defined $delay || $delay <= 0) {
42                $delay = 2;
43            }
44            select(undef, undef, undef, $delay);
45
46            my $continue =  $c->is_running || Danga::Socket->WatchedSockets();
47
48            if (! $continue) {
49                $c->log->info("no more requests, stopping...");
50            }
51            return $continue;
52        }
53    );
54    Danga::Socket->EventLoop();
55}
56
57sub send_request
58{
59    my $self = shift;
60    my $c    = shift;
61    my $req  = shift;
62
63    if ($self->resolver && $req->requires_name_lookup) {
64        $self->lookup_name($c, $req);
65    } else {
66        $req->uri->host( $req->notes('resolved_ip') )
67            if $req->notes('resolved_ip');
68        if (! $c->request_is_allowed($req)) {
69            return;
70        }
71        $self->start_request($c, $req);
72    }
73    return 1;
74}
75
76sub lookup_name
77{
78    my ($self, $c, $req) = @_;
79    my $resolver = $self->resolver;
80    my $bgsock   = $resolver->bgsend($req->uri->host);
81
82    my $danga = Danga::Socket::Callback->new(
83        handle => $bgsock,
84        on_read_ready => sub {
85            my $ds = shift;
86            delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) };
87            $self->handle_dns_response(
88                $c,
89                $req,
90                $resolver->bgread($ds->sock)
91            );
92        },
93        on_error => sub {
94            my $ds = shift;
95            delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) };
96            $self->handle_response(
97                $c,
98                $req,
99                $c->_http_error(500, "Failed to resolve host " . $req->uri->host, $req)
100            );
101        }
102    );
103}
104
105sub start_request
106{
107    my ($self, $c, $req) = @_;
108    my $uri  = $req->uri;
109
110    my $socket = IO::Socket::INET->new(
111        PeerAddr => $uri->host,
112        PeerPort => $uri->port || $uri->default_port,
113        Blocking => 0,
114    );
115    if ($@) {
116        $self->handle_response(
117            $req,
118            $c->_http_error(500, "Failed to connect to " . $uri->host . ": $@", $req)
119        );
120        return;
121    }
122
123    $req->headers->push_header(user_agent => $c->user_agent);
124    my $danga = Danga::Socket::Callback->new(
125        handle         => $socket,
126        context        => { write_done => 0, context => $c },
127        on_write_ready => sub {
128            my $ds = shift;
129            if ($ds->{context}{write_done}) {
130                if ($ds->write(undef)) {
131                    $ds->watch_write(0);
132                }
133            }
134            my $c = $ds->{context}{context};
135
136            $c->notify('engine.send_request', { request => $req });
137            my $req_str = $req->format();
138            if ($ds->write($req_str)) {
139                $ds->watch_write(0);
140            }
141            $ds->{context}{write_done} = 1;
142        },
143        on_read_ready => sub {
144            my $ds = shift;
145            my $parser = $req->notes('parser');
146            if (! $parser) {
147                $parser = HTTP::Parser->new(response => 1);
148                $req->notes('parser', $parser);
149            }
150
151            my ($buf, $success);
152            while(1) {
153                my $bytes = sysread($ds->sock(), $buf, 8192);
154                last if ($bytes || 0) <= 0;
155
156                my $parser_status = $parser->add($buf);
157
158                if ($parser_status == 0 ) {
159                    $success = 1;
160                    last;
161                }
162            }
163
164            if (! $success) {
165                $self->handle_response(
166                    $req,
167                    $c->_http_error(400, "incomplete response", $req)
168                );
169                return;
170            }
171
172            my $response = $parser->object;
173            $response->request($req);
174            $ds->watch_read(0);
175            delete Danga::Socket->DescriptorMap->{ fileno($ds->sock) };
176            $self->handle_response($req, $response);
177        }
178    );
179
180    $req->notes(danga => $danga);
181}
182
183sub handle_response
184{
185    my $self = shift;
186    my $request = shift;
187    my $response = shift;
188    delete $self->active_requests->{$request->id};
189    my $danga = $request->notes('danga');
190    $request->notes('danga', undef);
191    undef $danga;
192
193    if (my $host = $request->notes('original_host')) {
194        $request->uri->host($host);
195    }
196
197    my $c = $self->context;
198    $c->handle_response($request, $c->prepare_response($response) );
199}
200
2011;
202
203__END__
204
205=head1 NAME
206
207Gungho::Engine::Danga::Socket - Gungho Engine Using Danga::Socket
208
209=head1 DESCRIPTION
210
211This class uses Danga::Socket to dispatch requests.
212
213WARNING: This engine is still experimental. Patches welcome!
214In particular, this class definitely should cache connections.
215
216=head1 METHODS
217
218=head2 setup
219
220=head2 run
221
222=head2 lookup_name
223
224=head2 send_request
225
226=head2 start_request
227
228=head2 handle_response
229
230=cut
231