1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk 5 6package IO::Async::Routine; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.800'; 12 13use base qw( IO::Async::Notifier ); 14 15use Carp; 16 17use IO::Async::OS; 18use IO::Async::Process; 19 20use Struct::Dumb qw( readonly_struct ); 21 22=head1 NAME 23 24C<IO::Async::Routine> - execute code in an independent sub-process or thread 25 26=head1 SYNOPSIS 27 28 use IO::Async::Routine; 29 use IO::Async::Channel; 30 31 use IO::Async::Loop; 32 my $loop = IO::Async::Loop->new; 33 34 my $nums_ch = IO::Async::Channel->new; 35 my $ret_ch = IO::Async::Channel->new; 36 37 my $routine = IO::Async::Routine->new( 38 channels_in => [ $nums_ch ], 39 channels_out => [ $ret_ch ], 40 41 code => sub { 42 my @nums = @{ $nums_ch->recv }; 43 my $ret = 0; $ret += $_ for @nums; 44 45 # Can only send references 46 $ret_ch->send( \$ret ); 47 }, 48 49 on_finish => sub { 50 say "The routine aborted early - $_[-1]"; 51 $loop->stop; 52 }, 53 ); 54 55 $loop->add( $routine ); 56 57 $nums_ch->send( [ 10, 20, 30 ] ); 58 $ret_ch->recv( 59 on_recv => sub { 60 my ( $ch, $totalref ) = @_; 61 say "The total of 10, 20, 30 is: $$totalref"; 62 $loop->stop; 63 } 64 ); 65 66 $loop->run; 67 68=head1 DESCRIPTION 69 70This L<IO::Async::Notifier> contains a body of code and executes it in a 71sub-process or thread, allowing it to act independently of the main program. 72Once set up, all communication with the code happens by values passed into or 73out of the Routine via L<IO::Async::Channel> objects. 74 75The code contained within the Routine is free to make blocking calls without 76stalling the rest of the program. This makes it useful for using existing code 77which has no option not to block within an L<IO::Async>-based program. 78 79To create asynchronous wrappers of functions that return a value based only on 80their arguments, and do not generally maintain state within the process it may 81be more convenient to use an L<IO::Async::Function> instead, which uses an 82C<IO::Async::Routine> to contain the body of the function and manages the 83Channels itself. 84 85=head2 Models 86 87A choice of detachment model is available. Each has various advantages and 88disadvantages. Not all of them may be available on a particular system. 89 90=head3 The C<fork> model 91 92The code in this model runs within its own process, created by calling 93C<fork()> from the main process. It is isolated from the rest of the program 94in terms of memory, CPU time, and other resources. Because it is started 95using C<fork()>, the initial process state is a clone of the main process. 96 97This model performs well on UNIX-like operating systems which possess a true 98native C<fork()> system call, but is not available on C<MSWin32> for example, 99because the operating system does not provide full fork-like semantics. 100 101=head3 The C<thread> model 102 103The code in this model runs inside a separate thread within the main process. 104It therefore shares memory and other resources such as open filehandles with 105the main thread. As with the C<fork> model, the initial thread state is cloned 106from the main controlling thread. 107 108This model is only available on perls built to support threading. 109 110=head3 The C<spawn> model 111 112I<Since version 0.79.> 113 114The code in this model runs within its own freshly-created process running 115another copy of the perl interpreter. Similar to the C<fork> model it 116therefore has its own memory, CPU time, and other resources. However, since it 117is started freshly rather than by cloning the main process, it starts up in a 118clean state, without any shared resources from its parent. 119 120Since this model creates a new fresh process rather than sharing existing 121state, it cannot use the C<code> argument to specify the routine body; it must 122instead use only the C<module> and C<func> arguments. 123 124In the current implementation this model requires exactly one input channel 125and exactly one output channel; both must be present, and there cannot be more 126than one of either. 127 128=cut 129 130=head1 EVENTS 131 132=head2 on_finish $exitcode 133 134For C<fork()>-based Routines, this is invoked after the process has exited and 135is passed the raw exitcode status. 136 137=head2 on_finish $type, @result 138 139For thread-based Routines, this is invoked after the thread has returned from 140its code block and is passed the C<on_joined> result. 141 142As the behaviour of these events differs per model, it may be more convenient 143to use C<on_return> and C<on_die> instead. 144 145=head2 on_return $result 146 147Invoked if the code block returns normally. Note that C<fork()>-based Routines 148can only transport an integer result between 0 and 255, as this is the actual 149C<exit()> value. 150 151=head2 on_die $exception 152 153Invoked if the code block fails with an exception. 154 155=cut 156 157=head1 PARAMETERS 158 159The following named parameters may be passed to C<new> or C<configure>: 160 161=head2 model => "fork" | "thread" | "spawn" 162 163Optional. Defines how the routine will detach itself from the main process. 164See the L</Models> section above for more detail. 165 166If the model is not specified, the environment variable 167C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined, 168C<fork> is preferred if it is available, otherwise C<thread>. 169 170=head2 channels_in => ARRAY of IO::Async::Channel 171 172ARRAY reference of L<IO::Async::Channel> objects to set up for passing values 173in to the Routine. 174 175=head2 channels_out => ARRAY of IO::Async::Channel 176 177ARRAY reference of L<IO::Async::Channel> objects to set up for passing values 178out of the Routine. 179 180=head2 code => CODE 181 182CODE reference to the body of the Routine, to execute once the channels are 183set up. 184 185When using the C<spawn> model, this is not permitted; you must use C<module> 186and C<func> instead. 187 188=head2 module => STRING 189 190=head2 func => STRING 191 192I<Since version 0.79.> 193 194An alternative to the C<code> argument, which names a module to load and a 195function to call within it. C<module> should give a perl module name (i.e. 196C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give 197the basename of a function within that module (i.e. without the module name 198prefixed). It will be invoked as the main code body of the object, and passed 199in a list of all the channels; first the input ones then the output ones. 200 201 module::func( @channels_in, @channels_out ) 202 203=head2 setup => ARRAY 204 205Optional. For C<fork()>-based Routines, gives a reference to an array to pass 206to the underlying C<Loop> C<fork_child> method. Ignored for thread-based 207Routines. 208 209=cut 210 211use constant PREFERRED_MODEL => 212 IO::Async::OS->HAVE_POSIX_FORK ? "fork" : 213 IO::Async::OS->HAVE_THREADS ? "thread" : 214 die "No viable Routine models"; 215 216sub _init 217{ 218 my $self = shift; 219 my ( $params ) = @_; 220 221 $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL; 222 223 $self->SUPER::_init( @_ ); 224} 225 226my %SETUP_CODE; 227 228sub configure 229{ 230 my $self = shift; 231 my %params = @_; 232 233 # TODO: Can only reconfigure when not running 234 foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) { 235 $self->{$_} = delete $params{$_} if exists $params{$_}; 236 } 237 238 defined $self->{code} and defined $self->{func} and 239 croak "Cannot ->configure both 'code' and 'func'"; 240 defined $self->{func} and !defined $self->{module} and 241 croak "'func' parameter requires a 'module' as well"; 242 243 if( defined( my $model = delete $params{model} ) ) { 244 ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) ) 245 or die "Unrecognised Routine model $model"; 246 247 # TODO: optional plugin "configure" check here? 248 $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and 249 croak "Cannot use 'fork' model as fork() is not available"; 250 $model eq "thread" and !IO::Async::OS->HAVE_THREADS and 251 croak "Cannot use 'thread' model as threads are not available"; 252 253 $self->{model} = $model; 254 } 255 256 $self->SUPER::configure( %params ); 257} 258 259sub _add_to_loop 260{ 261 my $self = shift; 262 my ( $loop ) = @_; 263 $self->SUPER::_add_to_loop( $loop ); 264 265 my $model = $self->{model}; 266 267 my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) ) 268 or die "Unrecognised Routine model $model"; 269 270 $self->$code(); 271} 272 273readonly_struct ChannelSetup => [qw( chan myfd otherfd )]; 274 275sub _create_channels_in 276{ 277 my $self = shift; 278 279 my @channels_in; 280 281 foreach my $ch ( @{ $self->{channels_in} || [] } ) { 282 my ( $rd, $wr ); 283 unless( $rd = $ch->_extract_read_handle ) { 284 ( $rd, $wr ) = IO::Async::OS->pipepair; 285 } 286 push @channels_in, ChannelSetup( $ch, $wr, $rd ); 287 } 288 289 return @channels_in; 290} 291 292sub _create_channels_out 293{ 294 my $self = shift; 295 296 my @channels_out; 297 298 foreach my $ch ( @{ $self->{channels_out} || [] } ) { 299 my ( $rd, $wr ); 300 unless( $wr = $ch->_extract_write_handle ) { 301 ( $rd, $wr ) = IO::Async::OS->pipepair; 302 } 303 push @channels_out, ChannelSetup( $ch, $rd, $wr ); 304 } 305 306 return @channels_out; 307} 308 309sub _adopt_channels_in 310{ 311 my $self = shift; 312 my ( @channels_in ) = @_; 313 314 foreach ( @channels_in ) { 315 my $ch = $_->chan; 316 $ch->setup_async_mode( write_handle => $_->myfd ); 317 $self->add_child( $ch ) unless $ch->parent; 318 } 319} 320 321sub _adopt_channels_out 322{ 323 my $self = shift; 324 my ( @channels_out ) = @_; 325 326 foreach ( @channels_out ) { 327 my $ch = $_->chan; 328 $ch->setup_async_mode( read_handle => $_->myfd ); 329 $self->add_child( $ch ) unless $ch->parent; 330 } 331} 332 333sub _setup_fork 334{ 335 my $self = shift; 336 337 my @channels_in = $self->_create_channels_in; 338 my @channels_out = $self->_create_channels_out; 339 340 my $code = $self->{code}; 341 342 my $module = $self->{module}; 343 my $func = $self->{func}; 344 345 my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out; 346 347 my $setup = $self->{setup}; 348 push @setup, @$setup if $setup; 349 350 my $process = IO::Async::Process->new( 351 setup => \@setup, 352 code => sub { 353 foreach ( @channels_in, @channels_out ) { 354 $_->chan->setup_sync_mode( $_->otherfd ); 355 } 356 357 if( defined $module ) { 358 ( my $file = "$module.pm" ) =~ s{::}{/}g; 359 require $file; 360 361 $code = $module->can( $func ) or 362 die "Module '$module' has no '$func'\n"; 363 } 364 365 my $ret = $code->( map { $_->chan } @channels_in, @channels_out ); 366 367 foreach ( @channels_in, @channels_out ) { 368 $_->chan->close; 369 } 370 371 return $ret; 372 }, 373 on_finish => $self->_replace_weakself( sub { 374 my $self = shift or return; 375 my ( $exitcode ) = @_; 376 $self->maybe_invoke_event( on_finish => $exitcode ); 377 378 unless( $exitcode & 0x7f ) { 379 $self->maybe_invoke_event( on_return => ($exitcode >> 8) ); 380 $self->result_future->done( $exitcode >> 8 ); 381 } 382 }), 383 on_exception => $self->_replace_weakself( sub { 384 my $self = shift or return; 385 my ( $exception, $errno, $exitcode ) = @_; 386 387 $self->maybe_invoke_event( on_die => $exception ); 388 $self->result_future->fail( $exception, routine => ); 389 }), 390 ); 391 392 $self->_adopt_channels_in ( @channels_in ); 393 $self->_adopt_channels_out( @channels_out ); 394 395 $self->add_child( $self->{process} = $process ); 396 $self->{id} = "P" . $process->pid; 397 398 $_->otherfd->close for @channels_in, @channels_out; 399} 400 401sub _setup_thread 402{ 403 my $self = shift; 404 405 my @channels_in = $self->_create_channels_in; 406 my @channels_out = $self->_create_channels_out; 407 408 my $code = $self->{code}; 409 410 my $module = $self->{module}; 411 my $func = $self->{func}; 412 413 my $tid = $self->loop->create_thread( 414 code => sub { 415 foreach ( @channels_in, @channels_out ) { 416 $_->chan->setup_sync_mode( $_->otherfd ); 417 defined and $_->close for $_->myfd; 418 } 419 420 if( defined $func ) { 421 ( my $file = "$module.pm" ) =~ s{::}{/}g; 422 require $file; 423 424 $code = $module->can( $func ) or 425 die "Module '$module' has no '$func'\n"; 426 } 427 428 my $ret = $code->( map { $_->chan } @channels_in, @channels_out ); 429 430 foreach ( @channels_in, @channels_out ) { 431 $_->chan->close; 432 } 433 434 return $ret; 435 }, 436 on_joined => $self->_capture_weakself( sub { 437 my $self = shift or return; 438 my ( $ev, @result ) = @_; 439 $self->maybe_invoke_event( on_finish => @_ ); 440 441 if( $ev eq "return" ) { 442 $self->maybe_invoke_event( on_return => @result ); 443 $self->result_future->done( @result ); 444 } 445 if( $ev eq "died" ) { 446 $self->maybe_invoke_event( on_die => $result[0] ); 447 $self->result_future->fail( $result[0], routine => ); 448 } 449 450 delete $self->{tid}; 451 }), 452 ); 453 454 $self->{tid} = $tid; 455 $self->{id} = "T" . $tid; 456 457 $self->_adopt_channels_in ( @channels_in ); 458 $self->_adopt_channels_out( @channels_out ); 459 460 $_->otherfd->close for @channels_in, @channels_out; 461} 462 463# The injected program that goes into spawn mode 464use constant PERL_RUNNER => <<'EOF'; 465( my ( $module, $func ), @INC ) = @ARGV; 466( my $file = "$module.pm" ) =~ s{::}{/}g; 467require $file; 468my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n"; 469require IO::Async::Channel; 470exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout ); 471EOF 472 473sub _setup_spawn 474{ 475 my $self = shift; 476 477 $self->{code} and 478 die "Cannot run IO::Async::Routine in 'spawn' with code\n"; 479 480 @{ $self->{channels_in} } == 1 or 481 die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n"; 482 @{ $self->{channels_out} } == 1 or 483 die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n"; 484 485 my @channels_in = $self->_create_channels_in; 486 my @channels_out = $self->_create_channels_out; 487 488 my $module = $self->{module}; 489 my $func = $self->{func}; 490 491 my $process = IO::Async::Process->new( 492 setup => [ 493 stdin => $channels_in[0]->otherfd, 494 stdout => $channels_out[0]->otherfd, 495 ], 496 command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ], 497 on_finish => $self->_replace_weakself( sub { 498 my $self = shift or return; 499 my ( $exitcode ) = @_; 500 $self->maybe_invoke_event( on_finish => $exitcode ); 501 502 unless( $exitcode & 0x7f ) { 503 $self->maybe_invoke_event( on_return => ($exitcode >> 8) ); 504 $self->result_future->done( $exitcode >> 8 ); 505 } 506 }), 507 on_exception => $self->_replace_weakself( sub { 508 my $self = shift or return; 509 my ( $exception, $errno, $exitcode ) = @_; 510 511 $self->maybe_invoke_event( on_die => $exception ); 512 $self->result_future->fail( $exception, routine => ); 513 }), 514 ); 515 516 $self->_adopt_channels_in ( @channels_in ); 517 $self->_adopt_channels_out( @channels_out ); 518 519 $self->add_child( $self->{process} = $process ); 520 $self->{id} = "P" . $process->pid; 521 522 $_->otherfd->close for @channels_in, @channels_out; 523} 524 525=head1 METHODS 526 527=cut 528 529=head2 id 530 531 $id = $routine->id 532 533Returns an ID string that uniquely identifies the Routine out of all the 534currently-running ones. (The ID of already-exited Routines may be reused, 535however.) 536 537=cut 538 539sub id 540{ 541 my $self = shift; 542 return $self->{id}; 543} 544 545=head2 model 546 547 $model = $routine->model 548 549Returns the detachment model in use by the Routine. 550 551=cut 552 553sub model 554{ 555 my $self = shift; 556 return $self->{model}; 557} 558 559=head2 kill 560 561 $routine->kill( $signal ) 562 563Sends the specified signal to the routine code. This is either implemented by 564C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case 565this has the usual limits of signal delivery to threads; namely, that it works 566at the Perl interpreter level, and cannot actually interrupt blocking system 567calls. 568 569=cut 570 571sub kill 572{ 573 my $self = shift; 574 my ( $signal ) = @_; 575 576 $self->{process}->kill( $signal ) if $self->{model} eq "fork"; 577 threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread"; 578} 579 580=head2 result_future 581 582 $f = $routine->result_future 583 584I<Since version 0.75.> 585 586Returns a new C<IO::Async::Future> which will complete with the eventual 587return value or exception when the routine finishes. 588 589If the routine finishes with a successful result then this will be the C<done> 590result of the future. If the routine fails with an exception then this will be 591the C<fail> result. 592 593=cut 594 595sub result_future 596{ 597 my $self = shift; 598 599 return $self->{result_future} //= do { 600 my $f = $self->loop->new_future; 601 # This future needs to strongly retain $self to ensure it definitely gets 602 # notified 603 $f->on_ready( sub { undef $self } ); 604 $f; 605 }; 606} 607 608=head1 AUTHOR 609 610Paul Evans <leonerd@leonerd.org.uk> 611 612=cut 613 6140x55AA; 615