1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk
5
6package IO::Async::Routine;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.800';
12
13use base qw( IO::Async::Notifier );
14
15use Carp;
16
17use IO::Async::OS;
18use IO::Async::Process;
19
20use Struct::Dumb qw( readonly_struct );
21
22=head1 NAME
23
24C<IO::Async::Routine> - execute code in an independent sub-process or thread
25
26=head1 SYNOPSIS
27
28   use IO::Async::Routine;
29   use IO::Async::Channel;
30
31   use IO::Async::Loop;
32   my $loop = IO::Async::Loop->new;
33
34   my $nums_ch = IO::Async::Channel->new;
35   my $ret_ch  = IO::Async::Channel->new;
36
37   my $routine = IO::Async::Routine->new(
38      channels_in  => [ $nums_ch ],
39      channels_out => [ $ret_ch ],
40
41      code => sub {
42         my @nums = @{ $nums_ch->recv };
43         my $ret = 0; $ret += $_ for @nums;
44
45         # Can only send references
46         $ret_ch->send( \$ret );
47      },
48
49      on_finish => sub {
50         say "The routine aborted early - $_[-1]";
51         $loop->stop;
52      },
53   );
54
55   $loop->add( $routine );
56
57   $nums_ch->send( [ 10, 20, 30 ] );
58   $ret_ch->recv(
59      on_recv => sub {
60         my ( $ch, $totalref ) = @_;
61         say "The total of 10, 20, 30 is: $$totalref";
62         $loop->stop;
63      }
64   );
65
66   $loop->run;
67
68=head1 DESCRIPTION
69
70This L<IO::Async::Notifier> contains a body of code and executes it in a
71sub-process or thread, allowing it to act independently of the main program.
72Once set up, all communication with the code happens by values passed into or
73out of the Routine via L<IO::Async::Channel> objects.
74
75The code contained within the Routine is free to make blocking calls without
76stalling the rest of the program. This makes it useful for using existing code
77which has no option not to block within an L<IO::Async>-based program.
78
79To create asynchronous wrappers of functions that return a value based only on
80their arguments, and do not generally maintain state within the process it may
81be more convenient to use an L<IO::Async::Function> instead, which uses an
82C<IO::Async::Routine> to contain the body of the function and manages the
83Channels itself.
84
85=head2 Models
86
87A choice of detachment model is available. Each has various advantages and
88disadvantages. Not all of them may be available on a particular system.
89
90=head3 The C<fork> model
91
92The code in this model runs within its own process, created by calling
93C<fork()> from the main process. It is isolated from the rest of the program
94in terms of memory, CPU time, and other resources. Because it is started
95using C<fork()>, the initial process state is a clone of the main process.
96
97This model performs well on UNIX-like operating systems which possess a true
98native C<fork()> system call, but is not available on C<MSWin32> for example,
99because the operating system does not provide full fork-like semantics.
100
101=head3 The C<thread> model
102
103The code in this model runs inside a separate thread within the main process.
104It therefore shares memory and other resources such as open filehandles with
105the main thread. As with the C<fork> model, the initial thread state is cloned
106from the main controlling thread.
107
108This model is only available on perls built to support threading.
109
110=head3 The C<spawn> model
111
112I<Since version 0.79.>
113
114The code in this model runs within its own freshly-created process running
115another copy of the perl interpreter. Similar to the C<fork> model it
116therefore has its own memory, CPU time, and other resources. However, since it
117is started freshly rather than by cloning the main process, it starts up in a
118clean state, without any shared resources from its parent.
119
120Since this model creates a new fresh process rather than sharing existing
121state, it cannot use the C<code> argument to specify the routine body; it must
122instead use only the C<module> and C<func> arguments.
123
124In the current implementation this model requires exactly one input channel
125and exactly one output channel; both must be present, and there cannot be more
126than one of either.
127
128=cut
129
130=head1 EVENTS
131
132=head2 on_finish $exitcode
133
134For C<fork()>-based Routines, this is invoked after the process has exited and
135is passed the raw exitcode status.
136
137=head2 on_finish $type, @result
138
139For thread-based Routines, this is invoked after the thread has returned from
140its code block and is passed the C<on_joined> result.
141
142As the behaviour of these events differs per model, it may be more convenient
143to use C<on_return> and C<on_die> instead.
144
145=head2 on_return $result
146
147Invoked if the code block returns normally. Note that C<fork()>-based Routines
148can only transport an integer result between 0 and 255, as this is the actual
149C<exit()> value.
150
151=head2 on_die $exception
152
153Invoked if the code block fails with an exception.
154
155=cut
156
157=head1 PARAMETERS
158
159The following named parameters may be passed to C<new> or C<configure>:
160
161=head2 model => "fork" | "thread" | "spawn"
162
163Optional. Defines how the routine will detach itself from the main process.
164See the L</Models> section above for more detail.
165
166If the model is not specified, the environment variable
167C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
168C<fork> is preferred if it is available, otherwise C<thread>.
169
170=head2 channels_in => ARRAY of IO::Async::Channel
171
172ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
173in to the Routine.
174
175=head2 channels_out => ARRAY of IO::Async::Channel
176
177ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
178out of the Routine.
179
180=head2 code => CODE
181
182CODE reference to the body of the Routine, to execute once the channels are
183set up.
184
185When using the C<spawn> model, this is not permitted; you must use C<module>
186and C<func> instead.
187
188=head2 module => STRING
189
190=head2 func => STRING
191
192I<Since version 0.79.>
193
194An alternative to the C<code> argument, which names a module to load and a
195function to call within it. C<module> should give a perl module name (i.e.
196C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give
197the basename of a function within that module (i.e. without the module name
198prefixed). It will be invoked as the main code body of the object, and passed
199in a list of all the channels; first the input ones then the output ones.
200
201   module::func( @channels_in, @channels_out )
202
203=head2 setup => ARRAY
204
205Optional. For C<fork()>-based Routines, gives a reference to an array to pass
206to the underlying C<Loop> C<fork_child> method. Ignored for thread-based
207Routines.
208
209=cut
210
211use constant PREFERRED_MODEL =>
212   IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
213   IO::Async::OS->HAVE_THREADS    ? "thread" :
214      die "No viable Routine models";
215
216sub _init
217{
218   my $self = shift;
219   my ( $params ) = @_;
220
221   $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;
222
223   $self->SUPER::_init( @_ );
224}
225
226my %SETUP_CODE;
227
228sub configure
229{
230   my $self = shift;
231   my %params = @_;
232
233   # TODO: Can only reconfigure when not running
234   foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) {
235      $self->{$_} = delete $params{$_} if exists $params{$_};
236   }
237
238   defined $self->{code} and defined $self->{func} and
239      croak "Cannot ->configure both 'code' and 'func'";
240   defined $self->{func} and !defined $self->{module} and
241      croak "'func' parameter requires a 'module' as well";
242
243   if( defined( my $model = delete $params{model} ) ) {
244      ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
245         or die "Unrecognised Routine model $model";
246
247      # TODO: optional plugin "configure" check here?
248      $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
249         croak "Cannot use 'fork' model as fork() is not available";
250      $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
251         croak "Cannot use 'thread' model as threads are not available";
252
253      $self->{model} = $model;
254   }
255
256   $self->SUPER::configure( %params );
257}
258
259sub _add_to_loop
260{
261   my $self = shift;
262   my ( $loop ) = @_;
263   $self->SUPER::_add_to_loop( $loop );
264
265   my $model = $self->{model};
266
267   my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
268      or die "Unrecognised Routine model $model";
269
270   $self->$code();
271}
272
273readonly_struct ChannelSetup => [qw( chan myfd otherfd )];
274
275sub _create_channels_in
276{
277   my $self = shift;
278
279   my @channels_in;
280
281   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
282      my ( $rd, $wr );
283      unless( $rd = $ch->_extract_read_handle ) {
284         ( $rd, $wr ) = IO::Async::OS->pipepair;
285      }
286      push @channels_in, ChannelSetup( $ch, $wr, $rd );
287   }
288
289   return @channels_in;
290}
291
292sub _create_channels_out
293{
294   my $self = shift;
295
296   my @channels_out;
297
298   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
299      my ( $rd, $wr );
300      unless( $wr = $ch->_extract_write_handle ) {
301         ( $rd, $wr ) = IO::Async::OS->pipepair;
302      }
303      push @channels_out, ChannelSetup( $ch, $rd, $wr );
304   }
305
306   return @channels_out;
307}
308
309sub _adopt_channels_in
310{
311   my $self = shift;
312   my ( @channels_in ) = @_;
313
314   foreach ( @channels_in ) {
315      my $ch = $_->chan;
316      $ch->setup_async_mode( write_handle => $_->myfd );
317      $self->add_child( $ch ) unless $ch->parent;
318   }
319}
320
321sub _adopt_channels_out
322{
323   my $self = shift;
324   my ( @channels_out ) = @_;
325
326   foreach ( @channels_out ) {
327      my $ch = $_->chan;
328      $ch->setup_async_mode( read_handle => $_->myfd );
329      $self->add_child( $ch ) unless $ch->parent;
330   }
331}
332
333sub _setup_fork
334{
335   my $self = shift;
336
337   my @channels_in  = $self->_create_channels_in;
338   my @channels_out = $self->_create_channels_out;
339
340   my $code = $self->{code};
341
342   my $module = $self->{module};
343   my $func   = $self->{func};
344
345   my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out;
346
347   my $setup = $self->{setup};
348   push @setup, @$setup if $setup;
349
350   my $process = IO::Async::Process->new(
351      setup => \@setup,
352      code => sub {
353         foreach ( @channels_in, @channels_out ) {
354            $_->chan->setup_sync_mode( $_->otherfd );
355         }
356
357         if( defined $module ) {
358            ( my $file = "$module.pm" ) =~ s{::}{/}g;
359            require $file;
360
361            $code = $module->can( $func ) or
362               die "Module '$module' has no '$func'\n";
363         }
364
365         my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
366
367         foreach ( @channels_in, @channels_out ) {
368            $_->chan->close;
369         }
370
371         return $ret;
372      },
373      on_finish => $self->_replace_weakself( sub {
374         my $self = shift or return;
375         my ( $exitcode ) = @_;
376         $self->maybe_invoke_event( on_finish => $exitcode );
377
378         unless( $exitcode & 0x7f ) {
379            $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
380            $self->result_future->done( $exitcode >> 8 );
381         }
382      }),
383      on_exception => $self->_replace_weakself( sub {
384         my $self = shift or return;
385         my ( $exception, $errno, $exitcode ) = @_;
386
387         $self->maybe_invoke_event( on_die => $exception );
388         $self->result_future->fail( $exception, routine => );
389      }),
390   );
391
392   $self->_adopt_channels_in ( @channels_in  );
393   $self->_adopt_channels_out( @channels_out );
394
395   $self->add_child( $self->{process} = $process );
396   $self->{id} = "P" . $process->pid;
397
398   $_->otherfd->close for @channels_in, @channels_out;
399}
400
401sub _setup_thread
402{
403   my $self = shift;
404
405   my @channels_in  = $self->_create_channels_in;
406   my @channels_out = $self->_create_channels_out;
407
408   my $code = $self->{code};
409
410   my $module = $self->{module};
411   my $func   = $self->{func};
412
413   my $tid = $self->loop->create_thread(
414      code => sub {
415         foreach ( @channels_in, @channels_out ) {
416            $_->chan->setup_sync_mode( $_->otherfd );
417            defined and $_->close for $_->myfd;
418         }
419
420         if( defined $func ) {
421            ( my $file = "$module.pm" ) =~ s{::}{/}g;
422            require $file;
423
424            $code = $module->can( $func ) or
425               die "Module '$module' has no '$func'\n";
426         }
427
428         my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
429
430         foreach ( @channels_in, @channels_out ) {
431            $_->chan->close;
432         }
433
434         return $ret;
435      },
436      on_joined => $self->_capture_weakself( sub {
437         my $self = shift or return;
438         my ( $ev, @result ) = @_;
439         $self->maybe_invoke_event( on_finish => @_ );
440
441         if( $ev eq "return" ) {
442            $self->maybe_invoke_event( on_return => @result );
443            $self->result_future->done( @result );
444         }
445         if( $ev eq "died" ) {
446            $self->maybe_invoke_event( on_die => $result[0] );
447            $self->result_future->fail( $result[0], routine => );
448         }
449
450         delete $self->{tid};
451      }),
452   );
453
454   $self->{tid} = $tid;
455   $self->{id} = "T" . $tid;
456
457   $self->_adopt_channels_in ( @channels_in  );
458   $self->_adopt_channels_out( @channels_out );
459
460   $_->otherfd->close for @channels_in, @channels_out;
461}
462
463# The injected program that goes into spawn mode
464use constant PERL_RUNNER => <<'EOF';
465( my ( $module, $func ), @INC ) = @ARGV;
466( my $file = "$module.pm" ) =~ s{::}{/}g;
467require $file;
468my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n";
469require IO::Async::Channel;
470exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout );
471EOF
472
473sub _setup_spawn
474{
475   my $self = shift;
476
477   $self->{code} and
478      die "Cannot run IO::Async::Routine in 'spawn' with code\n";
479
480   @{ $self->{channels_in} } == 1 or
481      die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n";
482   @{ $self->{channels_out} } == 1 or
483      die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n";
484
485   my @channels_in  = $self->_create_channels_in;
486   my @channels_out = $self->_create_channels_out;
487
488   my $module = $self->{module};
489   my $func   = $self->{func};
490
491   my $process = IO::Async::Process->new(
492      setup => [
493         stdin  => $channels_in[0]->otherfd,
494         stdout => $channels_out[0]->otherfd,
495      ],
496      command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ],
497      on_finish => $self->_replace_weakself( sub {
498         my $self = shift or return;
499         my ( $exitcode ) = @_;
500         $self->maybe_invoke_event( on_finish => $exitcode );
501
502         unless( $exitcode & 0x7f ) {
503            $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
504            $self->result_future->done( $exitcode >> 8 );
505         }
506      }),
507      on_exception => $self->_replace_weakself( sub {
508         my $self = shift or return;
509         my ( $exception, $errno, $exitcode ) = @_;
510
511         $self->maybe_invoke_event( on_die => $exception );
512         $self->result_future->fail( $exception, routine => );
513      }),
514   );
515
516   $self->_adopt_channels_in ( @channels_in  );
517   $self->_adopt_channels_out( @channels_out );
518
519   $self->add_child( $self->{process} = $process );
520   $self->{id} = "P" . $process->pid;
521
522   $_->otherfd->close for @channels_in, @channels_out;
523}
524
525=head1 METHODS
526
527=cut
528
529=head2 id
530
531   $id = $routine->id
532
533Returns an ID string that uniquely identifies the Routine out of all the
534currently-running ones. (The ID of already-exited Routines may be reused,
535however.)
536
537=cut
538
539sub id
540{
541   my $self = shift;
542   return $self->{id};
543}
544
545=head2 model
546
547   $model = $routine->model
548
549Returns the detachment model in use by the Routine.
550
551=cut
552
553sub model
554{
555   my $self = shift;
556   return $self->{model};
557}
558
559=head2 kill
560
561   $routine->kill( $signal )
562
563Sends the specified signal to the routine code. This is either implemented by
564C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case
565this has the usual limits of signal delivery to threads; namely, that it works
566at the Perl interpreter level, and cannot actually interrupt blocking system
567calls.
568
569=cut
570
571sub kill
572{
573   my $self = shift;
574   my ( $signal ) = @_;
575
576   $self->{process}->kill( $signal ) if $self->{model} eq "fork";
577   threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
578}
579
580=head2 result_future
581
582   $f = $routine->result_future
583
584I<Since version 0.75.>
585
586Returns a new C<IO::Async::Future> which will complete with the eventual
587return value or exception when the routine finishes.
588
589If the routine finishes with a successful result then this will be the C<done>
590result of the future. If the routine fails with an exception then this will be
591the C<fail> result.
592
593=cut
594
595sub result_future
596{
597   my $self = shift;
598
599   return $self->{result_future} //= do {
600      my $f = $self->loop->new_future;
601      # This future needs to strongly retain $self to ensure it definitely gets
602      # notified
603      $f->on_ready( sub { undef $self } );
604      $f;
605   };
606}
607
608=head1 AUTHOR
609
610Paul Evans <leonerd@leonerd.org.uk>
611
612=cut
613
6140x55AA;
615