1package MogileFS::Worker::Reaper; 2# deletes files 3 4use strict; 5use base 'MogileFS::Worker'; 6use MogileFS::Server; 7use MogileFS::Util qw(error debug); 8use MogileFS::Config qw(DEVICE_SUMMARY_CACHE_TIMEOUT); 9use constant REAP_INTERVAL => 5; 10use constant REAP_BACKOFF_MIN => 60; 11 12# completely forget about devices we've reaped after 2 hours of idleness 13use constant REAP_BACKOFF_MAX => 7200; 14 15sub new { 16 my ($class, $psock) = @_; 17 my $self = fields::new($class); 18 $self->SUPER::new($psock); 19 20 return $self; 21} 22 23sub watchdog_timeout { 24 return 240; 25} 26 27# order is important here: 28# 29# first, add fid to file_to_replicate table. it 30# shouldn't matter if the replicator gets to this 31# before the subsequent 'forget_about' method, as the 32# replicator will treat dead file_on devices as 33# non-existent anyway. however, it is important that 34# we enqueue it for replication first, before we 35# forget about that file_on row, otherwise a failure 36# after/during 'forget_about' could leave a stranded 37# file on a dead device and we'd never fix it. 38sub reap_fid { 39 my ($self, $fid, $dev) = @_; 40 41 $fid->enqueue_for_replication(in => 1); 42 $dev->forget_about($fid); 43} 44 45# this returns 1000 by default 46sub reaper_inject_limit { 47 my ($self) = @_; 48 49 my $sto = Mgd::get_store(); 50 my $max = MogileFS::Config->server_setting_cached('queue_size_for_reaper'); 51 my $limit = MogileFS::Config->server_setting_cached('queue_rate_for_reaper') || 1000; 52 53 # max defaults to zero, meaning we inject $limit every wakeup 54 if ($max) { 55 # if a queue size limit is configured for reaper, prevent too many 56 # files from entering the repl queue: 57 my $len = $sto->deferred_repl_queue_length; 58 my $space_left = $max - $len; 59 60 $limit = $space_left if ($limit > $space_left); 61 62 # limit may end up being negative here since other processes 63 # can inject into the deferred replication queue, reaper is 64 # the only one which can respect this queue size 65 $limit = 0 if $limit < 0; 66 } 67 68 return $limit; 69} 70 71# we pass the $devid here (instead of a Device object) to avoid 72# potential memory leaks since this sub reschedules itself to run 73# forever. $delay is the current delay we were scheduled at 74sub reap_dev { 75 my ($self, $devid, $delay) = @_; 76 77 # ensure the master DB is up, retry in REAP_INTERVAL if down 78 unless ($self->validate_dbh) { 79 $delay = REAP_INTERVAL; 80 Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) }); 81 return; 82 } 83 84 my $limit = $self->reaper_inject_limit; 85 86 # just in case a user mistakenly nuked a devid from the device table: 87 my $dev = Mgd::device_factory()->get_by_id($devid); 88 unless ($dev) { 89 error("No device row for dev$devid, cannot reap"); 90 $delay = undef; 91 } 92 93 # user resurrected a "dead" device, not supported, really... 94 if (!$dev->dstate->is_perm_dead) { 95 Mgd::log("dev$devid is no longer dead to reaper"); 96 return; 97 } 98 99 # limit == 0 if we hit the queue size limit, we'll just reschedule 100 if ($limit && $dev) { 101 my $sto = Mgd::get_store(); 102 my $lock = "mgfs:reaper"; 103 my $lock_timeout = $self->watchdog_timeout / 4; 104 my @fids; 105 106 if ($sto->get_lock($lock, $lock_timeout)) { 107 @fids = $dev->fid_list(limit => $limit); 108 if (@fids) { 109 $self->still_alive; 110 foreach my $fid (@fids) { 111 $self->reap_fid($fid, $dev); 112 } 113 } 114 $sto->release_lock($lock); 115 116 # if we've found any FIDs (perhaps even while backing off) 117 # ensure we try to find more soon: 118 if (@fids) { 119 $delay = REAP_INTERVAL; 120 } else { 121 $delay = $self->reap_dev_backoff_delay($delay); 122 } 123 } else { 124 # No lock after a long lock_timeout? Try again soon. 125 # We should never get here under MySQL, and rarely for other DBs. 126 debug("get_lock($lock, $lock_timeout) failed"); 127 $delay = REAP_INTERVAL; 128 } 129 } 130 131 return unless defined $delay; 132 133 # schedule another update, delay could be REAP_BACKOFF_MAX 134 Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) }); 135} 136 137# called when we're hopefully all done with a device, but reschedule 138# into the future in case the replicator had an out-of-date cache and the 139# "dead" device was actually writable. 140sub reap_dev_backoff_delay { 141 my ($self, $delay) = @_; 142 143 return REAP_BACKOFF_MIN if ($delay < REAP_BACKOFF_MIN); 144 145 $delay *= 2; 146 return $delay > REAP_BACKOFF_MAX ? undef : $delay; 147} 148 149# looks for dead devices 150sub work { 151 my $self = shift; 152 153 # ensure we get monitor updates 154 Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent }); 155 156 my %devid_seen; 157 my $reap_check; 158 $reap_check = sub { 159 # get db and note we're starting a run 160 $self->parent_ping; 161 debug("Reaper running; looking for dead devices"); 162 163 foreach my $dev (grep { $_->dstate->is_perm_dead } 164 Mgd::device_factory()->get_all) 165 { 166 next if $devid_seen{$dev->id}; 167 168 # delay the initial device reap in case any replicator cache 169 # thinks the device is still alive 170 Danga::Socket->AddTimer(DEVICE_SUMMARY_CACHE_TIMEOUT + 1, sub { 171 $self->reap_dev($dev->id, REAP_INTERVAL); 172 }); 173 174 # once we've seen a device, reap_dev will takeover scheduling 175 # reaping for the given device. 176 $devid_seen{$dev->id} = 1; 177 } 178 179 Danga::Socket->AddTimer(REAP_INTERVAL, $reap_check); 180 }; 181 182 # kick off the reaper and loop forever 183 $reap_check->(); 184 Danga::Socket->EventLoop; 185} 186 1871; 188 189# Local Variables: 190# mode: perl 191# c-basic-indent: 4 192# indent-tabs-mode: nil 193# End: 194