1package MR::IProto::Server::Connection; 2 3=head1 NAME 4 5=head1 DESCRIPTION 6 7=cut 8 9use Mouse; 10use AnyEvent::DNS; 11use Scalar::Util qw/weaken/; 12 13with 'MR::IProto::Role::Debuggable'; 14 15has handler => ( 16 is => 'ro', 17 isa => 'CodeRef', 18 required => 1, 19); 20 21has on_accept => ( 22 is => 'ro', 23 isa => 'CodeRef', 24); 25 26has on_close => ( 27 is => 'ro', 28 isa => 'CodeRef', 29); 30 31has on_error => ( 32 is => 'ro', 33 isa => 'CodeRef', 34); 35 36has fh => ( 37 is => 'ro', 38 isa => 'FileHandle', 39 required => 1, 40); 41 42has host => ( 43 is => 'ro', 44 isa => 'Str', 45 required => 1, 46); 47 48has port => ( 49 is => 'ro', 50 isa => 'Int', 51 required => 1, 52); 53 54has hostname => ( 55 is => 'ro', 56 isa => 'Str', 57 writer => '_hostname', 58 lazy_build => 1, 59); 60 61has _handle => ( 62 is => 'ro', 63 isa => 'AnyEvent::Handle', 64 lazy_build => 1, 65); 66 67has _recv_header => ( 68 is => 'ro', 69 isa => 'CodeRef', 70 lazy_build => 1, 71); 72 73sub BUILD { 74 my ($self) = @_; 75 $self->_debug(sprintf "Connection accepted") if $self->debug >= 1; 76 weaken($self); 77 AnyEvent::DNS::reverse_verify $self->host, sub { 78 my ($hostname) = @_; 79 if ($hostname) { 80 $self->_hostname($hostname); 81 $self->_debug(sprintf "%s resolved as %s", $self->host, $hostname) if $self->debug >= 4; 82 } else { 83 $self->_hostname($self->host); 84 $self->_debug(sprintf "Can't resolve %s", $self->host); 85 } 86 $self->_handle; 87 $self->on_accept->($self) if $self->on_accept; 88 return; 89 }; 90 return; 91} 92 93sub DEMOLISH { 94 my ($self) = @_; 95 $self->_debug(sprintf "Object for connection was destroyed\n") if $self->debug >= 1; 96 return; 97} 98 99sub close { 100 my ($self) = @_; 101 $self->on_close->($self) if $self->on_close; 102 return; 103} 104 105sub _build_hostname { 106 my ($self) = @_; 107 return $self->host; 108} 109 110sub _build__handle { 111 my ($self) = @_; 112 weaken($self); 113 my $peername = join ':', $self->host, $self->port; 114 return AnyEvent::Handle->new( 115 fh => $self->fh, 116 peername => $peername, 117 on_read => sub { 118 my ($handle) = @_; 119 $handle->unshift_read( chunk => 12, $self->_recv_header ); 120 return; 121 }, 122 on_eof => sub { 123 my ($handle) = @_; 124 $self->_debug("Connection closed by foreign host\n") if $self->debug >= 1; 125 $handle->destroy(); 126 $self->on_close->($self) if $self->on_close; 127 return; 128 }, 129 on_error => sub { 130 my ($handle, $fatal, $message) = @_; 131 $handle->destroy(); 132 if ($self->on_error) { 133 $self->_debug("error: $message\n") if $self->debug >= 1; 134 $self->on_error->($self, $message); 135 } else { 136 $self->_debug("error: $message\n"); 137 } 138 $self->on_close->($self) if $self->on_close; 139 return; 140 } 141 ); 142 return; 143} 144 145sub _build__recv_header { 146 my ($self) = @_; 147 weaken($self); 148 my $handler = $self->handler; 149 return sub { 150 my ($handle, $data) = @_; 151 $self->_debug_dump('recv header: ', $data) if $self->debug >= 5; 152 my ($cmd, $length, $sync) = unpack 'L3', $data; 153 $handle->unshift_read( 154 chunk => $length, 155 sub { 156 my ($handle, $data) = @_; 157 $self->_debug_dump('recv payload: ', $data) if $self->debug >= 5; 158 my $result; 159 if (eval { $result = $handler->($self, $cmd, $data); 1 }) { 160 my $header = pack 'L3', $cmd, length $result, $sync; 161 if ($self->debug >= 6) { 162 $self->_debug_dump('send header: ', $header); 163 $self->_debug_dump('send payload: ', $result); 164 } 165 $handle->push_write($header . $result); 166 } else { 167 warn $@; 168 $self->_debug("Failed to handle cmd=$cmd\n"); 169 } 170 return; 171 } 172 ); 173 return; 174 }; 175} 176 177sub _debug { 178 my ($self, $msg) = @_; 179 $self->debug_cb->( sprintf "%s(%s:%d): %s", $self->hostname, $self->host, $self->port, $msg ); 180 return; 181} 182 183no Mouse; 184__PACKAGE__->meta->make_immutable(); 185 1861; 187