1package MogileFS::Worker::Reaper;
2# deletes files
3
4use strict;
5use base 'MogileFS::Worker';
6use MogileFS::Server;
7use MogileFS::Util qw(error debug);
8use MogileFS::Config qw(DEVICE_SUMMARY_CACHE_TIMEOUT);
9use constant REAP_INTERVAL => 5;
10use constant REAP_BACKOFF_MIN => 60;
11
12# completely forget about devices we've reaped after 2 hours of idleness
13use constant REAP_BACKOFF_MAX => 7200;
14
15sub new {
16    my ($class, $psock) = @_;
17    my $self = fields::new($class);
18    $self->SUPER::new($psock);
19
20    return $self;
21}
22
23sub watchdog_timeout {
24    return 240;
25}
26
27# order is important here:
28#
29# first, add fid to file_to_replicate table.  it
30# shouldn't matter if the replicator gets to this
31# before the subsequent 'forget_about' method, as the
32# replicator will treat dead file_on devices as
33# non-existent anyway.  however, it is important that
34# we enqueue it for replication first, before we
35# forget about that file_on row, otherwise a failure
36# after/during 'forget_about' could leave a stranded
37# file on a dead device and we'd never fix it.
38sub reap_fid {
39    my ($self, $fid, $dev) = @_;
40
41    $fid->enqueue_for_replication(in => 1);
42    $dev->forget_about($fid);
43}
44
45# this returns 1000 by default
46sub reaper_inject_limit {
47    my ($self) = @_;
48
49    my $sto = Mgd::get_store();
50    my $max = MogileFS::Config->server_setting_cached('queue_size_for_reaper');
51    my $limit = MogileFS::Config->server_setting_cached('queue_rate_for_reaper') || 1000;
52
53    # max defaults to zero, meaning we inject $limit every wakeup
54    if ($max) {
55        # if a queue size limit is configured for reaper, prevent too many
56        # files from entering the repl queue:
57        my $len = $sto->deferred_repl_queue_length;
58        my $space_left = $max - $len;
59
60        $limit = $space_left if ($limit > $space_left);
61
62        # limit may end up being negative here since other processes
63        # can inject into the deferred replication queue, reaper is
64        # the only one which can respect this queue size
65        $limit = 0 if $limit < 0;
66    }
67
68    return $limit;
69}
70
71# we pass the $devid here (instead of a Device object) to avoid
72# potential memory leaks since this sub reschedules itself to run
73# forever.   $delay is the current delay we were scheduled at
74sub reap_dev {
75    my ($self, $devid, $delay) = @_;
76
77    # ensure the master DB is up, retry in REAP_INTERVAL if down
78    unless ($self->validate_dbh) {
79        $delay = REAP_INTERVAL;
80        Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
81        return;
82    }
83
84    my $limit = $self->reaper_inject_limit;
85
86    # just in case a user mistakenly nuked a devid from the device table:
87    my $dev = Mgd::device_factory()->get_by_id($devid);
88    unless ($dev) {
89        error("No device row for dev$devid, cannot reap");
90        $delay = undef;
91    }
92
93    # user resurrected a "dead" device, not supported, really...
94    if (!$dev->dstate->is_perm_dead) {
95        Mgd::log("dev$devid is no longer dead to reaper");
96        return;
97    }
98
99    # limit == 0 if we hit the queue size limit, we'll just reschedule
100    if ($limit && $dev) {
101        my $sto = Mgd::get_store();
102        my $lock = "mgfs:reaper";
103        my $lock_timeout = $self->watchdog_timeout / 4;
104        my @fids;
105
106        if ($sto->get_lock($lock, $lock_timeout)) {
107            @fids = $dev->fid_list(limit => $limit);
108            if (@fids) {
109                $self->still_alive;
110                foreach my $fid (@fids) {
111                    $self->reap_fid($fid, $dev);
112                }
113            }
114            $sto->release_lock($lock);
115
116            # if we've found any FIDs (perhaps even while backing off)
117            # ensure we try to find more soon:
118            if (@fids) {
119                $delay = REAP_INTERVAL;
120            } else {
121                $delay = $self->reap_dev_backoff_delay($delay);
122            }
123        } else {
124            # No lock after a long lock_timeout?  Try again soon.
125            # We should never get here under MySQL, and rarely for other DBs.
126            debug("get_lock($lock, $lock_timeout) failed");
127            $delay = REAP_INTERVAL;
128        }
129    }
130
131    return unless defined $delay;
132
133    # schedule another update, delay could be REAP_BACKOFF_MAX
134    Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
135}
136
137# called when we're hopefully all done with a device, but reschedule
138# into the future in case the replicator had an out-of-date cache and the
139# "dead" device was actually writable.
140sub reap_dev_backoff_delay {
141    my ($self, $delay) = @_;
142
143    return REAP_BACKOFF_MIN if ($delay < REAP_BACKOFF_MIN);
144
145    $delay *= 2;
146    return $delay > REAP_BACKOFF_MAX ? undef : $delay;
147}
148
149# looks for dead devices
150sub work {
151    my $self = shift;
152
153    # ensure we get monitor updates
154    Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent });
155
156    my %devid_seen;
157    my $reap_check;
158    $reap_check = sub {
159        # get db and note we're starting a run
160        $self->parent_ping;
161        debug("Reaper running; looking for dead devices");
162
163        foreach my $dev (grep { $_->dstate->is_perm_dead }
164                         Mgd::device_factory()->get_all)
165        {
166            next if $devid_seen{$dev->id};
167
168            # delay the initial device reap in case any replicator cache
169            # thinks the device is still alive
170            Danga::Socket->AddTimer(DEVICE_SUMMARY_CACHE_TIMEOUT + 1, sub {
171                $self->reap_dev($dev->id, REAP_INTERVAL);
172            });
173
174            # once we've seen a device, reap_dev will takeover scheduling
175            # reaping for the given device.
176            $devid_seen{$dev->id} = 1;
177        }
178
179        Danga::Socket->AddTimer(REAP_INTERVAL, $reap_check);
180    };
181
182    # kick off the reaper and loop forever
183    $reap_check->();
184    Danga::Socket->EventLoop;
185}
186
1871;
188
189# Local Variables:
190# mode: perl
191# c-basic-indent: 4
192# indent-tabs-mode: nil
193# End:
194