1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2010-2013 -- leonerd@leonerd.org.uk
5
6package IO::Async::Protocol::Stream;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.800';
12
13use base qw( IO::Async::Protocol );
14
15use Carp;
16
17=head1 NAME
18
19C<IO::Async::Protocol::Stream> - base class for stream-based protocols
20
21=head1 SYNOPSIS
22
23Most likely this class will be subclassed to implement a particular network
24protocol.
25
26   package Net::Async::HelloWorld;
27
28   use strict;
29   use warnings;
30   use base qw( IO::Async::Protocol::Stream );
31
32   sub on_read
33   {
34      my $self = shift;
35      my ( $buffref, $eof ) = @_;
36
37      return 0 unless $$buffref =~ s/^(.*)\n//;
38      my $line = $1;
39
40      if( $line =~ m/^HELLO (.*)/ ) {
41         my $name = $1;
42
43         $self->invoke_event( on_hello => $name );
44      }
45
46      return 1;
47   }
48
49   sub send_hello
50   {
51      my $self = shift;
52      my ( $name ) = @_;
53
54      $self->write( "HELLO $name\n" );
55   }
56
57This small example elides such details as error handling, which a real
58protocol implementation would be likely to contain.
59
60=head1 DESCRIPTION
61
62This subclass of L<IO::Async::Protocol> is intended to stand as a base class
63for implementing stream-based protocols. It provides an interface similar to
64L<IO::Async::Stream>, primarily, a C<write> method and an C<on_read> event
65handler.
66
67It contains an instance of an L<IO::Async::Stream> object which it uses for
68actual communication, rather than being a subclass of it, allowing a level of
69independence from the actual stream being used. For example, the stream may
70actually be an L<IO::Async::SSLStream> to allow the protocol to be used over
71SSL.
72
73As with L<IO::Async::Stream>, it is required that by the time the protocol
74object is added to a Loop, that it either has an C<on_read> method, or has
75been configured with an C<on_read> callback handler.
76
77=cut
78
79=head1 EVENTS
80
81The following events are invoked, either using subclass methods or CODE
82references in parameters:
83
84=head2 $ret = on_read \$buffer, $eof
85
86=head2 on_read_eof
87
88=head2 on_write_eof
89
90The event handlers are invoked identically to L<IO::Async::Stream>.
91
92=head2 on_closed
93
94The C<on_closed> handler is optional, but if provided, will be invoked after
95the stream is closed by either side (either because the C<close()> method has
96been invoked on it, or on an incoming EOF).
97
98=cut
99
100=head1 PARAMETERS
101
102The following named parameters may be passed to C<new> or C<configure>:
103
104=head2 on_read => CODE
105
106=head2 on_read_eof => CODE
107
108=head2 on_write_eof => CODE
109
110CODE references for the events.
111
112=head2 handle => IO
113
114A shortcut for the common case where the transport only needs to be a plain
115L<IO::Async::Stream> object. If this argument is provided without a
116C<transport> object, a new L<IO::Async::Stream> object will be built around
117the given IO handle, and used as the transport.
118
119=cut
120
121sub configure
122{
123   my $self = shift;
124   my %params = @_;
125
126   for (qw( on_read on_read_eof on_write_eof )) {
127      $self->{$_} = delete $params{$_} if exists $params{$_};
128   }
129
130   if( !exists $params{transport} and my $handle = delete $params{handle} ) {
131      require IO::Async::Stream;
132      $params{transport} = IO::Async::Stream->new( handle => $handle );
133   }
134
135   $self->SUPER::configure( %params );
136
137   if( $self->loop ) {
138      $self->can_event( "on_read" ) or
139         croak 'Expected either an on_read callback or to be able to ->on_read';
140   }
141}
142
143sub _add_to_loop
144{
145   my $self = shift;
146
147   $self->can_event( "on_read" ) or
148      croak 'Expected either an on_read callback or to be able to ->on_read';
149}
150
151sub setup_transport
152{
153   my $self = shift;
154   my ( $transport ) = @_;
155
156   $self->SUPER::setup_transport( $transport );
157
158   $transport->configure(
159      on_read => $self->_replace_weakself( sub {
160         my $self = shift or return;
161         $self->invoke_event( on_read => @_ );
162      } ),
163      on_read_eof => $self->_replace_weakself( sub {
164         my $self = shift or return;
165         $self->maybe_invoke_event( on_read_eof => @_ );
166      } ),
167      on_write_eof => $self->_replace_weakself( sub {
168         my $self = shift or return;
169         $self->maybe_invoke_event( on_write_eof => @_ );
170      } ),
171   );
172}
173
174sub teardown_transport
175{
176   my $self = shift;
177   my ( $transport ) = @_;
178
179   $transport->configure(
180      on_read => undef,
181   );
182
183   $self->SUPER::teardown_transport( $transport );
184}
185
186=head1 METHODS
187
188=cut
189
190=head2 write
191
192   $protocol->write( $data )
193
194Writes the given data by calling the C<write> method on the contained
195transport stream.
196
197=cut
198
199sub write
200{
201   my $self = shift;
202   my ( $data, %args ) = @_;
203
204   if( ref $data eq "CODE" ) {
205      $data = $self->_replace_weakself( $data );
206   }
207
208   if( $args{on_flush} ) {
209      $args{on_flush} = $self->_replace_weakself( $args{on_flush} );
210   }
211
212   my $transport = $self->transport or croak "Attempted to ->write to a ".ref($self)." with no transport";
213   $transport->write( $data, %args );
214}
215
216=head2 connect
217
218   $protocol->connect( %args )
219
220Sets up a connection to a peer, and configures the underlying C<transport> for
221the Protocol. Calls L<IO::Async::Protocol> C<connect> with C<socktype> set to
222C<"stream">.
223
224=cut
225
226sub connect
227{
228   my $self = shift;
229   $self->SUPER::connect(
230      @_,
231      socktype => "stream",
232   );
233}
234
235=head1 AUTHOR
236
237Paul Evans <leonerd@leonerd.org.uk>
238
239=cut
240
2410x55AA;
242