1package MogileFS::Worker;
2use strict;
3use fields ('psock',              # socket for parent/child communications
4            'last_bcast_state',   # "{device|host}-$devid" => [$time, {alive|dead}]
5            'readbuf',            # unparsed data from parent
6            'monitor_has_run',    # true once we've heard of the monitor job being alive
7            'last_ping',          # time we last said we're alive
8            'woken_up',           # bool: if we've been woken up
9            'last_wake',          # hashref: { $class -> time() } when we last woke up a certain job class
10            'queue_depth',        # depth of a queue we queried
11            'queue_todo',         # aref of hrefs of work sent from parent
12            );
13
14use MogileFS::Util qw(error eurl decode_url_args apply_state_events);
15use MogileFS::Server;
16
17use vars (
18          '$got_live_vs_die',    # local'ized scalarref flag for whether we've
19                                 # gotten a live-vs-die instruction from parent
20          );
21
22sub new {
23    my ($self, $psock) = @_;
24    $self = fields::new($self) unless ref $self;
25
26    $self->{psock}            = $psock;
27    $self->{readbuf}          = '';
28    $self->{last_bcast_state} = {};
29    $self->{monitor_has_run}  = MogileFS::ProcManager->is_monitor_good;
30    $self->{last_ping}        = 0;
31    $self->{last_wake}        = {};
32    $self->{queue_depth}      = {};
33    $self->{queue_todo}       = {};
34
35    IO::Handle::blocking($psock, 0);
36    return $self;
37}
38
39sub psock_fd {
40    my $self = shift;
41    return fileno($self->{psock});
42}
43
44sub psock {
45    my $self = shift;
46    return $self->{psock};
47}
48
49sub validate_dbh {
50    return Mgd::validate_dbh();
51}
52
53sub monitor_has_run {
54    my $self = shift;
55    return $self->{monitor_has_run} ? 1 : 0;
56}
57
58sub forget_that_monitor_has_run {
59    my $self = shift;
60    $self->{monitor_has_run} = 0;
61}
62
63sub wait_for_monitor {
64    my $self = shift;
65    while (! $self->monitor_has_run) {
66        $self->read_from_parent(1);
67        $self->still_alive;
68    }
69}
70
71# method that workers can call just to write something to the parent, so worker
72# doesn't get killed.  (during idle/slow operation, say)
73# returns current time, so caller can avoid a time() call as well, for its loop
74sub still_alive {
75    my $self = shift;
76    my $now = time();
77    if ($now > $self->{last_ping} + ($self->watchdog_timeout / 4)) {
78        $self->send_to_parent(":still_alive");  # a no-op, just for the watchdog
79        $self->{last_ping} = $now;
80    }
81    return $now;
82}
83
84sub send_to_parent {
85    my $self = shift;
86
87    # can be called as package method:  MogileFS::Worker->send_to_parent...
88    unless (ref $self) {
89        $self = MogileFS::ProcManager->is_child
90            or return;
91    }
92
93    my $write = "$_[0]\r\n";
94    my $totallen = length $write;
95    my $rv = syswrite($self->{psock}, $write);
96    return 1 if defined $rv && $rv == $totallen;
97    die "Error writing to parent process: $!" if $! && ! $!{EAGAIN};
98
99    $rv ||= 0;  # could've been undef, if EAGAIN immediately.
100    my $remain = $totallen - $rv;
101    my $offset = $rv;
102    while ($remain > 0) {
103        MogileFS::Util::wait_for_writeability(fileno($self->{psock}), 30)
104            or die "Parent not writable in 30 seconds";
105
106        $rv = syswrite($self->{psock}, $write, $remain, $offset);
107        die "Error writing to parent process (in loop): $!" if $! && ! $!{EAGAIN};
108        if ($rv) {
109            $remain -= $rv;
110            $offset += $rv;
111        }
112    }
113    die "remain is negative:  $remain" if $remain < 0;
114    return 1;
115}
116
117# override in children
118sub watchdog_timeout {
119    return 10;
120}
121
122# should be overridden by workers to process worker-specific directives
123# from the parent process.  return 1 if you recognize the command, 0 otherwise.
124sub process_line {
125    my ($self, $lineref) = @_;
126    return 0;
127}
128
129sub read_from_parent {
130    my $self    = shift;
131    my $timeout = shift || 0;
132    my $psock = $self->{psock};
133
134    # while things are immediately available,
135    # (or optionally sleep a bit)
136    while (MogileFS::Util::wait_for_readability(fileno($psock), $timeout)) {
137        $timeout = 0; # only wait on the timeout for the first read.
138        my $buf;
139        my $rv = sysread($psock, $buf, Mgd::UNIX_RCVBUF_SIZE());
140        if (!$rv) {
141            if (defined $rv) {
142                die "While reading pipe from parent, got EOF.  Parent's gone.  Quitting.\n";
143            } else {
144                die "Error reading pipe from parent: $!\n";
145            }
146        }
147
148        if ($Mgd::POST_SLEEP_DEBUG) {
149            my $out = $buf;
150            $out =~ s/\s+$//;
151            warn "proc ${self}[$$] read: [$out]\n"
152        }
153        $self->{readbuf} .= $buf;
154
155        while ($self->{readbuf} =~ s/^(.+?)\r?\n//) {
156            my $line = $1;
157
158            next if $self->process_generic_command(\$line);
159            my $ok = $self->process_line(\$line);
160            unless ($ok) {
161                error("Unrecognized command from parent: $line");
162            }
163        }
164    }
165}
166
167sub parent_ping {
168    my $self = shift;
169    my $psock = $self->{psock};
170    $self->send_to_parent(':ping');
171
172    my $got_reply = 0;
173    die "recursive parent_ping!" if $got_live_vs_die;
174    local $got_live_vs_die = \$got_reply;
175
176    my $loops = 0;
177
178    while (!$got_reply) {
179        $self->read_from_parent;
180        return if $got_reply;
181
182        $loops++;
183        select undef, undef, undef, 0.20;
184        if ($loops > 5) {
185            warn "No simple reply from parent to child $self [$$] in $loops 0.2second loops.\n";
186            die "No answer in 4 seconds from parent to child $self [$$], dying" if $loops > 20;
187        }
188    }
189}
190
191# tries to parse generic (not job-specific) commands sent from parent
192# to child.  returns 1 on success, or 0 if command given isn't generic,
193# and child should parse.
194# lineref doesn't have \r\n at end.
195sub process_generic_command {
196    my ($self, $lineref) = @_;
197    return 0 unless $$lineref =~ /^:/;  # all generic commands start with colon
198
199    if ($$lineref =~ /^:shutdown/) {
200        $$got_live_vs_die = 1 if $got_live_vs_die;
201        exit 0;
202    }
203
204    if ($$lineref =~ /^:stay_alive/) {
205        $$got_live_vs_die = 1 if $got_live_vs_die;
206        return 1;
207    }
208
209    if ($$lineref =~ /^:monitor_events/) {
210        apply_state_events($lineref);
211        return 1;
212    }
213
214    if ($$lineref =~ /^:monitor_has_run/) {
215        $self->{monitor_has_run} = 1;
216        return 1;
217    }
218
219    if ($$lineref =~ /^:wake_up/) {
220        $self->{woken_up} = 1;
221        return 1;
222    }
223
224    if ($$lineref =~ /^:set_config_from_parent (\S+) (.+)/) {
225        # the 'no_broadcast' API keeps us from looping forever.
226        MogileFS::Config->set_config_no_broadcast($1, $2);
227        return 1;
228    }
229
230    # queue_name depth
231    if ($$lineref =~ /^:queue_depth (\w+) (\d+)/) {
232        $self->queue_depth($1, $2);
233        return 1;
234    }
235
236    # queue_name encoded_item
237    if ($$lineref =~ /^:queue_todo (\w+) (.+)/) {
238        # TODO: Use the accessor.
239        push(@{$self->{queue_todo}->{$1}}, decode_url_args(\$2));
240        return 1;
241    }
242
243    # TODO: warn on unknown commands?
244
245    return 0;
246}
247
248sub queue_depth {
249    my MogileFS::Worker $self = shift;
250    my $type = shift;
251    $self->{queue_depth}->{$type} ||= 0;
252    return $self->{queue_depth}->{$type} unless @_;
253    return $self->{queue_depth}->{$type} = shift;
254}
255
256sub queue_todo {
257    my MogileFS::Worker $self = shift;
258    my $type = shift;
259    $self->{queue_todo}->{$type} ||= [];
260    push(@{$self->{queue_todo}->{$type}}, @_) if @_;
261    return $self->{queue_todo}->{$type};
262}
263
264sub was_woken_up {
265    my MogileFS::Worker $self = shift;
266    return $self->{woken_up};
267}
268
269sub forget_woken_up {
270    my MogileFS::Worker $self = shift;
271    $self->{woken_up} = 0;
272}
273
274# don't wake processes more than once a second... not necessary.
275sub wake_a {
276    my ($self, $class) = @_;
277    my $now = time();
278    return if ($self->{last_wake}{$class}||0) == $now;
279    $self->{last_wake}{$class} = $now;
280    $self->send_to_parent(":wake_a $class");
281}
282
2831;
284
285# Local Variables:
286# mode: perl
287# c-basic-indent: 4
288# indent-tabs-mode: nil
289# End:
290
291