1package AnyEvent::Task::Server::Worker; 2 3use common::sense; 4 5use AnyEvent::Util; 6use Guard; 7 8use POSIX; ## POSIX::_exit is used so we don't unlink the unix socket file created by our parent before the fork 9use IO::Select; 10use JSON::XS; 11use Scalar::Util qw/blessed/; 12 13 14my $setup_has_been_run; 15my $json; 16my $sel; 17 18 19 20sub handle_worker { 21 eval { 22 handle_worker_wrapped(@_); 23 }; 24 25 POSIX::_exit(1); 26} 27 28 29sub handle_worker_wrapped { 30 my ($server, $fh, $monitor_fh) = @_; 31 32 AnyEvent::Util::fh_nonblocking $fh, 0; 33 AnyEvent::Util::fh_nonblocking $monitor_fh, 0; 34 35 $json = JSON::XS->new->utf8; 36 37 $sel = IO::Select->new; 38 $sel->add($fh, $monitor_fh); 39 40 while(1) { 41 my @all_ready = $sel->can_read; 42 43 foreach my $ready (@all_ready) { 44 if ($ready == $monitor_fh) { 45 ## Lost connection to server 46 $sel->remove($monitor_fh); 47 } elsif ($ready == $fh) { 48 process_data($server, $fh); 49 } 50 } 51 } 52} 53 54 55 56sub process_data { 57 my ($server, $fh) = @_; 58 59 scope_guard { alarm 0 }; 60 local $SIG{ALRM} = sub { print STDERR "Killing hung worker ($$)\n"; POSIX::_exit(1); }; 61 alarm $server->{hung_worker_timeout} if $server->{hung_worker_timeout}; 62 63 my $read_rv = sysread $fh, my $buf, 4096; 64 65 if (!defined $read_rv) { 66 return if $!{EINTR}; 67 POSIX::_exit(1); 68 } elsif ($read_rv == 0) { 69 POSIX::_exit(1); 70 } 71 72 for my $input ($json->incr_parse($buf)) { 73 my $output; 74 my $output_meta = {}; 75 76 my $cmd = shift @$input; 77 my $input_meta = shift @$input; 78 79 if ($cmd eq 'do') { 80 my $val; 81 82 local $AnyEvent::Task::Logger::log_defer_object; 83 84 eval { 85 if (!$setup_has_been_run) { 86 $server->{setup}->(); 87 $setup_has_been_run = 1; 88 } 89 90 $val = scalar $server->{interface}->(@$input); 91 }; 92 93 my $err = $@; 94 95 $output_meta->{ld} = $AnyEvent::Task::Logger::log_defer_object->{msg} 96 if defined $AnyEvent::Task::Logger::log_defer_object; 97 98 if ($err) { 99 $err = "$err" if blessed $err; 100 101 $err = "setup exception: $err" if !$setup_has_been_run; 102 103 $output = ['er', $output_meta, $err,]; 104 } else { 105 if (blessed $val) { 106 $val = "interface returned object: " . ref($val) . "=($val)"; 107 $output = ['er', $output_meta, $val,]; 108 } else { 109 $output = ['ok', $output_meta, $val,]; 110 } 111 } 112 113 my $output_json = eval { encode_json($output); }; 114 115 if ($@) { 116 $output = ['er', $output_meta, "error JSON encoding interface output: $@",]; 117 $output_json = encode_json($output); 118 } 119 120 my_syswrite($fh, $output_json); 121 } elsif ($cmd eq 'dn') { 122 $server->{checkout_done}->(); 123 } else { 124 die "unknown command: $cmd"; 125 } 126 } 127} 128 129 130sub my_syswrite { 131 my ($fh, $output) = @_; 132 133 while(1) { 134 my $rv = syswrite $fh, $output; 135 136 if (!defined $rv) { 137 next if $!{EINTR}; 138 POSIX::_exit(1); ## probably parent died and we're getting broken pipe 139 } 140 141 return if $rv == length($output); 142 143 POSIX::_exit(1); ## partial write: probably the socket is set nonblocking 144 } 145} 146 1471; 148