1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2008-2015 -- leonerd@leonerd.org.uk 5 6package IO::Async::Listener; 7 8use strict; 9use warnings; 10use base qw( IO::Async::Handle ); 11 12our $VERSION = '0.800'; 13 14use IO::Async::Handle; 15use IO::Async::OS; 16 17use Future 0.33; # ->catch 18 19use Errno qw( EAGAIN EWOULDBLOCK ); 20 21use Socket qw( sockaddr_family SOL_SOCKET SO_ACCEPTCONN SO_TYPE ); 22 23use Carp; 24 25=head1 NAME 26 27C<IO::Async::Listener> - listen on network sockets for incoming connections 28 29=head1 SYNOPSIS 30 31 use IO::Async::Listener; 32 33 use IO::Async::Loop; 34 my $loop = IO::Async::Loop->new; 35 36 my $listener = IO::Async::Listener->new( 37 on_stream => sub { 38 my ( undef, $stream ) = @_; 39 40 $stream->configure( 41 on_read => sub { 42 my ( $self, $buffref, $eof ) = @_; 43 $self->write( $$buffref ); 44 $$buffref = ""; 45 return 0; 46 }, 47 ); 48 49 $loop->add( $stream ); 50 }, 51 ); 52 53 $loop->add( $listener ); 54 55 $listener->listen( 56 service => "echo", 57 socktype => 'stream', 58 )->get; 59 60 $loop->run; 61 62This object can also be used indirectly via an L<IO::Async::Loop>: 63 64 use IO::Async::Stream; 65 66 use IO::Async::Loop; 67 my $loop = IO::Async::Loop->new; 68 69 $loop->listen( 70 service => "echo", 71 socktype => 'stream', 72 73 on_stream => sub { 74 ... 75 }, 76 )->get; 77 78 $loop->run; 79 80=head1 DESCRIPTION 81 82This subclass of L<IO::Async::Handle> adds behaviour which watches a socket in 83listening mode, to accept incoming connections on them. 84 85A Listener can be constructed and given a existing socket in listening mode. 86Alternatively, the Listener can construct a socket by calling the C<listen> 87method. Either a list of addresses can be provided, or a service name can be 88looked up using the underlying loop's C<resolve> method. 89 90=cut 91 92=head1 EVENTS 93 94The following events are invoked, either using subclass methods or CODE 95references in parameters: 96 97=head2 on_accept $clientsocket | $handle 98 99Invoked whenever a new client connects to the socket. 100 101If neither C<handle_constructor> nor C<handle_class> parameters are set, this 102will be invoked with the new client socket directly. If a handle constructor 103or class are set, this will be invoked with the newly-constructed handle, 104having the new socket already configured onto it. 105 106=head2 on_stream $stream 107 108An alternative to C<on_accept>, this is passed an instance of 109L<IO::Async::Stream> when a new client connects. This is provided as a 110convenience for the common case that a Stream object is required as the 111transport for a Protocol object. 112 113This is now vaguely deprecated in favour of using C<on_accept> with a handle 114constructor or class. 115 116=head2 on_socket $socket 117 118Similar to C<on_stream>, but constructs an instance of L<IO::Async::Socket>. 119This is most useful for C<SOCK_DGRAM> or C<SOCK_RAW> sockets. 120 121This is now vaguely deprecated in favour of using C<on_accept> with a handle 122constructor or class. 123 124=head2 on_accept_error $socket, $errno 125 126Optional. Invoked if the C<accept> syscall indicates an error (other than 127C<EAGAIN> or C<EWOULDBLOCK>). If not provided, failures of C<accept> will 128be passed to the main C<on_error> handler. 129 130=cut 131 132=head1 PARAMETERS 133 134The following named parameters may be passed to C<new> or C<configure>: 135 136=head2 on_accept => CODE 137 138=head2 on_stream => CODE 139 140=head2 on_socket => CODE 141 142CODE reference for the event handlers. Because of the mutually-exclusive 143nature of their behaviour, only one of these may be set at a time. Setting one 144will remove the other two. 145 146=head2 handle => IO 147 148The IO handle containing an existing listen-mode socket. 149 150=head2 handle_constructor => CODE 151 152Optional. If defined, gives a CODE reference to be invoked every time a new 153client socket is accepted from the listening socket. It is passed the listener 154object itself, and is expected to return a new instance of 155L<IO::Async::Handle> or a subclass, used to wrap the new client socket. 156 157 $handle = $handle_constructor->( $listener ) 158 159This can also be given as a subclass method 160 161 $handle = $listener->handle_constructor() 162 163=head2 handle_class => STRING 164 165Optional. If defined and C<handle_constructor> isn't, then new wrapper handles 166are constructed by invoking the C<new> method on the given class name, passing 167in no additional parameters. 168 169 $handle = $handle_class->new() 170 171This can also be given as a subclass method 172 173 $handle = $listener->handle_class->new 174 175=head2 acceptor => STRING|CODE 176 177Optional. If defined, gives the name of a method or a CODE reference to use to 178implement the actual accept behaviour. This will be invoked as: 179 180 ( $accepted ) = $listener->acceptor( $socket )->get 181 182 ( $handle ) = $listener->acceptor( $socket, handle => $handle )->get 183 184It is invoked with the listening socket as its its argument, and optionally 185an L<IO::Async::Handle> instance as a named parameter, and is expected to 186return a C<Future> that will eventually yield the newly-accepted socket or 187handle instance, if such was provided. 188 189=cut 190 191sub _init 192{ 193 my $self = shift; 194 $self->SUPER::_init( @_ ); 195 196 $self->{acceptor} = "_accept"; 197} 198 199my @acceptor_events = qw( on_accept on_stream on_socket ); 200 201sub configure 202{ 203 my $self = shift; 204 my %params = @_; 205 206 if( grep exists $params{$_}, @acceptor_events ) { 207 grep( defined $_, @params{@acceptor_events} ) <= 1 or 208 croak "Can only set at most one of 'on_accept', 'on_stream' or 'on_socket'"; 209 210 # Don't exists-test, so we'll clear the other two 211 $self->{$_} = delete $params{$_} for @acceptor_events; 212 } 213 214 croak "Cannot set 'on_read_ready' on a Listener" if exists $params{on_read_ready}; 215 216 if( defined $params{handle} ) { 217 my $handle = delete $params{handle}; 218 # Sanity check it - it may be a bare GLOB ref, not an IO::Socket-derived handle 219 defined getsockname( $handle ) or croak "IO handle $handle does not have a sockname"; 220 221 # So now we know it's at least some kind of socket. Is it listening? 222 # SO_ACCEPTCONN would tell us, but not all OSes implement it. Since it's 223 # only a best-effort sanity check, we won't mind if the OS doesn't. 224 my $acceptconn = getsockopt( $handle, SOL_SOCKET, SO_ACCEPTCONN ); 225 !defined $acceptconn or unpack( "I", $acceptconn ) or croak "Socket is not accepting connections"; 226 227 # This is a bit naughty but hopefully nobody will mind... 228 bless $handle, "IO::Socket" if ref( $handle ) eq "GLOB"; 229 230 $self->SUPER::configure( read_handle => $handle ); 231 } 232 elsif( exists $params{handle} ) { 233 delete $params{handle}; 234 235 $self->SUPER::configure( read_handle => undef ); 236 } 237 238 unless( grep $self->can_event( $_ ), @acceptor_events ) { 239 croak "Expected to be able to 'on_accept', 'on_stream' or 'on_socket'"; 240 } 241 242 foreach (qw( acceptor handle_constructor handle_class )) { 243 $self->{$_} = delete $params{$_} if exists $params{$_}; 244 } 245 246 if( keys %params ) { 247 croak "Cannot pass though configuration keys to underlying Handle - " . join( ", ", keys %params ); 248 } 249} 250 251sub on_read_ready 252{ 253 my $self = shift; 254 255 my $socket = $self->read_handle; 256 257 my $on_done; 258 my %acceptor_params; 259 260 if( $on_done = $self->can_event( "on_stream" ) ) { 261 # TODO: It doesn't make sense to put a SOCK_DGRAM in an 262 # IO::Async::Stream but currently we don't detect this 263 require IO::Async::Stream; 264 $acceptor_params{handle} = IO::Async::Stream->new; 265 } 266 elsif( $on_done = $self->can_event( "on_socket" ) ) { 267 require IO::Async::Socket; 268 $acceptor_params{handle} = IO::Async::Socket->new; 269 } 270 # on_accept needs to be last in case of multiple layers of subclassing 271 elsif( $on_done = $self->can_event( "on_accept" ) ) { 272 my $handle; 273 274 # Test both params before moving on to either method 275 if( my $constructor = $self->{handle_constructor} ) { 276 $handle = $self->{handle_constructor}->( $self ); 277 } 278 elsif( my $class = $self->{handle_class} ) { 279 $handle = $class->new; 280 } 281 elsif( $self->can( "handle_constructor" ) ) { 282 $handle = $self->handle_constructor; 283 } 284 elsif( $self->can( "handle_class" ) ) { 285 $handle = $self->handle_class->new; 286 } 287 288 $acceptor_params{handle} = $handle if $handle; 289 } 290 else { 291 die "ARG! Missing on_accept,on_stream,on_socket!"; 292 } 293 294 my $acceptor = $self->acceptor; 295 my $f = $self->$acceptor( $socket, %acceptor_params )->on_done( sub { 296 my ( $result ) = @_ or return; # false-alarm 297 $on_done->( $self, $result ); 298 })->catch( accept => sub { 299 my ( $message, $name, @args ) = @_; 300 my ( $socket, $dollarbang ) = @args; 301 $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ) or 302 $self->invoke_error( "accept() failed - $dollarbang", accept => $socket, $dollarbang ); 303 }); 304 305 # TODO: Consider if this wants a more fine-grained place to report 306 # non-accept() failures (such as SSL) to 307 $self->adopt_future( $f ); 308} 309 310sub _accept 311{ 312 my $self = shift; 313 my ( $listen_sock, %params ) = @_; 314 315 my $accepted = $listen_sock->accept; 316 317 if( defined $accepted ) { 318 $accepted->blocking( 0 ); 319 if( my $handle = $params{handle} ) { 320 $handle->set_handle( $accepted ); 321 return Future->done( $handle ); 322 } 323 else { 324 return Future->done( $accepted ); 325 } 326 } 327 elsif( $! == EAGAIN or $! == EWOULDBLOCK ) { 328 return Future->done; 329 } 330 else { 331 return Future->fail( "Cannot accept() - $!", accept => $listen_sock, $! ); 332 } 333} 334 335=head1 METHODS 336 337The following methods documented with a trailing call to C<< ->get >> return 338L<Future> instances. 339 340=cut 341 342=head2 acceptor 343 344 $acceptor = $listener->acceptor 345 346Returns the currently-set C<acceptor> method name or code reference. This may 347be of interest to Loop C<listen> extension methods that wish to extend or wrap 348it. 349 350=cut 351 352sub acceptor 353{ 354 my $self = shift; 355 return $self->{acceptor}; 356} 357 358sub is_listening 359{ 360 my $self = shift; 361 362 return ( defined $self->sockname ); 363} 364 365=head2 sockname 366 367 $name = $listener->sockname 368 369Returns the C<sockname> of the underlying listening socket 370 371=cut 372 373sub sockname 374{ 375 my $self = shift; 376 377 my $handle = $self->read_handle or return undef; 378 return $handle->sockname; 379} 380 381=head2 family 382 383 $family = $listener->family 384 385Returns the socket address family of the underlying listening socket 386 387=cut 388 389sub family 390{ 391 my $self = shift; 392 393 my $sockname = $self->sockname or return undef; 394 return sockaddr_family( $sockname ); 395} 396 397=head2 socktype 398 399 $socktype = $listener->socktype 400 401Returns the socket type of the underlying listening socket 402 403=cut 404 405sub socktype 406{ 407 my $self = shift; 408 409 my $handle = $self->read_handle or return undef; 410 return $handle->sockopt(SO_TYPE); 411} 412 413=head2 listen 414 415 $listener->listen( %params )->get 416 417This method sets up a listening socket and arranges for the acceptor callback 418to be invoked each time a new connection is accepted on the socket. 419 420Most parameters given to this method are passed into the C<listen> method of 421the L<IO::Async::Loop> object. In addition, the following arguments are also 422recognised directly: 423 424=over 8 425 426=item on_listen => CODE 427 428Optional. A callback that is invoked when the listening socket is ready. 429Similar to that on the underlying loop method, except it is passed the 430listener object itself. 431 432 $on_listen->( $listener ) 433 434=back 435 436=cut 437 438sub listen 439{ 440 my $self = shift; 441 my ( %params ) = @_; 442 443 my $loop = $self->loop; 444 defined $loop or croak "Cannot listen when not a member of a Loop"; # TODO: defer? 445 446 if( my $on_listen = delete $params{on_listen} ) { 447 $params{on_listen} = sub { $on_listen->( $self ) }; 448 } 449 450 $loop->listen( listener => $self, %params ); 451} 452 453=head1 EXAMPLES 454 455=head2 Listening on UNIX Sockets 456 457The C<handle> argument can be passed an existing socket already in listening 458mode, making it possible to listen on other types of socket such as UNIX 459sockets. 460 461 use IO::Async::Listener; 462 use IO::Socket::UNIX; 463 464 use IO::Async::Loop; 465 my $loop = IO::Async::Loop->new; 466 467 my $listener = IO::Async::Listener->new( 468 on_stream => sub { 469 my ( undef, $stream ) = @_; 470 471 $stream->configure( 472 on_read => sub { 473 my ( $self, $buffref, $eof ) = @_; 474 $self->write( $$buffref ); 475 $$buffref = ""; 476 return 0; 477 }, 478 ); 479 480 $loop->add( $stream ); 481 }, 482 ); 483 484 $loop->add( $listener ); 485 486 my $socket = IO::Socket::UNIX->new( 487 Local => "echo.sock", 488 Listen => 1, 489 ) or die "Cannot make UNIX socket - $!\n"; 490 491 $listener->listen( 492 handle => $socket, 493 ); 494 495 $loop->run; 496 497=head2 Passing Plain Socket Addresses 498 499The C<addr> or C<addrs> parameters should contain a definition of a plain 500socket address in a form that the L<IO::Async::OS> C<extract_addrinfo> 501method can use. 502 503This example shows how to listen on TCP port 8001 on address 10.0.0.1: 504 505 $listener->listen( 506 addr => { 507 family => "inet", 508 socktype => "stream", 509 port => 8001, 510 ip => "10.0.0.1", 511 }, 512 ... 513 ); 514 515This example shows another way to listen on a UNIX socket, similar to the 516earlier example: 517 518 $listener->listen( 519 addr => { 520 family => "unix", 521 socktype => "stream", 522 path => "echo.sock", 523 }, 524 ... 525 ); 526 527=head2 Using A Kernel-Assigned Port Number 528 529Rather than picking a specific port number, is it possible to ask the kernel 530to assign one arbitrarily that is currently free. This can be done by 531requesting port number 0 (which is actually the default if no port number is 532otherwise specified). To determine which port number the kernel actually 533picked, inspect the C<sockport> accessor on the actual socket filehandle. 534 535Either use the L<Future> returned by the C<listen> method: 536 537 $listener->listen( 538 addr => { family => "inet" }, 539 )->on_done( sub { 540 my ( $listener ) = @_; 541 my $socket = $listener->read_handle; 542 543 say "Now listening on port ", $socket->sockport; 544 }); 545 546Or pass an C<on_listen> continuation: 547 548 $listener->listen( 549 addr => { family => "inet" }, 550 551 on_listen => sub { 552 my ( $listener ) = @_; 553 my $socket = $listener->read_handle; 554 555 say "Now listening on port ", $socket->sockport; 556 }, 557 ); 558 559=head1 AUTHOR 560 561Paul Evans <leonerd@leonerd.org.uk> 562 563=cut 564 5650x55AA; 566