1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2006-2020 -- leonerd@leonerd.org.uk 5 6package IO::Async::Stream; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.800'; 12 13use base qw( IO::Async::Handle ); 14 15use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE ); 16 17use Carp; 18 19use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL ); 20use Scalar::Util qw( blessed ); 21 22use IO::Async::Debug; 23use IO::Async::Metrics '$METRICS'; 24 25# Tuneable from outside 26# Not yet documented 27our $READLEN = 8192; 28our $WRITELEN = 8192; 29 30use Struct::Dumb; 31 32# Element of the writequeue 33struct Writer => [qw( data writelen on_write on_flush on_error watching )]; 34 35# Element of the readqueue 36struct Reader => [qw( on_read future )]; 37 38# Bitfields in the want flags 39use constant WANT_READ_FOR_READ => 0x01; 40use constant WANT_READ_FOR_WRITE => 0x02; 41use constant WANT_WRITE_FOR_READ => 0x04; 42use constant WANT_WRITE_FOR_WRITE => 0x08; 43use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE; 44use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE; 45 46=head1 NAME 47 48C<IO::Async::Stream> - event callbacks and write bufering for a stream 49filehandle 50 51=head1 SYNOPSIS 52 53 use IO::Async::Stream; 54 55 use IO::Async::Loop; 56 my $loop = IO::Async::Loop->new; 57 58 my $stream = IO::Async::Stream->new( 59 read_handle => \*STDIN, 60 write_handle => \*STDOUT, 61 62 on_read => sub { 63 my ( $self, $buffref, $eof ) = @_; 64 65 while( $$buffref =~ s/^(.*\n)// ) { 66 print "Received a line $1"; 67 } 68 69 if( $eof ) { 70 print "EOF; last partial line is $$buffref\n"; 71 } 72 73 return 0; 74 } 75 ); 76 77 $loop->add( $stream ); 78 79 $stream->write( "An initial line here\n" ); 80 81=head1 DESCRIPTION 82 83This subclass of L<IO::Async::Handle> contains a filehandle that represents 84a byte-stream. It provides buffering for both incoming and outgoing data. It 85invokes the C<on_read> handler when new data is read from the filehandle. Data 86may be written to the filehandle by calling the C<write> method. 87 88This class is suitable for any kind of filehandle that provides a 89possibly-bidirectional reliable byte stream, such as a pipe, TTY, or 90C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For 91datagram or raw message-based sockets (such as UDP) see instead 92L<IO::Async::Socket>. 93 94=cut 95 96=head1 EVENTS 97 98The following events are invoked, either using subclass methods or CODE 99references in parameters: 100 101=head2 $ret = on_read \$buffer, $eof 102 103Invoked when more data is available in the internal receiving buffer. 104 105The first argument is a reference to a plain perl string. The code should 106inspect and remove any data it likes, but is not required to remove all, or 107indeed any of the data. Any data remaining in the buffer will be preserved for 108the next call, the next time more data is received from the handle. 109 110In this way, it is easy to implement code that reads records of some form when 111completed, but ignores partially-received records, until all the data is 112present. If the handler wishes to be immediately invoke a second time, to have 113another attempt at consuming more content, it should return C<1>. Otherwise, 114it should return C<0>, and the handler will next be invoked when more data has 115arrived from the underlying read handle and appended to the buffer. This makes 116it easy to implement code that handles multiple incoming records at the same 117time. Alternatively, if the handler function already attempts to consume as 118much as possible from the buffer, it will have no need to return C<1> at all. 119See the examples at the end of this documentation for more detail. 120 121The second argument is a scalar indicating whether the stream has reported an 122end-of-file (EOF) condition. A reference to the buffer is passed to the 123handler in the usual way, so it may inspect data contained in it. Once the 124handler returns a false value, it will not be called again, as the handle is 125now at EOF and no more data can arrive. 126 127The C<on_read> code may also dynamically replace itself with a new callback 128by returning a CODE reference instead of C<0> or C<1>. The original callback 129or method that the object first started with may be restored by returning 130C<undef>. Whenever the callback is changed in this way, the new code is called 131again; even if the read buffer is currently empty. See the examples at the end 132of this documentation for more detail. 133 134The C<push_on_read> method can be used to insert new, temporary handlers that 135take precedence over the global C<on_read> handler. This event is only used if 136there are no further pending handlers created by C<push_on_read>. 137 138=head2 on_read_eof 139 140Optional. Invoked when the read handle indicates an end-of-file (EOF) 141condition. If there is any data in the buffer still to be processed, the 142C<on_read> event will be invoked first, before this one. 143 144=head2 on_write_eof 145 146Optional. Invoked when the write handle indicates an end-of-file (EOF) 147condition. Note that this condition can only be detected after a C<write> 148syscall returns the C<EPIPE> error. If there is no data pending to be written 149then it will not be detected yet. 150 151=head2 on_read_error $errno 152 153Optional. Invoked when the C<sysread> method on the read handle fails. 154 155=head2 on_write_error $errno 156 157Optional. Invoked when the C<syswrite> method on the write handle fails. 158 159The C<on_read_error> and C<on_write_error> handlers are passed the value of 160C<$!> at the time the error occurred. (The C<$!> variable itself, by its 161nature, may have changed from the original error by the time this handler 162runs so it should always use the value passed in). 163 164If an error occurs when the corresponding error callback is not supplied, and 165there is not a handler for it, then the C<close> method is called instead. 166 167=head2 on_read_high_watermark $length 168 169=head2 on_read_low_watermark $length 170 171Optional. Invoked when the read buffer grows larger than the high watermark 172or smaller than the low watermark respectively. These are edge-triggered 173events; they will only be triggered once per crossing, not continuously while 174the buffer remains above or below the given limit. 175 176If these event handlers are not defined, the default behaviour is to disable 177read-ready notifications if the read buffer grows larger than the high 178watermark (so as to avoid it growing arbitrarily if nothing is consuming it), 179and re-enable notifications again once something has read enough to cause it to 180drop. If these events are overridden, the overriding code will have to perform 181this behaviour if required, by using 182 183 $self->want_readready_for_read(...) 184 185=head2 on_outgoing_empty 186 187Optional. Invoked when the writing data buffer becomes empty. 188 189=head2 on_writeable_start 190 191=head2 on_writeable_stop 192 193Optional. These two events inform when the filehandle becomes writeable, and 194when it stops being writeable. C<on_writeable_start> is invoked by the 195C<on_write_ready> event if previously it was known to be not writeable. 196C<on_writeable_stop> is invoked after a C<syswrite> operation fails with 197C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state, 198and ensure that only state change cause events to be invoked. A stream starts 199off being presumed writeable, so the first of these events to be observed will 200be C<on_writeable_stop>. 201 202=cut 203 204sub _init 205{ 206 my $self = shift; 207 208 $self->{writequeue} = []; # Queue of Writers 209 $self->{readqueue} = []; # Queue of Readers 210 $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN) 211 $self->{readbuff} = ""; 212 213 $self->{reader} = "_sysread"; 214 $self->{writer} = "_syswrite"; 215 216 $self->{read_len} = $READLEN; 217 $self->{write_len} = $WRITELEN; 218 219 $self->{want} = WANT_READ_FOR_READ; 220 221 $self->{close_on_read_eof} = 1; 222} 223 224=head1 PARAMETERS 225 226The following named parameters may be passed to C<new> or C<configure>: 227 228=head2 read_handle => IO 229 230The IO handle to read from. Must implement C<fileno> and C<sysread> methods. 231 232=head2 write_handle => IO 233 234The IO handle to write to. Must implement C<fileno> and C<syswrite> methods. 235 236=head2 handle => IO 237 238Shortcut to specifying the same IO handle for both of the above. 239 240=head2 on_read => CODE 241 242=head2 on_read_error => CODE 243 244=head2 on_outgoing_empty => CODE 245 246=head2 on_write_error => CODE 247 248=head2 on_writeable_start => CODE 249 250=head2 on_writeable_stop => CODE 251 252CODE references for event handlers. 253 254=head2 autoflush => BOOL 255 256Optional. If true, the C<write> method will attempt to write data to the 257operating system immediately, without waiting for the loop to indicate the 258filehandle is write-ready. This is useful, for example, on streams that should 259contain up-to-date logging or console information. 260 261It currently defaults to false for any file handle, but future versions of 262L<IO::Async> may enable this by default on STDOUT and STDERR. 263 264=head2 read_len => INT 265 266Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes. 267 268=head2 read_all => BOOL 269 270Optional. If true, attempt to read as much data from the kernel as possible 271when the handle becomes readable. By default this is turned off, meaning at 272most one fixed-size buffer is read. If there is still more data in the 273kernel's buffer, the handle will still be readable, and will be read from 274again. 275 276This behaviour allows multiple streams and sockets to be multiplexed 277simultaneously, meaning that a large bulk transfer on one cannot starve other 278filehandles of processing time. Turning this option on may improve bulk data 279transfer rate, at the risk of delaying or stalling processing on other 280filehandles. 281 282=head2 write_len => INT 283 284Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes. 285 286=head2 write_all => BOOL 287 288Optional. Analogous to the C<read_all> option, but for writing. When 289C<autoflush> is enabled, this option only affects deferred writing if the 290initial attempt failed due to buffer space. 291 292=head2 read_high_watermark => INT 293 294=head2 read_low_watermark => INT 295 296Optional. If defined, gives a way to implement flow control or other 297behaviours that depend on the size of Stream's read buffer. 298 299If after more data is read from the underlying filehandle the read buffer is 300now larger than the high watermark, the C<on_read_high_watermark> event is 301triggered (which, by default, will disable read-ready notifications and pause 302reading from the filehandle). 303 304If after data is consumed by an C<on_read> handler the read buffer is now 305smaller than the low watermark, the C<on_read_low_watermark> event is 306triggered (which, by default, will re-enable read-ready notifications and 307resume reading from the filehandle). For to be possible, the read handler 308would have to be one added by the C<push_on_read> method or one of the 309Future-returning C<read_*> methods. 310 311By default these options are not defined, so this behaviour will not happen. 312C<read_low_watermark> may not be set to a larger value than 313C<read_high_watermark>, but it may be set to a smaller value, creating a 314hysteresis region. If either option is defined then both must be. 315 316If these options are used with the default event handlers, be careful not to 317cause deadlocks by having a high watermark sufficiently low that a single 318C<on_read> invocation might not consider it finished yet. 319 320=head2 reader => STRING|CODE 321 322=head2 writer => STRING|CODE 323 324Optional. If defined, gives the name of a method or a CODE reference to use 325to implement the actual reading from or writing to the filehandle. These will 326be invoked as 327 328 $stream->reader( $read_handle, $buffer, $len ) 329 $stream->writer( $write_handle, $buffer, $len ) 330 331Each is expected to modify the passed buffer; C<reader> by appending to it, 332C<writer> by removing a prefix from it. Each is expected to return a true 333value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not 334provided, they will be substituted by implenentations using C<sysread> and 335C<syswrite> on the underlying handle, respectively. 336 337=head2 close_on_read_eof => BOOL 338 339Optional. Usually true, but if set to a false value then the stream will not 340be C<close>d when an EOF condition occurs on read. This is normally not useful 341as at that point the underlying stream filehandle is no longer useable, but it 342may be useful for reading regular files, or interacting with TTY devices. 343 344=head2 encoding => STRING 345 346If supplied, sets the name of encoding of the underlying stream. If an 347encoding is set, then the C<write> method will expect to receive Unicode 348strings and encodes them into bytes, and incoming bytes will be decoded into 349Unicode strings for the C<on_read> event. 350 351If an encoding is not supplied then C<write> and C<on_read> will work in byte 352strings. 353 354I<IMPORTANT NOTE:> in order to handle reads of UTF-8 content or other 355multibyte encodings, the code implementing the C<on_read> event uses a feature 356of L<Encode>; the C<STOP_AT_PARTIAL> flag. While this flag has existed for a 357while and is used by the C<:encoding> PerlIO layer itself for similar 358purposes, the flag is not officially documented by the C<Encode> module. In 359principle this undocumented feature could be subject to change, in practice I 360believe it to be reasonably stable. 361 362This note applies only to the C<on_read> event; data written using the 363C<write> method does not rely on any undocumented features of C<Encode>. 364 365If a read handle is given, it is required that either an C<on_read> callback 366reference is configured, or that the object provides an C<on_read> method. It 367is optional whether either is true for C<on_outgoing_empty>; if neither is 368supplied then no action will be taken when the writing buffer becomes empty. 369 370An C<on_read> handler may be supplied even if no read handle is yet given, to 371be used when a read handle is eventually provided by the C<set_handles> 372method. 373 374This condition is checked at the time the object is added to a Loop; it is 375allowed to create a C<IO::Async::Stream> object with a read handle but without 376a C<on_read> handler, provided that one is later given using C<configure> 377before the stream is added to its containing Loop, either directly or by being 378a child of another Notifier already in a Loop, or added to one. 379 380=cut 381 382sub configure 383{ 384 my $self = shift; 385 my %params = @_; 386 387 for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error 388 on_write_error on_writeable_start on_writeable_stop autoflush 389 read_len read_all write_len write_all on_read_high_watermark 390 on_read_low_watermark reader writer close_on_read_eof )) { 391 $self->{$_} = delete $params{$_} if exists $params{$_}; 392 } 393 394 if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) { 395 my $high = delete $params{read_high_watermark}; 396 defined $high or $high = $self->{read_high_watermark}; 397 398 my $low = delete $params{read_low_watermark}; 399 defined $low or $low = $self->{read_low_watermark}; 400 401 croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high; 402 croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low; 403 404 croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high; 405 406 $self->{read_high_watermark} = $high; 407 $self->{read_low_watermark} = $low; 408 409 # TODO: reassert levels if we've moved them 410 } 411 412 if( exists $params{encoding} ) { 413 my $encoding = delete $params{encoding}; 414 my $obj = find_encoding( $encoding ); 415 defined $obj or croak "Cannot handle an encoding of '$encoding'"; 416 $self->{encoding} = $obj; 417 } 418 419 $self->SUPER::configure( %params ); 420 421 if( $self->loop and $self->read_handle ) { 422 $self->can_event( "on_read" ) or 423 croak 'Expected either an on_read callback or to be able to ->on_read'; 424 } 425 426 if( $self->{autoflush} and my $write_handle = $self->write_handle ) { 427 carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle" 428 if $write_handle->blocking; 429 } 430} 431 432sub _add_to_loop 433{ 434 my $self = shift; 435 436 if( defined $self->read_handle ) { 437 $self->can_event( "on_read" ) or 438 croak 'Expected either an on_read callback or to be able to ->on_read'; 439 } 440 441 $self->SUPER::_add_to_loop( @_ ); 442 443 if( !$self->_is_empty ) { 444 $self->want_writeready_for_write( 1 ); 445 } 446} 447 448=head1 METHODS 449 450The following methods documented with a trailing call to C<< ->get >> return 451L<Future> instances. 452 453=cut 454 455=head2 want_readready_for_read 456 457=head2 want_readready_for_write 458 459 $stream->want_readready_for_read( $set ) 460 461 $stream->want_readready_for_write( $set ) 462 463Mutators for the C<want_readready> property on L<IO::Async::Handle>, which 464control whether the C<read> or C<write> behaviour should be continued once the 465filehandle becomes ready for read. 466 467Normally, C<want_readready_for_read> is always true (though the read watermark 468behaviour can modify it), and C<want_readready_for_write> is not used. 469However, if a custom C<writer> function is provided, it may find this useful 470for being invoked again if it cannot proceed with a write operation until the 471filehandle becomes readable (such as during transport negotiation or SSL key 472management, for example). 473 474=cut 475 476sub want_readready_for_read 477{ 478 my $self = shift; 479 my ( $set ) = @_; 480 $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ ); 481 482 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; 483} 484 485sub want_readready_for_write 486{ 487 my $self = shift; 488 my ( $set ) = @_; 489 $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE ); 490 491 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; 492} 493 494=head2 want_writeready_for_read 495 496=head2 want_writeready_for_write 497 498 $stream->want_writeready_for_write( $set ) 499 500 $stream->want_writeready_for_read( $set ) 501 502Mutators for the C<want_writeready> property on L<IO::Async::Handle>, which 503control whether the C<write> or C<read> behaviour should be continued once the 504filehandle becomes ready for write. 505 506Normally, C<want_writeready_for_write> is managed by the C<write> method and 507associated flushing, and C<want_writeready_for_read> is not used. However, if 508a custom C<reader> function is provided, it may find this useful for being 509invoked again if it cannot proceed with a read operation until the filehandle 510becomes writable (such as during transport negotiation or SSL key management, 511for example). 512 513=cut 514 515sub want_writeready_for_write 516{ 517 my $self = shift; 518 my ( $set ) = @_; 519 $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE ); 520 521 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; 522} 523 524sub want_writeready_for_read 525{ 526 my $self = shift; 527 my ( $set ) = @_; 528 $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ ); 529 530 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; 531} 532 533# FUNCTION not method 534sub _nonfatal_error 535{ 536 my ( $errno ) = @_; 537 538 return $errno == EAGAIN || 539 $errno == EWOULDBLOCK || 540 $errno == EINTR; 541} 542 543sub _is_empty 544{ 545 my $self = shift; 546 return !@{ $self->{writequeue} }; 547} 548 549=head2 close 550 551 $stream->close 552 553A synonym for C<close_when_empty>. This should not be used when the deferred 554wait behaviour is required, as the behaviour of C<close> may change in a 555future version of L<IO::Async>. Instead, call C<close_when_empty> directly. 556 557=cut 558 559sub close 560{ 561 my $self = shift; 562 $self->close_when_empty; 563} 564 565=head2 close_when_empty 566 567 $stream->close_when_empty 568 569If the write buffer is empty, this method calls C<close> on the underlying IO 570handles, and removes the stream from its containing loop. If the write buffer 571still contains data, then this is deferred until the buffer is empty. This is 572intended for "write-then-close" one-shot streams. 573 574 $stream->write( "Here is my final data\n" ); 575 $stream->close_when_empty; 576 577Because of this deferred nature, it may not be suitable for error handling. 578See instead the C<close_now> method. 579 580=cut 581 582sub close_when_empty 583{ 584 my $self = shift; 585 586 return $self->SUPER::close if $self->_is_empty; 587 588 $self->{stream_closing} = 1; 589} 590 591=head2 close_now 592 593 $stream->close_now 594 595This method immediately closes the underlying IO handles and removes the 596stream from the containing loop. It will not wait to flush the remaining data 597in the write buffer. 598 599=cut 600 601sub close_now 602{ 603 my $self = shift; 604 605 foreach ( @{ $self->{writequeue} } ) { 606 $_->on_error->( $self, "stream closing" ) if $_->on_error; 607 } 608 609 undef @{ $self->{writequeue} }; 610 undef $self->{stream_closing}; 611 612 $self->SUPER::close; 613} 614 615=head2 is_read_eof 616 617=head2 is_write_eof 618 619 $eof = $stream->is_read_eof 620 621 $eof = $stream->is_write_eof 622 623Returns true after an EOF condition is reported on either the read or the 624write handle, respectively. 625 626=cut 627 628sub is_read_eof 629{ 630 my $self = shift; 631 return $self->{read_eof}; 632} 633 634sub is_write_eof 635{ 636 my $self = shift; 637 return $self->{write_eof}; 638} 639 640=head2 write 641 642 $stream->write( $data, %params ) 643 644This method adds data to the outgoing data queue, or writes it immediately, 645according to the C<autoflush> parameter. 646 647If the C<autoflush> option is set, this method will try immediately to write 648the data to the underlying filehandle. If this completes successfully then it 649will have been written by the time this method returns. If it fails to write 650completely, then the data is queued as if C<autoflush> were not set, and will 651be flushed as normal. 652 653C<$data> can either be a plain string, a L<Future>, or a CODE reference. If it 654is a plain string it is written immediately. If it is not, its value will be 655used to generate more C<$data> values, eventually leading to strings to be 656written. 657 658If C<$data> is a C<Future>, the Stream will wait until it is ready, and take 659the single value it yields. 660 661If C<$data> is a CODE reference, it will be repeatedly invoked to generate new 662values. Each time the filehandle is ready to write more data to it, the 663function is invoked. Once the function has finished generating data it should 664return undef. The function is passed the Stream object as its first argument. 665 666It is allowed that C<Future>s yield CODE references, or CODE references return 667C<Future>s, as well as plain strings. 668 669For example, to stream the contents of an existing opened filehandle: 670 671 open my $fileh, "<", $path or die "Cannot open $path - $!"; 672 673 $stream->write( sub { 674 my ( $stream ) = @_; 675 676 sysread $fileh, my $buffer, 8192 or return; 677 return $buffer; 678 } ); 679 680Takes the following optional named parameters in C<%params>: 681 682=over 8 683 684=item write_len => INT 685 686Overrides the C<write_len> parameter for the data written by this call. 687 688=item on_write => CODE 689 690A CODE reference which will be invoked after every successful C<syswrite> 691operation on the underlying filehandle. It will be passed the number of bytes 692that were written by this call, which may not be the entire length of the 693buffer - if it takes more than one C<syscall> operation to empty the buffer 694then this callback will be invoked multiple times. 695 696 $on_write->( $stream, $len ) 697 698=item on_flush => CODE 699 700A CODE reference which will be invoked once the data queued by this C<write> 701call has been flushed. This will be invoked even if the buffer itself is not 702yet empty; if more data has been queued since the call. 703 704 $on_flush->( $stream ) 705 706=item on_error => CODE 707 708A CODE reference which will be invoked if a C<syswrite> error happens while 709performing this write. Invoked as for the C<Stream>'s C<on_write_error> event. 710 711 $on_error->( $stream, $errno ) 712 713=back 714 715If the object is not yet a member of a loop and doesn't yet have a 716C<write_handle>, then calls to the C<write> method will simply queue the data 717and return. It will be flushed when the object is added to the loop. 718 719If C<$data> is a defined but empty string, the write is still queued, and the 720C<on_flush> continuation will be invoked, if supplied. This can be used to 721obtain a marker, to invoke some code once the output queue has been flushed up 722to this point. 723 724=head2 write (scalar) 725 726 $stream->write( ... )->get 727 728If called in non-void context, this method returns a L<Future> which will 729complete (with no value) when the write operation has been flushed. This may 730be used as an alternative to, or combined with, the C<on_flush> callback. 731 732=cut 733 734sub _syswrite 735{ 736 my $self = shift; 737 my ( $handle, undef, $len ) = @_; 738 739 my $written = $handle->syswrite( $_[1], $len ); 740 return $written if !$written; # zero or undef 741 742 substr( $_[1], 0, $written ) = ""; 743 return $written; 744} 745 746sub _flush_one_write 747{ 748 my $self = shift; 749 750 my $writequeue = $self->{writequeue}; 751 752 my $head; 753 while( $head = $writequeue->[0] and ref $head->data ) { 754 if( ref $head->data eq "CODE" ) { 755 my $data = $head->data->( $self ); 756 if( !defined $data ) { 757 $head->on_flush->( $self ) if $head->on_flush; 758 shift @$writequeue; 759 return 1; 760 } 761 if( !ref $data and my $encoding = $self->{encoding} ) { 762 $data = $encoding->encode( $data ); 763 } 764 unshift @$writequeue, my $new = Writer( 765 $data, $head->writelen, $head->on_write, undef, undef, 0 766 ); 767 next; 768 } 769 elsif( blessed $head->data and $head->data->isa( "Future" ) ) { 770 my $f = $head->data; 771 if( !$f->is_ready ) { 772 return 0 if $head->watching; 773 $f->on_ready( sub { $self->_flush_one_write } ); 774 $head->watching++; 775 return 0; 776 } 777 my $data = $f->get; 778 if( !ref $data and my $encoding = $self->{encoding} ) { 779 $data = $encoding->encode( $data ); 780 } 781 $head->data = $data; 782 next; 783 } 784 else { 785 die "Unsure what to do with reference ".ref($head->data)." in write queue"; 786 } 787 } 788 789 my $second; 790 while( $second = $writequeue->[1] and 791 !ref $second->data and 792 $head->writelen == $second->writelen and 793 !$head->on_write and !$second->on_write and 794 !$head->on_flush ) { 795 $head->data .= $second->data; 796 $head->on_write = $second->on_write; 797 $head->on_flush = $second->on_flush; 798 splice @$writequeue, 1, 1, (); 799 } 800 801 die "TODO: head data does not contain a plain string" if ref $head->data; 802 803 if( $IO::Async::Debug::DEBUG > 1 ) { 804 my $data = substr $head->data, 0, $head->writelen; 805 $self->debug_printf( "WRITE len=%d", length $data ); 806 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw}; 807 } 808 809 my $writer = $self->{writer}; 810 my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen ); 811 812 if( !defined $len ) { 813 my $errno = $!; 814 815 if( $errno == EAGAIN or $errno == EWOULDBLOCK ) { 816 $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable}; 817 $self->{writeable} = 0; 818 } 819 820 return 0 if _nonfatal_error( $errno ); 821 822 $self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1; 823 824 if( $errno == EPIPE ) { 825 $self->debug_printf( "WRITE-EOF" ); 826 $self->{write_eof} = 1; 827 $self->maybe_invoke_event( on_write_eof => ); 828 } 829 830 $head->on_error->( $self, $errno ) if $head->on_error; 831 $self->maybe_invoke_event( on_write_error => $errno ) 832 or $self->close_now; 833 834 return 0; 835 } 836 837 $METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len; 838 839 if( my $on_write = $head->on_write ) { 840 $on_write->( $self, $len ); 841 } 842 843 if( !length $head->data ) { 844 $head->on_flush->( $self ) if $head->on_flush; 845 shift @{ $self->{writequeue} }; 846 } 847 848 return 1; 849} 850 851sub write 852{ 853 my $self = shift; 854 my ( $data, %params ) = @_; 855 856 carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing}; 857 858 # Allow writes without a filehandle if we're not yet in a Loop, just don't 859 # try to flush them 860 my $handle = $self->write_handle; 861 862 croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop; 863 864 if( !ref $data and my $encoding = $self->{encoding} ) { 865 $data = $encoding->encode( $data ); 866 } 867 868 my $on_write = delete $params{on_write}; 869 my $on_flush = delete $params{on_flush}; 870 my $on_error = delete $params{on_error}; 871 872 my $f; 873 if( defined wantarray ) { 874 my $orig_on_flush = $on_flush; 875 my $orig_on_error = $on_error; 876 877 my $loop = $self->loop or 878 croak "Cannot ->write data returning a Future to a Stream not in a Loop"; 879 $f = $loop->new_future; 880 $on_flush = sub { 881 $f->done; 882 $orig_on_flush->( @_ ) if $orig_on_flush; 883 }; 884 $on_error = sub { 885 my $self = shift; 886 my ( $errno ) = @_; 887 888 $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready; 889 890 $orig_on_error->( $self, @_ ) if $orig_on_error; 891 }; 892 } 893 894 my $write_len = $params{write_len}; 895 defined $write_len or $write_len = $self->{write_len}; 896 897 push @{ $self->{writequeue} }, Writer( 898 $data, $write_len, $on_write, $on_flush, $on_error, 0 899 ); 900 901 keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params ); 902 903 return $f unless $handle; 904 905 if( $self->{autoflush} ) { 906 1 while !$self->_is_empty and $self->_flush_one_write; 907 908 if( $self->_is_empty ) { 909 $self->want_writeready_for_write( 0 ); 910 return $f; 911 } 912 } 913 914 $self->want_writeready_for_write( 1 ); 915 return $f; 916} 917 918sub on_write_ready 919{ 920 my $self = shift; 921 922 if( !$self->{writeable} ) { 923 $self->maybe_invoke_event( on_writeable_start => ); 924 $self->{writeable} = 1; 925 } 926 927 $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE; 928 $self->_do_read if $self->{want} & WANT_WRITE_FOR_READ; 929} 930 931sub _do_write 932{ 933 my $self = shift; 934 935 1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all}; 936 937 # All data successfully flushed 938 if( $self->_is_empty ) { 939 $self->want_writeready_for_write( 0 ); 940 941 $self->maybe_invoke_event( on_outgoing_empty => ); 942 943 $self->close_now if $self->{stream_closing}; 944 } 945} 946 947sub _flush_one_read 948{ 949 my $self = shift; 950 my ( $eof ) = @_; 951 952 local $self->{flushing_read} = 1; 953 954 my $readqueue = $self->{readqueue}; 955 956 my $ret; 957 if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) { 958 $ret = $on_read->( $self, \$self->{readbuff}, $eof ); 959 } 960 else { 961 $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof ); 962 } 963 964 if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and 965 length $self->{readbuff} < $self->{read_low_watermark} ) { 966 undef $self->{at_read_high_watermark}; 967 $self->invoke_event( on_read_low_watermark => length $self->{readbuff} ); 968 } 969 970 if( ref $ret eq "CODE" ) { 971 # Replace the top CODE, or add it if there was none 972 $readqueue->[0] = Reader( $ret, undef ); 973 return 1; 974 } 975 elsif( @$readqueue and !defined $ret ) { 976 shift @$readqueue; 977 return 1; 978 } 979 else { 980 return $ret && ( length( $self->{readbuff} ) > 0 || $eof ); 981 } 982} 983 984sub _sysread 985{ 986 my $self = shift; 987 my ( $handle, undef, $len ) = @_; 988 return $handle->sysread( $_[1], $len ); 989} 990 991sub on_read_ready 992{ 993 my $self = shift; 994 995 $self->_do_read if $self->{want} & WANT_READ_FOR_READ; 996 $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE; 997} 998 999sub _do_read 1000{ 1001 my $self = shift; 1002 1003 my $handle = $self->read_handle; 1004 my $reader = $self->{reader}; 1005 1006 while(1) { 1007 my $data; 1008 my $len = $self->$reader( $handle, $data, $self->{read_len} ); 1009 1010 if( !defined $len ) { 1011 my $errno = $!; 1012 1013 return if _nonfatal_error( $errno ); 1014 1015 $self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1; 1016 1017 $self->maybe_invoke_event( on_read_error => $errno ) 1018 or $self->close_now; 1019 1020 foreach ( @{ $self->{readqueue} } ) { 1021 $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future; 1022 } 1023 undef @{ $self->{readqueue} }; 1024 1025 return; 1026 } 1027 1028 if( $IO::Async::Debug::DEBUG > 1 ) { 1029 $self->debug_printf( "READ len=%d", $len ); 1030 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr}; 1031 } 1032 1033 $METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len; 1034 1035 my $eof = $self->{read_eof} = ( $len == 0 ); 1036 1037 if( my $encoding = $self->{encoding} ) { 1038 my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data; 1039 $data = $encoding->decode( $bytes, STOP_AT_PARTIAL ); 1040 $self->{bytes_remaining} = $bytes; 1041 } 1042 1043 $self->{readbuff} .= $data if !$eof; 1044 1045 1 while $self->_flush_one_read( $eof ); 1046 1047 if( $eof ) { 1048 $self->debug_printf( "READ-EOF" ); 1049 $self->maybe_invoke_event( on_read_eof => ); 1050 $self->close_now if $self->{close_on_read_eof}; 1051 foreach ( @{ $self->{readqueue} } ) { 1052 $_->future->done( undef ) if $_->future; 1053 } 1054 undef @{ $self->{readqueue} }; 1055 return; 1056 } 1057 1058 last unless $self->{read_all}; 1059 } 1060 1061 if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) { 1062 $self->{at_read_high_watermark} or 1063 $self->invoke_event( on_read_high_watermark => length $self->{readbuff} ); 1064 1065 $self->{at_read_high_watermark} = 1; 1066 } 1067} 1068 1069sub on_read_high_watermark 1070{ 1071 my $self = shift; 1072 $self->want_readready_for_read( 0 ); 1073} 1074 1075sub on_read_low_watermark 1076{ 1077 my $self = shift; 1078 $self->want_readready_for_read( 1 ); 1079} 1080 1081=head2 push_on_read 1082 1083 $stream->push_on_read( $on_read ) 1084 1085Pushes a new temporary C<on_read> handler to the end of the queue. This queue, 1086if non-empty, is used to provide C<on_read> event handling code in preference 1087to using the object's main event handler or method. New handlers can be 1088supplied at any time, and they will be used in first-in first-out (FIFO) 1089order. 1090 1091As with the main C<on_read> event handler, each can return a (defined) boolean 1092to indicate if they wish to be invoked again or not, another C<CODE> reference 1093to replace themself with, or C<undef> to indicate it is now complete and 1094should be removed. When a temporary handler returns C<undef> it is shifted 1095from the queue and the next one, if present, is invoked instead. If there are 1096no more then the object's main handler is invoked instead. 1097 1098=cut 1099 1100sub push_on_read 1101{ 1102 my $self = shift; 1103 my ( $on_read, %args ) = @_; 1104 # %args undocumented for internal use 1105 1106 push @{ $self->{readqueue} }, Reader( $on_read, $args{future} ); 1107 1108 # TODO: Should this always defer? 1109 return if $self->{flushing_read}; 1110 1 while length $self->{readbuff} and $self->_flush_one_read( 0 ); 1111} 1112 1113=head1 FUTURE-RETURNING READ METHODS 1114 1115The following methods all return a L<Future> which will become ready when 1116enough data has been read by the Stream into its buffer. At this point, the 1117data is removed from the buffer and given to the C<Future> object to complete 1118it. 1119 1120 my $f = $stream->read_... 1121 1122 my ( $string ) = $f->get; 1123 1124Unlike the C<on_read> event handlers, these methods don't allow for access to 1125"partial" results; they only provide the final result once it is ready. 1126 1127If a C<Future> is cancelled before it completes it is removed from the read 1128queue without consuming any data; i.e. each C<Future> atomically either 1129completes or is cancelled. 1130 1131Since it is possible to use a readable C<Stream> entirely using these 1132C<Future>-returning methods instead of the C<on_read> event, it may be useful 1133to configure a trivial return-false event handler to keep it from consuming 1134any input, and to allow it to be added to a C<Loop> in the first place. 1135 1136 my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... ); 1137 $loop->add( $stream ); 1138 1139 my $f = $stream->read_... 1140 1141If a read EOF or error condition happens while there are read C<Future>s 1142pending, they are all completed. In the case of a read EOF, they are done with 1143C<undef>; in the case of a read error they are failed using the C<$!> error 1144value as the failure. 1145 1146 $f->fail( $message, sysread => $! ) 1147 1148If a read EOF condition happens to the currently-processing read C<Future>, it 1149will return a partial result. The calling code can detect this by the fact 1150that the returned data is not complete according to the specification (too 1151short in C<read_exactly>'s case, or lacking the ending pattern in 1152C<read_until>'s case). Additionally, each C<Future> will yield the C<$eof> 1153value in its results. 1154 1155 my ( $string, $eof ) = $f->get; 1156 1157=cut 1158 1159sub _read_future 1160{ 1161 my $self = shift; 1162 my $f = $self->loop->new_future; 1163 $f->on_cancel( $self->_capture_weakself( sub { 1164 my $self = shift or return; 1165 1 while $self->_flush_one_read; 1166 })); 1167 return $f; 1168} 1169 1170=head2 read_atmost 1171 1172=head2 read_exactly 1173 1174 ( $string, $eof ) = $stream->read_atmost( $len )->get 1175 1176 ( $string, $eof ) = $stream->read_exactly( $len )->get 1177 1178Completes the C<Future> when the read buffer contains C<$len> or more 1179characters of input. C<read_atmost> will also complete after the first 1180invocation of C<on_read>, even if fewer characters are available, whereas 1181C<read_exactly> will wait until at least C<$len> are available. 1182 1183=cut 1184 1185sub read_atmost 1186{ 1187 my $self = shift; 1188 my ( $len ) = @_; 1189 1190 my $f = $self->_read_future; 1191 $self->push_on_read( sub { 1192 my ( undef, $buffref, $eof ) = @_; 1193 return undef if $f->is_cancelled; 1194 $f->done( substr( $$buffref, 0, $len, "" ), $eof ); 1195 return undef; 1196 }, future => $f ); 1197 return $f; 1198} 1199 1200sub read_exactly 1201{ 1202 my $self = shift; 1203 my ( $len ) = @_; 1204 1205 my $f = $self->_read_future; 1206 $self->push_on_read( sub { 1207 my ( undef, $buffref, $eof ) = @_; 1208 return undef if $f->is_cancelled; 1209 return 0 unless $eof or length $$buffref >= $len; 1210 $f->done( substr( $$buffref, 0, $len, "" ), $eof ); 1211 return undef; 1212 }, future => $f ); 1213 return $f; 1214} 1215 1216=head2 read_until 1217 1218 ( $string, $eof ) = $stream->read_until( $end )->get 1219 1220Completes the C<Future> when the read buffer contains a match for C<$end>, 1221which may either be a plain string or a compiled C<Regexp> reference. Yields 1222the prefix of the buffer up to and including this match. 1223 1224=cut 1225 1226sub read_until 1227{ 1228 my $self = shift; 1229 my ( $until ) = @_; 1230 1231 ref $until or $until = qr/\Q$until\E/; 1232 1233 my $f = $self->_read_future; 1234 $self->push_on_read( sub { 1235 my ( undef, $buffref, $eof ) = @_; 1236 return undef if $f->is_cancelled; 1237 if( $$buffref =~ $until ) { 1238 $f->done( substr( $$buffref, 0, $+[0], "" ), $eof ); 1239 return undef; 1240 } 1241 elsif( $eof ) { 1242 $f->done( $$buffref, $eof ); $$buffref = ""; 1243 return undef; 1244 } 1245 else { 1246 return 0; 1247 } 1248 }, future => $f ); 1249 return $f; 1250} 1251 1252=head2 read_until_eof 1253 1254 ( $string, $eof ) = $stream->read_until_eof->get 1255 1256Completes the C<Future> when the stream is eventually closed at EOF, and 1257yields all of the data that was available. 1258 1259=cut 1260 1261sub read_until_eof 1262{ 1263 my $self = shift; 1264 1265 my $f = $self->_read_future; 1266 $self->push_on_read( sub { 1267 my ( undef, $buffref, $eof ) = @_; 1268 return undef if $f->is_cancelled; 1269 return 0 unless $eof; 1270 $f->done( $$buffref, $eof ); $$buffref = ""; 1271 return undef; 1272 }, future => $f ); 1273 return $f; 1274} 1275 1276=head1 UTILITY CONSTRUCTORS 1277 1278=cut 1279 1280=head2 new_for_stdin 1281 1282=head2 new_for_stdout 1283 1284=head2 new_for_stdio 1285 1286 $stream = IO::Async::Stream->new_for_stdin 1287 1288 $stream = IO::Async::Stream->new_for_stdout 1289 1290 $stream = IO::Async::Stream->new_for_stdio 1291 1292Return a C<IO::Async::Stream> object preconfigured with the correct 1293C<read_handle>, C<write_handle> or both. 1294 1295=cut 1296 1297sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) } 1298sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) } 1299 1300sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) } 1301 1302=head2 connect 1303 1304 $future = $stream->connect( %args ) 1305 1306A convenient wrapper for calling the C<connect> method on the underlying 1307L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not 1308otherwise supplied. 1309 1310=cut 1311 1312sub connect 1313{ 1314 my $self = shift; 1315 return $self->SUPER::connect( socktype => "stream", @_ ); 1316} 1317 1318=head1 DEBUGGING FLAGS 1319 1320The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging: 1321 1322=over 4 1323 1324=item C<Sr> 1325 1326Log byte buffers as data is read from a Stream 1327 1328=item C<Sw> 1329 1330Log byte buffers as data is written to a Stream 1331 1332=back 1333 1334=cut 1335 1336=head1 EXAMPLES 1337 1338=head2 A line-based C<on_read> method 1339 1340The following C<on_read> method accepts incoming C<\n>-terminated lines and 1341prints them to the program's C<STDOUT> stream. 1342 1343 sub on_read 1344 { 1345 my $self = shift; 1346 my ( $buffref, $eof ) = @_; 1347 1348 while( $$buffref =~ s/^(.*\n)// ) { 1349 print "Received a line: $1"; 1350 } 1351 1352 return 0; 1353 } 1354 1355Because a reference to the buffer itself is passed, it is simple to use a 1356C<s///> regular expression on the scalar it points at, to both check if data 1357is ready (i.e. a whole line), and to remove it from the buffer. Since it 1358always removes as many complete lines as possible, it doesn't need invoking 1359again when it has finished, so it can return a constant C<0>. 1360 1361=head2 Reading binary data 1362 1363This C<on_read> method accepts incoming records in 16-byte chunks, printing 1364each one. 1365 1366 sub on_read 1367 { 1368 my ( $self, $buffref, $eof ) = @_; 1369 1370 if( length $$buffref >= 16 ) { 1371 my $record = substr( $$buffref, 0, 16, "" ); 1372 print "Received a 16-byte record: $record\n"; 1373 1374 return 1; 1375 } 1376 1377 if( $eof and length $$buffref ) { 1378 print "EOF: a partial record still exists\n"; 1379 } 1380 1381 return 0; 1382 } 1383 1384This time, rather than a C<while()> loop we have decided to have the handler 1385just process one record, and use the C<return 1> mechanism to ask that the 1386handler be invoked again if there still remains data that might contain 1387another record; only stopping with C<return 0> when we know we can't find one. 1388 1389The 4-argument form of C<substr()> extracts the 16-byte record from the buffer 1390and assigns it to the C<$record> variable, if there was enough data in the 1391buffer to extract it. 1392 1393A lot of protocols use a fixed-size header, followed by a variable-sized body 1394of data, whose size is given by one of the fields of the header. The following 1395C<on_read> method extracts messages in such a protocol. 1396 1397 sub on_read 1398 { 1399 my ( $self, $buffref, $eof ) = @_; 1400 1401 return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes 1402 1403 my ( $len, $x, $y ) = unpack "N n n", $$buffref; 1404 1405 return 0 unless length $$buffref >= 8 + $len; 1406 1407 substr( $$buffref, 0, 8, "" ); 1408 my $data = substr( $$buffref, 0, $len, "" ); 1409 1410 print "A record with values x=$x y=$y\n"; 1411 1412 return 1; 1413 } 1414 1415In this example, the header is C<unpack()>ed first, to extract the body 1416length, and then the body is extracted. If the buffer does not have enough 1417data yet for a complete message then C<0> is returned, and the buffer is left 1418unmodified for next time. Only when there are enough bytes in total does it 1419use C<substr()> to remove them. 1420 1421=head2 Dynamic replacement of C<on_read> 1422 1423Consider the following protocol (inspired by IMAP), which consists of 1424C<\n>-terminated lines that may have an optional data block attached. The 1425presence of such a data block, as well as its size, is indicated by the line 1426prefix. 1427 1428 sub on_read 1429 { 1430 my $self = shift; 1431 my ( $buffref, $eof ) = @_; 1432 1433 if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) { 1434 my $length = $1; 1435 my $line = $2; 1436 1437 return sub { 1438 my $self = shift; 1439 my ( $buffref, $eof ) = @_; 1440 1441 return 0 unless length $$buffref >= $length; 1442 1443 # Take and remove the data from the buffer 1444 my $data = substr( $$buffref, 0, $length, "" ); 1445 1446 print "Received a line $line with some data ($data)\n"; 1447 1448 return undef; # Restore the original method 1449 } 1450 } 1451 elsif( $$buffref =~ s/^LINE:(.*)\n// ) { 1452 my $line = $1; 1453 1454 print "Received a line $line with no data\n"; 1455 1456 return 1; 1457 } 1458 else { 1459 print STDERR "Unrecognised input\n"; 1460 # Handle it somehow 1461 } 1462 } 1463 1464In the case where trailing data is supplied, a new temporary C<on_read> 1465callback is provided in a closure. This closure captures the C<$length> 1466variable so it knows how much data to expect. It also captures the C<$line> 1467variable so it can use it in the event report. When this method has finished 1468reading the data, it reports the event, then restores the original method by 1469returning C<undef>. 1470 1471=head1 SEE ALSO 1472 1473=over 4 1474 1475=item * 1476 1477L<IO::Handle> - Supply object methods for I/O handles 1478 1479=back 1480 1481=head1 AUTHOR 1482 1483Paul Evans <leonerd@leonerd.org.uk> 1484 1485=cut 1486 14870x55AA; 1488