1#!/usr/bin/perl 2# $Id: tcp-server.perl 155 2007-02-15 05:09:17Z rcaputo $ 3 4# Test out the syntax for a TCP listener stage. 5 6use lib qw(./lib ../lib); 7 8{ 9 package POE::Stage::Listener; 10 11 use POE::Stage qw(:base self req); 12 13 use IO::Socket::INET; 14 use POE::Watcher::Input; 15 16 # Fire off an automatic request using the stage's constructor 17 # parameters. Check the parameters while were here since this is 18 # happening during new(). 19 # 20 # TODO - Fix up error reporting so croak() reports where new() was 21 # called. 22 # 23 # TODO - I'm not sure whether things should be stored in $self, 24 # $self->{req} or what. Very confusing. Users will also have this 25 # problem. Hell, if *I* can't figure it out, then it sucks. 26 27 sub init :Handler { 28 my $args = $_[1]; 29 my $self_init_request; 30 my ($arg_socket, $arg_listen_queue); 31 32 # TODO - This idiom happens enough that we should abstract it. 33 my $passthrough_args = delete($args->{args}) || { }; 34 35 # TODO - Common pattern: Hoist parameters out of $args and place 36 # them into a request's args. It's a butt-ugly, repetitive thing 37 # to do. Find a better way. 38 39 die "POE::Stage::Listener requires a socket" unless $arg_socket; 40 41 $arg_listen_queue ||= SOMAXCONN; 42 43 $self_init_request = POE::Request->new( 44 stage => self, 45 method => "listen", 46 %$args, 47 args => { 48 %$passthrough_args, 49 socket => $arg_socket, 50 listen_queue => $arg_listen_queue, 51 }, 52 ); 53 54 # Do object-scoped initialization here. 55 # TODO 56 } 57 58 # Set up the listener. 59 60 sub listen :Handler { 61 my ($arg_socket, $arg_listen_queue); 62 63 my $req_socket = $arg_socket; 64 my $req_listen_queue = $arg_listen_queue; 65 66 # TODO - Pass in parameters for listen. Whee. 67 listen($arg_socket, $arg_listen_queue) or die "listen: $!"; 68 69 my $req_input_watcher = POE::Watcher::Input->new( 70 handle => $arg_socket, 71 on_input => "accept_connection", 72 ); 73 } 74 75 # Ready to accept from the socket. Do it. 76 77 sub accept_connection :Handler { 78 my $new_socket = (my $req_socket)->accept(); 79 warn "accept error $!" unless $new_socket; 80 req->emit( type => "accept", socket => $new_socket ); 81 } 82} 83 84### 85 86{ 87 package POE::Stage::EchoSession; 88 89 use POE::Stage qw(:base self req); 90 91 sub init :Handler { 92 my $args = $_[1]; 93 my $self_init_request; 94 my $arg_socket; 95 96 my $passthrough_args = delete($args->{args}) || { }; 97 98 $self_init_request = POE::Request->new( 99 stage => self, 100 method => "interact", 101 %$args, 102 args => { 103 socket => $arg_socket, 104 } 105 ); 106 } 107 108 sub interact :Handler { 109 my $arg_socket; 110 111 my $req_input_watcher = POE::Watcher::Input->new( 112 handle => $arg_socket, 113 on_input => "process_input", 114 ); 115 } 116 117 sub process_input :Handler { 118 my $arg_handle; 119 120 my $ret = sysread($arg_handle, my $buf = "", 65536); 121 122 use POSIX qw(EAGAIN EWOULDBLOCK); 123 124 my $req_input_watcher; 125 unless ($ret) { 126 return if $! == EAGAIN or $! == EWOULDBLOCK; 127 if ($!) { 128 warn "read error: $!"; 129 } 130 else { 131 warn "remote closed connection"; 132 } 133 $req_input_watcher = undef; 134 return; 135 } 136 137 my ($offset, $rest) = (0, $ret); 138 while ($rest) { 139 my $wrote = syswrite($arg_handle, $buf, $rest, $offset); 140 141 # Nasty busy loop for rapid prototyping. 142 unless ($wrote) { 143 next if $! == EAGAIN or $! == EWOULDBLOCK; 144 warn "write error: $!"; 145 $req_input_watcher = undef; 146 return; 147 } 148 149 $rest -= $wrote; 150 $offset += $wrote; 151 } 152 } 153} 154 155### 156 157{ 158 package POE::Stage::EchoServer; 159 160 use Scalar::Util qw(weaken); 161 use base qw(POE::Stage::Listener); 162 163 sub on_my_accept :Handler { 164 my $arg_socket; 165 166 # Do we need to save this reference? Self-requesting stages 167 # should do something magical here. 168 my %req_sockets; 169 $req_sockets{$arg_socket} = POE::Stage::EchoSession->new( 170 socket => $arg_socket, 171 ); 172 weaken $req_sockets{$arg_socket}; 173 } 174} 175 176# The application starts an echo server based on parameters given to 177# it. 178 179{ 180 package App; 181 use POE::Stage::App qw(:base); 182 sub on_run { 183 my $req_server = POE::Stage::EchoServer->new( 184 socket => IO::Socket::INET->new( 185 LocalAddr => my $arg_bind_addr, 186 LocalPort => my $arg_bind_port, 187 ReuseAddr => "yes", 188 ), 189 ); 190 191 print "To connect to this echo server: telnet localhost 31415\n"; 192 } 193} 194 195App->new()->run( 196 bind_addr => "127.0.0.1", 197 bind_port => 31415, 198); 199exit; 200 201__END__ 202 203Do we even need an App class for self-contained subclass 204components? Nifty! Try to avoid it. 205 206# Creating the server object will also set it up. 207# init() fires the event, self-firing style. 208# We need callbacks that redirect to other stages. 209 210my $x = POE::Stage::EchoServer->new( 211 BindPort => 8675, 212); 213 214POE::Kernel->run(); 215 216Uppercase parameters are constructor arguments? Does it matter which 217are for the constructor? 218 219Socket 220 221on_accept 222on_accept_failure 223on_failure 224 225