1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6use IO::Async::Test;
7
8use Test::More;
9use Test::Fatal;
10use Test::Refcount;
11
12use IO::File;
13use Errno qw( EAGAIN EWOULDBLOCK );
14
15use IO::Async::Loop;
16
17use IO::Async::OS;
18
19use IO::Async::Stream;
20
21my $loop = IO::Async::Loop->new_builtin;
22
23testing_loop( $loop );
24
25my ( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!";
26my ( $S3, $S4 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!";
27
28# Need sockets in nonblocking mode
29$_->blocking( 0 ) for $S1, $S2, $S3, $S4;
30
31# useful test function
32sub read_data
33{
34   my ( $s ) = @_;
35
36   my $buffer;
37   my $ret = $s->sysread( $buffer, 8192 );
38
39   return $buffer if( defined $ret && $ret > 0 );
40   die "Socket closed" if( defined $ret && $ret == 0 );
41   return "" if $! == EAGAIN or $! == EWOULDBLOCK;
42   die "Cannot sysread() - $!";
43}
44
45my @lines;
46
47my $stream = IO::Async::Stream->new(
48   read_handle => $S2,
49   write_handle => $S3,
50   on_read => sub {
51      my $self = shift;
52      my ( $buffref, $eof ) = @_;
53
54      push @lines, $1 while $$buffref =~ s/^(.*\n)//;
55      return 0;
56   },
57);
58
59is_oneref( $stream, 'split read/write $stream has refcount 1 initially' );
60
61undef @lines;
62
63$loop->add( $stream );
64
65is_refcount( $stream, 2, 'split read/write $stream has refcount 2 after adding to Loop' );
66
67$stream->write( "message\n" );
68
69$loop->loop_once( 0.1 );
70
71is( read_data( $S4 ), "message\n", '$S4 receives data from split stream' );
72is( read_data( $S1 ), "",          '$S1 empty from split stream' );
73
74$S1->syswrite( "reverse\n" );
75
76$loop->loop_once( 0.1 );
77
78is_deeply( \@lines, [ "reverse\n" ], '@lines on response to split stream' );
79
80is_refcount( $stream, 2, 'split read/write $stream has refcount 2 before removing from Loop' );
81
82$loop->remove( $stream );
83
84is_oneref( $stream, 'split read/write $stream refcount 1 finally' );
85
86undef $stream;
87
88my $buffer = "";
89my $closed;
90
91$stream = IO::Async::Stream->new(
92   # No handle yet
93   on_read => sub {
94      my ( $self, $buffref, $eof ) = @_;
95      $buffer .= $$buffref;
96      $$buffref =  "";
97      return 0;
98   },
99   on_closed => sub {
100      my ( $self ) = @_;
101      $closed = 1;
102   },
103);
104
105is_oneref( $stream, 'latehandle $stream has refcount 1 initially' );
106
107$loop->add( $stream );
108
109is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after adding to Loop' );
110
111ok( exception { $stream->write( "some text" ) },
112    '->write on stream with no IO handle fails' );
113
114$stream->set_handle( $S1 );
115
116is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after setting a handle' );
117
118$stream->write( "some text" );
119
120$loop->loop_once( 0.1 );
121
122my $buffer2;
123$S2->sysread( $buffer2, 8192 );
124
125is( $buffer2, "some text", 'stream-written text appears' );
126
127$S2->syswrite( "more text" );
128
129wait_for { length $buffer };
130
131is( $buffer, "more text", 'stream-read text appears' );
132
133$stream->close_when_empty;
134
135is( $closed, 1, 'closed after close' );
136
137ok( !defined $stream->loop, 'Stream no longer member of Loop' );
138
139is_oneref( $stream, 'latehandle $stream refcount 1 finally' );
140
141# Now try re-opening the stream with a new handle, and check it continues to
142# work
143
144$loop->add( $stream );
145
146$stream->set_handle( $S3 );
147
148$stream->write( "more text" );
149
150$loop->loop_once( 0.1 );
151
152undef $buffer2;
153$S4->sysread( $buffer2, 8192 );
154
155is( $buffer2, "more text", 'stream-written text appears after reopen' );
156
157$loop->remove( $stream );
158
159undef $stream;
160
161( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot socketpair - $!";
162$_->blocking( 0 ) for $S1, $S2;
163
164$stream = IO::Async::Stream->new(
165   handle => $S1,
166   on_read => sub { },
167);
168
169$stream->write( "hello" );
170
171$loop->add( $stream );
172
173is_refcount( $stream, 2, '$stream has two references' );
174undef $stream; # Only ref is now in the Loop
175
176$S2->close;
177
178# $S1 should now be both read- and write-ready.
179ok( !exception { $loop->loop_once }, 'read+write-ready closed Stream doesn\'t die' );
180
181undef $stream;
182
183binmode STDIN; # Avoid harmless warning in case -CS is in effect
184$stream = IO::Async::Stream->new_for_stdio;
185is( $stream->read_handle,  \*STDIN,  'Stream->new_for_stdio->read_handle is STDIN' );
186is( $stream->write_handle, \*STDOUT, 'Stream->new_for_stdio->write_handle is STDOUT' );
187
188done_testing;
189