1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5
6package IO::Async::Channel;
7
8use strict;
9use warnings;
10use base qw( IO::Async::Notifier );
11
12our $VERSION = '0.800';
13
14use Carp;
15
16use IO::Async::Stream;
17
18=head1 NAME
19
20C<IO::Async::Channel> - pass values into or out from an L<IO::Async::Routine>
21
22=head1 DESCRIPTION
23
24A C<IO::Async::Channel> object allows Perl values to be passed into or out of
25an L<IO::Async::Routine>. It is intended to be used primarily with a Routine
26object rather than independently. For more detail and examples on how to use
27this object see also the documentation for L<IO::Async::Routine>.
28
29A Channel object is shared between the main process of the program and the
30process running within the Routine. In the main process it will be used in
31asynchronous mode, and in the Routine process it will be used in synchronous
32mode. In asynchronous mode all methods return immediately and use
33L<IO::Async>-style futures or callback functions. In synchronous within the
34Routine process the methods block until they are ready and may be used for
35flow-control within the routine. Alternatively, a Channel may be shared
36between two different Routine objects, and not used directly by the
37controlling program.
38
39The channel itself represents a FIFO of Perl reference values. New values may
40be put into the channel by the C<send> method in either mode. Values may be
41retrieved from it by the C<recv> method. Values inserted into the Channel are
42snapshot by the C<send> method. Any changes to referred variables will not be
43observed by the other end of the Channel after the C<send> method returns.
44
45=head1 PARAMETERS
46
47The following named parameters may be passed to C<new> or C<configure>:
48
49=head2 codec => STR
50
51Gives the name of the encoding method used to represent values over the
52channel.
53
54This can be set to C<Storable> to use the core L<Storable> module. As this
55only supports references, to pass a single scalar value, C<send> a SCALAR
56reference to it, and dereference the result of C<recv>.
57
58If the L<Sereal::Encoder> and L<Sereal::Decoder> modules are installed, this
59can be set to C<Sereal> instead, and will use those to perform the encoding
60and decoding. This optional dependency may give higher performance than using
61C<Storable>. If these modules are available, then this option is picked by
62default.
63
64=cut
65
66=head1 CONSTRUCTOR
67
68=cut
69
70=head2 new
71
72   $channel = IO::Async::Channel->new
73
74Returns a new C<IO::Async::Channel> object. This object reference itself
75should be shared by both sides of a C<fork()>ed process. After C<fork()> the
76two C<setup_*> methods may be used to configure the object for operation on
77either end.
78
79While this object does in fact inherit from L<IO::Async::Notifier>, it should
80not be added to a Loop object directly; event management will be handled by
81its containing L<IO::Async::Routine> object.
82
83=cut
84
85# Undocumented convenience constructors for running IaRoutine in 'spawn' mode
86sub new_sync
87{
88   my $class = shift;
89   my ( $fd ) = @_;
90
91   my $self = $class->new;
92   $self->setup_sync_mode( $fd );
93   return $self;
94}
95
96sub new_stdin  { shift->new_sync( \*STDIN  ); }
97sub new_stdout { shift->new_sync( \*STDOUT ); }
98
99sub DESTROY
100{
101   my $self = shift;
102   eval { $self->close }; # ignore any error
103}
104
105=head1 METHODS
106
107The following methods documented with a trailing call to C<< ->get >> return
108L<Future> instances.
109
110=cut
111
112=head2 configure
113
114   $channel->configure( %params )
115
116Similar to the standard C<configure> method on L<IO::Async::Notifier>, this is
117used to change details of the Channel's operation.
118
119=over 4
120
121=item on_recv => CODE
122
123May only be set on an async mode channel. If present, will be invoked whenever
124a new value is received, rather than using the C<recv> method.
125
126   $on_recv->( $channel, $data )
127
128=item on_eof => CODE
129
130May only be set on an async mode channel. If present, will be invoked when the
131channel gets closed by the peer.
132
133   $on_eof->( $channel )
134
135=back
136
137=cut
138
139my $DEFAULT_CODEC;
140sub _default_codec
141{
142   $DEFAULT_CODEC ||= do {
143      my $HAVE_SEREAL = defined eval {
144         require Sereal::Encoder; require Sereal::Decoder };
145      $HAVE_SEREAL ? "Sereal" : "Storable";
146   };
147}
148
149sub _init
150{
151   my $self = shift;
152   my ( $params ) = @_;
153
154   defined $params->{codec} or $params->{codec} = _default_codec;
155
156   $self->SUPER::_init( $params );
157}
158
159sub configure
160{
161   my $self = shift;
162   my %params = @_;
163
164   foreach (qw( on_recv on_eof )) {
165      next unless exists $params{$_};
166      $self->{mode} and $self->{mode} eq "async" or
167         croak "Can only configure $_ in async mode";
168
169      $self->{$_} = delete $params{$_};
170      $self->_build_stream;
171   }
172
173   if( my $codec = delete $params{codec} ) {
174      @{ $self }{qw( encode decode )} = (
175         $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
176      )->();
177   }
178
179   $self->SUPER::configure( %params );
180}
181
182sub _make_codec_Storable
183{
184   require Storable;
185
186   return
187      \&Storable::freeze,
188      \&Storable::thaw;
189}
190
191sub _make_codec_Sereal
192{
193   require Sereal::Encoder;
194   require Sereal::Decoder;
195
196   my $encoder;
197   my $decoder;
198
199   # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
200   # reset to undef in new threads. We should defend against that.
201
202   return
203      sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
204      sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
205}
206
207=head2 send
208
209   $channel->send( $data )
210
211Pushes the data stored in the given Perl reference into the FIFO of the
212Channel, where it can be received by the other end. When called on a
213synchronous mode Channel this method may block if a C<write()> call on the
214underlying filehandle blocks. When called on an asynchronous mode channel this
215method will not block.
216
217=cut
218
219my %SENDMETHODS;
220sub send
221{
222   my $self = shift;
223   my ( $data ) = @_;
224
225   defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
226
227   my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
228      or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
229
230   $self->$code( $data );
231}
232
233*_send_sync = *_send_async = sub {
234   my ( $self, $data ) = @_;
235   $self->send_encoded( $self->{encode}->( $data ) );
236};
237
238=head2 send_encoded
239
240   $channel->send_encoded( $record )
241
242A variant of the C<send> method; this method pushes the byte record given.
243This should be the result of a call to C<encode>.
244
245=cut
246
247sub send_encoded
248{
249   my $self = shift;
250   my ( $record ) = @_;
251
252   my $bytes = pack( "I", length $record ) . $record;
253
254   defined $self->{mode} or die "Cannot ->send without being set up";
255
256   return $self->_sendbytes_sync( $bytes )  if $self->{mode} eq "sync";
257   return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
258}
259
260=head2 encode
261
262   $record = $channel->encode( $data )
263
264Takes a Perl reference and returns a serialised string that can be passed to
265C<send_encoded>. The following two forms are equivalent
266
267   $channel->send( $data )
268   $channel->send_encoded( $channel->encode( $data ) )
269
270This is provided for the use-case where data needs to be serialised into a
271fixed string to "snapshot it" but not sent yet; the returned string can be
272saved and sent at a later time.
273
274   $record = IO::Async::Channel->encode( $data )
275
276This can also be used as a class method, in case it is inconvenient to operate
277on a particular object instance, or when one does not exist yet. In this case
278it will encode using whatever is the default codec for C<IO::Async::Channel>.
279
280=cut
281
282my $default_encode;
283sub encode
284{
285   my $self = shift;
286   my ( $data ) = @_;
287
288   return ( ref $self ?
289      $self->{encode} :
290      $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] }
291   )->( $data );
292}
293
294=head2 recv
295
296   $data = $channel->recv
297
298When called on a synchronous mode Channel this method will block until a Perl
299reference value is available from the other end and then return it. If the
300Channel is closed this method will return C<undef>. Since only references may
301be passed and all Perl references are true the truth of the result of this
302method can be used to detect that the channel is still open and has not yet
303been closed.
304
305   $data = $channel->recv->get
306
307When called on an asynchronous mode Channel this method returns a future which
308will eventually yield the next Perl reference value that becomes available
309from the other end. If the Channel is closed, the future will fail with an
310C<eof> failure.
311
312   $channel->recv( %args )
313
314When not returning a future, takes the following named arguments:
315
316=over 8
317
318=item on_recv => CODE
319
320Called when a new Perl reference value is available. Will be passed the
321Channel object and the reference data.
322
323   $on_recv->( $channel, $data )
324
325=item on_eof => CODE
326
327Called if the Channel was closed before a new value was ready. Will be passed
328the Channel object.
329
330   $on_eof->( $channel )
331
332=back
333
334=cut
335
336my %RECVMETHODS;
337sub recv
338{
339   my $self = shift;
340
341   defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
342
343   my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
344      or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
345
346   return $self->$code( @_ );
347}
348
349=head2 close
350
351   $channel->close
352
353Closes the channel. Causes a pending C<recv> on the other end to return undef
354or the queued C<on_eof> callbacks to be invoked.
355
356=cut
357
358my %CLOSEMETHODS;
359sub close
360{
361   my $self = shift;
362
363   defined( my $mode = $self->{mode} ) or return;
364
365   my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
366      or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
367
368   return $self->$code;
369}
370
371# Leave this undocumented for now
372sub setup_sync_mode
373{
374   my $self = shift;
375   ( $self->{fh} ) = @_;
376
377   $self->{mode} = "sync";
378
379   # Since we're communicating binary structures and not Unicode text we need to
380   # enable binmode
381   binmode $self->{fh};
382
383   defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle};
384   $self->{fh}->autoflush(1);
385}
386
387sub _read_exactly
388{
389   $_[1] = "";
390
391   while( length $_[1] < $_[2] ) {
392      my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
393      defined $n or return undef;
394      $n or return "";
395   }
396
397   return $_[2];
398}
399
400sub _recv_sync
401{
402   my $self = shift;
403
404   my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
405   defined $n or die "Cannot read - $!";
406   length $n or return undef;
407
408   my $len = unpack( "I", $lenbuffer );
409
410   $n = _read_exactly( $self->{fh}, my $record, $len );
411   defined $n or die "Cannot read - $!";
412   length $n or return undef;
413
414   return $self->{decode}->( $record );
415}
416
417sub _sendbytes_sync
418{
419   my $self = shift;
420   my ( $bytes ) = @_;
421   $self->{fh}->print( $bytes );
422}
423
424sub _close_sync
425{
426   my $self = shift;
427   $self->{fh}->close;
428}
429
430# Leave this undocumented for now
431sub setup_async_mode
432{
433   my $self = shift;
434   my %args = @_;
435
436   exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
437
438   keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
439
440   defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle};
441   $self->{mode} = "async";
442}
443
444sub _build_stream
445{
446   my $self = shift;
447   return $self->{stream} ||= do {
448      $self->{on_result_queue} = [];
449
450      my $stream = IO::Async::Stream->new(
451         read_handle  => $self->{read_handle},
452         write_handle => $self->{write_handle},
453         autoflush    => 1,
454         on_read      => $self->_capture_weakself( '_on_stream_read' )
455      );
456
457      $self->add_child( $stream );
458
459      $stream;
460   };
461}
462
463sub _sendbytes_async
464{
465   my $self = shift;
466   my ( $bytes ) = @_;
467   $self->_build_stream->write( $bytes );
468}
469
470sub _recv_async
471{
472   my $self = shift;
473   my %args = @_;
474
475   my $on_recv = $args{on_recv};
476   my $on_eof = $args{on_eof};
477
478   my $stream = $self->_build_stream;
479
480   my $f;
481   $f = $stream->loop->new_future unless !defined wantarray;
482
483   push @{ $self->{on_result_queue} }, sub {
484      my ( $self, $type, $result ) = @_;
485      if( $type eq "recv" ) {
486         $f->done( $result ) if $f and !$f->is_cancelled;
487         $on_recv->( $self, $result ) if $on_recv;
488      }
489      else {
490         $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
491         $on_eof->( $self ) if $on_eof;
492      }
493   };
494
495   return $f;
496}
497
498sub _close_async
499{
500   my $self = shift;
501   if( my $stream = $self->{stream} ) {
502      $stream->close_when_empty;
503   }
504   else {
505      $_ and $_->close for $self->{read_handle}, $self->{write_handle};
506   }
507
508   undef $_ for $self->{read_handle}, $self->{write_handle};
509}
510
511sub _on_stream_read
512{
513   my $self = shift or return;
514   my ( $stream, $buffref, $eof ) = @_;
515
516   if( $eof ) {
517      while( my $on_result = shift @{ $self->{on_result_queue} } ) {
518         $on_result->( $self, eof => );
519      }
520      $self->{on_eof}->( $self ) if $self->{on_eof};
521      return;
522   }
523
524   return 0 unless length( $$buffref ) >= 4;
525   my $len = unpack( "I", $$buffref );
526   return 0 unless length( $$buffref ) >= 4 + $len;
527
528   my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
529   substr( $$buffref, 0, 4 + $len ) = "";
530
531   if( my $on_result = shift @{ $self->{on_result_queue} } ) {
532      $on_result->( $self, recv => $record );
533   }
534   else {
535      $self->{on_recv}->( $self, $record );
536   }
537
538   return 1;
539}
540
541sub _extract_read_handle
542{
543   my $self = shift;
544
545   return undef if !$self->{mode};
546
547   croak "Cannot extract filehandle" if $self->{mode} ne "async";
548   $self->{mode} = "dead";
549
550   return $self->{read_handle};
551}
552
553sub _extract_write_handle
554{
555   my $self = shift;
556
557   return undef if !$self->{mode};
558
559   croak "Cannot extract filehandle" if $self->{mode} ne "async";
560   $self->{mode} = "dead";
561
562   return $self->{write_handle};
563}
564
565=head1 AUTHOR
566
567Paul Evans <leonerd@leonerd.org.uk>
568
569=cut
570
5710x55AA;
572