1package AnyEvent::Task::Server::Worker;
2
3use common::sense;
4
5use AnyEvent::Util;
6use Guard;
7
8use POSIX; ## POSIX::_exit is used so we don't unlink the unix socket file created by our parent before the fork
9use IO::Select;
10use JSON::XS;
11use Scalar::Util qw/blessed/;
12
13
14my $setup_has_been_run;
15my $json;
16my $sel;
17
18
19
20sub handle_worker {
21  eval {
22    handle_worker_wrapped(@_);
23  };
24
25  POSIX::_exit(1);
26}
27
28
29sub handle_worker_wrapped {
30  my ($server, $fh, $monitor_fh) = @_;
31
32  AnyEvent::Util::fh_nonblocking $fh, 0;
33  AnyEvent::Util::fh_nonblocking $monitor_fh, 0;
34
35  $json = JSON::XS->new->utf8;
36
37  $sel = IO::Select->new;
38  $sel->add($fh, $monitor_fh);
39
40  while(1) {
41    my @all_ready = $sel->can_read;
42
43    foreach my $ready (@all_ready) {
44      if ($ready == $monitor_fh) {
45        ## Lost connection to server
46        $sel->remove($monitor_fh);
47      } elsif ($ready == $fh) {
48        process_data($server, $fh);
49      }
50    }
51  }
52}
53
54
55
56sub process_data {
57  my ($server, $fh) = @_;
58
59  scope_guard { alarm 0 };
60  local $SIG{ALRM} = sub { print STDERR "Killing hung worker ($$)\n"; POSIX::_exit(1); };
61  alarm $server->{hung_worker_timeout} if $server->{hung_worker_timeout};
62
63  my $read_rv = sysread $fh, my $buf, 4096;
64
65  if (!defined $read_rv) {
66    return if $!{EINTR};
67    POSIX::_exit(1);
68  } elsif ($read_rv == 0) {
69    POSIX::_exit(1);
70  }
71
72  for my $input ($json->incr_parse($buf)) {
73    my $output;
74    my $output_meta = {};
75
76    my $cmd = shift @$input;
77    my $input_meta = shift @$input;
78
79    if ($cmd eq 'do') {
80      my $val;
81
82      local $AnyEvent::Task::Logger::log_defer_object;
83
84      eval {
85        if (!$setup_has_been_run) {
86          $server->{setup}->();
87          $setup_has_been_run = 1;
88        }
89
90        $val = scalar $server->{interface}->(@$input);
91      };
92
93      my $err = $@;
94
95      $output_meta->{ld} = $AnyEvent::Task::Logger::log_defer_object->{msg}
96        if defined $AnyEvent::Task::Logger::log_defer_object;
97
98      if ($err) {
99        $err = "$err" if blessed $err;
100
101        $err = "setup exception: $err" if !$setup_has_been_run;
102
103        $output = ['er', $output_meta, $err,];
104      } else {
105        if (blessed $val) {
106          $val = "interface returned object: " . ref($val) . "=($val)";
107          $output = ['er', $output_meta, $val,];
108        } else {
109          $output = ['ok', $output_meta, $val,];
110        }
111      }
112
113      my $output_json = eval { encode_json($output); };
114
115      if ($@) {
116        $output = ['er', $output_meta, "error JSON encoding interface output: $@",];
117        $output_json = encode_json($output);
118      }
119
120      my_syswrite($fh, $output_json);
121    } elsif ($cmd eq 'dn') {
122      $server->{checkout_done}->();
123    } else {
124      die "unknown command: $cmd";
125    }
126  }
127}
128
129
130sub my_syswrite {
131  my ($fh, $output) = @_;
132
133  while(1) {
134    my $rv = syswrite $fh, $output;
135
136    if (!defined $rv) {
137      next if $!{EINTR};
138      POSIX::_exit(1); ## probably parent died and we're getting broken pipe
139    }
140
141    return if $rv == length($output);
142
143    POSIX::_exit(1); ## partial write: probably the socket is set nonblocking
144  }
145}
146
1471;
148