1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6use IO::Async::Test;
7
8use Test::More;
9use Test::Metrics::Any;
10use Test::Refcount;
11
12use Errno qw( EAGAIN EWOULDBLOCK ECONNRESET );
13
14use IO::Async::Loop;
15
16use IO::Async::OS;
17
18use IO::Async::Stream;
19
20my $loop = IO::Async::Loop->new_builtin;
21
22testing_loop( $loop );
23
24sub mkhandles
25{
26   my ( $rd, $wr ) = IO::Async::OS->pipepair or die "Cannot pipe() - $!";
27   # Need handles in nonblocking mode
28   $rd->blocking( 0 );
29   $wr->blocking( 0 );
30
31   return ( $rd, $wr );
32}
33
34# useful test function
35sub read_data
36{
37   my ( $s ) = @_;
38
39   my $buffer;
40   my $ret = $s->sysread( $buffer, 8192 );
41
42   return $buffer if( defined $ret && $ret > 0 );
43   die "Socket closed" if( defined $ret && $ret == 0 );
44   return "" if $! == EAGAIN or $! == EWOULDBLOCK;
45   die "Cannot sysread() - $!";
46}
47
48{
49   my ( $rd, $wr ) = mkhandles;
50
51   my $empty;
52
53   my $stream = IO::Async::Stream->new(
54      write_handle => $wr,
55      on_outgoing_empty => sub { $empty = 1 },
56   );
57
58   ok( defined $stream, 'writing $stream defined' );
59   isa_ok( $stream, "IO::Async::Stream", 'writing $stream isa IO::Async::Stream' );
60
61   is_oneref( $stream, 'writing $stream has refcount 1 initially' );
62
63   $loop->add( $stream );
64
65   is_refcount( $stream, 2, 'writing $stream has refcount 2 after adding to Loop' );
66
67   ok( !$stream->want_writeready, 'want_writeready before write' );
68   $stream->write( "message\n" );
69
70   ok( $stream->want_writeready, 'want_writeready after write' );
71
72   wait_for { $empty };
73
74   ok( !$stream->want_writeready, 'want_writeready after wait' );
75   is( $empty, 1, '$empty after writing buffer' );
76
77   is( read_data( $rd ), "message\n", 'data after writing buffer' );
78
79   my $written = 0;
80   my $flushed;
81
82   my $f = $stream->write( "hello again\n",
83      on_write => sub {
84         is( $_[0], $stream, 'on_write $_[0] is $stream' );
85         $written += $_[1];
86      },
87      on_flush => sub {
88         is( $_[0], $stream, 'on_flush $_[0] is $stream' );
89         $flushed++
90      },
91   );
92
93   ok( !$f->is_ready, '->write future not yet ready' );
94
95   wait_for { $flushed };
96
97   ok( $f->is_ready, '->write future is ready after flush' );
98   is( $written, 12, 'on_write given total write length after flush' );
99   is( read_data( $rd ), "hello again\n", 'flushed data does get flushed' );
100
101   $flushed = 0;
102   $stream->write( "", on_flush => sub { $flushed++ } );
103   wait_for { $flushed };
104
105   ok( 1, "write empty data with on_flush" );
106
107   $stream->configure( autoflush => 1 );
108   $stream->write( "immediate\n" );
109
110   ok( !$stream->want_writeready, 'not want_writeready after autoflush write' );
111   is( read_data( $rd ), "immediate\n", 'data after autoflush write' );
112
113   $stream->configure( autoflush => 0 );
114   $stream->write( "partial " );
115   $stream->configure( autoflush => 1 );
116   $stream->write( "data\n" );
117
118   ok( !$stream->want_writeready, 'not want_writeready after split autoflush write' );
119   is( read_data( $rd ), "partial data\n", 'data after split autoflush write' );
120
121   is_refcount( $stream, 2, 'writing $stream has refcount 2 before removing from Loop' );
122
123   $loop->remove( $stream );
124
125   is_oneref( $stream, 'writing $stream refcount 1 finally' );
126}
127
128# Abstract writing with writer function
129{
130   my ( $rd, $wr ) = mkhandles;
131   my $buffer;
132
133   my $stream = IO::Async::Stream->new(
134      write_handle => $wr,
135      writer => sub {
136         my $self = shift;
137         $buffer .= substr( $_[1], 0, $_[2], "" );
138         return $_[2];
139      },
140   );
141
142   $loop->add( $stream );
143
144   my $flushed;
145   $stream->write( "Some data for abstract buffer\n", on_flush => sub { $flushed++ } );
146
147   wait_for { $flushed };
148
149   is( $buffer, "Some data for abstract buffer\n", '$buffer after ->write to stream with abstract writer' );
150
151   $loop->remove( $stream );
152}
153
154# ->want_writeready_for_read
155{
156   my ( $rd, $wr ) = mkhandles;
157
158   my $reader_called;
159   my $stream = IO::Async::Stream->new(
160      handle => $wr,
161      on_read => sub { return 0; }, # ignore reading
162      reader => sub { $reader_called++; $! = EAGAIN; return undef },
163   );
164
165   $loop->add( $stream );
166
167   $loop->loop_once( 0.1 ); # haaaaack
168
169   ok( !$reader_called, 'reader not yet called before ->want_writeready_for_read' );
170
171   $stream->want_writeready_for_read( 1 );
172
173   wait_for { $reader_called };
174
175   ok( $reader_called, 'reader now invoked with ->want_writeready_for_read' );
176
177   $loop->remove( $stream );
178}
179
180# on_writeable_{start,stop}
181{
182   my ( $rd, $wr ) = mkhandles;
183   my $buffer;
184
185   my $writeable;
186   my $unwriteable;
187   my $emulate_writeable = 0;
188   my $stream = IO::Async::Stream->new(
189      write_handle => $wr,
190      writer => sub {
191         my $self = shift;
192         $! = EAGAIN, return undef unless $emulate_writeable;
193
194         $buffer .= substr( $_[1], 0, $_[2], "" );
195         return $_[2];
196      },
197      on_writeable_start => sub { $writeable++ },
198      on_writeable_stop  => sub { $unwriteable++ },
199   );
200
201   $loop->add( $stream );
202
203   $stream->write( "Something" );
204
205   wait_for { $unwriteable };
206
207   $emulate_writeable = 1;
208
209   wait_for { $writeable };
210
211   is( $buffer, "Something", '$buffer after emulated EAGAIN' );
212
213   $loop->remove( $stream );
214}
215
216{
217   my ( $rd, $wr ) = mkhandles;
218
219   my $stream = IO::Async::Stream->new(
220      write_handle => $wr,
221      write_len => 2,
222   );
223
224   $loop->add( $stream );
225
226   $stream->write( "partial" );
227
228   $loop->loop_once( 0.1 );
229
230   is( read_data( $rd ), "pa", 'data after writing buffer with write_len=2 without write_all');
231
232   $loop->loop_once( 0.1 ) for 1 .. 3;
233
234   is( read_data( $rd ), "rtial", 'data finally after writing buffer with write_len=2 without write_all' );
235
236   $stream->configure( write_all => 1 );
237
238   $stream->write( "partial" );
239
240   $loop->loop_once( 0.1 );
241
242   is( read_data( $rd ), "partial", 'data after writing buffer with write_len=2 with write_all');
243
244   $loop->remove( $stream );
245}
246
247# EOF
248SKIP: {
249   skip "This loop cannot detect hangup condition", 5 unless $loop->_CAN_ON_HANGUP;
250
251   my ( $rd, $wr ) = mkhandles;
252
253   local $SIG{PIPE} = "IGNORE";
254
255   my $eof = 0;
256
257   my $stream = IO::Async::Stream->new( write_handle => $wr,
258      on_write_eof => sub { $eof++ },
259   );
260
261   $loop->add( $stream );
262
263   my $write_future = $stream->write( "Junk" );
264
265   $rd->close;
266
267   ok( !$stream->is_write_eof, '$stream->is_write_eof before wait' );
268   is( $eof, 0, 'EOF indication before wait' );
269
270   wait_for { $eof };
271
272   ok( $stream->is_write_eof, '$stream->is_write_eof after wait' );
273   is( $eof, 1, 'EOF indication after wait' );
274
275   ok( !defined $stream->loop, 'EOF stream no longer member of Loop' );
276
277   ok( $write_future->is_ready,'write future ready after stream closed' );
278   ok( $write_future->is_failed,'write future failed after stream closed' );
279}
280
281# Close
282{
283   my ( $rd, $wr ) = mkhandles;
284
285   my $closed = 0;
286   my $loop_during_closed;
287
288   my $stream = IO::Async::Stream->new( write_handle => $wr,
289      on_closed => sub {
290         my ( $self ) = @_;
291         $closed = 1;
292         $loop_during_closed = $self->loop;
293      },
294   );
295
296   is_oneref( $stream, 'closing $stream has refcount 1 initially' );
297
298   $stream->write( "hello" );
299
300   $loop->add( $stream );
301
302   is_refcount( $stream, 2, 'closing $stream has refcount 2 after adding to Loop' );
303
304   is( $closed, 0, 'closed before close' );
305
306   $stream->close_when_empty;
307
308   is( $closed, 0, 'closed after close' );
309
310   wait_for { $closed };
311
312   is( $closed, 1, 'closed after wait' );
313   is( $loop_during_closed, $loop, 'loop during closed' );
314
315   ok( !defined $stream->loop, 'Stream no longer member of Loop' );
316
317   is_oneref( $stream, 'closing $stream refcount 1 finally' );
318}
319
320# ->write( Future )
321{
322   my ( $rd, $wr ) = mkhandles;
323   my $stream = IO::Async::Stream->new(
324      write_handle => $wr,
325   );
326   $loop->add( $stream );
327
328   my $written = 0;
329   my $flushed;
330   $stream->write(
331      my $future = $loop->new_future,
332      on_write => sub { $written += $_[1] },
333      on_flush => sub { $flushed++ },
334   );
335
336   $loop->loop_once( 0.1 );
337   is( read_data( $rd ), "", 'stream idle before Future completes' );
338
339   $future->done( "some data to write" );
340
341   wait_for { $flushed };
342
343   is( $written, 18, 'stream written by Future completion invokes on_write' );
344
345   is( read_data( $rd ), "some data to write", 'stream written by Future completion' );
346
347   $loop->remove( $stream );
348}
349
350# ->write( CODE )
351{
352   my ( $rd, $wr ) = mkhandles;
353   my $stream = IO::Async::Stream->new(
354      write_handle => $wr,
355   );
356   $loop->add( $stream );
357
358   my $done;
359   my $written = 0;
360   my $flushed;
361
362   $stream->write(
363      sub {
364         is( $_[0], $stream, 'Writersub $_[0] is $stream' );
365         return $done++ ? undef : "a lazy message\n";
366      },
367      on_write => sub { $written += $_[1] },
368      on_flush => sub { $flushed++ },
369   );
370
371   $flushed = 0;
372   wait_for { $flushed };
373
374   is( $written, 15, 'stream written by generator CODE invokes on_write' );
375
376   is( read_data( $rd ), "a lazy message\n", 'lazy data was written' );
377
378   my @chunks = ( "some ", "message chunks ", "here\n" );
379
380   $stream->write(
381      sub {
382         return shift @chunks;
383      },
384      on_flush => sub { $flushed++ },
385   );
386
387   $flushed = 0;
388   wait_for { $flushed };
389
390   is( read_data( $rd ), "some message chunks here\n", 'multiple lazy data was written' );
391
392   $loop->remove( $stream );
393}
394
395# ->write mixed returns
396{
397   my ( $rd, $wr ) = mkhandles;
398   my $stream = IO::Async::Stream->new(
399      write_handle => $wr,
400   );
401   $loop->add( $stream );
402
403   my $flushed;
404   $stream->write( my $future = $loop->new_future, on_flush => sub { $flushed++ } );
405
406   my $once = 0;
407   $future->done( sub {
408      return $once++ ? undef : ( $future = $loop->new_future );
409   });
410
411   wait_for { $once };
412
413   $future->done( "Eventual string" );
414
415   wait_for { $flushed };
416
417   is( read_data( $rd ), "Eventual string", 'multiple lazy data was written' );
418
419   $loop->remove( $stream );
420}
421
422{
423   my ( $rd, $wr ) = mkhandles;
424
425   my $stream = IO::Async::Stream->new;
426
427   my $flushed;
428
429   $stream->write( "Prequeued data", on_flush => sub { $flushed++ } );
430
431   $stream->configure( write_handle => $wr );
432
433   $loop->add( $stream );
434
435   wait_for { $flushed };
436
437   ok( 1, 'prequeued data gets flushed' );
438
439   is( read_data( $rd ), "Prequeued data", 'prequeued data gets written' );
440
441   $loop->remove( $stream );
442}
443
444# Errors
445{
446   my ( $rd, $wr ) = mkhandles;
447
448   no warnings 'redefine';
449   local *IO::Handle::syswrite = sub {
450      $! = ECONNRESET;
451      return undef;
452   };
453
454   my $write_errno;
455
456   my $stream = IO::Async::Stream->new(
457      write_handle => $wr,
458      on_write_error  => sub { ( undef, $write_errno ) = @_ },
459   );
460
461   $loop->add( $stream );
462
463   my $write_future = $stream->write( "hello" );
464
465   wait_for { defined $write_errno };
466
467   cmp_ok( $write_errno, "==", ECONNRESET, 'errno after failed write' );
468
469   ok( $write_future->is_ready,'write future ready after failed write' );
470   ok( $write_future->is_failed,'write future failed after failed write' );
471
472   $loop->remove( $stream );
473}
474
475{
476   my $stream = IO::Async::Stream->new_for_stdout;
477   is( $stream->write_handle, \*STDOUT, 'Stream->new_for_stdout->write_handle is STDOUT' );
478}
479
480# Metrics
481SKIP: {
482   skip "Metrics are unavailable" unless $IO::Async::Metrics::METRICS;
483   my ( $rd, $wr ) = mkhandles;
484
485   my $done;
486   my $stream = IO::Async::Stream->new(
487      write_handle => $wr,
488      on_outgoing_empty => sub { $done = 1 },
489   );
490   $stream->write( "X"x100 );
491
492   $loop->add( $stream );
493
494   is_metrics_from(
495      sub { wait_for { $done } },
496      { io_async_stream_written => 100 },
497      'Stream reading increments metric'
498   );
499
500   $loop->remove( $stream );
501}
502
503done_testing;
504