1package MR::IProto::Server::Connection;
2
3=head1 NAME
4
5=head1 DESCRIPTION
6
7=cut
8
9use Mouse;
10use AnyEvent::DNS;
11use Scalar::Util qw/weaken/;
12
13with 'MR::IProto::Role::Debuggable';
14
15has handler => (
16    is  => 'ro',
17    isa => 'CodeRef',
18    required => 1,
19);
20
21has on_accept => (
22    is  => 'ro',
23    isa => 'CodeRef',
24);
25
26has on_close => (
27    is  => 'ro',
28    isa => 'CodeRef',
29);
30
31has on_error => (
32    is  => 'ro',
33    isa => 'CodeRef',
34);
35
36has fh => (
37    is  => 'ro',
38    isa => 'FileHandle',
39    required => 1,
40);
41
42has host => (
43    is  => 'ro',
44    isa => 'Str',
45    required => 1,
46);
47
48has port => (
49    is  => 'ro',
50    isa => 'Int',
51    required => 1,
52);
53
54has hostname => (
55    is  => 'ro',
56    isa => 'Str',
57    writer => '_hostname',
58    lazy_build => 1,
59);
60
61has _handle => (
62    is  => 'ro',
63    isa => 'AnyEvent::Handle',
64    lazy_build => 1,
65);
66
67has _recv_header => (
68    is  => 'ro',
69    isa => 'CodeRef',
70    lazy_build => 1,
71);
72
73sub BUILD {
74    my ($self) = @_;
75    $self->_debug(sprintf "Connection accepted") if $self->debug >= 1;
76    weaken($self);
77    AnyEvent::DNS::reverse_verify $self->host, sub {
78        my ($hostname) = @_;
79        if ($hostname) {
80            $self->_hostname($hostname);
81            $self->_debug(sprintf "%s resolved as %s", $self->host, $hostname) if $self->debug >= 4;
82        } else {
83            $self->_hostname($self->host);
84            $self->_debug(sprintf "Can't resolve %s", $self->host);
85        }
86        $self->_handle;
87        $self->on_accept->($self) if $self->on_accept;
88        return;
89    };
90    return;
91}
92
93sub DEMOLISH {
94    my ($self) = @_;
95    $self->_debug(sprintf "Object for connection was destroyed\n") if $self->debug >= 1;
96    return;
97}
98
99sub close {
100    my ($self) = @_;
101    $self->on_close->($self) if $self->on_close;
102    return;
103}
104
105sub _build_hostname {
106    my ($self) = @_;
107    return $self->host;
108}
109
110sub _build__handle {
111    my ($self) = @_;
112    weaken($self);
113    my $peername = join ':', $self->host, $self->port;
114    return AnyEvent::Handle->new(
115        fh       => $self->fh,
116        peername => $peername,
117        on_read  => sub {
118            my ($handle) = @_;
119            $handle->unshift_read( chunk => 12, $self->_recv_header );
120            return;
121        },
122        on_eof   => sub {
123            my ($handle) = @_;
124            $self->_debug("Connection closed by foreign host\n") if $self->debug >= 1;
125            $handle->destroy();
126            $self->on_close->($self) if $self->on_close;
127            return;
128        },
129        on_error => sub {
130            my ($handle, $fatal, $message) = @_;
131            $handle->destroy();
132            if ($self->on_error) {
133                $self->_debug("error: $message\n") if $self->debug >= 1;
134                $self->on_error->($self, $message);
135            } else {
136                $self->_debug("error: $message\n");
137            }
138            $self->on_close->($self) if $self->on_close;
139            return;
140        }
141    );
142    return;
143}
144
145sub _build__recv_header {
146    my ($self) = @_;
147    weaken($self);
148    my $handler = $self->handler;
149    return sub {
150        my ($handle, $data) = @_;
151        $self->_debug_dump('recv header: ', $data) if $self->debug >= 5;
152        my ($cmd, $length, $sync) = unpack 'L3', $data;
153        $handle->unshift_read(
154            chunk => $length,
155            sub {
156                my ($handle, $data) = @_;
157                $self->_debug_dump('recv payload: ', $data) if $self->debug >= 5;
158                my $result;
159                if (eval { $result = $handler->($self, $cmd, $data); 1 }) {
160                    my $header = pack 'L3', $cmd, length $result, $sync;
161                    if ($self->debug >= 6) {
162                        $self->_debug_dump('send header: ', $header);
163                        $self->_debug_dump('send payload: ', $result);
164                    }
165                    $handle->push_write($header . $result);
166                } else {
167                    warn $@;
168                    $self->_debug("Failed to handle cmd=$cmd\n");
169                }
170                return;
171            }
172        );
173        return;
174    };
175}
176
177sub _debug {
178    my ($self, $msg) = @_;
179    $self->debug_cb->( sprintf "%s(%s:%d): %s", $self->hostname, $self->host, $self->port, $msg );
180    return;
181}
182
183no Mouse;
184__PACKAGE__->meta->make_immutable();
185
1861;
187