1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2006-2020 -- leonerd@leonerd.org.uk
5
6package IO::Async::Stream;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.800';
12
13use base qw( IO::Async::Handle );
14
15use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
16
17use Carp;
18
19use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
20use Scalar::Util qw( blessed );
21
22use IO::Async::Debug;
23use IO::Async::Metrics '$METRICS';
24
25# Tuneable from outside
26# Not yet documented
27our $READLEN  = 8192;
28our $WRITELEN = 8192;
29
30use Struct::Dumb;
31
32# Element of the writequeue
33struct Writer => [qw( data writelen on_write on_flush on_error watching )];
34
35# Element of the readqueue
36struct Reader => [qw( on_read future )];
37
38# Bitfields in the want flags
39use constant WANT_READ_FOR_READ   => 0x01;
40use constant WANT_READ_FOR_WRITE  => 0x02;
41use constant WANT_WRITE_FOR_READ  => 0x04;
42use constant WANT_WRITE_FOR_WRITE => 0x08;
43use constant WANT_ANY_READ  => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
44use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
45
46=head1 NAME
47
48C<IO::Async::Stream> - event callbacks and write bufering for a stream
49filehandle
50
51=head1 SYNOPSIS
52
53   use IO::Async::Stream;
54
55   use IO::Async::Loop;
56   my $loop = IO::Async::Loop->new;
57
58   my $stream = IO::Async::Stream->new(
59      read_handle  => \*STDIN,
60      write_handle => \*STDOUT,
61
62      on_read => sub {
63         my ( $self, $buffref, $eof ) = @_;
64
65         while( $$buffref =~ s/^(.*\n)// ) {
66            print "Received a line $1";
67         }
68
69         if( $eof ) {
70            print "EOF; last partial line is $$buffref\n";
71         }
72
73         return 0;
74      }
75   );
76
77   $loop->add( $stream );
78
79   $stream->write( "An initial line here\n" );
80
81=head1 DESCRIPTION
82
83This subclass of L<IO::Async::Handle> contains a filehandle that represents
84a byte-stream. It provides buffering for both incoming and outgoing data. It
85invokes the C<on_read> handler when new data is read from the filehandle. Data
86may be written to the filehandle by calling the C<write> method.
87
88This class is suitable for any kind of filehandle that provides a
89possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
90C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
91datagram or raw message-based sockets (such as UDP) see instead
92L<IO::Async::Socket>.
93
94=cut
95
96=head1 EVENTS
97
98The following events are invoked, either using subclass methods or CODE
99references in parameters:
100
101=head2 $ret = on_read \$buffer, $eof
102
103Invoked when more data is available in the internal receiving buffer.
104
105The first argument is a reference to a plain perl string. The code should
106inspect and remove any data it likes, but is not required to remove all, or
107indeed any of the data. Any data remaining in the buffer will be preserved for
108the next call, the next time more data is received from the handle.
109
110In this way, it is easy to implement code that reads records of some form when
111completed, but ignores partially-received records, until all the data is
112present. If the handler wishes to be immediately invoke a second time, to have
113another attempt at consuming more content, it should return C<1>. Otherwise,
114it should return C<0>, and the handler will next be invoked when more data has
115arrived from the underlying read handle and appended to the buffer. This makes
116it easy to implement code that handles multiple incoming records at the same
117time. Alternatively, if the handler function already attempts to consume as
118much as possible from the buffer, it will have no need to return C<1> at all.
119See the examples at the end of this documentation for more detail.
120
121The second argument is a scalar indicating whether the stream has reported an
122end-of-file (EOF) condition. A reference to the buffer is passed to the
123handler in the usual way, so it may inspect data contained in it. Once the
124handler returns a false value, it will not be called again, as the handle is
125now at EOF and no more data can arrive.
126
127The C<on_read> code may also dynamically replace itself with a new callback
128by returning a CODE reference instead of C<0> or C<1>. The original callback
129or method that the object first started with may be restored by returning
130C<undef>. Whenever the callback is changed in this way, the new code is called
131again; even if the read buffer is currently empty. See the examples at the end
132of this documentation for more detail.
133
134The C<push_on_read> method can be used to insert new, temporary handlers that
135take precedence over the global C<on_read> handler. This event is only used if
136there are no further pending handlers created by C<push_on_read>.
137
138=head2 on_read_eof
139
140Optional. Invoked when the read handle indicates an end-of-file (EOF)
141condition. If there is any data in the buffer still to be processed, the
142C<on_read> event will be invoked first, before this one.
143
144=head2 on_write_eof
145
146Optional. Invoked when the write handle indicates an end-of-file (EOF)
147condition. Note that this condition can only be detected after a C<write>
148syscall returns the C<EPIPE> error. If there is no data pending to be written
149then it will not be detected yet.
150
151=head2 on_read_error $errno
152
153Optional. Invoked when the C<sysread> method on the read handle fails.
154
155=head2 on_write_error $errno
156
157Optional. Invoked when the C<syswrite> method on the write handle fails.
158
159The C<on_read_error> and C<on_write_error> handlers are passed the value of
160C<$!> at the time the error occurred. (The C<$!> variable itself, by its
161nature, may have changed from the original error by the time this handler
162runs so it should always use the value passed in).
163
164If an error occurs when the corresponding error callback is not supplied, and
165there is not a handler for it, then the C<close> method is called instead.
166
167=head2 on_read_high_watermark $length
168
169=head2 on_read_low_watermark $length
170
171Optional. Invoked when the read buffer grows larger than the high watermark
172or smaller than the low watermark respectively. These are edge-triggered
173events; they will only be triggered once per crossing, not continuously while
174the buffer remains above or below the given limit.
175
176If these event handlers are not defined, the default behaviour is to disable
177read-ready notifications if the read buffer grows larger than the high
178watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
179and re-enable notifications again once something has read enough to cause it to
180drop. If these events are overridden, the overriding code will have to perform
181this behaviour if required, by using
182
183   $self->want_readready_for_read(...)
184
185=head2 on_outgoing_empty
186
187Optional. Invoked when the writing data buffer becomes empty.
188
189=head2 on_writeable_start
190
191=head2 on_writeable_stop
192
193Optional. These two events inform when the filehandle becomes writeable, and
194when it stops being writeable. C<on_writeable_start> is invoked by the
195C<on_write_ready> event if previously it was known to be not writeable.
196C<on_writeable_stop> is invoked after a C<syswrite> operation fails with
197C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state,
198and ensure that only state change cause events to be invoked. A stream starts
199off being presumed writeable, so the first of these events to be observed will
200be C<on_writeable_stop>.
201
202=cut
203
204sub _init
205{
206   my $self = shift;
207
208   $self->{writequeue} = []; # Queue of Writers
209   $self->{readqueue} = []; # Queue of Readers
210   $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
211   $self->{readbuff} = "";
212
213   $self->{reader} = "_sysread";
214   $self->{writer} = "_syswrite";
215
216   $self->{read_len}  = $READLEN;
217   $self->{write_len} = $WRITELEN;
218
219   $self->{want} = WANT_READ_FOR_READ;
220
221   $self->{close_on_read_eof} = 1;
222}
223
224=head1 PARAMETERS
225
226The following named parameters may be passed to C<new> or C<configure>:
227
228=head2 read_handle => IO
229
230The IO handle to read from. Must implement C<fileno> and C<sysread> methods.
231
232=head2 write_handle => IO
233
234The IO handle to write to. Must implement C<fileno> and C<syswrite> methods.
235
236=head2 handle => IO
237
238Shortcut to specifying the same IO handle for both of the above.
239
240=head2 on_read => CODE
241
242=head2 on_read_error => CODE
243
244=head2 on_outgoing_empty => CODE
245
246=head2 on_write_error => CODE
247
248=head2 on_writeable_start => CODE
249
250=head2 on_writeable_stop => CODE
251
252CODE references for event handlers.
253
254=head2 autoflush => BOOL
255
256Optional. If true, the C<write> method will attempt to write data to the
257operating system immediately, without waiting for the loop to indicate the
258filehandle is write-ready. This is useful, for example, on streams that should
259contain up-to-date logging or console information.
260
261It currently defaults to false for any file handle, but future versions of
262L<IO::Async> may enable this by default on STDOUT and STDERR.
263
264=head2 read_len => INT
265
266Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes.
267
268=head2 read_all => BOOL
269
270Optional. If true, attempt to read as much data from the kernel as possible
271when the handle becomes readable. By default this is turned off, meaning at
272most one fixed-size buffer is read. If there is still more data in the
273kernel's buffer, the handle will still be readable, and will be read from
274again.
275
276This behaviour allows multiple streams and sockets to be multiplexed
277simultaneously, meaning that a large bulk transfer on one cannot starve other
278filehandles of processing time. Turning this option on may improve bulk data
279transfer rate, at the risk of delaying or stalling processing on other
280filehandles.
281
282=head2 write_len => INT
283
284Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes.
285
286=head2 write_all => BOOL
287
288Optional. Analogous to the C<read_all> option, but for writing. When
289C<autoflush> is enabled, this option only affects deferred writing if the
290initial attempt failed due to buffer space.
291
292=head2 read_high_watermark => INT
293
294=head2 read_low_watermark => INT
295
296Optional. If defined, gives a way to implement flow control or other
297behaviours that depend on the size of Stream's read buffer.
298
299If after more data is read from the underlying filehandle the read buffer is
300now larger than the high watermark, the C<on_read_high_watermark> event is
301triggered (which, by default, will disable read-ready notifications and pause
302reading from the filehandle).
303
304If after data is consumed by an C<on_read> handler the read buffer is now
305smaller than the low watermark, the C<on_read_low_watermark> event is
306triggered (which, by default, will re-enable read-ready notifications and
307resume reading from the filehandle). For to be possible, the read handler
308would have to be one added by the C<push_on_read> method or one of the
309Future-returning C<read_*> methods.
310
311By default these options are not defined, so this behaviour will not happen.
312C<read_low_watermark> may not be set to a larger value than
313C<read_high_watermark>, but it may be set to a smaller value, creating a
314hysteresis region. If either option is defined then both must be.
315
316If these options are used with the default event handlers, be careful not to
317cause deadlocks by having a high watermark sufficiently low that a single
318C<on_read> invocation might not consider it finished yet.
319
320=head2 reader => STRING|CODE
321
322=head2 writer => STRING|CODE
323
324Optional. If defined, gives the name of a method or a CODE reference to use
325to implement the actual reading from or writing to the filehandle. These will
326be invoked as
327
328   $stream->reader( $read_handle, $buffer, $len )
329   $stream->writer( $write_handle, $buffer, $len )
330
331Each is expected to modify the passed buffer; C<reader> by appending to it,
332C<writer> by removing a prefix from it. Each is expected to return a true
333value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
334provided, they will be substituted by implenentations using C<sysread> and
335C<syswrite> on the underlying handle, respectively.
336
337=head2 close_on_read_eof => BOOL
338
339Optional. Usually true, but if set to a false value then the stream will not
340be C<close>d when an EOF condition occurs on read. This is normally not useful
341as at that point the underlying stream filehandle is no longer useable, but it
342may be useful for reading regular files, or interacting with TTY devices.
343
344=head2 encoding => STRING
345
346If supplied, sets the name of encoding of the underlying stream. If an
347encoding is set, then the C<write> method will expect to receive Unicode
348strings and encodes them into bytes, and incoming bytes will be decoded into
349Unicode strings for the C<on_read> event.
350
351If an encoding is not supplied then C<write> and C<on_read> will work in byte
352strings.
353
354I<IMPORTANT NOTE:> in order to handle reads of UTF-8 content or other
355multibyte encodings, the code implementing the C<on_read> event uses a feature
356of L<Encode>; the C<STOP_AT_PARTIAL> flag. While this flag has existed for a
357while and is used by the C<:encoding> PerlIO layer itself for similar
358purposes, the flag is not officially documented by the C<Encode> module. In
359principle this undocumented feature could be subject to change, in practice I
360believe it to be reasonably stable.
361
362This note applies only to the C<on_read> event; data written using the
363C<write> method does not rely on any undocumented features of C<Encode>.
364
365If a read handle is given, it is required that either an C<on_read> callback
366reference is configured, or that the object provides an C<on_read> method. It
367is optional whether either is true for C<on_outgoing_empty>; if neither is
368supplied then no action will be taken when the writing buffer becomes empty.
369
370An C<on_read> handler may be supplied even if no read handle is yet given, to
371be used when a read handle is eventually provided by the C<set_handles>
372method.
373
374This condition is checked at the time the object is added to a Loop; it is
375allowed to create a C<IO::Async::Stream> object with a read handle but without
376a C<on_read> handler, provided that one is later given using C<configure>
377before the stream is added to its containing Loop, either directly or by being
378a child of another Notifier already in a Loop, or added to one.
379
380=cut
381
382sub configure
383{
384   my $self = shift;
385   my %params = @_;
386
387   for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
388            on_write_error on_writeable_start on_writeable_stop autoflush
389            read_len read_all write_len write_all on_read_high_watermark
390            on_read_low_watermark reader writer close_on_read_eof )) {
391      $self->{$_} = delete $params{$_} if exists $params{$_};
392   }
393
394   if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
395      my $high = delete $params{read_high_watermark};
396      defined $high or $high = $self->{read_high_watermark};
397
398      my $low  = delete $params{read_low_watermark};
399      defined $low  or $low  = $self->{read_low_watermark};
400
401      croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
402      croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
403
404      croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
405
406      $self->{read_high_watermark} = $high;
407      $self->{read_low_watermark}  = $low;
408
409      # TODO: reassert levels if we've moved them
410   }
411
412   if( exists $params{encoding} ) {
413      my $encoding = delete $params{encoding};
414      my $obj = find_encoding( $encoding );
415      defined $obj or croak "Cannot handle an encoding of '$encoding'";
416      $self->{encoding} = $obj;
417   }
418
419   $self->SUPER::configure( %params );
420
421   if( $self->loop and $self->read_handle ) {
422      $self->can_event( "on_read" ) or
423         croak 'Expected either an on_read callback or to be able to ->on_read';
424   }
425
426   if( $self->{autoflush} and my $write_handle = $self->write_handle ) {
427      carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle"
428         if $write_handle->blocking;
429   }
430}
431
432sub _add_to_loop
433{
434   my $self = shift;
435
436   if( defined $self->read_handle ) {
437      $self->can_event( "on_read" ) or
438         croak 'Expected either an on_read callback or to be able to ->on_read';
439   }
440
441   $self->SUPER::_add_to_loop( @_ );
442
443   if( !$self->_is_empty ) {
444      $self->want_writeready_for_write( 1 );
445   }
446}
447
448=head1 METHODS
449
450The following methods documented with a trailing call to C<< ->get >> return
451L<Future> instances.
452
453=cut
454
455=head2 want_readready_for_read
456
457=head2 want_readready_for_write
458
459   $stream->want_readready_for_read( $set )
460
461   $stream->want_readready_for_write( $set )
462
463Mutators for the C<want_readready> property on L<IO::Async::Handle>, which
464control whether the C<read> or C<write> behaviour should be continued once the
465filehandle becomes ready for read.
466
467Normally, C<want_readready_for_read> is always true (though the read watermark
468behaviour can modify it), and C<want_readready_for_write> is not used.
469However, if a custom C<writer> function is provided, it may find this useful
470for being invoked again if it cannot proceed with a write operation until the
471filehandle becomes readable (such as during transport negotiation or SSL key
472management, for example).
473
474=cut
475
476sub want_readready_for_read
477{
478   my $self = shift;
479   my ( $set ) = @_;
480   $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ );
481
482   $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
483}
484
485sub want_readready_for_write
486{
487   my $self = shift;
488   my ( $set ) = @_;
489   $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE );
490
491   $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
492}
493
494=head2 want_writeready_for_read
495
496=head2 want_writeready_for_write
497
498   $stream->want_writeready_for_write( $set )
499
500   $stream->want_writeready_for_read( $set )
501
502Mutators for the C<want_writeready> property on L<IO::Async::Handle>, which
503control whether the C<write> or C<read> behaviour should be continued once the
504filehandle becomes ready for write.
505
506Normally, C<want_writeready_for_write> is managed by the C<write> method and
507associated flushing, and C<want_writeready_for_read> is not used. However, if
508a custom C<reader> function is provided, it may find this useful for being
509invoked again if it cannot proceed with a read operation until the filehandle
510becomes writable (such as during transport negotiation or SSL key management,
511for example).
512
513=cut
514
515sub want_writeready_for_write
516{
517   my $self = shift;
518   my ( $set ) = @_;
519   $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE );
520
521   $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
522}
523
524sub want_writeready_for_read
525{
526   my $self = shift;
527   my ( $set ) = @_;
528   $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ );
529
530   $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
531}
532
533# FUNCTION not method
534sub _nonfatal_error
535{
536   my ( $errno ) = @_;
537
538   return $errno == EAGAIN ||
539          $errno == EWOULDBLOCK ||
540          $errno == EINTR;
541}
542
543sub _is_empty
544{
545   my $self = shift;
546   return !@{ $self->{writequeue} };
547}
548
549=head2 close
550
551   $stream->close
552
553A synonym for C<close_when_empty>. This should not be used when the deferred
554wait behaviour is required, as the behaviour of C<close> may change in a
555future version of L<IO::Async>. Instead, call C<close_when_empty> directly.
556
557=cut
558
559sub close
560{
561   my $self = shift;
562   $self->close_when_empty;
563}
564
565=head2 close_when_empty
566
567   $stream->close_when_empty
568
569If the write buffer is empty, this method calls C<close> on the underlying IO
570handles, and removes the stream from its containing loop. If the write buffer
571still contains data, then this is deferred until the buffer is empty. This is
572intended for "write-then-close" one-shot streams.
573
574   $stream->write( "Here is my final data\n" );
575   $stream->close_when_empty;
576
577Because of this deferred nature, it may not be suitable for error handling.
578See instead the C<close_now> method.
579
580=cut
581
582sub close_when_empty
583{
584   my $self = shift;
585
586   return $self->SUPER::close if $self->_is_empty;
587
588   $self->{stream_closing} = 1;
589}
590
591=head2 close_now
592
593   $stream->close_now
594
595This method immediately closes the underlying IO handles and removes the
596stream from the containing loop. It will not wait to flush the remaining data
597in the write buffer.
598
599=cut
600
601sub close_now
602{
603   my $self = shift;
604
605   foreach ( @{ $self->{writequeue} } ) {
606       $_->on_error->( $self, "stream closing" ) if $_->on_error;
607   }
608
609   undef @{ $self->{writequeue} };
610   undef $self->{stream_closing};
611
612   $self->SUPER::close;
613}
614
615=head2 is_read_eof
616
617=head2 is_write_eof
618
619   $eof = $stream->is_read_eof
620
621   $eof = $stream->is_write_eof
622
623Returns true after an EOF condition is reported on either the read or the
624write handle, respectively.
625
626=cut
627
628sub is_read_eof
629{
630   my $self = shift;
631   return $self->{read_eof};
632}
633
634sub is_write_eof
635{
636   my $self = shift;
637   return $self->{write_eof};
638}
639
640=head2 write
641
642   $stream->write( $data, %params )
643
644This method adds data to the outgoing data queue, or writes it immediately,
645according to the C<autoflush> parameter.
646
647If the C<autoflush> option is set, this method will try immediately to write
648the data to the underlying filehandle. If this completes successfully then it
649will have been written by the time this method returns. If it fails to write
650completely, then the data is queued as if C<autoflush> were not set, and will
651be flushed as normal.
652
653C<$data> can either be a plain string, a L<Future>, or a CODE reference. If it
654is a plain string it is written immediately. If it is not, its value will be
655used to generate more C<$data> values, eventually leading to strings to be
656written.
657
658If C<$data> is a C<Future>, the Stream will wait until it is ready, and take
659the single value it yields.
660
661If C<$data> is a CODE reference, it will be repeatedly invoked to generate new
662values. Each time the filehandle is ready to write more data to it, the
663function is invoked. Once the function has finished generating data it should
664return undef. The function is passed the Stream object as its first argument.
665
666It is allowed that C<Future>s yield CODE references, or CODE references return
667C<Future>s, as well as plain strings.
668
669For example, to stream the contents of an existing opened filehandle:
670
671   open my $fileh, "<", $path or die "Cannot open $path - $!";
672
673   $stream->write( sub {
674      my ( $stream ) = @_;
675
676      sysread $fileh, my $buffer, 8192 or return;
677      return $buffer;
678   } );
679
680Takes the following optional named parameters in C<%params>:
681
682=over 8
683
684=item write_len => INT
685
686Overrides the C<write_len> parameter for the data written by this call.
687
688=item on_write => CODE
689
690A CODE reference which will be invoked after every successful C<syswrite>
691operation on the underlying filehandle. It will be passed the number of bytes
692that were written by this call, which may not be the entire length of the
693buffer - if it takes more than one C<syscall> operation to empty the buffer
694then this callback will be invoked multiple times.
695
696   $on_write->( $stream, $len )
697
698=item on_flush => CODE
699
700A CODE reference which will be invoked once the data queued by this C<write>
701call has been flushed. This will be invoked even if the buffer itself is not
702yet empty; if more data has been queued since the call.
703
704   $on_flush->( $stream )
705
706=item on_error => CODE
707
708A CODE reference which will be invoked if a C<syswrite> error happens while
709performing this write. Invoked as for the C<Stream>'s C<on_write_error> event.
710
711   $on_error->( $stream, $errno )
712
713=back
714
715If the object is not yet a member of a loop and doesn't yet have a
716C<write_handle>, then calls to the C<write> method will simply queue the data
717and return. It will be flushed when the object is added to the loop.
718
719If C<$data> is a defined but empty string, the write is still queued, and the
720C<on_flush> continuation will be invoked, if supplied. This can be used to
721obtain a marker, to invoke some code once the output queue has been flushed up
722to this point.
723
724=head2 write (scalar)
725
726   $stream->write( ... )->get
727
728If called in non-void context, this method returns a L<Future> which will
729complete (with no value) when the write operation has been flushed. This may
730be used as an alternative to, or combined with, the C<on_flush> callback.
731
732=cut
733
734sub _syswrite
735{
736   my $self = shift;
737   my ( $handle, undef, $len ) = @_;
738
739   my $written = $handle->syswrite( $_[1], $len );
740   return $written if !$written; # zero or undef
741
742   substr( $_[1], 0, $written ) = "";
743   return $written;
744}
745
746sub _flush_one_write
747{
748   my $self = shift;
749
750   my $writequeue = $self->{writequeue};
751
752   my $head;
753   while( $head = $writequeue->[0] and ref $head->data ) {
754      if( ref $head->data eq "CODE" ) {
755         my $data = $head->data->( $self );
756         if( !defined $data ) {
757            $head->on_flush->( $self ) if $head->on_flush;
758            shift @$writequeue;
759            return 1;
760         }
761         if( !ref $data and my $encoding = $self->{encoding} ) {
762            $data = $encoding->encode( $data );
763         }
764         unshift @$writequeue, my $new = Writer(
765            $data, $head->writelen, $head->on_write, undef, undef, 0
766         );
767         next;
768      }
769      elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
770         my $f = $head->data;
771         if( !$f->is_ready ) {
772            return 0 if $head->watching;
773            $f->on_ready( sub { $self->_flush_one_write } );
774            $head->watching++;
775            return 0;
776         }
777         my $data = $f->get;
778         if( !ref $data and my $encoding = $self->{encoding} ) {
779            $data = $encoding->encode( $data );
780         }
781         $head->data = $data;
782         next;
783      }
784      else {
785         die "Unsure what to do with reference ".ref($head->data)." in write queue";
786      }
787   }
788
789   my $second;
790   while( $second = $writequeue->[1] and
791          !ref $second->data and
792          $head->writelen == $second->writelen and
793          !$head->on_write and !$second->on_write and
794          !$head->on_flush ) {
795      $head->data .= $second->data;
796      $head->on_write = $second->on_write;
797      $head->on_flush = $second->on_flush;
798      splice @$writequeue, 1, 1, ();
799   }
800
801   die "TODO: head data does not contain a plain string" if ref $head->data;
802
803   if( $IO::Async::Debug::DEBUG > 1 ) {
804      my $data = substr $head->data, 0, $head->writelen;
805      $self->debug_printf( "WRITE len=%d", length $data );
806      IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
807   }
808
809   my $writer = $self->{writer};
810   my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
811
812   if( !defined $len ) {
813      my $errno = $!;
814
815      if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
816         $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
817         $self->{writeable} = 0;
818      }
819
820      return 0 if _nonfatal_error( $errno );
821
822      $self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
823
824      if( $errno == EPIPE ) {
825         $self->debug_printf( "WRITE-EOF" );
826         $self->{write_eof} = 1;
827         $self->maybe_invoke_event( on_write_eof => );
828      }
829
830      $head->on_error->( $self, $errno ) if $head->on_error;
831      $self->maybe_invoke_event( on_write_error => $errno )
832         or $self->close_now;
833
834      return 0;
835   }
836
837   $METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len;
838
839   if( my $on_write = $head->on_write ) {
840      $on_write->( $self, $len );
841   }
842
843   if( !length $head->data ) {
844      $head->on_flush->( $self ) if $head->on_flush;
845      shift @{ $self->{writequeue} };
846   }
847
848   return 1;
849}
850
851sub write
852{
853   my $self = shift;
854   my ( $data, %params ) = @_;
855
856   carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};
857
858   # Allow writes without a filehandle if we're not yet in a Loop, just don't
859   # try to flush them
860   my $handle = $self->write_handle;
861
862   croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;
863
864   if( !ref $data and my $encoding = $self->{encoding} ) {
865      $data = $encoding->encode( $data );
866   }
867
868   my $on_write = delete $params{on_write};
869   my $on_flush = delete $params{on_flush};
870   my $on_error = delete $params{on_error};
871
872   my $f;
873   if( defined wantarray ) {
874      my $orig_on_flush = $on_flush;
875      my $orig_on_error = $on_error;
876
877      my $loop = $self->loop or
878         croak "Cannot ->write data returning a Future to a Stream not in a Loop";
879      $f = $loop->new_future;
880      $on_flush = sub {
881         $f->done;
882         $orig_on_flush->( @_ ) if $orig_on_flush;
883      };
884      $on_error = sub {
885         my $self = shift;
886         my ( $errno ) = @_;
887
888         $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
889
890         $orig_on_error->( $self, @_ ) if $orig_on_error;
891      };
892   }
893
894   my $write_len = $params{write_len};
895   defined $write_len or $write_len = $self->{write_len};
896
897   push @{ $self->{writequeue} }, Writer(
898      $data, $write_len, $on_write, $on_flush, $on_error, 0
899   );
900
901   keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
902
903   return $f unless $handle;
904
905   if( $self->{autoflush} ) {
906      1 while !$self->_is_empty and $self->_flush_one_write;
907
908      if( $self->_is_empty ) {
909         $self->want_writeready_for_write( 0 );
910         return $f;
911      }
912   }
913
914   $self->want_writeready_for_write( 1 );
915   return $f;
916}
917
918sub on_write_ready
919{
920   my $self = shift;
921
922   if( !$self->{writeable} ) {
923      $self->maybe_invoke_event( on_writeable_start => );
924      $self->{writeable} = 1;
925   }
926
927   $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE;
928   $self->_do_read  if $self->{want} & WANT_WRITE_FOR_READ;
929}
930
931sub _do_write
932{
933   my $self = shift;
934
935   1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all};
936
937   # All data successfully flushed
938   if( $self->_is_empty ) {
939      $self->want_writeready_for_write( 0 );
940
941      $self->maybe_invoke_event( on_outgoing_empty => );
942
943      $self->close_now if $self->{stream_closing};
944   }
945}
946
947sub _flush_one_read
948{
949   my $self = shift;
950   my ( $eof ) = @_;
951
952   local $self->{flushing_read} = 1;
953
954   my $readqueue = $self->{readqueue};
955
956   my $ret;
957   if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
958      $ret = $on_read->( $self, \$self->{readbuff}, $eof );
959   }
960   else {
961      $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof );
962   }
963
964   if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and
965       length $self->{readbuff} < $self->{read_low_watermark} ) {
966      undef $self->{at_read_high_watermark};
967      $self->invoke_event( on_read_low_watermark => length $self->{readbuff} );
968   }
969
970   if( ref $ret eq "CODE" ) {
971      # Replace the top CODE, or add it if there was none
972      $readqueue->[0] = Reader( $ret, undef );
973      return 1;
974   }
975   elsif( @$readqueue and !defined $ret ) {
976      shift @$readqueue;
977      return 1;
978   }
979   else {
980      return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
981   }
982}
983
984sub _sysread
985{
986   my $self = shift;
987   my ( $handle, undef, $len ) = @_;
988   return $handle->sysread( $_[1], $len );
989}
990
991sub on_read_ready
992{
993   my $self = shift;
994
995   $self->_do_read  if $self->{want} & WANT_READ_FOR_READ;
996   $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE;
997}
998
999sub _do_read
1000{
1001   my $self = shift;
1002
1003   my $handle = $self->read_handle;
1004   my $reader = $self->{reader};
1005
1006   while(1) {
1007      my $data;
1008      my $len = $self->$reader( $handle, $data, $self->{read_len} );
1009
1010      if( !defined $len ) {
1011         my $errno = $!;
1012
1013         return if _nonfatal_error( $errno );
1014
1015         $self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
1016
1017         $self->maybe_invoke_event( on_read_error => $errno )
1018            or $self->close_now;
1019
1020         foreach ( @{ $self->{readqueue} } ) {
1021            $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
1022         }
1023         undef @{ $self->{readqueue} };
1024
1025         return;
1026      }
1027
1028      if( $IO::Async::Debug::DEBUG > 1 ) {
1029         $self->debug_printf( "READ len=%d", $len );
1030         IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr};
1031      }
1032
1033      $METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len;
1034
1035      my $eof = $self->{read_eof} = ( $len == 0 );
1036
1037      if( my $encoding = $self->{encoding} ) {
1038         my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data;
1039         $data = $encoding->decode( $bytes, STOP_AT_PARTIAL );
1040         $self->{bytes_remaining} = $bytes;
1041      }
1042
1043      $self->{readbuff} .= $data if !$eof;
1044
1045      1 while $self->_flush_one_read( $eof );
1046
1047      if( $eof ) {
1048         $self->debug_printf( "READ-EOF" );
1049         $self->maybe_invoke_event( on_read_eof => );
1050         $self->close_now if $self->{close_on_read_eof};
1051         foreach ( @{ $self->{readqueue} } ) {
1052            $_->future->done( undef ) if $_->future;
1053         }
1054         undef @{ $self->{readqueue} };
1055         return;
1056      }
1057
1058      last unless $self->{read_all};
1059   }
1060
1061   if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) {
1062      $self->{at_read_high_watermark} or
1063         $self->invoke_event( on_read_high_watermark => length $self->{readbuff} );
1064
1065      $self->{at_read_high_watermark} = 1;
1066   }
1067}
1068
1069sub on_read_high_watermark
1070{
1071   my $self = shift;
1072   $self->want_readready_for_read( 0 );
1073}
1074
1075sub on_read_low_watermark
1076{
1077   my $self = shift;
1078   $self->want_readready_for_read( 1 );
1079}
1080
1081=head2 push_on_read
1082
1083   $stream->push_on_read( $on_read )
1084
1085Pushes a new temporary C<on_read> handler to the end of the queue. This queue,
1086if non-empty, is used to provide C<on_read> event handling code in preference
1087to using the object's main event handler or method. New handlers can be
1088supplied at any time, and they will be used in first-in first-out (FIFO)
1089order.
1090
1091As with the main C<on_read> event handler, each can return a (defined) boolean
1092to indicate if they wish to be invoked again or not, another C<CODE> reference
1093to replace themself with, or C<undef> to indicate it is now complete and
1094should be removed. When a temporary handler returns C<undef> it is shifted
1095from the queue and the next one, if present, is invoked instead. If there are
1096no more then the object's main handler is invoked instead.
1097
1098=cut
1099
1100sub push_on_read
1101{
1102   my $self = shift;
1103   my ( $on_read, %args ) = @_;
1104   # %args undocumented for internal use
1105
1106   push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
1107
1108   # TODO: Should this always defer?
1109   return if $self->{flushing_read};
1110   1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
1111}
1112
1113=head1 FUTURE-RETURNING READ METHODS
1114
1115The following methods all return a L<Future> which will become ready when
1116enough data has been read by the Stream into its buffer. At this point, the
1117data is removed from the buffer and given to the C<Future> object to complete
1118it.
1119
1120   my $f = $stream->read_...
1121
1122   my ( $string ) = $f->get;
1123
1124Unlike the C<on_read> event handlers, these methods don't allow for access to
1125"partial" results; they only provide the final result once it is ready.
1126
1127If a C<Future> is cancelled before it completes it is removed from the read
1128queue without consuming any data; i.e. each C<Future> atomically either
1129completes or is cancelled.
1130
1131Since it is possible to use a readable C<Stream> entirely using these
1132C<Future>-returning methods instead of the C<on_read> event, it may be useful
1133to configure a trivial return-false event handler to keep it from consuming
1134any input, and to allow it to be added to a C<Loop> in the first place.
1135
1136   my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
1137   $loop->add( $stream );
1138
1139   my $f = $stream->read_...
1140
1141If a read EOF or error condition happens while there are read C<Future>s
1142pending, they are all completed. In the case of a read EOF, they are done with
1143C<undef>; in the case of a read error they are failed using the C<$!> error
1144value as the failure.
1145
1146   $f->fail( $message, sysread => $! )
1147
1148If a read EOF condition happens to the currently-processing read C<Future>, it
1149will return a partial result. The calling code can detect this by the fact
1150that the returned data is not complete according to the specification (too
1151short in C<read_exactly>'s case, or lacking the ending pattern in
1152C<read_until>'s case). Additionally, each C<Future> will yield the C<$eof>
1153value in its results.
1154
1155   my ( $string, $eof ) = $f->get;
1156
1157=cut
1158
1159sub _read_future
1160{
1161   my $self = shift;
1162   my $f = $self->loop->new_future;
1163   $f->on_cancel( $self->_capture_weakself( sub {
1164      my $self = shift or return;
1165      1 while $self->_flush_one_read;
1166   }));
1167   return $f;
1168}
1169
1170=head2 read_atmost
1171
1172=head2 read_exactly
1173
1174   ( $string, $eof ) = $stream->read_atmost( $len )->get
1175
1176   ( $string, $eof ) = $stream->read_exactly( $len )->get
1177
1178Completes the C<Future> when the read buffer contains C<$len> or more
1179characters of input. C<read_atmost> will also complete after the first
1180invocation of C<on_read>, even if fewer characters are available, whereas
1181C<read_exactly> will wait until at least C<$len> are available.
1182
1183=cut
1184
1185sub read_atmost
1186{
1187   my $self = shift;
1188   my ( $len ) = @_;
1189
1190   my $f = $self->_read_future;
1191   $self->push_on_read( sub {
1192      my ( undef, $buffref, $eof ) = @_;
1193      return undef if $f->is_cancelled;
1194      $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1195      return undef;
1196   }, future => $f );
1197   return $f;
1198}
1199
1200sub read_exactly
1201{
1202   my $self = shift;
1203   my ( $len ) = @_;
1204
1205   my $f = $self->_read_future;
1206   $self->push_on_read( sub {
1207      my ( undef, $buffref, $eof ) = @_;
1208      return undef if $f->is_cancelled;
1209      return 0 unless $eof or length $$buffref >= $len;
1210      $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1211      return undef;
1212   }, future => $f );
1213   return $f;
1214}
1215
1216=head2 read_until
1217
1218   ( $string, $eof ) = $stream->read_until( $end )->get
1219
1220Completes the C<Future> when the read buffer contains a match for C<$end>,
1221which may either be a plain string or a compiled C<Regexp> reference. Yields
1222the prefix of the buffer up to and including this match.
1223
1224=cut
1225
1226sub read_until
1227{
1228   my $self = shift;
1229   my ( $until ) = @_;
1230
1231   ref $until or $until = qr/\Q$until\E/;
1232
1233   my $f = $self->_read_future;
1234   $self->push_on_read( sub {
1235      my ( undef, $buffref, $eof ) = @_;
1236      return undef if $f->is_cancelled;
1237      if( $$buffref =~ $until ) {
1238         $f->done( substr( $$buffref, 0, $+[0], "" ), $eof );
1239         return undef;
1240      }
1241      elsif( $eof ) {
1242         $f->done( $$buffref, $eof ); $$buffref = "";
1243         return undef;
1244      }
1245      else {
1246         return 0;
1247      }
1248   }, future => $f );
1249   return $f;
1250}
1251
1252=head2 read_until_eof
1253
1254   ( $string, $eof ) = $stream->read_until_eof->get
1255
1256Completes the C<Future> when the stream is eventually closed at EOF, and
1257yields all of the data that was available.
1258
1259=cut
1260
1261sub read_until_eof
1262{
1263   my $self = shift;
1264
1265   my $f = $self->_read_future;
1266   $self->push_on_read( sub {
1267      my ( undef, $buffref, $eof ) = @_;
1268      return undef if $f->is_cancelled;
1269      return 0 unless $eof;
1270      $f->done( $$buffref, $eof ); $$buffref = "";
1271      return undef;
1272   }, future => $f );
1273   return $f;
1274}
1275
1276=head1 UTILITY CONSTRUCTORS
1277
1278=cut
1279
1280=head2 new_for_stdin
1281
1282=head2 new_for_stdout
1283
1284=head2 new_for_stdio
1285
1286   $stream = IO::Async::Stream->new_for_stdin
1287
1288   $stream = IO::Async::Stream->new_for_stdout
1289
1290   $stream = IO::Async::Stream->new_for_stdio
1291
1292Return a C<IO::Async::Stream> object preconfigured with the correct
1293C<read_handle>, C<write_handle> or both.
1294
1295=cut
1296
1297sub new_for_stdin  { shift->new( read_handle  => \*STDIN, @_ ) }
1298sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }
1299
1300sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }
1301
1302=head2 connect
1303
1304   $future = $stream->connect( %args )
1305
1306A convenient wrapper for calling the C<connect> method on the underlying
1307L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not
1308otherwise supplied.
1309
1310=cut
1311
1312sub connect
1313{
1314   my $self = shift;
1315   return $self->SUPER::connect( socktype => "stream", @_ );
1316}
1317
1318=head1 DEBUGGING FLAGS
1319
1320The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging:
1321
1322=over 4
1323
1324=item C<Sr>
1325
1326Log byte buffers as data is read from a Stream
1327
1328=item C<Sw>
1329
1330Log byte buffers as data is written to a Stream
1331
1332=back
1333
1334=cut
1335
1336=head1 EXAMPLES
1337
1338=head2 A line-based C<on_read> method
1339
1340The following C<on_read> method accepts incoming C<\n>-terminated lines and
1341prints them to the program's C<STDOUT> stream.
1342
1343   sub on_read
1344   {
1345      my $self = shift;
1346      my ( $buffref, $eof ) = @_;
1347
1348      while( $$buffref =~ s/^(.*\n)// ) {
1349         print "Received a line: $1";
1350      }
1351
1352      return 0;
1353   }
1354
1355Because a reference to the buffer itself is passed, it is simple to use a
1356C<s///> regular expression on the scalar it points at, to both check if data
1357is ready (i.e. a whole line), and to remove it from the buffer. Since it
1358always removes as many complete lines as possible, it doesn't need invoking
1359again when it has finished, so it can return a constant C<0>.
1360
1361=head2 Reading binary data
1362
1363This C<on_read> method accepts incoming records in 16-byte chunks, printing
1364each one.
1365
1366   sub on_read
1367   {
1368      my ( $self, $buffref, $eof ) = @_;
1369
1370      if( length $$buffref >= 16 ) {
1371         my $record = substr( $$buffref, 0, 16, "" );
1372         print "Received a 16-byte record: $record\n";
1373
1374         return 1;
1375      }
1376
1377      if( $eof and length $$buffref ) {
1378         print "EOF: a partial record still exists\n";
1379      }
1380
1381      return 0;
1382   }
1383
1384This time, rather than a C<while()> loop we have decided to have the handler
1385just process one record, and use the C<return 1> mechanism to ask that the
1386handler be invoked again if there still remains data that might contain
1387another record; only stopping with C<return 0> when we know we can't find one.
1388
1389The 4-argument form of C<substr()> extracts the 16-byte record from the buffer
1390and assigns it to the C<$record> variable, if there was enough data in the
1391buffer to extract it.
1392
1393A lot of protocols use a fixed-size header, followed by a variable-sized body
1394of data, whose size is given by one of the fields of the header. The following
1395C<on_read> method extracts messages in such a protocol.
1396
1397   sub on_read
1398   {
1399      my ( $self, $buffref, $eof ) = @_;
1400
1401      return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
1402
1403      my ( $len, $x, $y ) = unpack "N n n", $$buffref;
1404
1405      return 0 unless length $$buffref >= 8 + $len;
1406
1407      substr( $$buffref, 0, 8, "" );
1408      my $data = substr( $$buffref, 0, $len, "" );
1409
1410      print "A record with values x=$x y=$y\n";
1411
1412      return 1;
1413   }
1414
1415In this example, the header is C<unpack()>ed first, to extract the body
1416length, and then the body is extracted. If the buffer does not have enough
1417data yet for a complete message then C<0> is returned, and the buffer is left
1418unmodified for next time. Only when there are enough bytes in total does it
1419use C<substr()> to remove them.
1420
1421=head2 Dynamic replacement of C<on_read>
1422
1423Consider the following protocol (inspired by IMAP), which consists of
1424C<\n>-terminated lines that may have an optional data block attached. The
1425presence of such a data block, as well as its size, is indicated by the line
1426prefix.
1427
1428   sub on_read
1429   {
1430      my $self = shift;
1431      my ( $buffref, $eof ) = @_;
1432
1433      if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
1434         my $length = $1;
1435         my $line   = $2;
1436
1437         return sub {
1438            my $self = shift;
1439            my ( $buffref, $eof ) = @_;
1440
1441            return 0 unless length $$buffref >= $length;
1442
1443            # Take and remove the data from the buffer
1444            my $data = substr( $$buffref, 0, $length, "" );
1445
1446            print "Received a line $line with some data ($data)\n";
1447
1448            return undef; # Restore the original method
1449         }
1450      }
1451      elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
1452         my $line = $1;
1453
1454         print "Received a line $line with no data\n";
1455
1456         return 1;
1457      }
1458      else {
1459         print STDERR "Unrecognised input\n";
1460         # Handle it somehow
1461      }
1462   }
1463
1464In the case where trailing data is supplied, a new temporary C<on_read>
1465callback is provided in a closure. This closure captures the C<$length>
1466variable so it knows how much data to expect. It also captures the C<$line>
1467variable so it can use it in the event report. When this method has finished
1468reading the data, it reports the event, then restores the original method by
1469returning C<undef>.
1470
1471=head1 SEE ALSO
1472
1473=over 4
1474
1475=item *
1476
1477L<IO::Handle> - Supply object methods for I/O handles
1478
1479=back
1480
1481=head1 AUTHOR
1482
1483Paul Evans <leonerd@leonerd.org.uk>
1484
1485=cut
1486
14870x55AA;
1488