1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2008-2015 -- leonerd@leonerd.org.uk
5
6package IO::Async::Listener;
7
8use strict;
9use warnings;
10use base qw( IO::Async::Handle );
11
12our $VERSION = '0.800';
13
14use IO::Async::Handle;
15use IO::Async::OS;
16
17use Future 0.33; # ->catch
18
19use Errno qw( EAGAIN EWOULDBLOCK );
20
21use Socket qw( sockaddr_family SOL_SOCKET SO_ACCEPTCONN SO_TYPE );
22
23use Carp;
24
25=head1 NAME
26
27C<IO::Async::Listener> - listen on network sockets for incoming connections
28
29=head1 SYNOPSIS
30
31   use IO::Async::Listener;
32
33   use IO::Async::Loop;
34   my $loop = IO::Async::Loop->new;
35
36   my $listener = IO::Async::Listener->new(
37      on_stream => sub {
38         my ( undef, $stream ) = @_;
39
40         $stream->configure(
41            on_read => sub {
42               my ( $self, $buffref, $eof ) = @_;
43               $self->write( $$buffref );
44               $$buffref = "";
45               return 0;
46            },
47         );
48
49         $loop->add( $stream );
50      },
51   );
52
53   $loop->add( $listener );
54
55   $listener->listen(
56      service  => "echo",
57      socktype => 'stream',
58   )->get;
59
60   $loop->run;
61
62This object can also be used indirectly via an L<IO::Async::Loop>:
63
64   use IO::Async::Stream;
65
66   use IO::Async::Loop;
67   my $loop = IO::Async::Loop->new;
68
69   $loop->listen(
70      service  => "echo",
71      socktype => 'stream',
72
73      on_stream => sub {
74         ...
75      },
76   )->get;
77
78   $loop->run;
79
80=head1 DESCRIPTION
81
82This subclass of L<IO::Async::Handle> adds behaviour which watches a socket in
83listening mode, to accept incoming connections on them.
84
85A Listener can be constructed and given a existing socket in listening mode.
86Alternatively, the Listener can construct a socket by calling the C<listen>
87method. Either a list of addresses can be provided, or a service name can be
88looked up using the underlying loop's C<resolve> method.
89
90=cut
91
92=head1 EVENTS
93
94The following events are invoked, either using subclass methods or CODE
95references in parameters:
96
97=head2 on_accept $clientsocket | $handle
98
99Invoked whenever a new client connects to the socket.
100
101If neither C<handle_constructor> nor C<handle_class> parameters are set, this
102will be invoked with the new client socket directly. If a handle constructor
103or class are set, this will be invoked with the newly-constructed handle,
104having the new socket already configured onto it.
105
106=head2 on_stream $stream
107
108An alternative to C<on_accept>, this is passed an instance of
109L<IO::Async::Stream> when a new client connects. This is provided as a
110convenience for the common case that a Stream object is required as the
111transport for a Protocol object.
112
113This is now vaguely deprecated in favour of using C<on_accept> with a handle
114constructor or class.
115
116=head2 on_socket $socket
117
118Similar to C<on_stream>, but constructs an instance of L<IO::Async::Socket>.
119This is most useful for C<SOCK_DGRAM> or C<SOCK_RAW> sockets.
120
121This is now vaguely deprecated in favour of using C<on_accept> with a handle
122constructor or class.
123
124=head2 on_accept_error $socket, $errno
125
126Optional. Invoked if the C<accept> syscall indicates an error (other than
127C<EAGAIN> or C<EWOULDBLOCK>). If not provided, failures of C<accept> will
128be passed to the main C<on_error> handler.
129
130=cut
131
132=head1 PARAMETERS
133
134The following named parameters may be passed to C<new> or C<configure>:
135
136=head2 on_accept => CODE
137
138=head2 on_stream => CODE
139
140=head2 on_socket => CODE
141
142CODE reference for the event handlers. Because of the mutually-exclusive
143nature of their behaviour, only one of these may be set at a time. Setting one
144will remove the other two.
145
146=head2 handle => IO
147
148The IO handle containing an existing listen-mode socket.
149
150=head2 handle_constructor => CODE
151
152Optional. If defined, gives a CODE reference to be invoked every time a new
153client socket is accepted from the listening socket. It is passed the listener
154object itself, and is expected to return a new instance of
155L<IO::Async::Handle> or a subclass, used to wrap the new client socket.
156
157   $handle = $handle_constructor->( $listener )
158
159This can also be given as a subclass method
160
161   $handle = $listener->handle_constructor()
162
163=head2 handle_class => STRING
164
165Optional. If defined and C<handle_constructor> isn't, then new wrapper handles
166are constructed by invoking the C<new> method on the given class name, passing
167in no additional parameters.
168
169   $handle = $handle_class->new()
170
171This can also be given as a subclass method
172
173   $handle = $listener->handle_class->new
174
175=head2 acceptor => STRING|CODE
176
177Optional. If defined, gives the name of a method or a CODE reference to use to
178implement the actual accept behaviour. This will be invoked as:
179
180   ( $accepted ) = $listener->acceptor( $socket )->get
181
182   ( $handle ) = $listener->acceptor( $socket, handle => $handle )->get
183
184It is invoked with the listening socket as its its argument, and optionally
185an L<IO::Async::Handle> instance as a named parameter, and is expected to
186return a C<Future> that will eventually yield the newly-accepted socket or
187handle instance, if such was provided.
188
189=cut
190
191sub _init
192{
193   my $self = shift;
194   $self->SUPER::_init( @_ );
195
196   $self->{acceptor} = "_accept";
197}
198
199my @acceptor_events  = qw( on_accept on_stream on_socket );
200
201sub configure
202{
203   my $self = shift;
204   my %params = @_;
205
206   if( grep exists $params{$_}, @acceptor_events ) {
207      grep( defined $_, @params{@acceptor_events} ) <= 1 or
208         croak "Can only set at most one of 'on_accept', 'on_stream' or 'on_socket'";
209
210      # Don't exists-test, so we'll clear the other two
211      $self->{$_} = delete $params{$_} for @acceptor_events;
212   }
213
214   croak "Cannot set 'on_read_ready' on a Listener" if exists $params{on_read_ready};
215
216   if( defined $params{handle} ) {
217      my $handle = delete $params{handle};
218      # Sanity check it - it may be a bare GLOB ref, not an IO::Socket-derived handle
219      defined getsockname( $handle ) or croak "IO handle $handle does not have a sockname";
220
221      # So now we know it's at least some kind of socket. Is it listening?
222      # SO_ACCEPTCONN would tell us, but not all OSes implement it. Since it's
223      # only a best-effort sanity check, we won't mind if the OS doesn't.
224      my $acceptconn = getsockopt( $handle, SOL_SOCKET, SO_ACCEPTCONN );
225      !defined $acceptconn or unpack( "I", $acceptconn ) or croak "Socket is not accepting connections";
226
227      # This is a bit naughty but hopefully nobody will mind...
228      bless $handle, "IO::Socket" if ref( $handle ) eq "GLOB";
229
230      $self->SUPER::configure( read_handle => $handle );
231   }
232   elsif( exists $params{handle} ) {
233      delete $params{handle};
234
235      $self->SUPER::configure( read_handle => undef );
236   }
237
238   unless( grep $self->can_event( $_ ), @acceptor_events ) {
239      croak "Expected to be able to 'on_accept', 'on_stream' or 'on_socket'";
240   }
241
242   foreach (qw( acceptor handle_constructor handle_class )) {
243      $self->{$_} = delete $params{$_} if exists $params{$_};
244   }
245
246   if( keys %params ) {
247      croak "Cannot pass though configuration keys to underlying Handle - " . join( ", ", keys %params );
248   }
249}
250
251sub on_read_ready
252{
253   my $self = shift;
254
255   my $socket = $self->read_handle;
256
257   my $on_done;
258   my %acceptor_params;
259
260   if( $on_done = $self->can_event( "on_stream" ) ) {
261      # TODO: It doesn't make sense to put a SOCK_DGRAM in an
262      # IO::Async::Stream but currently we don't detect this
263      require IO::Async::Stream;
264      $acceptor_params{handle} = IO::Async::Stream->new;
265   }
266   elsif( $on_done = $self->can_event( "on_socket" ) ) {
267      require IO::Async::Socket;
268      $acceptor_params{handle} = IO::Async::Socket->new;
269   }
270   # on_accept needs to be last in case of multiple layers of subclassing
271   elsif( $on_done = $self->can_event( "on_accept" ) ) {
272      my $handle;
273
274      # Test both params before moving on to either method
275      if( my $constructor = $self->{handle_constructor} ) {
276         $handle = $self->{handle_constructor}->( $self );
277      }
278      elsif( my $class = $self->{handle_class} ) {
279         $handle = $class->new;
280      }
281      elsif( $self->can( "handle_constructor" ) ) {
282         $handle = $self->handle_constructor;
283      }
284      elsif( $self->can( "handle_class" ) ) {
285         $handle = $self->handle_class->new;
286      }
287
288      $acceptor_params{handle} = $handle if $handle;
289   }
290   else {
291      die "ARG! Missing on_accept,on_stream,on_socket!";
292   }
293
294   my $acceptor = $self->acceptor;
295   my $f = $self->$acceptor( $socket, %acceptor_params )->on_done( sub {
296      my ( $result ) = @_ or return; # false-alarm
297      $on_done->( $self, $result );
298   })->catch( accept => sub {
299      my ( $message, $name, @args ) = @_;
300      my ( $socket, $dollarbang ) = @args;
301      $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ) or
302         $self->invoke_error( "accept() failed - $dollarbang", accept => $socket, $dollarbang );
303   });
304
305   # TODO: Consider if this wants a more fine-grained place to report
306   # non-accept() failures (such as SSL) to
307   $self->adopt_future( $f );
308}
309
310sub _accept
311{
312   my $self = shift;
313   my ( $listen_sock, %params ) = @_;
314
315   my $accepted = $listen_sock->accept;
316
317   if( defined $accepted ) {
318      $accepted->blocking( 0 );
319      if( my $handle = $params{handle} ) {
320         $handle->set_handle( $accepted );
321         return Future->done( $handle );
322      }
323      else {
324         return Future->done( $accepted );
325      }
326   }
327   elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
328      return Future->done;
329   }
330   else {
331      return Future->fail( "Cannot accept() - $!", accept => $listen_sock, $! );
332   }
333}
334
335=head1 METHODS
336
337The following methods documented with a trailing call to C<< ->get >> return
338L<Future> instances.
339
340=cut
341
342=head2 acceptor
343
344   $acceptor = $listener->acceptor
345
346Returns the currently-set C<acceptor> method name or code reference. This may
347be of interest to Loop C<listen> extension methods that wish to extend or wrap
348it.
349
350=cut
351
352sub acceptor
353{
354   my $self = shift;
355   return $self->{acceptor};
356}
357
358sub is_listening
359{
360   my $self = shift;
361
362   return ( defined $self->sockname );
363}
364
365=head2 sockname
366
367   $name = $listener->sockname
368
369Returns the C<sockname> of the underlying listening socket
370
371=cut
372
373sub sockname
374{
375   my $self = shift;
376
377   my $handle = $self->read_handle or return undef;
378   return $handle->sockname;
379}
380
381=head2 family
382
383   $family = $listener->family
384
385Returns the socket address family of the underlying listening socket
386
387=cut
388
389sub family
390{
391   my $self = shift;
392
393   my $sockname = $self->sockname or return undef;
394   return sockaddr_family( $sockname );
395}
396
397=head2 socktype
398
399   $socktype = $listener->socktype
400
401Returns the socket type of the underlying listening socket
402
403=cut
404
405sub socktype
406{
407   my $self = shift;
408
409   my $handle = $self->read_handle or return undef;
410   return $handle->sockopt(SO_TYPE);
411}
412
413=head2 listen
414
415   $listener->listen( %params )->get
416
417This method sets up a listening socket and arranges for the acceptor callback
418to be invoked each time a new connection is accepted on the socket.
419
420Most parameters given to this method are passed into the C<listen> method of
421the L<IO::Async::Loop> object. In addition, the following arguments are also
422recognised directly:
423
424=over 8
425
426=item on_listen => CODE
427
428Optional. A callback that is invoked when the listening socket is ready.
429Similar to that on the underlying loop method, except it is passed the
430listener object itself.
431
432   $on_listen->( $listener )
433
434=back
435
436=cut
437
438sub listen
439{
440   my $self = shift;
441   my ( %params ) = @_;
442
443   my $loop = $self->loop;
444   defined $loop or croak "Cannot listen when not a member of a Loop"; # TODO: defer?
445
446   if( my $on_listen = delete $params{on_listen} ) {
447      $params{on_listen} = sub { $on_listen->( $self ) };
448   }
449
450   $loop->listen( listener => $self, %params );
451}
452
453=head1 EXAMPLES
454
455=head2 Listening on UNIX Sockets
456
457The C<handle> argument can be passed an existing socket already in listening
458mode, making it possible to listen on other types of socket such as UNIX
459sockets.
460
461   use IO::Async::Listener;
462   use IO::Socket::UNIX;
463
464   use IO::Async::Loop;
465   my $loop = IO::Async::Loop->new;
466
467   my $listener = IO::Async::Listener->new(
468      on_stream => sub {
469         my ( undef, $stream ) = @_;
470
471         $stream->configure(
472            on_read => sub {
473               my ( $self, $buffref, $eof ) = @_;
474               $self->write( $$buffref );
475               $$buffref = "";
476               return 0;
477            },
478         );
479
480         $loop->add( $stream );
481      },
482   );
483
484   $loop->add( $listener );
485
486   my $socket = IO::Socket::UNIX->new(
487      Local => "echo.sock",
488      Listen => 1,
489   ) or die "Cannot make UNIX socket - $!\n";
490
491   $listener->listen(
492      handle => $socket,
493   );
494
495   $loop->run;
496
497=head2 Passing Plain Socket Addresses
498
499The C<addr> or C<addrs> parameters should contain a definition of a plain
500socket address in a form that the L<IO::Async::OS> C<extract_addrinfo>
501method can use.
502
503This example shows how to listen on TCP port 8001 on address 10.0.0.1:
504
505   $listener->listen(
506      addr => {
507         family   => "inet",
508         socktype => "stream",
509         port     => 8001,
510         ip       => "10.0.0.1",
511      },
512      ...
513   );
514
515This example shows another way to listen on a UNIX socket, similar to the
516earlier example:
517
518   $listener->listen(
519      addr => {
520         family   => "unix",
521         socktype => "stream",
522         path     => "echo.sock",
523      },
524      ...
525   );
526
527=head2 Using A Kernel-Assigned Port Number
528
529Rather than picking a specific port number, is it possible to ask the kernel
530to assign one arbitrarily that is currently free. This can be done by
531requesting port number 0 (which is actually the default if no port number is
532otherwise specified). To determine which port number the kernel actually
533picked, inspect the C<sockport> accessor on the actual socket filehandle.
534
535Either use the L<Future> returned by the C<listen> method:
536
537   $listener->listen(
538      addr => { family => "inet" },
539   )->on_done( sub {
540      my ( $listener ) = @_;
541      my $socket = $listener->read_handle;
542
543      say "Now listening on port ", $socket->sockport;
544   });
545
546Or pass an C<on_listen> continuation:
547
548   $listener->listen(
549      addr => { family => "inet" },
550
551      on_listen => sub {
552         my ( $listener ) = @_;
553         my $socket = $listener->read_handle;
554
555         say "Now listening on port ", $socket->sockport;
556      },
557   );
558
559=head1 AUTHOR
560
561Paul Evans <leonerd@leonerd.org.uk>
562
563=cut
564
5650x55AA;
566