1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5
6package IO::Async::Function;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.800';
12
13use base qw( IO::Async::Notifier );
14use IO::Async::Timer::Countdown;
15
16use Carp;
17
18use List::Util qw( first );
19
20use Struct::Dumb qw( readonly_struct );
21
22readonly_struct Pending => [qw( priority f )];
23
24=head1 NAME
25
26C<IO::Async::Function> - call a function asynchronously
27
28=head1 SYNOPSIS
29
30   use IO::Async::Function;
31
32   use IO::Async::Loop;
33   my $loop = IO::Async::Loop->new;
34
35   my $function = IO::Async::Function->new(
36      code => sub {
37         my ( $number ) = @_;
38         return is_prime( $number );
39      },
40   );
41
42   $loop->add( $function );
43
44   $function->call(
45      args => [ 123454321 ],
46   )->on_done( sub {
47      my $isprime = shift;
48      print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
49   })->on_fail( sub {
50      print STDERR "Cannot determine if it's prime - $_[0]\n";
51   })->get;
52
53=head1 DESCRIPTION
54
55This subclass of L<IO::Async::Notifier> wraps a function body in a collection
56of worker processes, to allow it to execute independently of the main process.
57The object acts as a proxy to the function, allowing invocations to be made by
58passing in arguments, and invoking a continuation in the main process when the
59function returns.
60
61The object represents the function code itself, rather than one specific
62invocation of it. It can be called multiple times, by the C<call> method.
63Multiple outstanding invocations can be called; they will be dispatched in
64the order they were queued. If only one worker process is used then results
65will be returned in the order they were called. If multiple are used, then
66each request will be sent in the order called, but timing differences between
67each worker may mean results are returned in a different order.
68
69Since the code block will be called multiple times within the same child
70process, it must take care not to modify any of its state that might affect
71subsequent calls. Since it executes in a child process, it cannot make any
72modifications to the state of the parent program. Therefore, all the data
73required to perform its task must be represented in the call arguments, and
74all of the result must be represented in the return values.
75
76The Function object is implemented using an L<IO::Async::Routine> with two
77L<IO::Async::Channel> objects to pass calls into and results out from it.
78
79The L<IO::Async> framework generally provides mechanisms for multiplexing IO
80tasks between different handles, so there aren't many occasions when such an
81asynchronous function is necessary. Two cases where this does become useful
82are:
83
84=over 4
85
86=item 1.
87
88When a large amount of computationally-intensive work needs to be performed
89(for example, the C<is_prime> test in the example in the C<SYNOPSIS>).
90
91=item 2.
92
93When a blocking OS syscall or library-level function needs to be called, and
94no nonblocking or asynchronous version is supplied. This is used by
95L<IO::Async::Resolver>.
96
97=back
98
99This object is ideal for representing "pure" functions; that is, blocks of
100code which have no stateful effect on the process, and whose result depends
101only on the arguments passed in. For a more general co-routine ability, see
102also L<IO::Async::Routine>.
103
104=cut
105
106=head1 PARAMETERS
107
108The following named parameters may be passed to C<new> or C<configure>:
109
110=head2 code => CODE
111
112The body of the function to execute.
113
114   @result = $code->( @args )
115
116=head2 init_code => CODE
117
118Optional. If defined, this is invoked exactly once in every child process or
119thread, after it is created, but before the first invocation of the function
120body itself.
121
122   $init_code->()
123
124=head2 module => STRING
125
126=head2 func => STRING
127
128I<Since version 0.79.>
129
130An alternative to the C<code> argument, which names a module to load and a
131function to call within it. C<module> should give a perl module name (i.e.
132C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give
133the basename of a function within that module (i.e. without the module name
134prefixed). It will be invoked, without extra arguments, as the main code
135body of the object.
136
137The task of loading this module and resolving the resulting function from it
138is only performed on the remote worker side, so the controlling process will
139not need to actually load the module.
140
141=head2 init_func => STRING or ARRAY [ STRING, ... ]
142
143Optional addition to the C<module> and C<func> alternatives. Names a function
144within the module to call each time a new worker is created.
145
146If this value is an array reference, its first element must be a string giving
147the name of the function; the remaining values are passed to that function as
148arguments.
149
150=head2 model => "fork" | "thread" | "spawn"
151
152Optional. Requests a specific L<IO::Async::Routine> model. If not supplied,
153leaves the default choice up to Routine.
154
155=head2 min_workers => INT
156
157=head2 max_workers => INT
158
159The lower and upper bounds of worker processes to try to keep running. The
160actual number running at any time will be kept somewhere between these bounds
161according to load.
162
163=head2 max_worker_calls => INT
164
165Optional. If provided, stop a worker process after it has processed this
166number of calls. (New workers may be started to replace stopped ones, within
167the bounds given above).
168
169=head2 idle_timeout => NUM
170
171Optional. If provided, idle worker processes will be shut down after this
172amount of time, if there are more than C<min_workers> of them.
173
174=head2 exit_on_die => BOOL
175
176Optional boolean, controls what happens after the C<code> throws an
177exception. If missing or false, the worker will continue running to process
178more requests. If true, the worker will be shut down. A new worker might be
179constructed by the C<call> method to replace it, if necessary.
180
181=head2 setup => ARRAY
182
183Optional array reference. Specifies the C<setup> key to pass to the underlying
184L<IO::Async::Process> when setting up new worker processes.
185
186=cut
187
188sub _init
189{
190   my $self = shift;
191   $self->SUPER::_init( @_ );
192
193   $self->{min_workers} = 1;
194   $self->{max_workers} = 8;
195
196   $self->{workers} = {}; # {$id} => IaFunction:Worker
197
198   $self->{pending_queue} = [];
199}
200
201sub configure
202{
203   my $self = shift;
204   my %params = @_;
205
206   my %worker_params;
207   foreach (qw( model exit_on_die max_worker_calls )) {
208      $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
209   }
210
211   if( keys %worker_params ) {
212      foreach my $worker ( $self->_worker_objects ) {
213         $worker->configure( %worker_params );
214      }
215   }
216
217   if( exists $params{idle_timeout} ) {
218      my $timeout = delete $params{idle_timeout};
219      if( !$timeout ) {
220         $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
221      }
222      elsif( my $idle_timer = $self->{idle_timer} ) {
223         $idle_timer->configure( delay => $timeout );
224      }
225      else {
226         $self->{idle_timer} = IO::Async::Timer::Countdown->new(
227            delay => $timeout,
228            on_expire => $self->_capture_weakself( sub {
229               my $self = shift or return;
230               my $workers = $self->{workers};
231
232               # Shut down atmost one idle worker, starting from the highest
233               # ID. Since we search from lowest to assign work, this tries
234               # to ensure we'll shut down the least useful ones first,
235               # keeping more useful ones in memory (page/cache warmth, etc..)
236               foreach my $id ( reverse sort keys %$workers ) {
237                  next if $workers->{$id}{busy};
238
239                  $workers->{$id}->stop;
240                  last;
241               }
242
243               # Still more?
244               $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
245            } ),
246         );
247         $self->add_child( $self->{idle_timer} );
248      }
249   }
250
251   foreach (qw( min_workers max_workers )) {
252      $self->{$_} = delete $params{$_} if exists $params{$_};
253      # TODO: something about retuning
254   }
255
256   my $need_restart;
257
258   foreach (qw( init_code code module init_func func setup )) {
259      $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
260   }
261
262   defined $self->{code} and defined $self->{func} and
263      croak "Cannot ->configure both 'code' and 'func'";
264   defined $self->{func} and !defined $self->{module} and
265      croak "'func' parameter requires a 'module' as well";
266
267   $self->SUPER::configure( %params );
268
269   if( $need_restart and $self->loop ) {
270      $self->stop;
271      $self->start;
272   }
273}
274
275sub _add_to_loop
276{
277   my $self = shift;
278   $self->SUPER::_add_to_loop( @_ );
279
280   $self->start;
281}
282
283sub _remove_from_loop
284{
285   my $self = shift;
286
287   $self->stop;
288
289   $self->SUPER::_remove_from_loop( @_ );
290}
291
292=head1 METHODS
293
294The following methods documented with a trailing call to C<< ->get >> return
295L<Future> instances.
296
297=cut
298
299=head2 start
300
301   $function->start
302
303Start the worker processes
304
305=cut
306
307sub start
308{
309   my $self = shift;
310
311   $self->_new_worker for 1 .. $self->{min_workers};
312}
313
314=head2 stop
315
316   $function->stop
317
318Stop the worker processes
319
320   $f = $function->stop
321
322I<Since version 0.75.>
323
324If called in non-void context, returns a L<IO::Async::Future> instance that
325will complete once every worker process has stopped and exited. This may be
326useful for waiting until all of the processes are waited on, or other
327edge-cases, but is not otherwise particularly useful.
328
329=cut
330
331sub stop
332{
333   my $self = shift;
334
335   $self->{stopping} = 1;
336
337   my @f;
338
339   foreach my $worker ( $self->_worker_objects ) {
340      defined wantarray ? push @f, $worker->stop : $worker->stop;
341   }
342
343   return Future->needs_all( @f ) if defined wantarray;
344}
345
346=head2 restart
347
348   $function->restart
349
350Gracefully stop and restart all the worker processes.
351
352=cut
353
354sub restart
355{
356   my $self = shift;
357
358   $self->stop;
359   $self->start;
360}
361
362=head2 call
363
364   @result = $function->call( %params )->get
365
366Schedules an invocation of the contained function to be executed on one of the
367worker processes. If a non-busy worker is available now, it will be called
368immediately. If not, it will be queued and sent to the next free worker that
369becomes available.
370
371The request will already have been serialised by the marshaller, so it will be
372safe to modify any referenced data structures in the arguments after this call
373returns.
374
375The C<%params> hash takes the following keys:
376
377=over 8
378
379=item args => ARRAY
380
381A reference to the array of arguments to pass to the code.
382
383=item priority => NUM
384
385Optional. Defines the sorting order when no workers are available and calls
386must be queued for later. A default of zero will apply if not provided.
387
388Higher values cause the call to be considered more important, and will be
389placed earlier in the queue than calls with a smaller value. Calls of equal
390priority are still handled in FIFO order.
391
392=back
393
394If the function body returns normally the list of results are provided as the
395(successful) result of returned future. If the function throws an exception
396this results in a failed future. In the special case that the exception is in
397fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is
398for the C<fail> result. If the exception is not such a reference, it is used
399as the first argument to C<fail>, in the category of C<error>.
400
401   $f->done( @result )
402
403   $f->fail( @{ $exception } )
404   $f->fail( $exception, error => )
405
406=head2 call (void)
407
408   $function->call( %params )
409
410When not returning a future, the C<on_result>, C<on_return> and C<on_error>
411arguments give continuations to handle successful results or failure.
412
413=over 8
414
415=item on_result => CODE
416
417A continuation that is invoked when the code has been executed. If the code
418returned normally, it is called as:
419
420   $on_result->( 'return', @values )
421
422If the code threw an exception, or some other error occurred such as a closed
423connection or the process died, it is called as:
424
425   $on_result->( 'error', $exception_name )
426
427=item on_return => CODE and on_error => CODE
428
429An alternative to C<on_result>. Two continuations to use in either of the
430circumstances given above. They will be called directly, without the leading
431'return' or 'error' value.
432
433=back
434
435=cut
436
437sub debug_printf_call
438{
439   my $self = shift;
440   $self->debug_printf( "CALL" );
441}
442
443sub debug_printf_result
444{
445   my $self = shift;
446   $self->debug_printf( "RESULT" );
447}
448
449sub debug_printf_failure
450{
451   my $self = shift;
452   my ( $err ) = @_;
453   $self->debug_printf( "FAIL $err" );
454}
455
456sub call
457{
458   my $self = shift;
459   my %params = @_;
460
461   # TODO: possibly just queue this?
462   $self->loop or croak "Cannot ->call on a Function not yet in a Loop";
463
464   my $args = delete $params{args};
465   ref $args eq "ARRAY" or croak "Expected 'args' to be an array";
466
467   my ( $on_done, $on_fail );
468   if( defined $params{on_result} ) {
469      my $on_result = delete $params{on_result};
470      ref $on_result or croak "Expected 'on_result' to be a reference";
471
472      $on_done = sub {
473         $on_result->( return => @_ );
474      };
475      $on_fail = sub {
476         my ( $err, @values ) = @_;
477         $on_result->( error => @values );
478      };
479   }
480   elsif( defined $params{on_return} and defined $params{on_error} ) {
481      my $on_return = delete $params{on_return};
482      ref $on_return or croak "Expected 'on_return' to be a reference";
483      my $on_error  = delete $params{on_error};
484      ref $on_error or croak "Expected 'on_error' to be a reference";
485
486      $on_done = $on_return;
487      $on_fail = $on_error;
488   }
489   elsif( !defined wantarray ) {
490      croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
491   }
492
493   $self->debug_printf_call( @$args );
494
495   my $request = IO::Async::Channel->encode( $args );
496
497   my $future;
498   if( my $worker = $self->_get_worker ) {
499      $future = $self->_call_worker( $worker, $request );
500   }
501   else {
502      $self->debug_printf( "QUEUE" );
503      my $queue = $self->{pending_queue};
504
505      my $next = Pending(
506         my $priority = $params{priority} || 0,
507         my $wait_f = $self->loop->new_future,
508      );
509
510      if( $priority ) {
511         my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
512         splice @$queue, $idx // $#$queue+1, 0, ( $next );
513      }
514      else {
515         push @$queue, $next;
516      }
517
518      $future = $wait_f->then( sub {
519         my ( $self, $worker ) = @_;
520         $self->_call_worker( $worker, $request );
521      });
522   }
523
524   $future->on_done( $self->_capture_weakself( sub {
525      my $self = shift or return;
526      $self->debug_printf_result( @_ );
527   }));
528   $future->on_fail( $self->_capture_weakself( sub {
529      my $self = shift or return;
530      $self->debug_printf_failure( @_ );
531   }));
532
533   $future->on_done( $on_done ) if $on_done;
534   $future->on_fail( $on_fail ) if $on_fail;
535
536   return $future if defined wantarray;
537
538   # Caller is not going to keep hold of the Future, so we have to ensure it
539   # stays alive somehow
540   $self->adopt_future( $future->else( sub { Future->done } ) );
541}
542
543sub _worker_objects
544{
545   my $self = shift;
546   return values %{ $self->{workers} };
547}
548
549=head2 workers
550
551   $count = $function->workers
552
553Returns the total number of worker processes available
554
555=cut
556
557sub workers
558{
559   my $self = shift;
560   return scalar keys %{ $self->{workers} };
561}
562
563=head2 workers_busy
564
565   $count = $function->workers_busy
566
567Returns the number of worker processes that are currently busy
568
569=cut
570
571sub workers_busy
572{
573   my $self = shift;
574   return scalar grep { $_->{busy} } $self->_worker_objects;
575}
576
577=head2 workers_idle
578
579   $count = $function->workers_idle
580
581Returns the number of worker processes that are currently idle
582
583=cut
584
585sub workers_idle
586{
587   my $self = shift;
588   return scalar grep { !$_->{busy} } $self->_worker_objects;
589}
590
591sub _new_worker
592{
593   my $self = shift;
594
595   my $worker = IO::Async::Function::Worker->new(
596      ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
597      max_calls => $self->{max_worker_calls},
598
599      on_finish => $self->_capture_weakself( sub {
600         my $self = shift or return;
601         my ( $worker ) = @_;
602
603         return if $self->{stopping};
604
605         $self->_new_worker if $self->workers < $self->{min_workers};
606
607         $self->_dispatch_pending;
608      } ),
609   );
610
611   $self->add_child( $worker );
612
613   return $self->{workers}{$worker->id} = $worker;
614}
615
616sub _get_worker
617{
618   my $self = shift;
619
620   foreach ( sort keys %{ $self->{workers} } ) {
621      return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
622   }
623
624   if( $self->workers < $self->{max_workers} ) {
625      return $self->_new_worker;
626   }
627
628   return undef;
629}
630
631sub _call_worker
632{
633   my $self = shift;
634   my ( $worker, $type, $args ) = @_;
635
636   my $future = $worker->call( $type, $args );
637
638   if( $self->workers_idle == 0 ) {
639      $self->{idle_timer}->stop if $self->{idle_timer};
640   }
641
642   return $future;
643}
644
645sub _dispatch_pending
646{
647   my $self = shift;
648
649   while( my $next = shift @{ $self->{pending_queue} } ) {
650      my $worker = $self->_get_worker or return;
651
652      my $f = $next->f;
653
654      next if $f->is_cancelled;
655
656      $self->debug_printf( "UNQUEUE" );
657      $f->done( $self, $worker );
658      return;
659   }
660
661   if( $self->workers_idle > $self->{min_workers} ) {
662      $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
663   }
664}
665
666package # hide from indexer
667   IO::Async::Function::Worker;
668
669use base qw( IO::Async::Routine );
670
671use Carp;
672
673use IO::Async::Channel;
674
675use IO::Async::Internals::FunctionWorker;
676
677sub new
678{
679   my $class = shift;
680   my %params = @_;
681
682   my $arg_channel = IO::Async::Channel->new;
683   my $ret_channel = IO::Async::Channel->new;
684
685   my $send_initial;
686
687   if( defined( my $code = $params{code} ) ) {
688      my $init_code = $params{init_code};
689
690      $params{code} = sub {
691         $init_code->() if defined $init_code;
692
693         IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel );
694      };
695   }
696   elsif( defined( my $func = $params{func} ) ) {
697      my $module    = $params{module};
698      my $init_func = $params{init_func};
699      my @init_args;
700
701      $params{module} = "IO::Async::Internals::FunctionWorker";
702      $params{func}   = "run_worker";
703
704      ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
705
706      $send_initial = [ $module, $func, $init_func, @init_args ];
707   }
708
709   delete @params{qw( init_code init_func )};
710
711   my $worker = $class->SUPER::new(
712      %params,
713      channels_in  => [ $arg_channel ],
714      channels_out => [ $ret_channel ],
715   );
716
717   $worker->{arg_channel} = $arg_channel;
718   $worker->{ret_channel} = $ret_channel;
719
720   $worker->{send_initial} = $send_initial if $send_initial;
721
722   return $worker;
723}
724
725sub _add_to_loop
726{
727   my $self = shift;
728   $self->SUPER::_add_to_loop( @_ );
729
730   $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
731}
732
733sub configure
734{
735   my $self = shift;
736   my %params = @_;
737
738   exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
739
740   $self->SUPER::configure( %params );
741}
742
743sub stop
744{
745   my $worker = shift;
746   $worker->{arg_channel}->close;
747
748   my $ret;
749   $ret = $worker->result_future if defined wantarray;
750
751   if( my $function = $worker->parent ) {
752      delete $function->{workers}{$worker->id};
753
754      if( $worker->{busy} ) {
755         $worker->{remove_on_idle}++;
756      }
757      else {
758         $function->remove_child( $worker );
759      }
760   }
761
762   return $ret;
763}
764
765sub call
766{
767   my $worker = shift;
768   my ( $args ) = @_;
769
770   $worker->{arg_channel}->send_encoded( $args );
771
772   $worker->{busy} = 1;
773   $worker->{max_calls}--;
774
775   return $worker->{ret_channel}->recv->then(
776      # on recv
777      $worker->_capture_weakself( sub {
778         my ( $worker, $result ) = @_;
779         my ( $type, @values ) = @$result;
780
781         $worker->stop if !$worker->{max_calls} or
782                          $worker->{exit_on_die} && $type eq "e";
783
784         if( $type eq "r" ) {
785            return Future->done( @values );
786         }
787         elsif( $type eq "e" ) {
788            return Future->fail( @values );
789         }
790         else {
791            die "Unrecognised type from worker - $type\n";
792         }
793      } ),
794      # on EOF
795      $worker->_capture_weakself( sub {
796         my ( $worker ) = @_;
797
798         $worker->stop;
799
800         return Future->fail( "closed", "closed" );
801      } )
802   )->on_ready( $worker->_capture_weakself( sub {
803      my ( $worker, $f ) = @_;
804      $worker->{busy} = 0;
805
806      my $function = $worker->parent;
807      $function->_dispatch_pending if $function;
808
809      $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
810   }));
811}
812
813=head1 EXAMPLES
814
815=head2 Extended Error Information on Failure
816
817The array-unpacking form of exception indiciation allows the function body to
818more precicely control the resulting failure from the C<call> future.
819
820   my $divider = IO::Async::Function->new(
821      code => sub {
822         my ( $numerator, $divisor ) = @_;
823         $divisor == 0 and
824            die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
825
826         return $numerator / $divisor;
827      }
828   );
829
830=head1 NOTES
831
832For the record, 123454321 is 11111 * 11111, a square number, and therefore not
833prime.
834
835=head1 AUTHOR
836
837Paul Evans <leonerd@leonerd.org.uk>
838
839=cut
840
8410x55AA;
842