1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2010-2013 -- leonerd@leonerd.org.uk 5 6package IO::Async::Protocol::Stream; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.800'; 12 13use base qw( IO::Async::Protocol ); 14 15use Carp; 16 17=head1 NAME 18 19C<IO::Async::Protocol::Stream> - base class for stream-based protocols 20 21=head1 SYNOPSIS 22 23Most likely this class will be subclassed to implement a particular network 24protocol. 25 26 package Net::Async::HelloWorld; 27 28 use strict; 29 use warnings; 30 use base qw( IO::Async::Protocol::Stream ); 31 32 sub on_read 33 { 34 my $self = shift; 35 my ( $buffref, $eof ) = @_; 36 37 return 0 unless $$buffref =~ s/^(.*)\n//; 38 my $line = $1; 39 40 if( $line =~ m/^HELLO (.*)/ ) { 41 my $name = $1; 42 43 $self->invoke_event( on_hello => $name ); 44 } 45 46 return 1; 47 } 48 49 sub send_hello 50 { 51 my $self = shift; 52 my ( $name ) = @_; 53 54 $self->write( "HELLO $name\n" ); 55 } 56 57This small example elides such details as error handling, which a real 58protocol implementation would be likely to contain. 59 60=head1 DESCRIPTION 61 62This subclass of L<IO::Async::Protocol> is intended to stand as a base class 63for implementing stream-based protocols. It provides an interface similar to 64L<IO::Async::Stream>, primarily, a C<write> method and an C<on_read> event 65handler. 66 67It contains an instance of an L<IO::Async::Stream> object which it uses for 68actual communication, rather than being a subclass of it, allowing a level of 69independence from the actual stream being used. For example, the stream may 70actually be an L<IO::Async::SSLStream> to allow the protocol to be used over 71SSL. 72 73As with L<IO::Async::Stream>, it is required that by the time the protocol 74object is added to a Loop, that it either has an C<on_read> method, or has 75been configured with an C<on_read> callback handler. 76 77=cut 78 79=head1 EVENTS 80 81The following events are invoked, either using subclass methods or CODE 82references in parameters: 83 84=head2 $ret = on_read \$buffer, $eof 85 86=head2 on_read_eof 87 88=head2 on_write_eof 89 90The event handlers are invoked identically to L<IO::Async::Stream>. 91 92=head2 on_closed 93 94The C<on_closed> handler is optional, but if provided, will be invoked after 95the stream is closed by either side (either because the C<close()> method has 96been invoked on it, or on an incoming EOF). 97 98=cut 99 100=head1 PARAMETERS 101 102The following named parameters may be passed to C<new> or C<configure>: 103 104=head2 on_read => CODE 105 106=head2 on_read_eof => CODE 107 108=head2 on_write_eof => CODE 109 110CODE references for the events. 111 112=head2 handle => IO 113 114A shortcut for the common case where the transport only needs to be a plain 115L<IO::Async::Stream> object. If this argument is provided without a 116C<transport> object, a new L<IO::Async::Stream> object will be built around 117the given IO handle, and used as the transport. 118 119=cut 120 121sub configure 122{ 123 my $self = shift; 124 my %params = @_; 125 126 for (qw( on_read on_read_eof on_write_eof )) { 127 $self->{$_} = delete $params{$_} if exists $params{$_}; 128 } 129 130 if( !exists $params{transport} and my $handle = delete $params{handle} ) { 131 require IO::Async::Stream; 132 $params{transport} = IO::Async::Stream->new( handle => $handle ); 133 } 134 135 $self->SUPER::configure( %params ); 136 137 if( $self->loop ) { 138 $self->can_event( "on_read" ) or 139 croak 'Expected either an on_read callback or to be able to ->on_read'; 140 } 141} 142 143sub _add_to_loop 144{ 145 my $self = shift; 146 147 $self->can_event( "on_read" ) or 148 croak 'Expected either an on_read callback or to be able to ->on_read'; 149} 150 151sub setup_transport 152{ 153 my $self = shift; 154 my ( $transport ) = @_; 155 156 $self->SUPER::setup_transport( $transport ); 157 158 $transport->configure( 159 on_read => $self->_replace_weakself( sub { 160 my $self = shift or return; 161 $self->invoke_event( on_read => @_ ); 162 } ), 163 on_read_eof => $self->_replace_weakself( sub { 164 my $self = shift or return; 165 $self->maybe_invoke_event( on_read_eof => @_ ); 166 } ), 167 on_write_eof => $self->_replace_weakself( sub { 168 my $self = shift or return; 169 $self->maybe_invoke_event( on_write_eof => @_ ); 170 } ), 171 ); 172} 173 174sub teardown_transport 175{ 176 my $self = shift; 177 my ( $transport ) = @_; 178 179 $transport->configure( 180 on_read => undef, 181 ); 182 183 $self->SUPER::teardown_transport( $transport ); 184} 185 186=head1 METHODS 187 188=cut 189 190=head2 write 191 192 $protocol->write( $data ) 193 194Writes the given data by calling the C<write> method on the contained 195transport stream. 196 197=cut 198 199sub write 200{ 201 my $self = shift; 202 my ( $data, %args ) = @_; 203 204 if( ref $data eq "CODE" ) { 205 $data = $self->_replace_weakself( $data ); 206 } 207 208 if( $args{on_flush} ) { 209 $args{on_flush} = $self->_replace_weakself( $args{on_flush} ); 210 } 211 212 my $transport = $self->transport or croak "Attempted to ->write to a ".ref($self)." with no transport"; 213 $transport->write( $data, %args ); 214} 215 216=head2 connect 217 218 $protocol->connect( %args ) 219 220Sets up a connection to a peer, and configures the underlying C<transport> for 221the Protocol. Calls L<IO::Async::Protocol> C<connect> with C<socktype> set to 222C<"stream">. 223 224=cut 225 226sub connect 227{ 228 my $self = shift; 229 $self->SUPER::connect( 230 @_, 231 socktype => "stream", 232 ); 233} 234 235=head1 AUTHOR 236 237Paul Evans <leonerd@leonerd.org.uk> 238 239=cut 240 2410x55AA; 242