1#!/usr/bin/perl 2 3use strict; 4use warnings; 5 6use IO::Async::Test; 7 8use Test::More; 9use Test::Metrics::Any; 10use Test::Refcount; 11 12use Errno qw( EAGAIN EWOULDBLOCK ECONNRESET ); 13 14use IO::Async::Loop; 15 16use IO::Async::OS; 17 18use IO::Async::Stream; 19 20my $loop = IO::Async::Loop->new_builtin; 21 22testing_loop( $loop ); 23 24sub mkhandles 25{ 26 my ( $rd, $wr ) = IO::Async::OS->pipepair or die "Cannot pipe() - $!"; 27 # Need handles in nonblocking mode 28 $rd->blocking( 0 ); 29 $wr->blocking( 0 ); 30 31 return ( $rd, $wr ); 32} 33 34# useful test function 35sub read_data 36{ 37 my ( $s ) = @_; 38 39 my $buffer; 40 my $ret = $s->sysread( $buffer, 8192 ); 41 42 return $buffer if( defined $ret && $ret > 0 ); 43 die "Socket closed" if( defined $ret && $ret == 0 ); 44 return "" if $! == EAGAIN or $! == EWOULDBLOCK; 45 die "Cannot sysread() - $!"; 46} 47 48{ 49 my ( $rd, $wr ) = mkhandles; 50 51 my $empty; 52 53 my $stream = IO::Async::Stream->new( 54 write_handle => $wr, 55 on_outgoing_empty => sub { $empty = 1 }, 56 ); 57 58 ok( defined $stream, 'writing $stream defined' ); 59 isa_ok( $stream, "IO::Async::Stream", 'writing $stream isa IO::Async::Stream' ); 60 61 is_oneref( $stream, 'writing $stream has refcount 1 initially' ); 62 63 $loop->add( $stream ); 64 65 is_refcount( $stream, 2, 'writing $stream has refcount 2 after adding to Loop' ); 66 67 ok( !$stream->want_writeready, 'want_writeready before write' ); 68 $stream->write( "message\n" ); 69 70 ok( $stream->want_writeready, 'want_writeready after write' ); 71 72 wait_for { $empty }; 73 74 ok( !$stream->want_writeready, 'want_writeready after wait' ); 75 is( $empty, 1, '$empty after writing buffer' ); 76 77 is( read_data( $rd ), "message\n", 'data after writing buffer' ); 78 79 my $written = 0; 80 my $flushed; 81 82 my $f = $stream->write( "hello again\n", 83 on_write => sub { 84 is( $_[0], $stream, 'on_write $_[0] is $stream' ); 85 $written += $_[1]; 86 }, 87 on_flush => sub { 88 is( $_[0], $stream, 'on_flush $_[0] is $stream' ); 89 $flushed++ 90 }, 91 ); 92 93 ok( !$f->is_ready, '->write future not yet ready' ); 94 95 wait_for { $flushed }; 96 97 ok( $f->is_ready, '->write future is ready after flush' ); 98 is( $written, 12, 'on_write given total write length after flush' ); 99 is( read_data( $rd ), "hello again\n", 'flushed data does get flushed' ); 100 101 $flushed = 0; 102 $stream->write( "", on_flush => sub { $flushed++ } ); 103 wait_for { $flushed }; 104 105 ok( 1, "write empty data with on_flush" ); 106 107 $stream->configure( autoflush => 1 ); 108 $stream->write( "immediate\n" ); 109 110 ok( !$stream->want_writeready, 'not want_writeready after autoflush write' ); 111 is( read_data( $rd ), "immediate\n", 'data after autoflush write' ); 112 113 $stream->configure( autoflush => 0 ); 114 $stream->write( "partial " ); 115 $stream->configure( autoflush => 1 ); 116 $stream->write( "data\n" ); 117 118 ok( !$stream->want_writeready, 'not want_writeready after split autoflush write' ); 119 is( read_data( $rd ), "partial data\n", 'data after split autoflush write' ); 120 121 is_refcount( $stream, 2, 'writing $stream has refcount 2 before removing from Loop' ); 122 123 $loop->remove( $stream ); 124 125 is_oneref( $stream, 'writing $stream refcount 1 finally' ); 126} 127 128# Abstract writing with writer function 129{ 130 my ( $rd, $wr ) = mkhandles; 131 my $buffer; 132 133 my $stream = IO::Async::Stream->new( 134 write_handle => $wr, 135 writer => sub { 136 my $self = shift; 137 $buffer .= substr( $_[1], 0, $_[2], "" ); 138 return $_[2]; 139 }, 140 ); 141 142 $loop->add( $stream ); 143 144 my $flushed; 145 $stream->write( "Some data for abstract buffer\n", on_flush => sub { $flushed++ } ); 146 147 wait_for { $flushed }; 148 149 is( $buffer, "Some data for abstract buffer\n", '$buffer after ->write to stream with abstract writer' ); 150 151 $loop->remove( $stream ); 152} 153 154# ->want_writeready_for_read 155{ 156 my ( $rd, $wr ) = mkhandles; 157 158 my $reader_called; 159 my $stream = IO::Async::Stream->new( 160 handle => $wr, 161 on_read => sub { return 0; }, # ignore reading 162 reader => sub { $reader_called++; $! = EAGAIN; return undef }, 163 ); 164 165 $loop->add( $stream ); 166 167 $loop->loop_once( 0.1 ); # haaaaack 168 169 ok( !$reader_called, 'reader not yet called before ->want_writeready_for_read' ); 170 171 $stream->want_writeready_for_read( 1 ); 172 173 wait_for { $reader_called }; 174 175 ok( $reader_called, 'reader now invoked with ->want_writeready_for_read' ); 176 177 $loop->remove( $stream ); 178} 179 180# on_writeable_{start,stop} 181{ 182 my ( $rd, $wr ) = mkhandles; 183 my $buffer; 184 185 my $writeable; 186 my $unwriteable; 187 my $emulate_writeable = 0; 188 my $stream = IO::Async::Stream->new( 189 write_handle => $wr, 190 writer => sub { 191 my $self = shift; 192 $! = EAGAIN, return undef unless $emulate_writeable; 193 194 $buffer .= substr( $_[1], 0, $_[2], "" ); 195 return $_[2]; 196 }, 197 on_writeable_start => sub { $writeable++ }, 198 on_writeable_stop => sub { $unwriteable++ }, 199 ); 200 201 $loop->add( $stream ); 202 203 $stream->write( "Something" ); 204 205 wait_for { $unwriteable }; 206 207 $emulate_writeable = 1; 208 209 wait_for { $writeable }; 210 211 is( $buffer, "Something", '$buffer after emulated EAGAIN' ); 212 213 $loop->remove( $stream ); 214} 215 216{ 217 my ( $rd, $wr ) = mkhandles; 218 219 my $stream = IO::Async::Stream->new( 220 write_handle => $wr, 221 write_len => 2, 222 ); 223 224 $loop->add( $stream ); 225 226 $stream->write( "partial" ); 227 228 $loop->loop_once( 0.1 ); 229 230 is( read_data( $rd ), "pa", 'data after writing buffer with write_len=2 without write_all'); 231 232 $loop->loop_once( 0.1 ) for 1 .. 3; 233 234 is( read_data( $rd ), "rtial", 'data finally after writing buffer with write_len=2 without write_all' ); 235 236 $stream->configure( write_all => 1 ); 237 238 $stream->write( "partial" ); 239 240 $loop->loop_once( 0.1 ); 241 242 is( read_data( $rd ), "partial", 'data after writing buffer with write_len=2 with write_all'); 243 244 $loop->remove( $stream ); 245} 246 247# EOF 248SKIP: { 249 skip "This loop cannot detect hangup condition", 5 unless $loop->_CAN_ON_HANGUP; 250 251 my ( $rd, $wr ) = mkhandles; 252 253 local $SIG{PIPE} = "IGNORE"; 254 255 my $eof = 0; 256 257 my $stream = IO::Async::Stream->new( write_handle => $wr, 258 on_write_eof => sub { $eof++ }, 259 ); 260 261 $loop->add( $stream ); 262 263 my $write_future = $stream->write( "Junk" ); 264 265 $rd->close; 266 267 ok( !$stream->is_write_eof, '$stream->is_write_eof before wait' ); 268 is( $eof, 0, 'EOF indication before wait' ); 269 270 wait_for { $eof }; 271 272 ok( $stream->is_write_eof, '$stream->is_write_eof after wait' ); 273 is( $eof, 1, 'EOF indication after wait' ); 274 275 ok( !defined $stream->loop, 'EOF stream no longer member of Loop' ); 276 277 ok( $write_future->is_ready,'write future ready after stream closed' ); 278 ok( $write_future->is_failed,'write future failed after stream closed' ); 279} 280 281# Close 282{ 283 my ( $rd, $wr ) = mkhandles; 284 285 my $closed = 0; 286 my $loop_during_closed; 287 288 my $stream = IO::Async::Stream->new( write_handle => $wr, 289 on_closed => sub { 290 my ( $self ) = @_; 291 $closed = 1; 292 $loop_during_closed = $self->loop; 293 }, 294 ); 295 296 is_oneref( $stream, 'closing $stream has refcount 1 initially' ); 297 298 $stream->write( "hello" ); 299 300 $loop->add( $stream ); 301 302 is_refcount( $stream, 2, 'closing $stream has refcount 2 after adding to Loop' ); 303 304 is( $closed, 0, 'closed before close' ); 305 306 $stream->close_when_empty; 307 308 is( $closed, 0, 'closed after close' ); 309 310 wait_for { $closed }; 311 312 is( $closed, 1, 'closed after wait' ); 313 is( $loop_during_closed, $loop, 'loop during closed' ); 314 315 ok( !defined $stream->loop, 'Stream no longer member of Loop' ); 316 317 is_oneref( $stream, 'closing $stream refcount 1 finally' ); 318} 319 320# ->write( Future ) 321{ 322 my ( $rd, $wr ) = mkhandles; 323 my $stream = IO::Async::Stream->new( 324 write_handle => $wr, 325 ); 326 $loop->add( $stream ); 327 328 my $written = 0; 329 my $flushed; 330 $stream->write( 331 my $future = $loop->new_future, 332 on_write => sub { $written += $_[1] }, 333 on_flush => sub { $flushed++ }, 334 ); 335 336 $loop->loop_once( 0.1 ); 337 is( read_data( $rd ), "", 'stream idle before Future completes' ); 338 339 $future->done( "some data to write" ); 340 341 wait_for { $flushed }; 342 343 is( $written, 18, 'stream written by Future completion invokes on_write' ); 344 345 is( read_data( $rd ), "some data to write", 'stream written by Future completion' ); 346 347 $loop->remove( $stream ); 348} 349 350# ->write( CODE ) 351{ 352 my ( $rd, $wr ) = mkhandles; 353 my $stream = IO::Async::Stream->new( 354 write_handle => $wr, 355 ); 356 $loop->add( $stream ); 357 358 my $done; 359 my $written = 0; 360 my $flushed; 361 362 $stream->write( 363 sub { 364 is( $_[0], $stream, 'Writersub $_[0] is $stream' ); 365 return $done++ ? undef : "a lazy message\n"; 366 }, 367 on_write => sub { $written += $_[1] }, 368 on_flush => sub { $flushed++ }, 369 ); 370 371 $flushed = 0; 372 wait_for { $flushed }; 373 374 is( $written, 15, 'stream written by generator CODE invokes on_write' ); 375 376 is( read_data( $rd ), "a lazy message\n", 'lazy data was written' ); 377 378 my @chunks = ( "some ", "message chunks ", "here\n" ); 379 380 $stream->write( 381 sub { 382 return shift @chunks; 383 }, 384 on_flush => sub { $flushed++ }, 385 ); 386 387 $flushed = 0; 388 wait_for { $flushed }; 389 390 is( read_data( $rd ), "some message chunks here\n", 'multiple lazy data was written' ); 391 392 $loop->remove( $stream ); 393} 394 395# ->write mixed returns 396{ 397 my ( $rd, $wr ) = mkhandles; 398 my $stream = IO::Async::Stream->new( 399 write_handle => $wr, 400 ); 401 $loop->add( $stream ); 402 403 my $flushed; 404 $stream->write( my $future = $loop->new_future, on_flush => sub { $flushed++ } ); 405 406 my $once = 0; 407 $future->done( sub { 408 return $once++ ? undef : ( $future = $loop->new_future ); 409 }); 410 411 wait_for { $once }; 412 413 $future->done( "Eventual string" ); 414 415 wait_for { $flushed }; 416 417 is( read_data( $rd ), "Eventual string", 'multiple lazy data was written' ); 418 419 $loop->remove( $stream ); 420} 421 422{ 423 my ( $rd, $wr ) = mkhandles; 424 425 my $stream = IO::Async::Stream->new; 426 427 my $flushed; 428 429 $stream->write( "Prequeued data", on_flush => sub { $flushed++ } ); 430 431 $stream->configure( write_handle => $wr ); 432 433 $loop->add( $stream ); 434 435 wait_for { $flushed }; 436 437 ok( 1, 'prequeued data gets flushed' ); 438 439 is( read_data( $rd ), "Prequeued data", 'prequeued data gets written' ); 440 441 $loop->remove( $stream ); 442} 443 444# Errors 445{ 446 my ( $rd, $wr ) = mkhandles; 447 448 no warnings 'redefine'; 449 local *IO::Handle::syswrite = sub { 450 $! = ECONNRESET; 451 return undef; 452 }; 453 454 my $write_errno; 455 456 my $stream = IO::Async::Stream->new( 457 write_handle => $wr, 458 on_write_error => sub { ( undef, $write_errno ) = @_ }, 459 ); 460 461 $loop->add( $stream ); 462 463 my $write_future = $stream->write( "hello" ); 464 465 wait_for { defined $write_errno }; 466 467 cmp_ok( $write_errno, "==", ECONNRESET, 'errno after failed write' ); 468 469 ok( $write_future->is_ready,'write future ready after failed write' ); 470 ok( $write_future->is_failed,'write future failed after failed write' ); 471 472 $loop->remove( $stream ); 473} 474 475{ 476 my $stream = IO::Async::Stream->new_for_stdout; 477 is( $stream->write_handle, \*STDOUT, 'Stream->new_for_stdout->write_handle is STDOUT' ); 478} 479 480# Metrics 481SKIP: { 482 skip "Metrics are unavailable" unless $IO::Async::Metrics::METRICS; 483 my ( $rd, $wr ) = mkhandles; 484 485 my $done; 486 my $stream = IO::Async::Stream->new( 487 write_handle => $wr, 488 on_outgoing_empty => sub { $done = 1 }, 489 ); 490 $stream->write( "X"x100 ); 491 492 $loop->add( $stream ); 493 494 is_metrics_from( 495 sub { wait_for { $done } }, 496 { io_async_stream_written => 100 }, 497 'Stream reading increments metric' 498 ); 499 500 $loop->remove( $stream ); 501} 502 503done_testing; 504