1# $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $
2package IO::Lambda::Signal;
3use vars qw(@ISA %SIGDATA);
4@ISA = qw(Exporter);
5@EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process);
6%EXPORT_TAGS = ( all => \@EXPORT_OK);
7
8our $DEBUG = $IO::Lambda::DEBUG{signal} || 0;
9
10use strict;
11use Carp;
12use IO::Handle;
13use POSIX ":sys_wait_h";
14use IO::Lambda qw(:all :dev);
15
16my $MASTER = bless {}, __PACKAGE__;
17
18# register yield handler
19IO::Lambda::add_loop($MASTER);
20END { IO::Lambda::remove_loop($MASTER) };
21
22sub empty { 0 == keys %SIGDATA }
23
24sub remove
25{
26	my $lambda = $_[1];
27	my %rec;
28	keys %SIGDATA;
29	while ( my ($id, $v) = each %SIGDATA) {
30		for my $r (@{$v-> {lambdas}}) {
31			push @{$rec{$id}}, $r-> [0];
32		}
33	}
34	while ( my ($id, $v) = each %rec) {
35		unwatch_signal( $id, $_ ) for @$v;
36	}
37}
38
39sub yield
40{
41	my %v = %SIGDATA;
42	for my $id ( keys %v) {
43		my $v = $v{$id};
44		# use mutex in case signal happens right here during handling
45		$v-> {mutex} = 0;
46		warn "  yield sig $id\n" if $DEBUG > 1;
47	AGAIN:
48		next unless $v-> {signal};
49
50		my @r = @{$v-> {lambdas}};
51		warn "  calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1;
52		for my $r ( @r) {
53			my ( $lambda, $callback, @param) = @$r;
54			$callback-> ( $lambda, @param);
55		}
56
57		my $sigs = $v-> {mutex};
58		if ( $sigs) {
59			warn "  caught $sigs signals during yield\n" if $DEBUG > 1;
60			$v-> {signal} = $sigs;
61			$v-> {mutex}  -= $sigs;
62			goto AGAIN;
63		}
64	}
65}
66
67sub signal_handler
68{
69	my $id = shift;
70	warn "SIG{$id}\n" if $DEBUG;
71	return unless exists $SIGDATA{$id};
72	$SIGDATA{$id}-> {signal}++;
73	$SIGDATA{$id}-> {mutex}++;
74	$IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal');
75}
76
77sub watch_signal
78{
79	my ($id, $lambda, $callback, @param) = @_;
80
81	my $entry = [ $lambda, $callback, @param ];
82	unless ( exists $SIGDATA{$id}) {
83		$SIGDATA{$id} = {
84			mutex   => 0,
85			signal  => 0,
86			save    => $SIG{$id},
87			lambdas => [$entry],
88		};
89		$SIG{$id} = sub { signal_handler($id) };
90		warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1;
91	} else {
92		push @{ $SIGDATA{$id}-> {lambdas} }, $entry;
93		warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
94	}
95}
96
97sub unwatch_signal
98{
99	my ( $id, $lambda) = @_;
100
101	return unless exists $SIGDATA{$id};
102
103	warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
104
105	@{ $SIGDATA{$id}-> {lambdas} } =
106		grep { $$_[0] != $lambda }
107		@{ $SIGDATA{$id}-> {lambdas} };
108
109	return if @{ $SIGDATA{$id}-> {lambdas} };
110
111	warn "uninstall signal handler for $id\n" if $DEBUG > 1;
112
113	if (defined($SIGDATA{$id}-> {save})) {
114		$SIG{$id} = $SIGDATA{$id}-> {save};
115	} else {
116		delete $SIG{$id};
117	}
118	delete $SIGDATA{$id};
119}
120
121# create a lambda that either returns undef on timeout,
122# or some custom value based on passed callback
123sub signal_or_timeout_lambda
124{
125	my ( $id, $deadline, $condition) = @_;
126
127	my $t;
128	my $q = IO::Lambda-> new;
129
130	# wait for signal
131	my $c = $q-> bind;
132	watch_signal( $id, $q, sub {
133		my @ret = $condition-> ();
134		return unless @ret;
135
136		unwatch_signal( $id, $q);
137		$q-> cancel_event($t) if $t;
138		$q-> resolve($c);
139		$q-> terminate(@ret); # result
140		undef $c;
141		undef $q;
142	});
143
144	# or wait for timeout
145	$t = $q-> watch_timer( $deadline, sub {
146		unwatch_signal( $id, $q);
147		$q-> resolve($c);
148		undef $c;
149		undef $q;
150		return undef; #result
151	}) if $deadline;
152
153	return $q;
154}
155
156sub new_signal
157{
158	my ( $id, $deadline) = @_;
159	signal_or_timeout_lambda( $id, $deadline,
160		sub { 1 });
161}
162
163sub new_pid
164{
165	my ( $pid, $deadline) = @_;
166
167	croak 'bad pid' unless $pid =~ /^\-?\d+$/;
168	warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG;
169
170	# avoid race conditions
171	my ( $savesig, $early_sigchld);
172	unless ( defined $SIGDATA{CHLD}) {
173		warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1;
174		$savesig       = $SIG{CHLD};
175		$early_sigchld = 0;
176		$SIG{CHLD} = sub {
177			warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1;
178			$early_sigchld++
179		};
180	}
181
182	# finished already
183	if ( waitpid( $pid, WNOHANG) > 0) {
184		if ( defined $early_sigchld) {
185			if ( defined( $savesig)) {
186				$SIG{CHLD} = $savesig;
187			} else {
188				delete $SIG{CHLD};
189			}
190		}
191		warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1;
192		return IO::Lambda-> new-> call($?)
193	}
194
195	# wait
196	my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub {
197		my $wp = waitpid($pid, WNOHANG);
198		warn "waitpid($pid) = $wp\n" if $DEBUG > 1;
199		return if $wp == 0;
200		return $?;
201	});
202
203	warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1;
204
205	# don't let unwatch_signal() to restore it back to us
206	$SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld;
207
208	# possibly have a race? gracefully remove the lambda
209	if ( $early_sigchld) {
210
211		# Got a signal, but that wasn't our pid. And neither it was
212		# pid that we're watching.
213		return $p if waitpid( $pid, WNOHANG) == 0;
214
215		# Our pid is finished. Unwatch the signal.
216		unwatch_signal( 'CHLD', $p);
217		# Lambda will also never get executed - cancel it
218		$p-> terminate;
219
220		warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1;
221
222		return IO::Lambda-> new-> call($?);
223	}
224
225	return $p;
226}
227
228sub new_process_posix
229{
230lambda {
231	my $h   = IO::Handle-> new;
232	my $pid = open( $h, '-|', @_);
233
234	return undef, undef, $! unless $pid;
235
236	this-> {pid} = $pid;
237	$h-> blocking(0);
238
239	my $buf;
240	context readbuf, $h, \$buf, undef; # wait for EOF
241tail {
242	my ($res, $error) = @_;
243	if ( defined $error) {
244		close $h;
245		return ($buf, $?, $error);
246	}
247	return ($buf, $?, $!) unless close $h;
248	# finished already
249	return ($buf, $?, $!) if waitpid($pid, WNOHANG) >= 0;
250
251	# wait for it
252	context $pid;
253pid {
254	return ($buf, shift);
255}}}}
256
257sub new_process_win32
258{
259	lambda {
260		my @cmd = @_;
261		context IO::Lambda::Thread::threaded( sub {
262			my $k = `@cmd`;
263			return $? ? ( undef, $?, $! ) : ( $k, 0, undef );
264		});
265		&tail();
266	}
267}
268
269sub new_process;
270if ( $^O !~ /win32/i) {
271	*new_process = \&new_process_posix;
272} else {
273	require IO::Lambda::Thread;
274	unless ( $IO::Lambda::Thread::DISABLED) {
275		*new_process = \&new_process_win32;
276	} else {
277		*new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } };
278	}
279}
280
281# condition
282sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') }
283sub pid    (&) { new_pid    (context)-> condition(shift, \&pid,    'pid') }
284sub spawn  (&) { new_process-> call(context)-> condition(shift, \&spawn,  'spawn') }
285
286
2871;
288
289__DATA__
290
291=pod
292
293=head1 NAME
294
295IO::Lambda::Signal - wait for pids and signals
296
297=head1 DESCRIPTION
298
299The module provides access to the signal-based callbacks: generic signal listener
300C<signal>, process ID listener C<pid>, and the asynchronous version of I<system>
301call, C<spawn>.
302
303=head1 SYNOPSIS
304
305   use strict;
306   use IO::Lambda qw(:all);
307   use IO::Lambda::Signal qw(pid spawn);
308
309   # pid
310   my $pid = fork;
311   exec "/bin/ls" unless $pid;
312   lambda {
313       context $pid, 5;
314       pid {
315          my $ret = shift;
316	  print defined($ret) ? ("exitcode(", $ret>>8, ")\n") : "timeout\n";
317       }
318   }-> wait;
319
320   # spawn
321   this lambda {
322      context "perl -v";
323      spawn {
324      	  my ( $buf, $exitcode, $error) = @_;
325   	  print "buf=[$buf], exitcode=$exitcode, error=$error\n";
326      }
327   }-> wait;
328
329=head2 USAGE
330
331=over
332
333=item pid ($PID, $TIMEOUT) -> $?|undef
334
335Accepts PID and an optional deadline/timeout, returns either the process' exit status,
336or undef on timeout.  The corresponding lambda is C<new_pid> :
337
338   new_pid ($PID, $TIMEOUT) :: () -> $?|undef
339
340=item signal ($SIG, $TIMEOUT) -> boolean
341
342Accepts signal name and optional deadline/timeout, returns 1 if the signal was caught,
343or C<undef> on timeout.  The corresponding lambda is C<new_signal> :
344
345   new_signal ($SIG, $TIMEOUT) :: () -> boolean
346
347=item spawn (@LIST) -> ( output, $?, $!)
348
349Calls pipe open on C<@LIST>, reads all data printed by the child process,
350and awaits for the process to finish. Returns three scalars - collected output,
351process exitcode C<$?>, and an error string (usually C<$!>). The corresponding
352lambda is C<new_process> :
353
354   new_process (@LIST) :: () -> ( output, $?, $!)
355
356Lambda objects created by C<new_process> have an additional field C<'pid'>
357initialized with the process pid value.
358
359=back
360
361=head1 LIMITATION
362
363C<pid> and C<new_pid> don't work on win32 because win32 doesn't use
364SIGCHLD/waitpid.  Native implementation of C<spawn> and C<new_process> doesn't
365work for the same reason on win32 as well, therefore those were reimplemented
366using threads, and require a threaded perl.
367
368=head1 SEE ALSO
369
370L<IO::Lambda>, L<perlipc>, L<IPC::Open2>, L<IPC::Run>
371
372=head1 AUTHOR
373
374Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.
375
376=cut
377