1# $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $ 2package IO::Lambda::Signal; 3use vars qw(@ISA %SIGDATA); 4@ISA = qw(Exporter); 5@EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process); 6%EXPORT_TAGS = ( all => \@EXPORT_OK); 7 8our $DEBUG = $IO::Lambda::DEBUG{signal} || 0; 9 10use strict; 11use Carp; 12use IO::Handle; 13use POSIX ":sys_wait_h"; 14use IO::Lambda qw(:all :dev); 15 16my $MASTER = bless {}, __PACKAGE__; 17 18# register yield handler 19IO::Lambda::add_loop($MASTER); 20END { IO::Lambda::remove_loop($MASTER) }; 21 22sub empty { 0 == keys %SIGDATA } 23 24sub remove 25{ 26 my $lambda = $_[1]; 27 my %rec; 28 keys %SIGDATA; 29 while ( my ($id, $v) = each %SIGDATA) { 30 for my $r (@{$v-> {lambdas}}) { 31 push @{$rec{$id}}, $r-> [0]; 32 } 33 } 34 while ( my ($id, $v) = each %rec) { 35 unwatch_signal( $id, $_ ) for @$v; 36 } 37} 38 39sub yield 40{ 41 my %v = %SIGDATA; 42 for my $id ( keys %v) { 43 my $v = $v{$id}; 44 # use mutex in case signal happens right here during handling 45 $v-> {mutex} = 0; 46 warn " yield sig $id\n" if $DEBUG > 1; 47 AGAIN: 48 next unless $v-> {signal}; 49 50 my @r = @{$v-> {lambdas}}; 51 warn " calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1; 52 for my $r ( @r) { 53 my ( $lambda, $callback, @param) = @$r; 54 $callback-> ( $lambda, @param); 55 } 56 57 my $sigs = $v-> {mutex}; 58 if ( $sigs) { 59 warn " caught $sigs signals during yield\n" if $DEBUG > 1; 60 $v-> {signal} = $sigs; 61 $v-> {mutex} -= $sigs; 62 goto AGAIN; 63 } 64 } 65} 66 67sub signal_handler 68{ 69 my $id = shift; 70 warn "SIG{$id}\n" if $DEBUG; 71 return unless exists $SIGDATA{$id}; 72 $SIGDATA{$id}-> {signal}++; 73 $SIGDATA{$id}-> {mutex}++; 74 $IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal'); 75} 76 77sub watch_signal 78{ 79 my ($id, $lambda, $callback, @param) = @_; 80 81 my $entry = [ $lambda, $callback, @param ]; 82 unless ( exists $SIGDATA{$id}) { 83 $SIGDATA{$id} = { 84 mutex => 0, 85 signal => 0, 86 save => $SIG{$id}, 87 lambdas => [$entry], 88 }; 89 $SIG{$id} = sub { signal_handler($id) }; 90 warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1; 91 } else { 92 push @{ $SIGDATA{$id}-> {lambdas} }, $entry; 93 warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2; 94 } 95} 96 97sub unwatch_signal 98{ 99 my ( $id, $lambda) = @_; 100 101 return unless exists $SIGDATA{$id}; 102 103 warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2; 104 105 @{ $SIGDATA{$id}-> {lambdas} } = 106 grep { $$_[0] != $lambda } 107 @{ $SIGDATA{$id}-> {lambdas} }; 108 109 return if @{ $SIGDATA{$id}-> {lambdas} }; 110 111 warn "uninstall signal handler for $id\n" if $DEBUG > 1; 112 113 if (defined($SIGDATA{$id}-> {save})) { 114 $SIG{$id} = $SIGDATA{$id}-> {save}; 115 } else { 116 delete $SIG{$id}; 117 } 118 delete $SIGDATA{$id}; 119} 120 121# create a lambda that either returns undef on timeout, 122# or some custom value based on passed callback 123sub signal_or_timeout_lambda 124{ 125 my ( $id, $deadline, $condition) = @_; 126 127 my $t; 128 my $q = IO::Lambda-> new; 129 130 # wait for signal 131 my $c = $q-> bind; 132 watch_signal( $id, $q, sub { 133 my @ret = $condition-> (); 134 return unless @ret; 135 136 unwatch_signal( $id, $q); 137 $q-> cancel_event($t) if $t; 138 $q-> resolve($c); 139 $q-> terminate(@ret); # result 140 undef $c; 141 undef $q; 142 }); 143 144 # or wait for timeout 145 $t = $q-> watch_timer( $deadline, sub { 146 unwatch_signal( $id, $q); 147 $q-> resolve($c); 148 undef $c; 149 undef $q; 150 return undef; #result 151 }) if $deadline; 152 153 return $q; 154} 155 156sub new_signal 157{ 158 my ( $id, $deadline) = @_; 159 signal_or_timeout_lambda( $id, $deadline, 160 sub { 1 }); 161} 162 163sub new_pid 164{ 165 my ( $pid, $deadline) = @_; 166 167 croak 'bad pid' unless $pid =~ /^\-?\d+$/; 168 warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG; 169 170 # avoid race conditions 171 my ( $savesig, $early_sigchld); 172 unless ( defined $SIGDATA{CHLD}) { 173 warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1; 174 $savesig = $SIG{CHLD}; 175 $early_sigchld = 0; 176 $SIG{CHLD} = sub { 177 warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1; 178 $early_sigchld++ 179 }; 180 } 181 182 # finished already 183 if ( waitpid( $pid, WNOHANG) > 0) { 184 if ( defined $early_sigchld) { 185 if ( defined( $savesig)) { 186 $SIG{CHLD} = $savesig; 187 } else { 188 delete $SIG{CHLD}; 189 } 190 } 191 warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1; 192 return IO::Lambda-> new-> call($?) 193 } 194 195 # wait 196 my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub { 197 my $wp = waitpid($pid, WNOHANG); 198 warn "waitpid($pid) = $wp\n" if $DEBUG > 1; 199 return if $wp == 0; 200 return $?; 201 }); 202 203 warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1; 204 205 # don't let unwatch_signal() to restore it back to us 206 $SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld; 207 208 # possibly have a race? gracefully remove the lambda 209 if ( $early_sigchld) { 210 211 # Got a signal, but that wasn't our pid. And neither it was 212 # pid that we're watching. 213 return $p if waitpid( $pid, WNOHANG) == 0; 214 215 # Our pid is finished. Unwatch the signal. 216 unwatch_signal( 'CHLD', $p); 217 # Lambda will also never get executed - cancel it 218 $p-> terminate; 219 220 warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1; 221 222 return IO::Lambda-> new-> call($?); 223 } 224 225 return $p; 226} 227 228sub new_process_posix 229{ 230lambda { 231 my $h = IO::Handle-> new; 232 my $pid = open( $h, '-|', @_); 233 234 return undef, undef, $! unless $pid; 235 236 this-> {pid} = $pid; 237 $h-> blocking(0); 238 239 my $buf; 240 context readbuf, $h, \$buf, undef; # wait for EOF 241tail { 242 my ($res, $error) = @_; 243 if ( defined $error) { 244 close $h; 245 return ($buf, $?, $error); 246 } 247 return ($buf, $?, $!) unless close $h; 248 # finished already 249 return ($buf, $?, $!) if waitpid($pid, WNOHANG) >= 0; 250 251 # wait for it 252 context $pid; 253pid { 254 return ($buf, shift); 255}}}} 256 257sub new_process_win32 258{ 259 lambda { 260 my @cmd = @_; 261 context IO::Lambda::Thread::threaded( sub { 262 my $k = `@cmd`; 263 return $? ? ( undef, $?, $! ) : ( $k, 0, undef ); 264 }); 265 &tail(); 266 } 267} 268 269sub new_process; 270if ( $^O !~ /win32/i) { 271 *new_process = \&new_process_posix; 272} else { 273 require IO::Lambda::Thread; 274 unless ( $IO::Lambda::Thread::DISABLED) { 275 *new_process = \&new_process_win32; 276 } else { 277 *new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } }; 278 } 279} 280 281# condition 282sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') } 283sub pid (&) { new_pid (context)-> condition(shift, \&pid, 'pid') } 284sub spawn (&) { new_process-> call(context)-> condition(shift, \&spawn, 'spawn') } 285 286 2871; 288 289__DATA__ 290 291=pod 292 293=head1 NAME 294 295IO::Lambda::Signal - wait for pids and signals 296 297=head1 DESCRIPTION 298 299The module provides access to the signal-based callbacks: generic signal listener 300C<signal>, process ID listener C<pid>, and the asynchronous version of I<system> 301call, C<spawn>. 302 303=head1 SYNOPSIS 304 305 use strict; 306 use IO::Lambda qw(:all); 307 use IO::Lambda::Signal qw(pid spawn); 308 309 # pid 310 my $pid = fork; 311 exec "/bin/ls" unless $pid; 312 lambda { 313 context $pid, 5; 314 pid { 315 my $ret = shift; 316 print defined($ret) ? ("exitcode(", $ret>>8, ")\n") : "timeout\n"; 317 } 318 }-> wait; 319 320 # spawn 321 this lambda { 322 context "perl -v"; 323 spawn { 324 my ( $buf, $exitcode, $error) = @_; 325 print "buf=[$buf], exitcode=$exitcode, error=$error\n"; 326 } 327 }-> wait; 328 329=head2 USAGE 330 331=over 332 333=item pid ($PID, $TIMEOUT) -> $?|undef 334 335Accepts PID and an optional deadline/timeout, returns either the process' exit status, 336or undef on timeout. The corresponding lambda is C<new_pid> : 337 338 new_pid ($PID, $TIMEOUT) :: () -> $?|undef 339 340=item signal ($SIG, $TIMEOUT) -> boolean 341 342Accepts signal name and optional deadline/timeout, returns 1 if the signal was caught, 343or C<undef> on timeout. The corresponding lambda is C<new_signal> : 344 345 new_signal ($SIG, $TIMEOUT) :: () -> boolean 346 347=item spawn (@LIST) -> ( output, $?, $!) 348 349Calls pipe open on C<@LIST>, reads all data printed by the child process, 350and awaits for the process to finish. Returns three scalars - collected output, 351process exitcode C<$?>, and an error string (usually C<$!>). The corresponding 352lambda is C<new_process> : 353 354 new_process (@LIST) :: () -> ( output, $?, $!) 355 356Lambda objects created by C<new_process> have an additional field C<'pid'> 357initialized with the process pid value. 358 359=back 360 361=head1 LIMITATION 362 363C<pid> and C<new_pid> don't work on win32 because win32 doesn't use 364SIGCHLD/waitpid. Native implementation of C<spawn> and C<new_process> doesn't 365work for the same reason on win32 as well, therefore those were reimplemented 366using threads, and require a threaded perl. 367 368=head1 SEE ALSO 369 370L<IO::Lambda>, L<perlipc>, L<IPC::Open2>, L<IPC::Run> 371 372=head1 AUTHOR 373 374Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>. 375 376=cut 377