1package MogileFS::ProcManager;
2use strict;
3use warnings;
4use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
5use Symbol;
6use Socket;
7use MogileFS::Connection::Client;
8use MogileFS::Connection::Worker;
9use MogileFS::Util qw(apply_state_events);
10
11# This class handles keeping lists of workers and clients and
12# assigning them to each other when things happen.  You don't actually
13# instantiate a procmanager.  the class itself holds all state.
14
15# Mappings: fd => [ clientref, jobstring, starttime ]
16# queues are just lists of Client class objects
17# ChildrenByJob: job => { pid => $client }
18# ErrorsTo: fid => Client
19# RecentQueries: [ string, string, string, ... ]
20# Stats: element => number
21our ($IsChild, @RecentQueries,
22     %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
23
24our $starttime = time(); # time we got going
25sub server_starttime { return $starttime }
26
27my @IdleQueryWorkers;  # workers that are idle, able to process commands  (MogileFS::Worker::Query, ...)
28my @PendingQueries;    # [ MogileFS::Connection::Client, "$ip $query" ]
29
30my %idle_workers = (); # 'job' -> {href of idle workers}
31my %pending_work = (); # 'job' -> [aref of pending work]
32
33$IsChild = 0;  # either false if we're the parent, or a MogileFS::Worker object
34
35# keep track of what all child pids are doing, and what jobs are being
36# satisifed.
37my %child  = ();    # pid -> MogileFS::Connection::Worker
38my %todie  = ();    # pid -> 1 (lists pids that we've asked to die)
39my %jobs   = ();    # jobname -> [ min, current ]
40
41# we start job_master after monitor has run, but this avoid undef warning
42# in job_needs_reduction
43$jobs{job_master} = [ 0, 0 ];
44
45our $allkidsup = 0;  # if true, all our kids are running. set to 0 when a kid dies.
46
47my @prefork_cleanup;  # subrefs to run to clean stuff up before we make a new child
48
49*error = \&Mgd::error;
50
51my $monitor_good = 0; # ticked after monitor executes once after startup
52
53my $nowish;  # updated approximately once per second
54
55# it's pointless to spawn certain jobs without a job_master
56my $want_job_master;
57my %needs_job_master = (
58    delete => 1,
59    fsck => 1,
60    replicate => 1,
61);
62
63sub push_pre_fork_cleanup {
64    my ($class, $code) = @_;
65    push @prefork_cleanup, $code;
66}
67
68sub RecentQueries {
69    return @RecentQueries;
70}
71
72sub write_pidfile {
73    my $class = shift;
74    my $pidfile = MogileFS->config("pidfile")
75        or return 1;
76    my $fh;
77    unless (open($fh, ">$pidfile")) {
78        Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
79        return 0;
80    }
81    unless ((print $fh "$$\n") && close($fh)) {
82        Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
83        remove_pidfile();
84        return 0;
85    }
86    return 1;
87}
88
89sub remove_pidfile {
90    my $class = shift;
91    my $pidfile = MogileFS->config("pidfile")
92        or return;
93    unlink $pidfile;
94    return 1;
95}
96
97sub set_min_workers {
98    my ($class, $job, $min) = @_;
99    $jobs{$job} ||= [undef, 0];   # [min, current]
100    $jobs{$job}->[0] = $min;
101
102    # TODO: set allkipsup false, so spawner re-checks?
103}
104
105sub job_to_class_suffix {
106    my ($class, $job) = @_;
107    return {
108        fsck        => "Fsck",
109        queryworker => "Query",
110        delete      => "Delete",
111        replicate   => "Replicate",
112        reaper      => "Reaper",
113        monitor     => "Monitor",
114        job_master  => "JobMaster",
115    }->{$job};
116}
117
118sub job_to_class {
119    my ($class, $job) = @_;
120    my $suffix = $class->job_to_class_suffix($job) or return "";
121    return "MogileFS::Worker::$suffix";
122}
123
124sub child_pids {
125    return keys %child;
126}
127
128sub WatchDog {
129    foreach my $pid (keys %child) {
130        my MogileFS::Connection::Worker $child = $child{$pid};
131        my $healthy = $child->watchdog_check;
132        next if $healthy;
133
134        # special $todie level of 2 means the watchdog tried to kill it.
135        # TODO: Should be a CONSTANT?
136        next if $todie{$pid} && $todie{$pid} == 2;
137        note_pending_death($child->job, $pid, 2);
138
139        error("Watchdog killing worker $pid (" . $child->job . ")");
140        kill 9, $pid;
141    }
142}
143
144# returns a sub that Danga::Socket calls after each event loop round.
145# the sub must return 1 for the program to continue running.
146sub PostEventLoopChecker {
147    my $lastspawntime = 0; # time we last ran spawn_children sub
148
149    return sub {
150        MogileFS::Connection::Client->ProcessPipelined;
151        # run only once per second
152        $nowish = time();
153        return 1 unless $nowish > $lastspawntime;
154        $lastspawntime = $nowish;
155
156        MogileFS::ProcManager->WatchDog;
157        MogileFS::Connection::Client->WriterWatchDog;
158
159        # see if anybody has died, but don't hang up on doing so
160        while(my $pid = waitpid -1, WNOHANG) {
161            last unless $pid > 0;
162            $allkidsup = 0; # know something died
163
164            # when a child dies, figure out what it was doing
165            # and note that job has one less worker
166            my $jobconn;
167            if (($jobconn = delete $child{$pid})) {
168                my $job = $jobconn->job;
169                my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
170                error("Child $pid ($job) died: $? ($extra)");
171                MogileFS::ProcManager->NoteDeadChild($pid);
172                $jobconn->close;
173
174                if (my $jobstat = $jobs{$job}) {
175                    # if the pid is in %todie, then we have asked it to shut down
176                    # and have already decremented the jobstat counter and don't
177                    # want to do it again
178                    unless (my $true = delete $todie{$pid}) {
179                        # decrement the count of currently running jobs
180                        $jobstat->[1]--;
181                    }
182                }
183            }
184        }
185
186        return 1 if $allkidsup;
187
188        # foreach job, fork enough children
189        while (my ($job, $jobstat) = each %jobs) {
190
191            # do not spawn job_master-dependent workers if we have no job_master
192            next if (! $want_job_master && $needs_job_master{$job});
193
194            my $need = $jobstat->[0] - $jobstat->[1];
195            if ($need > 0) {
196                error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
197                for (1..$need) {
198                    my $jobconn = make_new_child($job)
199                        or return 1;  # basically bail: true value keeps event loop running
200                    $child{$jobconn->pid} = $jobconn;
201
202                    # now increase the count of processes currently doing this job
203                    $jobstat->[1]++;
204                }
205            }
206        }
207
208        # if we got this far, all jobs have been re-created.  note that
209        # so we avoid more CPU usage in this post-event-loop callback later
210        $allkidsup = 1;
211
212        # true value keeps us running:
213        return 1;
214    };
215}
216
217sub make_new_child {
218    my $job = shift;
219
220    my $pid;
221    my $sigset;
222
223    # Ensure our dbh is closed before we fork anything.
224    # Causes problems on some platforms (Solaris+Postgres)
225    Mgd::close_store();
226
227    # block signal for fork
228    $sigset = POSIX::SigSet->new(SIGINT);
229    sigprocmask(SIG_BLOCK, $sigset)
230        or return error("Can't block SIGINT for fork: $!");
231
232    socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
233        or die( "socketpair failed: $!" );
234
235    return error("fork failed creating $job: $!")
236        unless defined ($pid = fork);
237
238    # enable auto-flush, so it's not pipe-buffered between parent/child
239    select((select( $parents_ipc ), $|++)[0]);
240    select((select( $childs_ipc  ), $|++)[0]);
241
242    # if i'm the parent
243    if ($pid) {
244        sigprocmask(SIG_UNBLOCK, $sigset)
245            or return error("Can't unblock SIGINT for fork: $!");
246
247        close($childs_ipc);  # unnecessary but explicit
248        IO::Handle::blocking($parents_ipc, 0);
249
250        my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
251        $worker_conn->pid($pid);
252        $worker_conn->job($job);
253        MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
254        return $worker_conn;
255    }
256
257    # let children have different random number seeds
258    srand();
259
260    # as a child, we want to close these and ignore them
261    $_->() foreach @prefork_cleanup;
262    close($parents_ipc);
263    undef $parents_ipc;
264
265    $SIG{INT} = 'DEFAULT';
266    $SIG{TERM} = 'DEFAULT';
267    $0 .= " [$job]";
268
269    # unblock signals
270    sigprocmask(SIG_UNBLOCK, $sigset)
271        or return error("Can't unblock SIGINT for fork: $!");
272
273    # now call our job function
274    my $class = MogileFS::ProcManager->job_to_class($job)
275        or die "No worker class defined for job '$job'\n";
276    my $worker = $class->new($childs_ipc);
277
278    # set our frontend into child mode
279    MogileFS::ProcManager->SetAsChild($worker);
280
281    $worker->work;
282    exit 0;
283}
284
285sub PendingQueryCount {
286    return scalar @PendingQueries;
287}
288
289sub BoredQueryWorkerCount {
290    return scalar @IdleQueryWorkers;
291}
292
293sub QueriesInProgressCount {
294    return scalar keys %Mappings;
295}
296
297# Toss in any queue depths.
298sub StatsHash {
299    for my $job (keys %pending_work) {
300        $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
301    }
302    return \%Stats;
303}
304
305sub foreach_job {
306    my ($class, $cb) = @_;
307    foreach my $job (sort keys %ChildrenByJob) {
308        my $ct = scalar(keys %{$ChildrenByJob{$job}});
309        $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
310    }
311}
312
313sub foreach_pending_query {
314    my ($class, $cb) = @_;
315    foreach my $clq (@PendingQueries) {
316        $cb->($clq->[0],  # client object,
317              $clq->[1],  # "$ip $query"
318              );
319    }
320}
321
322sub is_monitor_good {
323    return $monitor_good;
324}
325
326sub is_valid_job {
327    my ($class, $job) = @_;
328    return defined $jobs{$job};
329}
330
331sub valid_jobs {
332    return sort keys %jobs;
333}
334
335sub request_job_process {
336    my ($class, $job, $n) = @_;
337    return 0 unless $class->is_valid_job($job);
338    return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case
339
340    $want_job_master = $n if ($job eq "job_master");
341
342    $jobs{$job}->[0] = $n;
343    $allkidsup = 0;
344
345    # try to clean out the queryworkers (if that's what we're doing?)
346    MogileFS::ProcManager->CullQueryWorkers
347        if $job eq 'queryworker';
348
349    # other workers listening off of a queue should be pinging parent
350    # frequently. shouldn't explicitly kill them.
351}
352
353
354# when a child is spawned, they'll have copies of all the data from the
355# parent, but they don't need it.  this method is called when you want
356# to indicate that this procmanager is running on a child and should clean.
357sub SetAsChild {
358    my ($class, $worker) = @_;
359
360    @IdleQueryWorkers = ();
361    @PendingQueries = ();
362    %Mappings = ();
363    $IsChild = $worker;
364    %ErrorsTo = ();
365    %idle_workers = ();
366    %pending_work = ();
367    %ChildrenByJob = ();
368    %child = ();
369    %todie = ();
370    %jobs = ();
371
372    # we just forked from our parent process, also using Danga::Socket,
373    # so we need to lose all that state and start afresh.
374    Danga::Socket->Reset;
375    MogileFS::Connection::Client->Reset;
376}
377
378# called when a child has died.  a child is someone doing a job for us,
379# but it might be a queryworker or any other type of job.  we just want
380# to remove them from our list of children.  they're actually respawned
381# by the make_new_child function elsewhere in Mgd.
382sub NoteDeadChild {
383    my $pid = $_[1];
384    foreach my $job (keys %ChildrenByJob) {
385        return if # bail out if we actually delete one
386            delete $ChildrenByJob{$job}->{$pid};
387    }
388}
389
390# called when a client dies.  clients are users, management or non.
391# we just want to remove them from the error reporting interface, if
392# they happen to be part of it.
393sub NoteDeadClient {
394    my $client = $_[1];
395    delete $ErrorsTo{$client->{fd}};
396}
397
398# called when the error function in Mgd is called and we're in the parent,
399# so it's pretty simple that basically we just spit it out to folks listening
400# to errors
401sub NoteError {
402    return unless %ErrorsTo;
403
404    my $msg = ":: ${$_[1]}\r\n";
405    foreach my $client (values %ErrorsTo) {
406        $client->write(\$msg);
407    }
408}
409
410sub RemoveErrorWatcher {
411    my ($class, $client) = @_;
412    return delete $ErrorsTo{$client->{fd}};
413}
414
415sub AddErrorWatcher {
416    my ($class, $client) = @_;
417    $ErrorsTo{$client->{fd}} = $client;
418}
419
420# one-time initialization of a new worker connection
421sub RegisterWorkerConn {
422    my MogileFS::Connection::Worker $worker = $_[1];
423    $worker->watch_read(1);
424
425    #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
426
427    # now do any special case startup
428    if ($worker->job eq 'queryworker') {
429        MogileFS::ProcManager->NoteIdleQueryWorker($worker);
430    }
431
432    # add to normal list
433    $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
434
435}
436
437sub EnqueueCommandRequest {
438    my ($class, $line, $client) = @_;
439    push @PendingQueries, [
440                           $client,
441                           ($client->peer_ip_string || '0.0.0.0') . " $line"
442                           ];
443    MogileFS::ProcManager->ProcessQueues;
444    if (@PendingQueries) {
445        # Don't like the name. Feel free to change if you find better.
446        $Stats{times_out_of_qworkers}++;
447    }
448}
449
450# puts a worker back in the queue, deleting any outstanding jobs in
451# the mapping list for this fd.
452sub NoteIdleQueryWorker {
453    # first arg is class, second is worker
454    my MogileFS::Connection::Worker $worker = $_[1];
455    delete $Mappings{$worker->{fd}};
456
457    # see if we need to kill off some workers
458    if (job_needs_reduction('queryworker')) {
459        Mgd::error("Reducing queryworker headcount by 1.");
460        MogileFS::ProcManager->AskWorkerToDie($worker);
461        return;
462    }
463
464    # must be okay, so put it in the queue
465    push @IdleQueryWorkers, $worker;
466    MogileFS::ProcManager->ProcessQueues;
467}
468
469# if we need to kill off a worker, this function takes in the WorkerConn
470# object, tells it to die, marks us as having requested its death, and decrements
471# the count of running jobs.
472sub AskWorkerToDie {
473    my MogileFS::Connection::Worker $worker = $_[1];
474    note_pending_death($worker->job, $worker->pid);
475    $worker->write(":shutdown\r\n");
476}
477
478# kill bored query workers so we can get down to the level requested.  this
479# continues killing until we run out of folks to kill.
480sub CullQueryWorkers {
481    while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
482        my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
483        MogileFS::ProcManager->AskWorkerToDie($worker);
484    }
485}
486
487# called when we get a response from a worker.  this reenqueues the
488# worker so it can handle another response as well as passes the answer
489# back on to the client.
490sub HandleQueryWorkerResponse {
491    # got a response from a worker
492    my MogileFS::Connection::Worker $worker;
493    my $line;
494    (undef, $worker, $line) = @_;
495
496    return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
497    return unless $worker && $Mappings{$worker->{fd}};
498
499    # get the client we're working with (if any)
500    my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
501
502    # if we have no client, then we just got a standard message from
503    # the queryworker and need to pass it up the line
504    return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
505
506    # at this point it was a command response, but if the client has gone
507    # away, just reenqueue this query worker
508    return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
509
510    # <numeric id> [client-side time to complete] <response>
511    my ($time, $id, $res);
512    if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
513        # save time and response for use later
514        # Note the optional negative sign in the regexp.  Somebody
515        # on the mailing list was getting a time of -0.0000, causing
516        # broken connections.
517        ($id, $time, $res) = ($1, $2, $3);
518    }
519
520    # now, if it doesn't match
521    unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
522        $id   = "<undef>" unless defined $id;
523        $line = "<undef>" unless defined $line;
524        $line =~ s/\n/\\n/g;
525        $line =~ s/\r/\\r/g;
526        Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
527        $client->close('worker_mismatch');
528        return MogileFS::ProcManager->AskWorkerToDie($worker);
529    }
530
531    # now time this interval and add to @RecentQueries
532    my $tinterval = Time::HiRes::time() - $starttime;
533    push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
534    shift @RecentQueries if scalar(@RecentQueries) > 50;
535
536    # send text to client, put worker back in queue
537    $client->write("$res\r\n");
538    MogileFS::ProcManager->NoteIdleQueryWorker($worker);
539}
540
541# new per-worker magic internal queue runner.
542# TODO: Since this fires only when a master asks or a worker reports
543# in bored, it should just operate on that *one* queue?
544#
545# new change: if worker in $job, but not in _bored, do not send work.
546# if work is received, only delete from _bored
547sub process_worker_queues {
548    return if $IsChild;
549
550    JOB: while (my ($job, $queue) = each %pending_work) {
551        next JOB unless @$queue;
552        next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
553        WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
554            my MogileFS::Connection::Worker $worker =
555                delete $idle_workers{_bored}->{$worker_key};
556            if (!defined $worker || $worker->{closed}) {
557                delete $idle_workers{$job}->{$worker_key};
558                next WORKER;
559            }
560
561            # allow workers to grab a linear range of work.
562            while (@$queue && $worker->wants_todo($job)) {
563                $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
564                $Stats{'work_sent_to_' . $job}++;
565            }
566            next JOB unless @$queue;
567        }
568    }
569}
570
571# called from various spots to empty the queues of available pairs.
572sub ProcessQueues {
573    return if $IsChild;
574
575    # try to match up a client with a worker
576    while (@IdleQueryWorkers && @PendingQueries) {
577        # get client that isn't closed
578        my $clref;
579        while (!$clref && @PendingQueries) {
580            $clref = shift @PendingQueries
581                or next;
582            if ($clref->[0]->{closed}) {
583                $clref = undef;
584                next;
585            }
586        }
587        next unless $clref;
588
589        # get worker and make sure it's not closed already
590        my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers;
591        if (!defined $worker || $worker->{closed}) {
592            unshift @PendingQueries, $clref;
593            next;
594        }
595
596        # put in mapping and send data to worker
597        push @$clref, Time::HiRes::time();
598        $Mappings{$worker->{fd}} = $clref;
599        $Stats{queries}++;
600
601        # increment our counter so we know what request counter this is going out
602        $worker->{reqid}++;
603        # so we're writing a string of the form:
604        #     123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
605        $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
606    }
607}
608
609# send short descriptions of commands we support to the user
610sub SendHelp {
611    my $client = $_[1];
612
613    # send general purpose help
614    $client->write(<<HELP);
615Mogilefsd admin commands:
616
617    !version    Server version
618    !recent     Recently executed queries and how long they took.
619    !queue      Queries that are pending execution.
620    !stats      General stats on what we\'re up to.
621    !watch      Observe errors/messages from children.
622    !jobs       Outstanding job counts, desired level, and pids.
623    !shutdown   Immediately kill all of mogilefsd.
624
625    !to <job class> <message>
626                Send <message> to all workers of <job class>.
627                Mostly used for debugging.
628
629    !want <count> <job class>
630                Alter the level of workers of this class desired.
631                Example: !want 20 queryworker, !want 3 replicate.
632                See !jobs for what jobs are available.
633
634HELP
635
636}
637
638# a child has contacted us with some command/status/something.
639sub HandleChildRequest {
640    if ($IsChild) {
641        Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
642    }
643
644    # if they have no job set, then their first line is what job they are
645    # and not a command.  they also specify their pid, just so we know what
646    # connection goes with what pid, in case it's ever useful information.
647    my MogileFS::Connection::Worker $child = $_[1];
648    my $cmd = $_[2];
649
650    die "Child $child with no pid?" unless $child->job;
651
652    # at this point we've got a command of some sort
653    if ($cmd =~ /^error (.+)$/i) {
654        # pass it on to our error handler, prefaced with the child's job
655        Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
656
657    } elsif ($cmd =~ /^debug (.+)$/i) {
658        # pass it on to our error handler, prefaced with the child's job
659        Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
660
661    } elsif ($cmd =~ /^queue_depth (\w+)/) {
662        my $job   = $1;
663        if ($job eq 'all') {
664            for my $qname (keys %pending_work) {
665                my $depth = @{$pending_work{$qname}};
666                $child->write(":queue_depth $qname $depth\r\n");
667            }
668        } else {
669            my $depth = 0;
670            if ($pending_work{$job}) {
671                $depth = @{$pending_work{$job}};
672            }
673            $child->write(":queue_depth $job $depth\r\n");
674        }
675        MogileFS::ProcManager->process_worker_queues;
676    } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
677        my $job = $1;
678        $pending_work{$job} ||= [];
679        push(@{$pending_work{$job}}, $2);
680        # Don't process queues immediately, to allow batch processing.
681    } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
682        my $batch = $1;
683        my $types = $2;
684        if (job_needs_reduction($child->job)) {
685            MogileFS::ProcManager->AskWorkerToDie($child);
686        } else {
687            unless (exists $idle_workers{$child->job}) {
688                $idle_workers{$child->job} = {};
689            }
690            $idle_workers{_bored} ||= {};
691            $idle_workers{_bored}->{$child} = $child;
692            for my $type (split(/\s+/, $types)) {
693                $idle_workers{$type} ||= {};
694                $idle_workers{$type}->{$child}++;
695                $child->wants_todo($type, $batch);
696            }
697            MogileFS::ProcManager->process_worker_queues;
698        }
699    } elsif ($cmd eq ":ping") {
700
701        # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
702
703        # this command expects a reply, either to die or stay alive.  beginning of worker's loops
704        if (job_needs_reduction($child->job)) {
705            MogileFS::ProcManager->AskWorkerToDie($child);
706        } else {
707            $child->write(":stay_alive\r\n");
708        }
709
710    } elsif ($cmd eq ":still_alive") {
711        # a no-op
712
713    } elsif ($cmd =~ /^:monitor_events/) {
714        # Apply the state locally, so when we fork children they have a
715        # pre-parsed factory.
716        # We do not replay the events back to where it came, since this
717        # severely impacts startup performance for instances with several
718        # thousand domains, classes, hosts or devices.
719        apply_state_events(\$cmd);
720        MogileFS::ProcManager->send_to_all_children($cmd, $child);
721
722    } elsif ($cmd eq ":monitor_just_ran") {
723        send_monitor_has_run($child);
724
725    } elsif ($cmd =~ /^:wake_a (\w+)$/) {
726
727        MogileFS::ProcManager->wake_a($1, $child);
728    } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
729        # and this will rebroadcast it to all other children
730        # (including the one that just set it to us, but eh)
731        MogileFS::Config->set_config($1, $2);
732    } elsif ($cmd =~ /^:refresh_monitor$/) {
733        MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd);
734    } else {
735        # unknown command
736        my $show = $cmd;
737        $show = substr($show, 0, 80) . "..." if length $cmd > 80;
738        Mgd::error("Unknown command [$show] from child; job=" . $child->job);
739    }
740}
741
742# Class method.
743#   ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
744# given a job class, and a message, send it to all children of that job.  returns
745# the number of children the message was sent to.
746#
747# if child is specified, the message will be sent to members of the job class that
748# aren't that child.  so you can exclude the one that originated the message.
749#
750# doesn't add to queue of things child gets on next interactive command: writes immediately
751# (won't get in middle of partial write, though, as danga::socket queues things up)
752#
753# if $just_one is specified, only a single process is notified, then we stop.
754sub ImmediateSendToChildrenByJob {
755    my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
756
757    my $childref = $ChildrenByJob{$class};
758    return 0 unless defined $childref && %$childref;
759
760    foreach my $child (values %$childref) {
761        # ignore the child specified as the third arg if one is sent
762        next if $exclude_child && $exclude_child == $child;
763
764        # send the message to this child
765        $child->write("$msg\r\n");
766        return 1 if $just_one;
767    }
768    return scalar(keys %$childref);
769}
770
771# called when we notice that a worker has bit it.  we might have to restart a
772# job that they had been working on.
773sub NoteDeadWorkerConn {
774    return if $IsChild;
775
776    # get parms and error check
777    my MogileFS::Connection::Worker $worker = $_[1];
778    return unless $worker;
779
780    my $fd = $worker->{fd};
781    return unless defined($fd);
782
783    # if there's a mapping for this worker's fd, they had a job that didn't get done
784    if ($Mappings{$fd}) {
785        # unshift, since this one already went through the queue once
786        unshift @PendingQueries, $Mappings{$worker->{fd}};
787        delete $Mappings{$worker->{fd}};
788
789        # now try to get it processing again
790        MogileFS::ProcManager->ProcessQueues;
791    }
792}
793
794# given (job, pid), record that this worker is about to die
795# $level is so we can tell if watchdog requested the death.
796sub note_pending_death {
797    my ($job, $pid, $level) = @_;
798
799    die "$job not defined in call to note_pending_death.\n"
800        unless defined $jobs{$job};
801
802    $level ||= 1;
803    # don't double decrement.
804    $jobs{$job}->[1]-- unless $todie{$pid};
805    $todie{$pid} = $level;
806}
807
808# see if we should reduce the number of active children
809sub job_needs_reduction {
810    my $job = shift;
811    my $q;
812
813    # drop job_master-dependent workers if there is no job_master and no
814    # previously queued work
815    if (!$want_job_master && $needs_job_master{$job}
816            && $jobs{job_master}->[1] == 0 # check if job_master is really dead
817            && (($q = $pending_work{$job}) && !@$q || !$q)) {
818        return 1;
819    }
820
821    return $jobs{$job}->[0] < $jobs{$job}->[1];
822}
823
824sub is_child {
825    return $IsChild;
826}
827
828sub wake_a {
829    my ($pkg, $class, $fromchild) = @_;  # from arg is optional (which child sent it)
830    my $child = MogileFS::ProcManager->is_child;
831    if ($child) {
832        $child->wake_a($class);
833    } else {
834        MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
835    }
836}
837
838sub send_to_all_children {
839    my ($pkg, $msg, $exclude) = @_;
840    foreach my $child (values %child) {
841        next if $exclude && $child == $exclude;
842        $child->write($msg . "\r\n");
843    }
844}
845
846sub send_monitor_has_run {
847    my $child = shift;
848    # Gas up other workers if monitor's completed for the first time.
849    if (! $monitor_good) {
850        MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
851        MogileFS::ProcManager->set_min_workers('delete'      => MogileFS->config('delete_jobs'));
852        MogileFS::ProcManager->set_min_workers('replicate'   => MogileFS->config('replicate_jobs'));
853        MogileFS::ProcManager->set_min_workers('reaper'      => MogileFS->config('reaper_jobs'));
854        MogileFS::ProcManager->set_min_workers('fsck'        => MogileFS->config('fsck_jobs'));
855
856        # only one job_master at most
857        $want_job_master = !!MogileFS->config('job_master');
858        MogileFS::ProcManager->set_min_workers('job_master'  => $want_job_master);
859
860        $monitor_good = 1;
861        $allkidsup    = 0;
862    }
863    for my $type (qw(queryworker)) {
864        MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
865    }
866}
867
8681;
869
870# Local Variables:
871# mode: perl
872# c-basic-indent: 4
873# indent-tabs-mode: nil
874# End:
875