1package MogileFS::Connection::Worker; 2# This class maintains a connection to one of the various classes of 3# workers. 4 5use strict; 6use Danga::Socket (); 7use base qw{Danga::Socket}; 8 9use fields ( 10 'read_buf', 11 'read_size', # bigger for monitor 12 'job', 13 'pid', 14 'reqid', 15 'last_alive', # unixtime 16 'known_state', # hashref of { "$what-$whatid" => $state } 17 'wants_todo', # count of how many jobs worker wants. 18 ); 19 20sub new { 21 my MogileFS::Connection::Worker $self = shift; 22 $self = fields::new($self) unless ref $self; 23 $self->SUPER::new( @_ ); 24 25 $self->{pid} = 0; 26 $self->{reqid} = 0; 27 $self->{wants_todo} = {}; 28 $self->{job} = undef; 29 $self->{last_alive} = time(); 30 $self->{known_state} = {}; 31 $self->{read_size} = 1024; 32 33 return $self; 34} 35 36sub note_alive { 37 my $self = shift; 38 $self->{last_alive} = time(); 39} 40 41sub watchdog_check { 42 my MogileFS::Connection::Worker $self = shift; 43 44 my $timeout = $self->worker_class->watchdog_timeout; 45 my $time_since_last_alive = time() - $self->{last_alive}; 46 return $time_since_last_alive < $timeout; 47} 48 49sub event_read { 50 my MogileFS::Connection::Worker $self = shift; 51 52 # if we read data from it, it's not blocked on something else. 53 $self->note_alive; 54 55 my $bref = $self->read($self->{read_size}); 56 return $self->close() unless defined $bref; 57 $self->{read_buf} .= $$bref; 58 59 while ($self->{read_buf} =~ s/^(.+?)\r?\n//) { 60 my $line = $1; 61 if ($self->job eq 'queryworker' && $line !~ /^(?:\:|error|debug)/) { 62 MogileFS::ProcManager->HandleQueryWorkerResponse($self, $line); 63 } else { 64 MogileFS::ProcManager->HandleChildRequest($self, $line); 65 } 66 } 67} 68 69sub event_write { 70 my $self = shift; 71 my $done = $self->write(undef); 72 $self->watch_write(0) if $done; 73} 74 75sub job { 76 my MogileFS::Connection::Worker $self = shift; 77 return $self->{job} unless @_; 78 my $j = shift; 79 80 # monitor may send huge state events (which we send to everyone else) 81 $self->{read_size} = Mgd::UNIX_RCVBUF_SIZE() if ($j eq 'monitor'); 82 $self->{job} = $j; 83} 84 85sub wants_todo { 86 my MogileFS::Connection::Worker $self = shift; 87 my $type = shift; 88 return $self->{wants_todo}->{$type}-- unless @_; 89 return $self->{wants_todo}->{$type} = shift; 90} 91 92sub worker_class { 93 my MogileFS::Connection::Worker $self = shift; 94 return MogileFS::ProcManager->job_to_class($self->{job}); 95} 96 97sub pid { 98 my MogileFS::Connection::Worker $self = shift; 99 return $self->{pid} unless @_; 100 return $self->{pid} = shift; 101} 102 103sub event_hup { my $self = shift; $self->close; } 104sub event_err { my $self = shift; $self->close; } 105 106sub close { 107 # mark us as being dead 108 my MogileFS::Connection::Worker $self = shift; 109 MogileFS::ProcManager->NoteDeadWorkerConn($self); 110 $self->SUPER::close(@_); 111} 112 1131; 114 115# Local Variables: 116# mode: perl 117# c-basic-indent: 4 118# indent-tabs-mode: nil 119# End: 120