1package MogileFS::Worker; 2use strict; 3use fields ('psock', # socket for parent/child communications 4 'last_bcast_state', # "{device|host}-$devid" => [$time, {alive|dead}] 5 'readbuf', # unparsed data from parent 6 'monitor_has_run', # true once we've heard of the monitor job being alive 7 'last_ping', # time we last said we're alive 8 'woken_up', # bool: if we've been woken up 9 'last_wake', # hashref: { $class -> time() } when we last woke up a certain job class 10 'queue_depth', # depth of a queue we queried 11 'queue_todo', # aref of hrefs of work sent from parent 12 ); 13 14use MogileFS::Util qw(error eurl decode_url_args apply_state_events); 15use MogileFS::Server; 16 17use vars ( 18 '$got_live_vs_die', # local'ized scalarref flag for whether we've 19 # gotten a live-vs-die instruction from parent 20 ); 21 22sub new { 23 my ($self, $psock) = @_; 24 $self = fields::new($self) unless ref $self; 25 26 $self->{psock} = $psock; 27 $self->{readbuf} = ''; 28 $self->{last_bcast_state} = {}; 29 $self->{monitor_has_run} = MogileFS::ProcManager->is_monitor_good; 30 $self->{last_ping} = 0; 31 $self->{last_wake} = {}; 32 $self->{queue_depth} = {}; 33 $self->{queue_todo} = {}; 34 35 IO::Handle::blocking($psock, 0); 36 return $self; 37} 38 39sub psock_fd { 40 my $self = shift; 41 return fileno($self->{psock}); 42} 43 44sub psock { 45 my $self = shift; 46 return $self->{psock}; 47} 48 49sub validate_dbh { 50 return Mgd::validate_dbh(); 51} 52 53sub monitor_has_run { 54 my $self = shift; 55 return $self->{monitor_has_run} ? 1 : 0; 56} 57 58sub forget_that_monitor_has_run { 59 my $self = shift; 60 $self->{monitor_has_run} = 0; 61} 62 63sub wait_for_monitor { 64 my $self = shift; 65 while (! $self->monitor_has_run) { 66 $self->read_from_parent(1); 67 $self->still_alive; 68 } 69} 70 71# method that workers can call just to write something to the parent, so worker 72# doesn't get killed. (during idle/slow operation, say) 73# returns current time, so caller can avoid a time() call as well, for its loop 74sub still_alive { 75 my $self = shift; 76 my $now = time(); 77 if ($now > $self->{last_ping} + ($self->watchdog_timeout / 4)) { 78 $self->send_to_parent(":still_alive"); # a no-op, just for the watchdog 79 $self->{last_ping} = $now; 80 } 81 return $now; 82} 83 84sub send_to_parent { 85 my $self = shift; 86 87 # can be called as package method: MogileFS::Worker->send_to_parent... 88 unless (ref $self) { 89 $self = MogileFS::ProcManager->is_child 90 or return; 91 } 92 93 my $write = "$_[0]\r\n"; 94 my $totallen = length $write; 95 my $rv = syswrite($self->{psock}, $write); 96 return 1 if defined $rv && $rv == $totallen; 97 die "Error writing to parent process: $!" if $! && ! $!{EAGAIN}; 98 99 $rv ||= 0; # could've been undef, if EAGAIN immediately. 100 my $remain = $totallen - $rv; 101 my $offset = $rv; 102 while ($remain > 0) { 103 MogileFS::Util::wait_for_writeability(fileno($self->{psock}), 30) 104 or die "Parent not writable in 30 seconds"; 105 106 $rv = syswrite($self->{psock}, $write, $remain, $offset); 107 die "Error writing to parent process (in loop): $!" if $! && ! $!{EAGAIN}; 108 if ($rv) { 109 $remain -= $rv; 110 $offset += $rv; 111 } 112 } 113 die "remain is negative: $remain" if $remain < 0; 114 return 1; 115} 116 117# override in children 118sub watchdog_timeout { 119 return 10; 120} 121 122# should be overridden by workers to process worker-specific directives 123# from the parent process. return 1 if you recognize the command, 0 otherwise. 124sub process_line { 125 my ($self, $lineref) = @_; 126 return 0; 127} 128 129sub read_from_parent { 130 my $self = shift; 131 my $timeout = shift || 0; 132 my $psock = $self->{psock}; 133 134 # while things are immediately available, 135 # (or optionally sleep a bit) 136 while (MogileFS::Util::wait_for_readability(fileno($psock), $timeout)) { 137 $timeout = 0; # only wait on the timeout for the first read. 138 my $buf; 139 my $rv = sysread($psock, $buf, Mgd::UNIX_RCVBUF_SIZE()); 140 if (!$rv) { 141 if (defined $rv) { 142 die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n"; 143 } else { 144 die "Error reading pipe from parent: $!\n"; 145 } 146 } 147 148 if ($Mgd::POST_SLEEP_DEBUG) { 149 my $out = $buf; 150 $out =~ s/\s+$//; 151 warn "proc ${self}[$$] read: [$out]\n" 152 } 153 $self->{readbuf} .= $buf; 154 155 while ($self->{readbuf} =~ s/^(.+?)\r?\n//) { 156 my $line = $1; 157 158 next if $self->process_generic_command(\$line); 159 my $ok = $self->process_line(\$line); 160 unless ($ok) { 161 error("Unrecognized command from parent: $line"); 162 } 163 } 164 } 165} 166 167sub parent_ping { 168 my $self = shift; 169 my $psock = $self->{psock}; 170 $self->send_to_parent(':ping'); 171 172 my $got_reply = 0; 173 die "recursive parent_ping!" if $got_live_vs_die; 174 local $got_live_vs_die = \$got_reply; 175 176 my $loops = 0; 177 178 while (!$got_reply) { 179 $self->read_from_parent; 180 return if $got_reply; 181 182 $loops++; 183 select undef, undef, undef, 0.20; 184 if ($loops > 5) { 185 warn "No simple reply from parent to child $self [$$] in $loops 0.2second loops.\n"; 186 die "No answer in 4 seconds from parent to child $self [$$], dying" if $loops > 20; 187 } 188 } 189} 190 191# tries to parse generic (not job-specific) commands sent from parent 192# to child. returns 1 on success, or 0 if command given isn't generic, 193# and child should parse. 194# lineref doesn't have \r\n at end. 195sub process_generic_command { 196 my ($self, $lineref) = @_; 197 return 0 unless $$lineref =~ /^:/; # all generic commands start with colon 198 199 if ($$lineref =~ /^:shutdown/) { 200 $$got_live_vs_die = 1 if $got_live_vs_die; 201 exit 0; 202 } 203 204 if ($$lineref =~ /^:stay_alive/) { 205 $$got_live_vs_die = 1 if $got_live_vs_die; 206 return 1; 207 } 208 209 if ($$lineref =~ /^:monitor_events/) { 210 apply_state_events($lineref); 211 return 1; 212 } 213 214 if ($$lineref =~ /^:monitor_has_run/) { 215 $self->{monitor_has_run} = 1; 216 return 1; 217 } 218 219 if ($$lineref =~ /^:wake_up/) { 220 $self->{woken_up} = 1; 221 return 1; 222 } 223 224 if ($$lineref =~ /^:set_config_from_parent (\S+) (.+)/) { 225 # the 'no_broadcast' API keeps us from looping forever. 226 MogileFS::Config->set_config_no_broadcast($1, $2); 227 return 1; 228 } 229 230 # queue_name depth 231 if ($$lineref =~ /^:queue_depth (\w+) (\d+)/) { 232 $self->queue_depth($1, $2); 233 return 1; 234 } 235 236 # queue_name encoded_item 237 if ($$lineref =~ /^:queue_todo (\w+) (.+)/) { 238 # TODO: Use the accessor. 239 push(@{$self->{queue_todo}->{$1}}, decode_url_args(\$2)); 240 return 1; 241 } 242 243 # TODO: warn on unknown commands? 244 245 return 0; 246} 247 248sub queue_depth { 249 my MogileFS::Worker $self = shift; 250 my $type = shift; 251 $self->{queue_depth}->{$type} ||= 0; 252 return $self->{queue_depth}->{$type} unless @_; 253 return $self->{queue_depth}->{$type} = shift; 254} 255 256sub queue_todo { 257 my MogileFS::Worker $self = shift; 258 my $type = shift; 259 $self->{queue_todo}->{$type} ||= []; 260 push(@{$self->{queue_todo}->{$type}}, @_) if @_; 261 return $self->{queue_todo}->{$type}; 262} 263 264sub was_woken_up { 265 my MogileFS::Worker $self = shift; 266 return $self->{woken_up}; 267} 268 269sub forget_woken_up { 270 my MogileFS::Worker $self = shift; 271 $self->{woken_up} = 0; 272} 273 274# don't wake processes more than once a second... not necessary. 275sub wake_a { 276 my ($self, $class) = @_; 277 my $now = time(); 278 return if ($self->{last_wake}{$class}||0) == $now; 279 $self->{last_wake}{$class} = $now; 280 $self->send_to_parent(":wake_a $class"); 281} 282 2831; 284 285# Local Variables: 286# mode: perl 287# c-basic-indent: 4 288# indent-tabs-mode: nil 289# End: 290 291