1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk 5 6package IO::Async::Function; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.800'; 12 13use base qw( IO::Async::Notifier ); 14use IO::Async::Timer::Countdown; 15 16use Carp; 17 18use List::Util qw( first ); 19 20use Struct::Dumb qw( readonly_struct ); 21 22readonly_struct Pending => [qw( priority f )]; 23 24=head1 NAME 25 26C<IO::Async::Function> - call a function asynchronously 27 28=head1 SYNOPSIS 29 30 use IO::Async::Function; 31 32 use IO::Async::Loop; 33 my $loop = IO::Async::Loop->new; 34 35 my $function = IO::Async::Function->new( 36 code => sub { 37 my ( $number ) = @_; 38 return is_prime( $number ); 39 }, 40 ); 41 42 $loop->add( $function ); 43 44 $function->call( 45 args => [ 123454321 ], 46 )->on_done( sub { 47 my $isprime = shift; 48 print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n"; 49 })->on_fail( sub { 50 print STDERR "Cannot determine if it's prime - $_[0]\n"; 51 })->get; 52 53=head1 DESCRIPTION 54 55This subclass of L<IO::Async::Notifier> wraps a function body in a collection 56of worker processes, to allow it to execute independently of the main process. 57The object acts as a proxy to the function, allowing invocations to be made by 58passing in arguments, and invoking a continuation in the main process when the 59function returns. 60 61The object represents the function code itself, rather than one specific 62invocation of it. It can be called multiple times, by the C<call> method. 63Multiple outstanding invocations can be called; they will be dispatched in 64the order they were queued. If only one worker process is used then results 65will be returned in the order they were called. If multiple are used, then 66each request will be sent in the order called, but timing differences between 67each worker may mean results are returned in a different order. 68 69Since the code block will be called multiple times within the same child 70process, it must take care not to modify any of its state that might affect 71subsequent calls. Since it executes in a child process, it cannot make any 72modifications to the state of the parent program. Therefore, all the data 73required to perform its task must be represented in the call arguments, and 74all of the result must be represented in the return values. 75 76The Function object is implemented using an L<IO::Async::Routine> with two 77L<IO::Async::Channel> objects to pass calls into and results out from it. 78 79The L<IO::Async> framework generally provides mechanisms for multiplexing IO 80tasks between different handles, so there aren't many occasions when such an 81asynchronous function is necessary. Two cases where this does become useful 82are: 83 84=over 4 85 86=item 1. 87 88When a large amount of computationally-intensive work needs to be performed 89(for example, the C<is_prime> test in the example in the C<SYNOPSIS>). 90 91=item 2. 92 93When a blocking OS syscall or library-level function needs to be called, and 94no nonblocking or asynchronous version is supplied. This is used by 95L<IO::Async::Resolver>. 96 97=back 98 99This object is ideal for representing "pure" functions; that is, blocks of 100code which have no stateful effect on the process, and whose result depends 101only on the arguments passed in. For a more general co-routine ability, see 102also L<IO::Async::Routine>. 103 104=cut 105 106=head1 PARAMETERS 107 108The following named parameters may be passed to C<new> or C<configure>: 109 110=head2 code => CODE 111 112The body of the function to execute. 113 114 @result = $code->( @args ) 115 116=head2 init_code => CODE 117 118Optional. If defined, this is invoked exactly once in every child process or 119thread, after it is created, but before the first invocation of the function 120body itself. 121 122 $init_code->() 123 124=head2 module => STRING 125 126=head2 func => STRING 127 128I<Since version 0.79.> 129 130An alternative to the C<code> argument, which names a module to load and a 131function to call within it. C<module> should give a perl module name (i.e. 132C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give 133the basename of a function within that module (i.e. without the module name 134prefixed). It will be invoked, without extra arguments, as the main code 135body of the object. 136 137The task of loading this module and resolving the resulting function from it 138is only performed on the remote worker side, so the controlling process will 139not need to actually load the module. 140 141=head2 init_func => STRING or ARRAY [ STRING, ... ] 142 143Optional addition to the C<module> and C<func> alternatives. Names a function 144within the module to call each time a new worker is created. 145 146If this value is an array reference, its first element must be a string giving 147the name of the function; the remaining values are passed to that function as 148arguments. 149 150=head2 model => "fork" | "thread" | "spawn" 151 152Optional. Requests a specific L<IO::Async::Routine> model. If not supplied, 153leaves the default choice up to Routine. 154 155=head2 min_workers => INT 156 157=head2 max_workers => INT 158 159The lower and upper bounds of worker processes to try to keep running. The 160actual number running at any time will be kept somewhere between these bounds 161according to load. 162 163=head2 max_worker_calls => INT 164 165Optional. If provided, stop a worker process after it has processed this 166number of calls. (New workers may be started to replace stopped ones, within 167the bounds given above). 168 169=head2 idle_timeout => NUM 170 171Optional. If provided, idle worker processes will be shut down after this 172amount of time, if there are more than C<min_workers> of them. 173 174=head2 exit_on_die => BOOL 175 176Optional boolean, controls what happens after the C<code> throws an 177exception. If missing or false, the worker will continue running to process 178more requests. If true, the worker will be shut down. A new worker might be 179constructed by the C<call> method to replace it, if necessary. 180 181=head2 setup => ARRAY 182 183Optional array reference. Specifies the C<setup> key to pass to the underlying 184L<IO::Async::Process> when setting up new worker processes. 185 186=cut 187 188sub _init 189{ 190 my $self = shift; 191 $self->SUPER::_init( @_ ); 192 193 $self->{min_workers} = 1; 194 $self->{max_workers} = 8; 195 196 $self->{workers} = {}; # {$id} => IaFunction:Worker 197 198 $self->{pending_queue} = []; 199} 200 201sub configure 202{ 203 my $self = shift; 204 my %params = @_; 205 206 my %worker_params; 207 foreach (qw( model exit_on_die max_worker_calls )) { 208 $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_}; 209 } 210 211 if( keys %worker_params ) { 212 foreach my $worker ( $self->_worker_objects ) { 213 $worker->configure( %worker_params ); 214 } 215 } 216 217 if( exists $params{idle_timeout} ) { 218 my $timeout = delete $params{idle_timeout}; 219 if( !$timeout ) { 220 $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer}; 221 } 222 elsif( my $idle_timer = $self->{idle_timer} ) { 223 $idle_timer->configure( delay => $timeout ); 224 } 225 else { 226 $self->{idle_timer} = IO::Async::Timer::Countdown->new( 227 delay => $timeout, 228 on_expire => $self->_capture_weakself( sub { 229 my $self = shift or return; 230 my $workers = $self->{workers}; 231 232 # Shut down atmost one idle worker, starting from the highest 233 # ID. Since we search from lowest to assign work, this tries 234 # to ensure we'll shut down the least useful ones first, 235 # keeping more useful ones in memory (page/cache warmth, etc..) 236 foreach my $id ( reverse sort keys %$workers ) { 237 next if $workers->{$id}{busy}; 238 239 $workers->{$id}->stop; 240 last; 241 } 242 243 # Still more? 244 $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers}; 245 } ), 246 ); 247 $self->add_child( $self->{idle_timer} ); 248 } 249 } 250 251 foreach (qw( min_workers max_workers )) { 252 $self->{$_} = delete $params{$_} if exists $params{$_}; 253 # TODO: something about retuning 254 } 255 256 my $need_restart; 257 258 foreach (qw( init_code code module init_func func setup )) { 259 $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_}; 260 } 261 262 defined $self->{code} and defined $self->{func} and 263 croak "Cannot ->configure both 'code' and 'func'"; 264 defined $self->{func} and !defined $self->{module} and 265 croak "'func' parameter requires a 'module' as well"; 266 267 $self->SUPER::configure( %params ); 268 269 if( $need_restart and $self->loop ) { 270 $self->stop; 271 $self->start; 272 } 273} 274 275sub _add_to_loop 276{ 277 my $self = shift; 278 $self->SUPER::_add_to_loop( @_ ); 279 280 $self->start; 281} 282 283sub _remove_from_loop 284{ 285 my $self = shift; 286 287 $self->stop; 288 289 $self->SUPER::_remove_from_loop( @_ ); 290} 291 292=head1 METHODS 293 294The following methods documented with a trailing call to C<< ->get >> return 295L<Future> instances. 296 297=cut 298 299=head2 start 300 301 $function->start 302 303Start the worker processes 304 305=cut 306 307sub start 308{ 309 my $self = shift; 310 311 $self->_new_worker for 1 .. $self->{min_workers}; 312} 313 314=head2 stop 315 316 $function->stop 317 318Stop the worker processes 319 320 $f = $function->stop 321 322I<Since version 0.75.> 323 324If called in non-void context, returns a L<IO::Async::Future> instance that 325will complete once every worker process has stopped and exited. This may be 326useful for waiting until all of the processes are waited on, or other 327edge-cases, but is not otherwise particularly useful. 328 329=cut 330 331sub stop 332{ 333 my $self = shift; 334 335 $self->{stopping} = 1; 336 337 my @f; 338 339 foreach my $worker ( $self->_worker_objects ) { 340 defined wantarray ? push @f, $worker->stop : $worker->stop; 341 } 342 343 return Future->needs_all( @f ) if defined wantarray; 344} 345 346=head2 restart 347 348 $function->restart 349 350Gracefully stop and restart all the worker processes. 351 352=cut 353 354sub restart 355{ 356 my $self = shift; 357 358 $self->stop; 359 $self->start; 360} 361 362=head2 call 363 364 @result = $function->call( %params )->get 365 366Schedules an invocation of the contained function to be executed on one of the 367worker processes. If a non-busy worker is available now, it will be called 368immediately. If not, it will be queued and sent to the next free worker that 369becomes available. 370 371The request will already have been serialised by the marshaller, so it will be 372safe to modify any referenced data structures in the arguments after this call 373returns. 374 375The C<%params> hash takes the following keys: 376 377=over 8 378 379=item args => ARRAY 380 381A reference to the array of arguments to pass to the code. 382 383=item priority => NUM 384 385Optional. Defines the sorting order when no workers are available and calls 386must be queued for later. A default of zero will apply if not provided. 387 388Higher values cause the call to be considered more important, and will be 389placed earlier in the queue than calls with a smaller value. Calls of equal 390priority are still handled in FIFO order. 391 392=back 393 394If the function body returns normally the list of results are provided as the 395(successful) result of returned future. If the function throws an exception 396this results in a failed future. In the special case that the exception is in 397fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is 398for the C<fail> result. If the exception is not such a reference, it is used 399as the first argument to C<fail>, in the category of C<error>. 400 401 $f->done( @result ) 402 403 $f->fail( @{ $exception } ) 404 $f->fail( $exception, error => ) 405 406=head2 call (void) 407 408 $function->call( %params ) 409 410When not returning a future, the C<on_result>, C<on_return> and C<on_error> 411arguments give continuations to handle successful results or failure. 412 413=over 8 414 415=item on_result => CODE 416 417A continuation that is invoked when the code has been executed. If the code 418returned normally, it is called as: 419 420 $on_result->( 'return', @values ) 421 422If the code threw an exception, or some other error occurred such as a closed 423connection or the process died, it is called as: 424 425 $on_result->( 'error', $exception_name ) 426 427=item on_return => CODE and on_error => CODE 428 429An alternative to C<on_result>. Two continuations to use in either of the 430circumstances given above. They will be called directly, without the leading 431'return' or 'error' value. 432 433=back 434 435=cut 436 437sub debug_printf_call 438{ 439 my $self = shift; 440 $self->debug_printf( "CALL" ); 441} 442 443sub debug_printf_result 444{ 445 my $self = shift; 446 $self->debug_printf( "RESULT" ); 447} 448 449sub debug_printf_failure 450{ 451 my $self = shift; 452 my ( $err ) = @_; 453 $self->debug_printf( "FAIL $err" ); 454} 455 456sub call 457{ 458 my $self = shift; 459 my %params = @_; 460 461 # TODO: possibly just queue this? 462 $self->loop or croak "Cannot ->call on a Function not yet in a Loop"; 463 464 my $args = delete $params{args}; 465 ref $args eq "ARRAY" or croak "Expected 'args' to be an array"; 466 467 my ( $on_done, $on_fail ); 468 if( defined $params{on_result} ) { 469 my $on_result = delete $params{on_result}; 470 ref $on_result or croak "Expected 'on_result' to be a reference"; 471 472 $on_done = sub { 473 $on_result->( return => @_ ); 474 }; 475 $on_fail = sub { 476 my ( $err, @values ) = @_; 477 $on_result->( error => @values ); 478 }; 479 } 480 elsif( defined $params{on_return} and defined $params{on_error} ) { 481 my $on_return = delete $params{on_return}; 482 ref $on_return or croak "Expected 'on_return' to be a reference"; 483 my $on_error = delete $params{on_error}; 484 ref $on_error or croak "Expected 'on_error' to be a reference"; 485 486 $on_done = $on_return; 487 $on_fail = $on_error; 488 } 489 elsif( !defined wantarray ) { 490 croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future"; 491 } 492 493 $self->debug_printf_call( @$args ); 494 495 my $request = IO::Async::Channel->encode( $args ); 496 497 my $future; 498 if( my $worker = $self->_get_worker ) { 499 $future = $self->_call_worker( $worker, $request ); 500 } 501 else { 502 $self->debug_printf( "QUEUE" ); 503 my $queue = $self->{pending_queue}; 504 505 my $next = Pending( 506 my $priority = $params{priority} || 0, 507 my $wait_f = $self->loop->new_future, 508 ); 509 510 if( $priority ) { 511 my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue; 512 splice @$queue, $idx // $#$queue+1, 0, ( $next ); 513 } 514 else { 515 push @$queue, $next; 516 } 517 518 $future = $wait_f->then( sub { 519 my ( $self, $worker ) = @_; 520 $self->_call_worker( $worker, $request ); 521 }); 522 } 523 524 $future->on_done( $self->_capture_weakself( sub { 525 my $self = shift or return; 526 $self->debug_printf_result( @_ ); 527 })); 528 $future->on_fail( $self->_capture_weakself( sub { 529 my $self = shift or return; 530 $self->debug_printf_failure( @_ ); 531 })); 532 533 $future->on_done( $on_done ) if $on_done; 534 $future->on_fail( $on_fail ) if $on_fail; 535 536 return $future if defined wantarray; 537 538 # Caller is not going to keep hold of the Future, so we have to ensure it 539 # stays alive somehow 540 $self->adopt_future( $future->else( sub { Future->done } ) ); 541} 542 543sub _worker_objects 544{ 545 my $self = shift; 546 return values %{ $self->{workers} }; 547} 548 549=head2 workers 550 551 $count = $function->workers 552 553Returns the total number of worker processes available 554 555=cut 556 557sub workers 558{ 559 my $self = shift; 560 return scalar keys %{ $self->{workers} }; 561} 562 563=head2 workers_busy 564 565 $count = $function->workers_busy 566 567Returns the number of worker processes that are currently busy 568 569=cut 570 571sub workers_busy 572{ 573 my $self = shift; 574 return scalar grep { $_->{busy} } $self->_worker_objects; 575} 576 577=head2 workers_idle 578 579 $count = $function->workers_idle 580 581Returns the number of worker processes that are currently idle 582 583=cut 584 585sub workers_idle 586{ 587 my $self = shift; 588 return scalar grep { !$_->{busy} } $self->_worker_objects; 589} 590 591sub _new_worker 592{ 593 my $self = shift; 594 595 my $worker = IO::Async::Function::Worker->new( 596 ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ), 597 max_calls => $self->{max_worker_calls}, 598 599 on_finish => $self->_capture_weakself( sub { 600 my $self = shift or return; 601 my ( $worker ) = @_; 602 603 return if $self->{stopping}; 604 605 $self->_new_worker if $self->workers < $self->{min_workers}; 606 607 $self->_dispatch_pending; 608 } ), 609 ); 610 611 $self->add_child( $worker ); 612 613 return $self->{workers}{$worker->id} = $worker; 614} 615 616sub _get_worker 617{ 618 my $self = shift; 619 620 foreach ( sort keys %{ $self->{workers} } ) { 621 return $self->{workers}{$_} if !$self->{workers}{$_}{busy}; 622 } 623 624 if( $self->workers < $self->{max_workers} ) { 625 return $self->_new_worker; 626 } 627 628 return undef; 629} 630 631sub _call_worker 632{ 633 my $self = shift; 634 my ( $worker, $type, $args ) = @_; 635 636 my $future = $worker->call( $type, $args ); 637 638 if( $self->workers_idle == 0 ) { 639 $self->{idle_timer}->stop if $self->{idle_timer}; 640 } 641 642 return $future; 643} 644 645sub _dispatch_pending 646{ 647 my $self = shift; 648 649 while( my $next = shift @{ $self->{pending_queue} } ) { 650 my $worker = $self->_get_worker or return; 651 652 my $f = $next->f; 653 654 next if $f->is_cancelled; 655 656 $self->debug_printf( "UNQUEUE" ); 657 $f->done( $self, $worker ); 658 return; 659 } 660 661 if( $self->workers_idle > $self->{min_workers} ) { 662 $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running; 663 } 664} 665 666package # hide from indexer 667 IO::Async::Function::Worker; 668 669use base qw( IO::Async::Routine ); 670 671use Carp; 672 673use IO::Async::Channel; 674 675use IO::Async::Internals::FunctionWorker; 676 677sub new 678{ 679 my $class = shift; 680 my %params = @_; 681 682 my $arg_channel = IO::Async::Channel->new; 683 my $ret_channel = IO::Async::Channel->new; 684 685 my $send_initial; 686 687 if( defined( my $code = $params{code} ) ) { 688 my $init_code = $params{init_code}; 689 690 $params{code} = sub { 691 $init_code->() if defined $init_code; 692 693 IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel ); 694 }; 695 } 696 elsif( defined( my $func = $params{func} ) ) { 697 my $module = $params{module}; 698 my $init_func = $params{init_func}; 699 my @init_args; 700 701 $params{module} = "IO::Async::Internals::FunctionWorker"; 702 $params{func} = "run_worker"; 703 704 ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY"; 705 706 $send_initial = [ $module, $func, $init_func, @init_args ]; 707 } 708 709 delete @params{qw( init_code init_func )}; 710 711 my $worker = $class->SUPER::new( 712 %params, 713 channels_in => [ $arg_channel ], 714 channels_out => [ $ret_channel ], 715 ); 716 717 $worker->{arg_channel} = $arg_channel; 718 $worker->{ret_channel} = $ret_channel; 719 720 $worker->{send_initial} = $send_initial if $send_initial; 721 722 return $worker; 723} 724 725sub _add_to_loop 726{ 727 my $self = shift; 728 $self->SUPER::_add_to_loop( @_ ); 729 730 $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial}; 731} 732 733sub configure 734{ 735 my $self = shift; 736 my %params = @_; 737 738 exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls ); 739 740 $self->SUPER::configure( %params ); 741} 742 743sub stop 744{ 745 my $worker = shift; 746 $worker->{arg_channel}->close; 747 748 my $ret; 749 $ret = $worker->result_future if defined wantarray; 750 751 if( my $function = $worker->parent ) { 752 delete $function->{workers}{$worker->id}; 753 754 if( $worker->{busy} ) { 755 $worker->{remove_on_idle}++; 756 } 757 else { 758 $function->remove_child( $worker ); 759 } 760 } 761 762 return $ret; 763} 764 765sub call 766{ 767 my $worker = shift; 768 my ( $args ) = @_; 769 770 $worker->{arg_channel}->send_encoded( $args ); 771 772 $worker->{busy} = 1; 773 $worker->{max_calls}--; 774 775 return $worker->{ret_channel}->recv->then( 776 # on recv 777 $worker->_capture_weakself( sub { 778 my ( $worker, $result ) = @_; 779 my ( $type, @values ) = @$result; 780 781 $worker->stop if !$worker->{max_calls} or 782 $worker->{exit_on_die} && $type eq "e"; 783 784 if( $type eq "r" ) { 785 return Future->done( @values ); 786 } 787 elsif( $type eq "e" ) { 788 return Future->fail( @values ); 789 } 790 else { 791 die "Unrecognised type from worker - $type\n"; 792 } 793 } ), 794 # on EOF 795 $worker->_capture_weakself( sub { 796 my ( $worker ) = @_; 797 798 $worker->stop; 799 800 return Future->fail( "closed", "closed" ); 801 } ) 802 )->on_ready( $worker->_capture_weakself( sub { 803 my ( $worker, $f ) = @_; 804 $worker->{busy} = 0; 805 806 my $function = $worker->parent; 807 $function->_dispatch_pending if $function; 808 809 $function->remove_child( $worker ) if $function and $worker->{remove_on_idle}; 810 })); 811} 812 813=head1 EXAMPLES 814 815=head2 Extended Error Information on Failure 816 817The array-unpacking form of exception indiciation allows the function body to 818more precicely control the resulting failure from the C<call> future. 819 820 my $divider = IO::Async::Function->new( 821 code => sub { 822 my ( $numerator, $divisor ) = @_; 823 $divisor == 0 and 824 die [ "Cannot divide by zero", div_zero => $numerator, $divisor ]; 825 826 return $numerator / $divisor; 827 } 828 ); 829 830=head1 NOTES 831 832For the record, 123454321 is 11111 * 11111, a square number, and therefore not 833prime. 834 835=head1 AUTHOR 836 837Paul Evans <leonerd@leonerd.org.uk> 838 839=cut 840 8410x55AA; 842