1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk 5 6package IO::Async::Channel; 7 8use strict; 9use warnings; 10use base qw( IO::Async::Notifier ); 11 12our $VERSION = '0.800'; 13 14use Carp; 15 16use IO::Async::Stream; 17 18=head1 NAME 19 20C<IO::Async::Channel> - pass values into or out from an L<IO::Async::Routine> 21 22=head1 DESCRIPTION 23 24A C<IO::Async::Channel> object allows Perl values to be passed into or out of 25an L<IO::Async::Routine>. It is intended to be used primarily with a Routine 26object rather than independently. For more detail and examples on how to use 27this object see also the documentation for L<IO::Async::Routine>. 28 29A Channel object is shared between the main process of the program and the 30process running within the Routine. In the main process it will be used in 31asynchronous mode, and in the Routine process it will be used in synchronous 32mode. In asynchronous mode all methods return immediately and use 33L<IO::Async>-style futures or callback functions. In synchronous within the 34Routine process the methods block until they are ready and may be used for 35flow-control within the routine. Alternatively, a Channel may be shared 36between two different Routine objects, and not used directly by the 37controlling program. 38 39The channel itself represents a FIFO of Perl reference values. New values may 40be put into the channel by the C<send> method in either mode. Values may be 41retrieved from it by the C<recv> method. Values inserted into the Channel are 42snapshot by the C<send> method. Any changes to referred variables will not be 43observed by the other end of the Channel after the C<send> method returns. 44 45=head1 PARAMETERS 46 47The following named parameters may be passed to C<new> or C<configure>: 48 49=head2 codec => STR 50 51Gives the name of the encoding method used to represent values over the 52channel. 53 54This can be set to C<Storable> to use the core L<Storable> module. As this 55only supports references, to pass a single scalar value, C<send> a SCALAR 56reference to it, and dereference the result of C<recv>. 57 58If the L<Sereal::Encoder> and L<Sereal::Decoder> modules are installed, this 59can be set to C<Sereal> instead, and will use those to perform the encoding 60and decoding. This optional dependency may give higher performance than using 61C<Storable>. If these modules are available, then this option is picked by 62default. 63 64=cut 65 66=head1 CONSTRUCTOR 67 68=cut 69 70=head2 new 71 72 $channel = IO::Async::Channel->new 73 74Returns a new C<IO::Async::Channel> object. This object reference itself 75should be shared by both sides of a C<fork()>ed process. After C<fork()> the 76two C<setup_*> methods may be used to configure the object for operation on 77either end. 78 79While this object does in fact inherit from L<IO::Async::Notifier>, it should 80not be added to a Loop object directly; event management will be handled by 81its containing L<IO::Async::Routine> object. 82 83=cut 84 85# Undocumented convenience constructors for running IaRoutine in 'spawn' mode 86sub new_sync 87{ 88 my $class = shift; 89 my ( $fd ) = @_; 90 91 my $self = $class->new; 92 $self->setup_sync_mode( $fd ); 93 return $self; 94} 95 96sub new_stdin { shift->new_sync( \*STDIN ); } 97sub new_stdout { shift->new_sync( \*STDOUT ); } 98 99sub DESTROY 100{ 101 my $self = shift; 102 eval { $self->close }; # ignore any error 103} 104 105=head1 METHODS 106 107The following methods documented with a trailing call to C<< ->get >> return 108L<Future> instances. 109 110=cut 111 112=head2 configure 113 114 $channel->configure( %params ) 115 116Similar to the standard C<configure> method on L<IO::Async::Notifier>, this is 117used to change details of the Channel's operation. 118 119=over 4 120 121=item on_recv => CODE 122 123May only be set on an async mode channel. If present, will be invoked whenever 124a new value is received, rather than using the C<recv> method. 125 126 $on_recv->( $channel, $data ) 127 128=item on_eof => CODE 129 130May only be set on an async mode channel. If present, will be invoked when the 131channel gets closed by the peer. 132 133 $on_eof->( $channel ) 134 135=back 136 137=cut 138 139my $DEFAULT_CODEC; 140sub _default_codec 141{ 142 $DEFAULT_CODEC ||= do { 143 my $HAVE_SEREAL = defined eval { 144 require Sereal::Encoder; require Sereal::Decoder }; 145 $HAVE_SEREAL ? "Sereal" : "Storable"; 146 }; 147} 148 149sub _init 150{ 151 my $self = shift; 152 my ( $params ) = @_; 153 154 defined $params->{codec} or $params->{codec} = _default_codec; 155 156 $self->SUPER::_init( $params ); 157} 158 159sub configure 160{ 161 my $self = shift; 162 my %params = @_; 163 164 foreach (qw( on_recv on_eof )) { 165 next unless exists $params{$_}; 166 $self->{mode} and $self->{mode} eq "async" or 167 croak "Can only configure $_ in async mode"; 168 169 $self->{$_} = delete $params{$_}; 170 $self->_build_stream; 171 } 172 173 if( my $codec = delete $params{codec} ) { 174 @{ $self }{qw( encode decode )} = ( 175 $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'" 176 )->(); 177 } 178 179 $self->SUPER::configure( %params ); 180} 181 182sub _make_codec_Storable 183{ 184 require Storable; 185 186 return 187 \&Storable::freeze, 188 \&Storable::thaw; 189} 190 191sub _make_codec_Sereal 192{ 193 require Sereal::Encoder; 194 require Sereal::Decoder; 195 196 my $encoder; 197 my $decoder; 198 199 # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get 200 # reset to undef in new threads. We should defend against that. 201 202 return 203 sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) }, 204 sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) }; 205} 206 207=head2 send 208 209 $channel->send( $data ) 210 211Pushes the data stored in the given Perl reference into the FIFO of the 212Channel, where it can be received by the other end. When called on a 213synchronous mode Channel this method may block if a C<write()> call on the 214underlying filehandle blocks. When called on an asynchronous mode channel this 215method will not block. 216 217=cut 218 219my %SENDMETHODS; 220sub send 221{ 222 my $self = shift; 223 my ( $data ) = @_; 224 225 defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up"; 226 227 my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) ) 228 or die "IO::Async::Channel cannot send in unrecognised mode '$mode'"; 229 230 $self->$code( $data ); 231} 232 233*_send_sync = *_send_async = sub { 234 my ( $self, $data ) = @_; 235 $self->send_encoded( $self->{encode}->( $data ) ); 236}; 237 238=head2 send_encoded 239 240 $channel->send_encoded( $record ) 241 242A variant of the C<send> method; this method pushes the byte record given. 243This should be the result of a call to C<encode>. 244 245=cut 246 247sub send_encoded 248{ 249 my $self = shift; 250 my ( $record ) = @_; 251 252 my $bytes = pack( "I", length $record ) . $record; 253 254 defined $self->{mode} or die "Cannot ->send without being set up"; 255 256 return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync"; 257 return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async"; 258} 259 260=head2 encode 261 262 $record = $channel->encode( $data ) 263 264Takes a Perl reference and returns a serialised string that can be passed to 265C<send_encoded>. The following two forms are equivalent 266 267 $channel->send( $data ) 268 $channel->send_encoded( $channel->encode( $data ) ) 269 270This is provided for the use-case where data needs to be serialised into a 271fixed string to "snapshot it" but not sent yet; the returned string can be 272saved and sent at a later time. 273 274 $record = IO::Async::Channel->encode( $data ) 275 276This can also be used as a class method, in case it is inconvenient to operate 277on a particular object instance, or when one does not exist yet. In this case 278it will encode using whatever is the default codec for C<IO::Async::Channel>. 279 280=cut 281 282my $default_encode; 283sub encode 284{ 285 my $self = shift; 286 my ( $data ) = @_; 287 288 return ( ref $self ? 289 $self->{encode} : 290 $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] } 291 )->( $data ); 292} 293 294=head2 recv 295 296 $data = $channel->recv 297 298When called on a synchronous mode Channel this method will block until a Perl 299reference value is available from the other end and then return it. If the 300Channel is closed this method will return C<undef>. Since only references may 301be passed and all Perl references are true the truth of the result of this 302method can be used to detect that the channel is still open and has not yet 303been closed. 304 305 $data = $channel->recv->get 306 307When called on an asynchronous mode Channel this method returns a future which 308will eventually yield the next Perl reference value that becomes available 309from the other end. If the Channel is closed, the future will fail with an 310C<eof> failure. 311 312 $channel->recv( %args ) 313 314When not returning a future, takes the following named arguments: 315 316=over 8 317 318=item on_recv => CODE 319 320Called when a new Perl reference value is available. Will be passed the 321Channel object and the reference data. 322 323 $on_recv->( $channel, $data ) 324 325=item on_eof => CODE 326 327Called if the Channel was closed before a new value was ready. Will be passed 328the Channel object. 329 330 $on_eof->( $channel ) 331 332=back 333 334=cut 335 336my %RECVMETHODS; 337sub recv 338{ 339 my $self = shift; 340 341 defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up"; 342 343 my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) ) 344 or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'"; 345 346 return $self->$code( @_ ); 347} 348 349=head2 close 350 351 $channel->close 352 353Closes the channel. Causes a pending C<recv> on the other end to return undef 354or the queued C<on_eof> callbacks to be invoked. 355 356=cut 357 358my %CLOSEMETHODS; 359sub close 360{ 361 my $self = shift; 362 363 defined( my $mode = $self->{mode} ) or return; 364 365 my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) ) 366 or die "IO::Async::Channel cannot close in unrecognised mode '$mode'"; 367 368 return $self->$code; 369} 370 371# Leave this undocumented for now 372sub setup_sync_mode 373{ 374 my $self = shift; 375 ( $self->{fh} ) = @_; 376 377 $self->{mode} = "sync"; 378 379 # Since we're communicating binary structures and not Unicode text we need to 380 # enable binmode 381 binmode $self->{fh}; 382 383 defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle}; 384 $self->{fh}->autoflush(1); 385} 386 387sub _read_exactly 388{ 389 $_[1] = ""; 390 391 while( length $_[1] < $_[2] ) { 392 my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] ); 393 defined $n or return undef; 394 $n or return ""; 395 } 396 397 return $_[2]; 398} 399 400sub _recv_sync 401{ 402 my $self = shift; 403 404 my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 ); 405 defined $n or die "Cannot read - $!"; 406 length $n or return undef; 407 408 my $len = unpack( "I", $lenbuffer ); 409 410 $n = _read_exactly( $self->{fh}, my $record, $len ); 411 defined $n or die "Cannot read - $!"; 412 length $n or return undef; 413 414 return $self->{decode}->( $record ); 415} 416 417sub _sendbytes_sync 418{ 419 my $self = shift; 420 my ( $bytes ) = @_; 421 $self->{fh}->print( $bytes ); 422} 423 424sub _close_sync 425{ 426 my $self = shift; 427 $self->{fh}->close; 428} 429 430# Leave this undocumented for now 431sub setup_async_mode 432{ 433 my $self = shift; 434 my %args = @_; 435 436 exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle ); 437 438 keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args ); 439 440 defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle}; 441 $self->{mode} = "async"; 442} 443 444sub _build_stream 445{ 446 my $self = shift; 447 return $self->{stream} ||= do { 448 $self->{on_result_queue} = []; 449 450 my $stream = IO::Async::Stream->new( 451 read_handle => $self->{read_handle}, 452 write_handle => $self->{write_handle}, 453 autoflush => 1, 454 on_read => $self->_capture_weakself( '_on_stream_read' ) 455 ); 456 457 $self->add_child( $stream ); 458 459 $stream; 460 }; 461} 462 463sub _sendbytes_async 464{ 465 my $self = shift; 466 my ( $bytes ) = @_; 467 $self->_build_stream->write( $bytes ); 468} 469 470sub _recv_async 471{ 472 my $self = shift; 473 my %args = @_; 474 475 my $on_recv = $args{on_recv}; 476 my $on_eof = $args{on_eof}; 477 478 my $stream = $self->_build_stream; 479 480 my $f; 481 $f = $stream->loop->new_future unless !defined wantarray; 482 483 push @{ $self->{on_result_queue} }, sub { 484 my ( $self, $type, $result ) = @_; 485 if( $type eq "recv" ) { 486 $f->done( $result ) if $f and !$f->is_cancelled; 487 $on_recv->( $self, $result ) if $on_recv; 488 } 489 else { 490 $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled; 491 $on_eof->( $self ) if $on_eof; 492 } 493 }; 494 495 return $f; 496} 497 498sub _close_async 499{ 500 my $self = shift; 501 if( my $stream = $self->{stream} ) { 502 $stream->close_when_empty; 503 } 504 else { 505 $_ and $_->close for $self->{read_handle}, $self->{write_handle}; 506 } 507 508 undef $_ for $self->{read_handle}, $self->{write_handle}; 509} 510 511sub _on_stream_read 512{ 513 my $self = shift or return; 514 my ( $stream, $buffref, $eof ) = @_; 515 516 if( $eof ) { 517 while( my $on_result = shift @{ $self->{on_result_queue} } ) { 518 $on_result->( $self, eof => ); 519 } 520 $self->{on_eof}->( $self ) if $self->{on_eof}; 521 return; 522 } 523 524 return 0 unless length( $$buffref ) >= 4; 525 my $len = unpack( "I", $$buffref ); 526 return 0 unless length( $$buffref ) >= 4 + $len; 527 528 my $record = $self->{decode}->( substr( $$buffref, 4, $len ) ); 529 substr( $$buffref, 0, 4 + $len ) = ""; 530 531 if( my $on_result = shift @{ $self->{on_result_queue} } ) { 532 $on_result->( $self, recv => $record ); 533 } 534 else { 535 $self->{on_recv}->( $self, $record ); 536 } 537 538 return 1; 539} 540 541sub _extract_read_handle 542{ 543 my $self = shift; 544 545 return undef if !$self->{mode}; 546 547 croak "Cannot extract filehandle" if $self->{mode} ne "async"; 548 $self->{mode} = "dead"; 549 550 return $self->{read_handle}; 551} 552 553sub _extract_write_handle 554{ 555 my $self = shift; 556 557 return undef if !$self->{mode}; 558 559 croak "Cannot extract filehandle" if $self->{mode} ne "async"; 560 $self->{mode} = "dead"; 561 562 return $self->{write_handle}; 563} 564 565=head1 AUTHOR 566 567Paul Evans <leonerd@leonerd.org.uk> 568 569=cut 570 5710x55AA; 572