1package MogileFS::Connection::Worker;
2# This class maintains a connection to one of the various classes of
3# workers.
4
5use strict;
6use Danga::Socket ();
7use base qw{Danga::Socket};
8
9use fields (
10            'read_buf',
11            'read_size',   # bigger for monitor
12            'job',
13            'pid',
14            'reqid',
15            'last_alive',  # unixtime
16            'known_state', # hashref of { "$what-$whatid" => $state }
17            'wants_todo',  # count of how many jobs worker wants.
18            );
19
20sub new {
21    my MogileFS::Connection::Worker $self = shift;
22    $self = fields::new($self) unless ref $self;
23    $self->SUPER::new( @_ );
24
25    $self->{pid}         = 0;
26    $self->{reqid}       = 0;
27    $self->{wants_todo}  = {};
28    $self->{job}         = undef;
29    $self->{last_alive}  = time();
30    $self->{known_state} = {};
31    $self->{read_size}   = 1024;
32
33    return $self;
34}
35
36sub note_alive {
37    my $self = shift;
38    $self->{last_alive} = time();
39}
40
41sub watchdog_check {
42    my MogileFS::Connection::Worker $self = shift;
43
44    my $timeout               = $self->worker_class->watchdog_timeout;
45    my $time_since_last_alive = time() - $self->{last_alive};
46    return $time_since_last_alive < $timeout;
47}
48
49sub event_read {
50    my MogileFS::Connection::Worker $self = shift;
51
52    # if we read data from it, it's not blocked on something else.
53    $self->note_alive;
54
55    my $bref = $self->read($self->{read_size});
56    return $self->close() unless defined $bref;
57    $self->{read_buf} .= $$bref;
58
59    while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
60        my $line = $1;
61        if ($self->job eq 'queryworker' && $line !~ /^(?:\:|error|debug)/) {
62            MogileFS::ProcManager->HandleQueryWorkerResponse($self, $line);
63        } else {
64            MogileFS::ProcManager->HandleChildRequest($self, $line);
65        }
66    }
67}
68
69sub event_write {
70    my $self = shift;
71    my $done = $self->write(undef);
72    $self->watch_write(0) if $done;
73}
74
75sub job {
76    my MogileFS::Connection::Worker $self = shift;
77    return $self->{job} unless @_;
78    my $j = shift;
79
80    # monitor may send huge state events (which we send to everyone else)
81    $self->{read_size} = Mgd::UNIX_RCVBUF_SIZE() if ($j eq 'monitor');
82    $self->{job} = $j;
83}
84
85sub wants_todo {
86    my MogileFS::Connection::Worker $self = shift;
87    my $type = shift;
88    return $self->{wants_todo}->{$type}-- unless @_;
89    return $self->{wants_todo}->{$type} = shift;
90}
91
92sub worker_class {
93    my MogileFS::Connection::Worker $self = shift;
94    return MogileFS::ProcManager->job_to_class($self->{job});
95}
96
97sub pid {
98    my MogileFS::Connection::Worker $self = shift;
99    return $self->{pid} unless @_;
100    return $self->{pid} = shift;
101}
102
103sub event_hup { my $self = shift; $self->close; }
104sub event_err { my $self = shift; $self->close; }
105
106sub close {
107    # mark us as being dead
108    my MogileFS::Connection::Worker $self = shift;
109    MogileFS::ProcManager->NoteDeadWorkerConn($self);
110    $self->SUPER::close(@_);
111}
112
1131;
114
115# Local Variables:
116# mode: perl
117# c-basic-indent: 4
118# indent-tabs-mode: nil
119# End:
120