1package MogileFS::ProcManager; 2use strict; 3use warnings; 4use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK); 5use Symbol; 6use Socket; 7use MogileFS::Connection::Client; 8use MogileFS::Connection::Worker; 9use MogileFS::Util qw(apply_state_events); 10 11# This class handles keeping lists of workers and clients and 12# assigning them to each other when things happen. You don't actually 13# instantiate a procmanager. the class itself holds all state. 14 15# Mappings: fd => [ clientref, jobstring, starttime ] 16# queues are just lists of Client class objects 17# ChildrenByJob: job => { pid => $client } 18# ErrorsTo: fid => Client 19# RecentQueries: [ string, string, string, ... ] 20# Stats: element => number 21our ($IsChild, @RecentQueries, 22 %Mappings, %ChildrenByJob, %ErrorsTo, %Stats); 23 24our $starttime = time(); # time we got going 25sub server_starttime { return $starttime } 26 27my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...) 28my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ] 29 30my %idle_workers = (); # 'job' -> {href of idle workers} 31my %pending_work = (); # 'job' -> [aref of pending work] 32 33$IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object 34 35# keep track of what all child pids are doing, and what jobs are being 36# satisifed. 37my %child = (); # pid -> MogileFS::Connection::Worker 38my %todie = (); # pid -> 1 (lists pids that we've asked to die) 39my %jobs = (); # jobname -> [ min, current ] 40 41# we start job_master after monitor has run, but this avoid undef warning 42# in job_needs_reduction 43$jobs{job_master} = [ 0, 0 ]; 44 45our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies. 46 47my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child 48 49*error = \&Mgd::error; 50 51my $monitor_good = 0; # ticked after monitor executes once after startup 52 53my $nowish; # updated approximately once per second 54 55# it's pointless to spawn certain jobs without a job_master 56my $want_job_master; 57my %needs_job_master = ( 58 delete => 1, 59 fsck => 1, 60 replicate => 1, 61); 62 63sub push_pre_fork_cleanup { 64 my ($class, $code) = @_; 65 push @prefork_cleanup, $code; 66} 67 68sub RecentQueries { 69 return @RecentQueries; 70} 71 72sub write_pidfile { 73 my $class = shift; 74 my $pidfile = MogileFS->config("pidfile") 75 or return 1; 76 my $fh; 77 unless (open($fh, ">$pidfile")) { 78 Mgd::log('err', "couldn't create pidfile '$pidfile': $!"); 79 return 0; 80 } 81 unless ((print $fh "$$\n") && close($fh)) { 82 Mgd::log('err', "couldn't write into pidfile '$pidfile': $!"); 83 remove_pidfile(); 84 return 0; 85 } 86 return 1; 87} 88 89sub remove_pidfile { 90 my $class = shift; 91 my $pidfile = MogileFS->config("pidfile") 92 or return; 93 unlink $pidfile; 94 return 1; 95} 96 97sub set_min_workers { 98 my ($class, $job, $min) = @_; 99 $jobs{$job} ||= [undef, 0]; # [min, current] 100 $jobs{$job}->[0] = $min; 101 102 # TODO: set allkipsup false, so spawner re-checks? 103} 104 105sub job_to_class_suffix { 106 my ($class, $job) = @_; 107 return { 108 fsck => "Fsck", 109 queryworker => "Query", 110 delete => "Delete", 111 replicate => "Replicate", 112 reaper => "Reaper", 113 monitor => "Monitor", 114 job_master => "JobMaster", 115 }->{$job}; 116} 117 118sub job_to_class { 119 my ($class, $job) = @_; 120 my $suffix = $class->job_to_class_suffix($job) or return ""; 121 return "MogileFS::Worker::$suffix"; 122} 123 124sub child_pids { 125 return keys %child; 126} 127 128sub WatchDog { 129 foreach my $pid (keys %child) { 130 my MogileFS::Connection::Worker $child = $child{$pid}; 131 my $healthy = $child->watchdog_check; 132 next if $healthy; 133 134 # special $todie level of 2 means the watchdog tried to kill it. 135 # TODO: Should be a CONSTANT? 136 next if $todie{$pid} && $todie{$pid} == 2; 137 note_pending_death($child->job, $pid, 2); 138 139 error("Watchdog killing worker $pid (" . $child->job . ")"); 140 kill 9, $pid; 141 } 142} 143 144# returns a sub that Danga::Socket calls after each event loop round. 145# the sub must return 1 for the program to continue running. 146sub PostEventLoopChecker { 147 my $lastspawntime = 0; # time we last ran spawn_children sub 148 149 return sub { 150 MogileFS::Connection::Client->ProcessPipelined; 151 # run only once per second 152 $nowish = time(); 153 return 1 unless $nowish > $lastspawntime; 154 $lastspawntime = $nowish; 155 156 MogileFS::ProcManager->WatchDog; 157 MogileFS::Connection::Client->WriterWatchDog; 158 159 # see if anybody has died, but don't hang up on doing so 160 while(my $pid = waitpid -1, WNOHANG) { 161 last unless $pid > 0; 162 $allkidsup = 0; # know something died 163 164 # when a child dies, figure out what it was doing 165 # and note that job has one less worker 166 my $jobconn; 167 if (($jobconn = delete $child{$pid})) { 168 my $job = $jobconn->job; 169 my $extra = $todie{$pid} ? "expected" : "UNEXPECTED"; 170 error("Child $pid ($job) died: $? ($extra)"); 171 MogileFS::ProcManager->NoteDeadChild($pid); 172 $jobconn->close; 173 174 if (my $jobstat = $jobs{$job}) { 175 # if the pid is in %todie, then we have asked it to shut down 176 # and have already decremented the jobstat counter and don't 177 # want to do it again 178 unless (my $true = delete $todie{$pid}) { 179 # decrement the count of currently running jobs 180 $jobstat->[1]--; 181 } 182 } 183 } 184 } 185 186 return 1 if $allkidsup; 187 188 # foreach job, fork enough children 189 while (my ($job, $jobstat) = each %jobs) { 190 191 # do not spawn job_master-dependent workers if we have no job_master 192 next if (! $want_job_master && $needs_job_master{$job}); 193 194 my $need = $jobstat->[0] - $jobstat->[1]; 195 if ($need > 0) { 196 error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need."); 197 for (1..$need) { 198 my $jobconn = make_new_child($job) 199 or return 1; # basically bail: true value keeps event loop running 200 $child{$jobconn->pid} = $jobconn; 201 202 # now increase the count of processes currently doing this job 203 $jobstat->[1]++; 204 } 205 } 206 } 207 208 # if we got this far, all jobs have been re-created. note that 209 # so we avoid more CPU usage in this post-event-loop callback later 210 $allkidsup = 1; 211 212 # true value keeps us running: 213 return 1; 214 }; 215} 216 217sub make_new_child { 218 my $job = shift; 219 220 my $pid; 221 my $sigset; 222 223 # Ensure our dbh is closed before we fork anything. 224 # Causes problems on some platforms (Solaris+Postgres) 225 Mgd::close_store(); 226 227 # block signal for fork 228 $sigset = POSIX::SigSet->new(SIGINT); 229 sigprocmask(SIG_BLOCK, $sigset) 230 or return error("Can't block SIGINT for fork: $!"); 231 232 socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) 233 or die( "socketpair failed: $!" ); 234 235 return error("fork failed creating $job: $!") 236 unless defined ($pid = fork); 237 238 # enable auto-flush, so it's not pipe-buffered between parent/child 239 select((select( $parents_ipc ), $|++)[0]); 240 select((select( $childs_ipc ), $|++)[0]); 241 242 # if i'm the parent 243 if ($pid) { 244 sigprocmask(SIG_UNBLOCK, $sigset) 245 or return error("Can't unblock SIGINT for fork: $!"); 246 247 close($childs_ipc); # unnecessary but explicit 248 IO::Handle::blocking($parents_ipc, 0); 249 250 my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc); 251 $worker_conn->pid($pid); 252 $worker_conn->job($job); 253 MogileFS::ProcManager->RegisterWorkerConn($worker_conn); 254 return $worker_conn; 255 } 256 257 # let children have different random number seeds 258 srand(); 259 260 # as a child, we want to close these and ignore them 261 $_->() foreach @prefork_cleanup; 262 close($parents_ipc); 263 undef $parents_ipc; 264 265 $SIG{INT} = 'DEFAULT'; 266 $SIG{TERM} = 'DEFAULT'; 267 $0 .= " [$job]"; 268 269 # unblock signals 270 sigprocmask(SIG_UNBLOCK, $sigset) 271 or return error("Can't unblock SIGINT for fork: $!"); 272 273 # now call our job function 274 my $class = MogileFS::ProcManager->job_to_class($job) 275 or die "No worker class defined for job '$job'\n"; 276 my $worker = $class->new($childs_ipc); 277 278 # set our frontend into child mode 279 MogileFS::ProcManager->SetAsChild($worker); 280 281 $worker->work; 282 exit 0; 283} 284 285sub PendingQueryCount { 286 return scalar @PendingQueries; 287} 288 289sub BoredQueryWorkerCount { 290 return scalar @IdleQueryWorkers; 291} 292 293sub QueriesInProgressCount { 294 return scalar keys %Mappings; 295} 296 297# Toss in any queue depths. 298sub StatsHash { 299 for my $job (keys %pending_work) { 300 $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}}; 301 } 302 return \%Stats; 303} 304 305sub foreach_job { 306 my ($class, $cb) = @_; 307 foreach my $job (sort keys %ChildrenByJob) { 308 my $ct = scalar(keys %{$ChildrenByJob{$job}}); 309 $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]); 310 } 311} 312 313sub foreach_pending_query { 314 my ($class, $cb) = @_; 315 foreach my $clq (@PendingQueries) { 316 $cb->($clq->[0], # client object, 317 $clq->[1], # "$ip $query" 318 ); 319 } 320} 321 322sub is_monitor_good { 323 return $monitor_good; 324} 325 326sub is_valid_job { 327 my ($class, $job) = @_; 328 return defined $jobs{$job}; 329} 330 331sub valid_jobs { 332 return sort keys %jobs; 333} 334 335sub request_job_process { 336 my ($class, $job, $n) = @_; 337 return 0 unless $class->is_valid_job($job); 338 return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case 339 340 $want_job_master = $n if ($job eq "job_master"); 341 342 $jobs{$job}->[0] = $n; 343 $allkidsup = 0; 344 345 # try to clean out the queryworkers (if that's what we're doing?) 346 MogileFS::ProcManager->CullQueryWorkers 347 if $job eq 'queryworker'; 348 349 # other workers listening off of a queue should be pinging parent 350 # frequently. shouldn't explicitly kill them. 351} 352 353 354# when a child is spawned, they'll have copies of all the data from the 355# parent, but they don't need it. this method is called when you want 356# to indicate that this procmanager is running on a child and should clean. 357sub SetAsChild { 358 my ($class, $worker) = @_; 359 360 @IdleQueryWorkers = (); 361 @PendingQueries = (); 362 %Mappings = (); 363 $IsChild = $worker; 364 %ErrorsTo = (); 365 %idle_workers = (); 366 %pending_work = (); 367 %ChildrenByJob = (); 368 %child = (); 369 %todie = (); 370 %jobs = (); 371 372 # we just forked from our parent process, also using Danga::Socket, 373 # so we need to lose all that state and start afresh. 374 Danga::Socket->Reset; 375 MogileFS::Connection::Client->Reset; 376} 377 378# called when a child has died. a child is someone doing a job for us, 379# but it might be a queryworker or any other type of job. we just want 380# to remove them from our list of children. they're actually respawned 381# by the make_new_child function elsewhere in Mgd. 382sub NoteDeadChild { 383 my $pid = $_[1]; 384 foreach my $job (keys %ChildrenByJob) { 385 return if # bail out if we actually delete one 386 delete $ChildrenByJob{$job}->{$pid}; 387 } 388} 389 390# called when a client dies. clients are users, management or non. 391# we just want to remove them from the error reporting interface, if 392# they happen to be part of it. 393sub NoteDeadClient { 394 my $client = $_[1]; 395 delete $ErrorsTo{$client->{fd}}; 396} 397 398# called when the error function in Mgd is called and we're in the parent, 399# so it's pretty simple that basically we just spit it out to folks listening 400# to errors 401sub NoteError { 402 return unless %ErrorsTo; 403 404 my $msg = ":: ${$_[1]}\r\n"; 405 foreach my $client (values %ErrorsTo) { 406 $client->write(\$msg); 407 } 408} 409 410sub RemoveErrorWatcher { 411 my ($class, $client) = @_; 412 return delete $ErrorsTo{$client->{fd}}; 413} 414 415sub AddErrorWatcher { 416 my ($class, $client) = @_; 417 $ErrorsTo{$client->{fd}} = $client; 418} 419 420# one-time initialization of a new worker connection 421sub RegisterWorkerConn { 422 my MogileFS::Connection::Worker $worker = $_[1]; 423 $worker->watch_read(1); 424 425 #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid); 426 427 # now do any special case startup 428 if ($worker->job eq 'queryworker') { 429 MogileFS::ProcManager->NoteIdleQueryWorker($worker); 430 } 431 432 # add to normal list 433 $ChildrenByJob{$worker->job}->{$worker->pid} = $worker; 434 435} 436 437sub EnqueueCommandRequest { 438 my ($class, $line, $client) = @_; 439 push @PendingQueries, [ 440 $client, 441 ($client->peer_ip_string || '0.0.0.0') . " $line" 442 ]; 443 MogileFS::ProcManager->ProcessQueues; 444 if (@PendingQueries) { 445 # Don't like the name. Feel free to change if you find better. 446 $Stats{times_out_of_qworkers}++; 447 } 448} 449 450# puts a worker back in the queue, deleting any outstanding jobs in 451# the mapping list for this fd. 452sub NoteIdleQueryWorker { 453 # first arg is class, second is worker 454 my MogileFS::Connection::Worker $worker = $_[1]; 455 delete $Mappings{$worker->{fd}}; 456 457 # see if we need to kill off some workers 458 if (job_needs_reduction('queryworker')) { 459 Mgd::error("Reducing queryworker headcount by 1."); 460 MogileFS::ProcManager->AskWorkerToDie($worker); 461 return; 462 } 463 464 # must be okay, so put it in the queue 465 push @IdleQueryWorkers, $worker; 466 MogileFS::ProcManager->ProcessQueues; 467} 468 469# if we need to kill off a worker, this function takes in the WorkerConn 470# object, tells it to die, marks us as having requested its death, and decrements 471# the count of running jobs. 472sub AskWorkerToDie { 473 my MogileFS::Connection::Worker $worker = $_[1]; 474 note_pending_death($worker->job, $worker->pid); 475 $worker->write(":shutdown\r\n"); 476} 477 478# kill bored query workers so we can get down to the level requested. this 479# continues killing until we run out of folks to kill. 480sub CullQueryWorkers { 481 while (@IdleQueryWorkers && job_needs_reduction('queryworker')) { 482 my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers; 483 MogileFS::ProcManager->AskWorkerToDie($worker); 484 } 485} 486 487# called when we get a response from a worker. this reenqueues the 488# worker so it can handle another response as well as passes the answer 489# back on to the client. 490sub HandleQueryWorkerResponse { 491 # got a response from a worker 492 my MogileFS::Connection::Worker $worker; 493 my $line; 494 (undef, $worker, $line) = @_; 495 496 return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild; 497 return unless $worker && $Mappings{$worker->{fd}}; 498 499 # get the client we're working with (if any) 500 my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} }; 501 502 # if we have no client, then we just got a standard message from 503 # the queryworker and need to pass it up the line 504 return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client; 505 506 # at this point it was a command response, but if the client has gone 507 # away, just reenqueue this query worker 508 return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed}; 509 510 # <numeric id> [client-side time to complete] <response> 511 my ($time, $id, $res); 512 if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) { 513 # save time and response for use later 514 # Note the optional negative sign in the regexp. Somebody 515 # on the mailing list was getting a time of -0.0000, causing 516 # broken connections. 517 ($id, $time, $res) = ($1, $2, $3); 518 } 519 520 # now, if it doesn't match 521 unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") { 522 $id = "<undef>" unless defined $id; 523 $line = "<undef>" unless defined $line; 524 $line =~ s/\n/\\n/g; 525 $line =~ s/\r/\\r/g; 526 Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing"); 527 $client->close('worker_mismatch'); 528 return MogileFS::ProcManager->AskWorkerToDie($worker); 529 } 530 531 # now time this interval and add to @RecentQueries 532 my $tinterval = Time::HiRes::time() - $starttime; 533 push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time); 534 shift @RecentQueries if scalar(@RecentQueries) > 50; 535 536 # send text to client, put worker back in queue 537 $client->write("$res\r\n"); 538 MogileFS::ProcManager->NoteIdleQueryWorker($worker); 539} 540 541# new per-worker magic internal queue runner. 542# TODO: Since this fires only when a master asks or a worker reports 543# in bored, it should just operate on that *one* queue? 544# 545# new change: if worker in $job, but not in _bored, do not send work. 546# if work is received, only delete from _bored 547sub process_worker_queues { 548 return if $IsChild; 549 550 JOB: while (my ($job, $queue) = each %pending_work) { 551 next JOB unless @$queue; 552 next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}}; 553 WORKER: for my $worker_key (keys %{$idle_workers{$job}}) { 554 my MogileFS::Connection::Worker $worker = 555 delete $idle_workers{_bored}->{$worker_key}; 556 if (!defined $worker || $worker->{closed}) { 557 delete $idle_workers{$job}->{$worker_key}; 558 next WORKER; 559 } 560 561 # allow workers to grab a linear range of work. 562 while (@$queue && $worker->wants_todo($job)) { 563 $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n"); 564 $Stats{'work_sent_to_' . $job}++; 565 } 566 next JOB unless @$queue; 567 } 568 } 569} 570 571# called from various spots to empty the queues of available pairs. 572sub ProcessQueues { 573 return if $IsChild; 574 575 # try to match up a client with a worker 576 while (@IdleQueryWorkers && @PendingQueries) { 577 # get client that isn't closed 578 my $clref; 579 while (!$clref && @PendingQueries) { 580 $clref = shift @PendingQueries 581 or next; 582 if ($clref->[0]->{closed}) { 583 $clref = undef; 584 next; 585 } 586 } 587 next unless $clref; 588 589 # get worker and make sure it's not closed already 590 my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers; 591 if (!defined $worker || $worker->{closed}) { 592 unshift @PendingQueries, $clref; 593 next; 594 } 595 596 # put in mapping and send data to worker 597 push @$clref, Time::HiRes::time(); 598 $Mappings{$worker->{fd}} = $clref; 599 $Stats{queries}++; 600 601 # increment our counter so we know what request counter this is going out 602 $worker->{reqid}++; 603 # so we're writing a string of the form: 604 # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n 605 $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n"); 606 } 607} 608 609# send short descriptions of commands we support to the user 610sub SendHelp { 611 my $client = $_[1]; 612 613 # send general purpose help 614 $client->write(<<HELP); 615Mogilefsd admin commands: 616 617 !version Server version 618 !recent Recently executed queries and how long they took. 619 !queue Queries that are pending execution. 620 !stats General stats on what we\'re up to. 621 !watch Observe errors/messages from children. 622 !jobs Outstanding job counts, desired level, and pids. 623 !shutdown Immediately kill all of mogilefsd. 624 625 !to <job class> <message> 626 Send <message> to all workers of <job class>. 627 Mostly used for debugging. 628 629 !want <count> <job class> 630 Alter the level of workers of this class desired. 631 Example: !want 20 queryworker, !want 3 replicate. 632 See !jobs for what jobs are available. 633 634HELP 635 636} 637 638# a child has contacted us with some command/status/something. 639sub HandleChildRequest { 640 if ($IsChild) { 641 Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children"); 642 } 643 644 # if they have no job set, then their first line is what job they are 645 # and not a command. they also specify their pid, just so we know what 646 # connection goes with what pid, in case it's ever useful information. 647 my MogileFS::Connection::Worker $child = $_[1]; 648 my $cmd = $_[2]; 649 650 die "Child $child with no pid?" unless $child->job; 651 652 # at this point we've got a command of some sort 653 if ($cmd =~ /^error (.+)$/i) { 654 # pass it on to our error handler, prefaced with the child's job 655 Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1"); 656 657 } elsif ($cmd =~ /^debug (.+)$/i) { 658 # pass it on to our error handler, prefaced with the child's job 659 Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1"); 660 661 } elsif ($cmd =~ /^queue_depth (\w+)/) { 662 my $job = $1; 663 if ($job eq 'all') { 664 for my $qname (keys %pending_work) { 665 my $depth = @{$pending_work{$qname}}; 666 $child->write(":queue_depth $qname $depth\r\n"); 667 } 668 } else { 669 my $depth = 0; 670 if ($pending_work{$job}) { 671 $depth = @{$pending_work{$job}}; 672 } 673 $child->write(":queue_depth $job $depth\r\n"); 674 } 675 MogileFS::ProcManager->process_worker_queues; 676 } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) { 677 my $job = $1; 678 $pending_work{$job} ||= []; 679 push(@{$pending_work{$job}}, $2); 680 # Don't process queues immediately, to allow batch processing. 681 } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) { 682 my $batch = $1; 683 my $types = $2; 684 if (job_needs_reduction($child->job)) { 685 MogileFS::ProcManager->AskWorkerToDie($child); 686 } else { 687 unless (exists $idle_workers{$child->job}) { 688 $idle_workers{$child->job} = {}; 689 } 690 $idle_workers{_bored} ||= {}; 691 $idle_workers{_bored}->{$child} = $child; 692 for my $type (split(/\s+/, $types)) { 693 $idle_workers{$type} ||= {}; 694 $idle_workers{$type}->{$child}++; 695 $child->wants_todo($type, $batch); 696 } 697 MogileFS::ProcManager->process_worker_queues; 698 } 699 } elsif ($cmd eq ":ping") { 700 701 # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time()); 702 703 # this command expects a reply, either to die or stay alive. beginning of worker's loops 704 if (job_needs_reduction($child->job)) { 705 MogileFS::ProcManager->AskWorkerToDie($child); 706 } else { 707 $child->write(":stay_alive\r\n"); 708 } 709 710 } elsif ($cmd eq ":still_alive") { 711 # a no-op 712 713 } elsif ($cmd =~ /^:monitor_events/) { 714 # Apply the state locally, so when we fork children they have a 715 # pre-parsed factory. 716 # We do not replay the events back to where it came, since this 717 # severely impacts startup performance for instances with several 718 # thousand domains, classes, hosts or devices. 719 apply_state_events(\$cmd); 720 MogileFS::ProcManager->send_to_all_children($cmd, $child); 721 722 } elsif ($cmd eq ":monitor_just_ran") { 723 send_monitor_has_run($child); 724 725 } elsif ($cmd =~ /^:wake_a (\w+)$/) { 726 727 MogileFS::ProcManager->wake_a($1, $child); 728 } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) { 729 # and this will rebroadcast it to all other children 730 # (including the one that just set it to us, but eh) 731 MogileFS::Config->set_config($1, $2); 732 } elsif ($cmd =~ /^:refresh_monitor$/) { 733 MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd); 734 } else { 735 # unknown command 736 my $show = $cmd; 737 $show = substr($show, 0, 80) . "..." if length $cmd > 80; 738 Mgd::error("Unknown command [$show] from child; job=" . $child->job); 739 } 740} 741 742# Class method. 743# ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ]) 744# given a job class, and a message, send it to all children of that job. returns 745# the number of children the message was sent to. 746# 747# if child is specified, the message will be sent to members of the job class that 748# aren't that child. so you can exclude the one that originated the message. 749# 750# doesn't add to queue of things child gets on next interactive command: writes immediately 751# (won't get in middle of partial write, though, as danga::socket queues things up) 752# 753# if $just_one is specified, only a single process is notified, then we stop. 754sub ImmediateSendToChildrenByJob { 755 my ($pkg, $class, $msg, $exclude_child, $just_one) = @_; 756 757 my $childref = $ChildrenByJob{$class}; 758 return 0 unless defined $childref && %$childref; 759 760 foreach my $child (values %$childref) { 761 # ignore the child specified as the third arg if one is sent 762 next if $exclude_child && $exclude_child == $child; 763 764 # send the message to this child 765 $child->write("$msg\r\n"); 766 return 1 if $just_one; 767 } 768 return scalar(keys %$childref); 769} 770 771# called when we notice that a worker has bit it. we might have to restart a 772# job that they had been working on. 773sub NoteDeadWorkerConn { 774 return if $IsChild; 775 776 # get parms and error check 777 my MogileFS::Connection::Worker $worker = $_[1]; 778 return unless $worker; 779 780 my $fd = $worker->{fd}; 781 return unless defined($fd); 782 783 # if there's a mapping for this worker's fd, they had a job that didn't get done 784 if ($Mappings{$fd}) { 785 # unshift, since this one already went through the queue once 786 unshift @PendingQueries, $Mappings{$worker->{fd}}; 787 delete $Mappings{$worker->{fd}}; 788 789 # now try to get it processing again 790 MogileFS::ProcManager->ProcessQueues; 791 } 792} 793 794# given (job, pid), record that this worker is about to die 795# $level is so we can tell if watchdog requested the death. 796sub note_pending_death { 797 my ($job, $pid, $level) = @_; 798 799 die "$job not defined in call to note_pending_death.\n" 800 unless defined $jobs{$job}; 801 802 $level ||= 1; 803 # don't double decrement. 804 $jobs{$job}->[1]-- unless $todie{$pid}; 805 $todie{$pid} = $level; 806} 807 808# see if we should reduce the number of active children 809sub job_needs_reduction { 810 my $job = shift; 811 my $q; 812 813 # drop job_master-dependent workers if there is no job_master and no 814 # previously queued work 815 if (!$want_job_master && $needs_job_master{$job} 816 && $jobs{job_master}->[1] == 0 # check if job_master is really dead 817 && (($q = $pending_work{$job}) && !@$q || !$q)) { 818 return 1; 819 } 820 821 return $jobs{$job}->[0] < $jobs{$job}->[1]; 822} 823 824sub is_child { 825 return $IsChild; 826} 827 828sub wake_a { 829 my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it) 830 my $child = MogileFS::ProcManager->is_child; 831 if ($child) { 832 $child->wake_a($class); 833 } else { 834 MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one"); 835 } 836} 837 838sub send_to_all_children { 839 my ($pkg, $msg, $exclude) = @_; 840 foreach my $child (values %child) { 841 next if $exclude && $child == $exclude; 842 $child->write($msg . "\r\n"); 843 } 844} 845 846sub send_monitor_has_run { 847 my $child = shift; 848 # Gas up other workers if monitor's completed for the first time. 849 if (! $monitor_good) { 850 MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs')); 851 MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs')); 852 MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs')); 853 MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs')); 854 MogileFS::ProcManager->set_min_workers('fsck' => MogileFS->config('fsck_jobs')); 855 856 # only one job_master at most 857 $want_job_master = !!MogileFS->config('job_master'); 858 MogileFS::ProcManager->set_min_workers('job_master' => $want_job_master); 859 860 $monitor_good = 1; 861 $allkidsup = 0; 862 } 863 for my $type (qw(queryworker)) { 864 MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child); 865 } 866} 867 8681; 869 870# Local Variables: 871# mode: perl 872# c-basic-indent: 4 873# indent-tabs-mode: nil 874# End: 875