1#!/usr/bin/perl 2 3use strict; 4use warnings; 5 6use IO::Async::Test; 7 8use Test::More; 9use Test::Fatal; 10use Test::Refcount; 11 12use IO::File; 13use Errno qw( EAGAIN EWOULDBLOCK ); 14 15use IO::Async::Loop; 16 17use IO::Async::OS; 18 19use IO::Async::Stream; 20 21my $loop = IO::Async::Loop->new_builtin; 22 23testing_loop( $loop ); 24 25my ( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!"; 26my ( $S3, $S4 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!"; 27 28# Need sockets in nonblocking mode 29$_->blocking( 0 ) for $S1, $S2, $S3, $S4; 30 31# useful test function 32sub read_data 33{ 34 my ( $s ) = @_; 35 36 my $buffer; 37 my $ret = $s->sysread( $buffer, 8192 ); 38 39 return $buffer if( defined $ret && $ret > 0 ); 40 die "Socket closed" if( defined $ret && $ret == 0 ); 41 return "" if $! == EAGAIN or $! == EWOULDBLOCK; 42 die "Cannot sysread() - $!"; 43} 44 45my @lines; 46 47my $stream = IO::Async::Stream->new( 48 read_handle => $S2, 49 write_handle => $S3, 50 on_read => sub { 51 my $self = shift; 52 my ( $buffref, $eof ) = @_; 53 54 push @lines, $1 while $$buffref =~ s/^(.*\n)//; 55 return 0; 56 }, 57); 58 59is_oneref( $stream, 'split read/write $stream has refcount 1 initially' ); 60 61undef @lines; 62 63$loop->add( $stream ); 64 65is_refcount( $stream, 2, 'split read/write $stream has refcount 2 after adding to Loop' ); 66 67$stream->write( "message\n" ); 68 69$loop->loop_once( 0.1 ); 70 71is( read_data( $S4 ), "message\n", '$S4 receives data from split stream' ); 72is( read_data( $S1 ), "", '$S1 empty from split stream' ); 73 74$S1->syswrite( "reverse\n" ); 75 76$loop->loop_once( 0.1 ); 77 78is_deeply( \@lines, [ "reverse\n" ], '@lines on response to split stream' ); 79 80is_refcount( $stream, 2, 'split read/write $stream has refcount 2 before removing from Loop' ); 81 82$loop->remove( $stream ); 83 84is_oneref( $stream, 'split read/write $stream refcount 1 finally' ); 85 86undef $stream; 87 88my $buffer = ""; 89my $closed; 90 91$stream = IO::Async::Stream->new( 92 # No handle yet 93 on_read => sub { 94 my ( $self, $buffref, $eof ) = @_; 95 $buffer .= $$buffref; 96 $$buffref = ""; 97 return 0; 98 }, 99 on_closed => sub { 100 my ( $self ) = @_; 101 $closed = 1; 102 }, 103); 104 105is_oneref( $stream, 'latehandle $stream has refcount 1 initially' ); 106 107$loop->add( $stream ); 108 109is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after adding to Loop' ); 110 111ok( exception { $stream->write( "some text" ) }, 112 '->write on stream with no IO handle fails' ); 113 114$stream->set_handle( $S1 ); 115 116is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after setting a handle' ); 117 118$stream->write( "some text" ); 119 120$loop->loop_once( 0.1 ); 121 122my $buffer2; 123$S2->sysread( $buffer2, 8192 ); 124 125is( $buffer2, "some text", 'stream-written text appears' ); 126 127$S2->syswrite( "more text" ); 128 129wait_for { length $buffer }; 130 131is( $buffer, "more text", 'stream-read text appears' ); 132 133$stream->close_when_empty; 134 135is( $closed, 1, 'closed after close' ); 136 137ok( !defined $stream->loop, 'Stream no longer member of Loop' ); 138 139is_oneref( $stream, 'latehandle $stream refcount 1 finally' ); 140 141# Now try re-opening the stream with a new handle, and check it continues to 142# work 143 144$loop->add( $stream ); 145 146$stream->set_handle( $S3 ); 147 148$stream->write( "more text" ); 149 150$loop->loop_once( 0.1 ); 151 152undef $buffer2; 153$S4->sysread( $buffer2, 8192 ); 154 155is( $buffer2, "more text", 'stream-written text appears after reopen' ); 156 157$loop->remove( $stream ); 158 159undef $stream; 160 161( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot socketpair - $!"; 162$_->blocking( 0 ) for $S1, $S2; 163 164$stream = IO::Async::Stream->new( 165 handle => $S1, 166 on_read => sub { }, 167); 168 169$stream->write( "hello" ); 170 171$loop->add( $stream ); 172 173is_refcount( $stream, 2, '$stream has two references' ); 174undef $stream; # Only ref is now in the Loop 175 176$S2->close; 177 178# $S1 should now be both read- and write-ready. 179ok( !exception { $loop->loop_once }, 'read+write-ready closed Stream doesn\'t die' ); 180 181undef $stream; 182 183binmode STDIN; # Avoid harmless warning in case -CS is in effect 184$stream = IO::Async::Stream->new_for_stdio; 185is( $stream->read_handle, \*STDIN, 'Stream->new_for_stdio->read_handle is STDIN' ); 186is( $stream->write_handle, \*STDOUT, 'Stream->new_for_stdio->write_handle is STDOUT' ); 187 188done_testing; 189