1package MogileFS::Worker::Replicate; 2# replicates files around 3 4use strict; 5use base 'MogileFS::Worker'; 6use fields ( 7 'fidtodo', # hashref { fid => 1 } 8 ); 9 10use List::Util (); 11use MogileFS::Server; 12use MogileFS::Util qw(error every debug wait_for_readability); 13use MogileFS::Config; 14use MogileFS::ReplicationRequest qw(rr_upgrade); 15use Digest; 16use MIME::Base64 qw(encode_base64); 17 18sub new { 19 my ($class, $psock) = @_; 20 my $self = fields::new($class); 21 $self->SUPER::new($psock); 22 $self->{fidtodo} = {}; 23 return $self; 24} 25 26# replicator wants 27sub watchdog_timeout { 90; } 28use constant SOCK_TIMEOUT => 45; 29 30sub work { 31 my $self = shift; 32 33 every(1.0, sub { 34 $self->send_to_parent("worker_bored 100 replicate rebalance"); 35 36 my $queue_todo = $self->queue_todo('replicate'); 37 my $queue_todo2 = $self->queue_todo('rebalance'); 38 return unless (@$queue_todo || @$queue_todo2); 39 40 return unless $self->validate_dbh; 41 my $sto = Mgd::get_store(); 42 43 while (my $todo = shift @$queue_todo) { 44 my $fid = $todo->{fid}; 45 $self->replicate_using_torepl_table($todo); 46 } 47 while (my $todo = shift @$queue_todo2) { 48 $self->still_alive; 49 # deserialize the arg :/ 50 $todo->{arg} = [split /,/, $todo->{arg}]; 51 my $devfid = 52 MogileFS::DevFID->new($todo->{devid}, $todo->{fid}); 53 $self->rebalance_devfid($devfid, 54 { target_devids => $todo->{arg} }); 55 56 # If files error out, we want to send the error up to syslog 57 # and make a real effort to chew through the queue. Users may 58 # manually re-run rebalance to retry. 59 $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE); 60 } 61 $_[0]->(0); # don't sleep. 62 }); 63} 64 65# return 1 if we did something (or tried to do something), return 0 if 66# there was nothing to be done. 67sub replicate_using_torepl_table { 68 my $self = shift; 69 my $todo = shift; 70 71 # find some fids to replicate, prioritize based on when they should be tried 72 my $sto = Mgd::get_store(); 73 74 my $fid = $todo->{fid}; 75 $self->still_alive; 76 77 my $errcode; 78 79 my %opts; 80 $opts{errref} = \$errcode; 81 $opts{no_unlock} = 1; # to make it return an $unlock subref 82 $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid}; 83 84 my ($status, $unlock) = replicate($fid, %opts); 85 86 if ($status) { 87 # $status is either 0 (failure, handled below), 1 (success, we actually 88 # replicated this file), or 2 (success, but someone else replicated it). 89 90 # when $staus eq "lost_race", this delete is unnecessary normally 91 # (somebody else presumably already deleted it if they 92 # also replicated it), but in the case of running with old 93 # replicators from previous versions, -or- simply if the 94 # other guy's delete failed, this cleans it up.... 95 $sto->delete_fid_from_file_to_replicate($fid); 96 $unlock->() if $unlock; 97 next; 98 } 99 100 debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2; 101 102 # ERROR CASES: 103 104 # README: please keep this up to date if you update the replicate() function so we ensure 105 # that this code always does the right thing 106 # 107 # -- HARMLESS -- 108 # failed_getting_lock => harmless. skip. somebody else probably doing. 109 # 110 # -- ACTIONABLE -- 111 # too_happy => too many copies, attempt to rebalance. 112 # 113 # -- TEMPORARY; DO EXPONENTIAL BACKOFF -- 114 # source_down => only source available is observed down. 115 # policy_error_doing_failed => policy plugin fucked up. it's looping. 116 # policy_error_already_there => policy plugin fucked up. it's dumb. 117 # policy_no_suggestions => no copy was attempted. policy is just not happy. 118 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions. 119 # 120 # -- FATAL; DON'T TRY AGAIN -- 121 # no_source => it simply exists nowhere. not that something's down, but file_on is empty. 122 123 # bail if we failed getting the lock, that means someone else probably 124 # already did it, so we should just move on 125 if ($errcode eq 'failed_getting_lock') { 126 $unlock->() if $unlock; 127 next; 128 } 129 130 # logic for setting the next try time appropriately 131 my $update_nexttry = sub { 132 my ($type, $delay) = @_; 133 my $sto = Mgd::get_store(); 134 if ($type eq 'end_of_time') { 135 # special; update to a time that won't happen again, 136 # as we've encountered a scenario in which case we're 137 # really hosed 138 $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time); 139 } elsif ($type eq "offset") { 140 $sto->reschedule_file_to_replicate_relative($fid, $delay+0); 141 } else { 142 $sto->reschedule_file_to_replicate_absolute($fid, $delay+0); 143 } 144 }; 145 146 # now let's handle any error we want to consider a total failure; do not 147 # retry at any point. push this file off to the end so someone has to come 148 # along and figure out what went wrong. 149 if ($errcode eq 'no_source') { 150 $update_nexttry->( end_of_time => 1 ); 151 $unlock->() if $unlock; 152 next; 153 } 154 155 # try to shake off extra copies. fall through to the backoff logic 156 # so we don't flood if it's impossible to properly weaken the fid. 157 # there's a race where the fid could be checked again, but the 158 # exclusive locking prevents replication clobbering. 159 if ($errcode eq 'too_happy') { 160 $unlock->() if $unlock; 161 $unlock = undef; 162 my $f = MogileFS::FID->new($fid); 163 my @devs = List::Util::shuffle($f->devids); 164 my $devfid; 165 # First one we can delete from, we try to rebalance away from. 166 for (@devs) { 167 my $dev = Mgd::device_factory()->get_by_id($_); 168 # Not positive 'should_read_from' needs to be here. 169 # We must be able to delete off of this dev so the fid can 170 # move. 171 if ($dev->can_delete_from && $dev->should_read_from) { 172 $devfid = MogileFS::DevFID->new($dev, $f); 173 last; 174 } 175 } 176 if ($devfid) { 177 if ($self->rebalance_devfid($devfid)) { 178 # disable exponential backoff below if we rebalanced due to 179 # excessive replication: 180 $todo->{failcount} = 0; 181 } 182 } 183 } 184 185 # at this point, the rest of the errors require exponential backoff. define what this means 186 # as far as failcount -> delay to next try. 187 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... 188 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); 189 $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); 190 $unlock->() if $unlock; 191 return 1; 192} 193 194# Return 1 on success, 0 on failure or no-op. 195sub rebalance_devfid { 196 my ($self, $devfid, $opts) = @_; 197 $opts ||= {}; 198 MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids)); 199 200 my $fid = $devfid->fid; 201 202 # bail out early if this FID is no longer in the namespace (weird 203 # case where file is in file_on because not yet deleted, but 204 # has been replaced/deleted in 'file' table...). not too harmful 205 # (just noisy) if this line didn't exist, but whatever... it 206 # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing 207 # dev machine... 208 return 1 if ! $fid->exists; 209 210 my $errcode; 211 my ($ret, $unlock) = replicate($fid, 212 mask_devids => { $devfid->devid => 1 }, 213 no_unlock => 1, 214 target_devids => $opts->{target_devids}, 215 errref => \$errcode, 216 ); 217 218 my $fail = sub { 219 my $error = shift; 220 $unlock->(); 221 error("Rebalance for $devfid (" . $devfid->url . ") failed: $error"); 222 return 0; 223 }; 224 225 unless ($ret || $errcode eq "too_happy") { 226 return $fail->("Replication failed"); 227 } 228 229 my $should_delete = 0; 230 my $del_reason; 231 232 if ($errcode eq "too_happy" || $ret eq "lost_race") { 233 # for some reason, we did no work. that could be because 234 # either 1) we lost the race, as the error code implies, 235 # and some other process rebalanced this first, or 2) 236 # the file is over-replicated, and everybody just thinks they 237 # lost the race because the replication policy said there's 238 # nothing to do, even with this devfid masked away. 239 # so let's figure it out... if this devfid still exists, 240 # we're over-replicated, else we just lost the race. 241 if ($devfid->exists) { 242 # over-replicated 243 244 # see if some copy, besides this one we want 245 # to delete, is currently alive & of right size.. 246 # just as extra paranoid check before we delete it 247 foreach my $test_df ($fid->devfids) { 248 next if $test_df->devid == $devfid->devid; 249 if ($test_df->size_matches) { 250 $should_delete = 1; 251 $del_reason = "over_replicated"; 252 last; 253 } 254 } 255 } else { 256 # lost race 257 $should_delete = 0; # no-op 258 } 259 } elsif ($ret eq "would_worsen") { 260 # replication has indicated we would be making ruining this fid's day 261 # if we delete an existing copy, so lets not do that. 262 # this indicates a condition where there're no suitable devices to 263 # copy new data onto, so lets be loud about it. 264 return $fail->("no suitable destination devices available"); 265 } else { 266 $should_delete = 1; 267 $del_reason = "did_rebalance;ret=$ret"; 268 } 269 270 my %destroy_opts; 271 272 $destroy_opts{ignore_missing} = 1 273 if MogileFS::Config->config("rebalance_ignore_missing"); 274 275 if ($should_delete) { 276 eval { $devfid->destroy(%destroy_opts) }; 277 if ($@) { 278 return $fail->("HTTP delete (due to '$del_reason') failed: $@"); 279 } 280 } 281 282 $unlock->(); 283 return $should_delete; 284} 285 286# replicates $fid to make sure it meets its class' replicate policy. 287# 288# README: if you update this sub to return a new error code, please update the 289# appropriate callers to know how to deal with the errors returned. 290# 291# returns either: 292# $rv 293# ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock. 294# $rv is one of: 295# 0 = failure (failure written to ${$opts{errref}}) 296# 1 = success 297# "lost_race" = skipping, we did no work and policy was already met. 298# "nofid" => fid no longer exists. skip replication. 299sub replicate { 300 my ($fid, %opts) = @_; 301 $fid = MogileFS::FID->new($fid) unless ref $fid; 302 my $fidid = $fid->id; 303 304 debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2; 305 306 my $errref = delete $opts{'errref'}; 307 my $no_unlock = delete $opts{'no_unlock'}; 308 my $fixed_source = delete $opts{'source_devid'}; 309 my $mask_devids = delete $opts{'mask_devids'} || {}; 310 my $avoid_devids = delete $opts{'avoid_devids'} || {}; 311 my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids. 312 die "unknown_opts" if %opts; 313 die unless ref $mask_devids eq "HASH"; 314 315 my $sdevid; 316 317 my $sto = Mgd::get_store(); 318 my $unlock = sub { 319 $sto->note_done_replicating($fidid); 320 }; 321 322 my $retunlock = sub { 323 my $rv = shift; 324 my ($errmsg, $errcode); 325 if (@_ == 2) { 326 ($errcode, $errmsg) = @_; 327 $errmsg = "$errcode: $errmsg"; # include code with message 328 } else { 329 ($errmsg) = @_; 330 } 331 $$errref = $errcode if $errref; 332 333 my $ret; 334 if ($errcode && $errcode eq "failed_getting_lock") { 335 # don't emit a warning with error() on lock failure. not 336 # a big deal, don't scare people. 337 $ret = 0; 338 } else { 339 $ret = $rv ? $rv : error($errmsg); 340 } 341 if ($no_unlock) { 342 die "ERROR: must be called in list context w/ no_unlock" unless wantarray; 343 return ($ret, $unlock); 344 } else { 345 die "ERROR: must not be called in list context w/o no_unlock" if wantarray; 346 $unlock->(); 347 return $ret; 348 } 349 }; 350 351 # hashref of devid -> MogileFS::Device 352 my $devs = Mgd::device_factory()->map_by_id 353 or die "No device map"; 354 355 return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid") 356 unless $sto->should_begin_replicating_fidid($fidid); 357 358 # if the fid doesn't even exist, consider our job done! no point 359 # replicating file contents of a file no longer in the namespace. 360 return $retunlock->("nofid") unless $fid->exists; 361 362 my $cls = $fid->class; 363 my $polobj = $cls->repl_policy_obj; 364 365 # learn what this devices file is already on 366 my @on_devs; # all devices fid is on, reachable or not. 367 my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about 368 my @on_up_devid; # subset of @on_devs: just devs that are readable 369 370 foreach my $devid ($fid->devids) { 371 my $d = Mgd::device_factory()->get_by_id($devid) 372 or next; 373 push @on_devs, $d; 374 if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) { 375 push @on_devs_tellpol, $d; 376 } 377 if ($d->should_read_from) { 378 push @on_up_devid, $devid; 379 } 380 } 381 382 return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0; 383 return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0; 384 385 if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) { 386 error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices"); 387 } 388 389 my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed. 390 my %source_failed; # devid -> 1 for each devid we had problems reading from. 391 my $got_copy_request = 0; # true once replication policy asks us to move something somewhere 392 my $copy_err; 393 394 my $dest_devs = $devs; 395 if (@$target_devids) { 396 $dest_devs = {map { $_ => $devs->{$_} } @$target_devids}; 397 } 398 399 my $rr; # MogileFS::ReplicationRequest 400 while (1) { 401 $rr = rr_upgrade($polobj->replicate_to( 402 fid => $fidid, 403 on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise 404 all_devs => $dest_devs, 405 failed => \%dest_failed, 406 min => $cls->mindevcount, 407 )); 408 409 last if $rr->is_happy; 410 411 my @ddevs; # dest devs, in order of preference 412 my $ddevid; # dest devid we've chosen to copy to 413 if (@ddevs = $rr->copy_to_one_of_ideally) { 414 if (my @not_masked_ids = (grep { ! $mask_devids->{$_} && 415 ! $avoid_devids->{$_} 416 } 417 map { $_->id } @ddevs)) { 418 $ddevid = $not_masked_ids[0]; 419 } else { 420 # once we masked devids away, there were no 421 # ideal suggestions. this is the case of rebalancing, 422 # which without this check could 'worsen' the state 423 # of the world. consider the case: 424 # h1[ d1 d2 ] h2[ d3 ] 425 # and files are on d1 & d3, an ideal layout. 426 # if d3 is being rebalanced, and masked away, the 427 # replication policy could presumably say to put 428 # the file on d2, even though d3 isn't dead. 429 # so instead, when masking is in effect, we don't 430 # use non-ideal placement, just bailing out. 431 432 # this used to return "lost_race" as a lie, but rebalance was 433 # happily deleting the masked fid if at least one other fid 434 # existed... because it assumed it was over replicated. 435 # now we tell rebalance that touching this fid would be 436 # stupid. 437 return $retunlock->("would_worsen"); 438 } 439 } elsif (@ddevs = $rr->copy_to_one_of_desperate) { 440 # TODO: reschedule a replication for 'n' minutes in future, or 441 # when new hosts/devices become available or change state 442 $ddevid = $ddevs[0]->id; 443 } else { 444 last; 445 } 446 447 $got_copy_request = 1; 448 449 # replication policy shouldn't tell us to put a file on a device 450 # we've already told it that we've failed at. so if we get that response, 451 # the policy plugin is broken and we should terminate now. 452 if ($dest_failed{$ddevid}) { 453 return $retunlock->(0, "policy_error_doing_failed", 454 "replication policy told us to do something we already told it we failed at while replicating fid $fidid"); 455 } 456 457 # replication policy shouldn't tell us to put a file on a 458 # device that it's already on. that's just stupid. 459 if (grep { $_->id == $ddevid } @on_devs) { 460 return $retunlock->(0, "policy_error_already_there", 461 "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!"); 462 } 463 464 # find where we're replicating from 465 { 466 # TODO: use an observed good device+host as source to start. 467 my @choices = grep { ! $source_failed{$_} } @on_up_devid; 468 return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices; 469 if ($fixed_source && grep { $_ == $fixed_source } @choices) { 470 $sdevid = $fixed_source; 471 } else { 472 @choices = List::Util::shuffle(@choices); 473 MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices); 474 $sdevid = shift @choices; 475 } 476 } 477 478 my $worker = MogileFS::ProcManager->is_child or die; 479 my $digest; 480 my $fid_checksum = $fid->checksum; 481 $digest = Digest->new($fid_checksum->hashname) if $fid_checksum; 482 $digest ||= Digest->new($cls->hashname) if $cls->hashtype; 483 484 my $rv = http_copy( 485 sdevid => $sdevid, 486 ddevid => $ddevid, 487 fid => $fid, 488 errref => \$copy_err, 489 callback => sub { $worker->still_alive; }, 490 digest => $digest, 491 ); 492 die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/; 493 494 unless ($rv) { 495 error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)"); 496 if ($copy_err eq "src_error") { 497 $source_failed{$sdevid} = 1; 498 499 if ($fixed_source && $fixed_source == $sdevid) { 500 error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources"); 501 } 502 503 } else { 504 $dest_failed{$ddevid} = 1; 505 } 506 next; 507 } 508 509 my $dfid = MogileFS::DevFID->new($ddevid, $fid); 510 $dfid->add_to_db; 511 if ($digest && !$fid->checksum) { 512 $sto->set_checksum($fidid, $cls->hashtype, $digest->digest); 513 } 514 515 push @on_devs, $devs->{$ddevid}; 516 push @on_devs_tellpol, $devs->{$ddevid}; 517 push @on_up_devid, $ddevid; 518 } 519 520 # We are over replicated. Let caller decide if it should rebalance. 521 if ($rr->too_happy) { 522 return $retunlock->(0, "too_happy", "fid $fidid is on too many devices"); 523 } 524 525 if ($rr->is_happy) { 526 return $retunlock->(1) if $got_copy_request; 527 return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately. 528 } 529 530 return $retunlock->(0, "policy_no_suggestions", 531 "replication policy ran out of suggestions for us replicating fid $fidid"); 532} 533 534# Returns a hashref with the following: 535# { 536# code => HTTP status code integer, 537# keep => boolean, whether to keep the connection after reading 538# len => value of the Content-Length header (integer) 539# } 540# Returns undef on timeout 541sub read_headers { 542 my ($sock, $intercopy_cb) = @_; 543 my $head = ''; 544 545 do { 546 wait_for_readability(fileno($sock), SOCK_TIMEOUT) or return; 547 $intercopy_cb->(); 548 my $r = sysread($sock, $head, 1024, length($head)); 549 if (defined $r) { 550 return if $r == 0; # EOF 551 } elsif ($!{EAGAIN} || $!{EINTR}) { 552 # loop again 553 } else { 554 return; 555 } 556 } until ($head =~ /\r?\n\r?\n/); 557 558 my $data; 559 ($head, $data) = split(/\r?\n\r?\n/, $head, 2); 560 my @head = split(/\r?\n/, $head); 561 $head = shift(@head); 562 $head =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return; 563 my %rv = ( keep => $1 >= 1.1, code => $2 ); 564 565 foreach my $line (@head) { 566 if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) { 567 $rv{keep} = 1; 568 } elsif ($line =~ /\AConnection:\s*close\s*\z/is) { 569 $rv{keep} = 0; 570 } elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) { 571 $rv{len} = $1; 572 } 573 } 574 return (\%rv, $data); 575} 576 577# copies a file from one Perlbal to another utilizing HTTP 578sub http_copy { 579 my %opts = @_; 580 my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) = 581 map { delete $opts{$_} } qw(sdevid 582 ddevid 583 fid 584 callback 585 errref 586 digest 587 ); 588 die if %opts; 589 590 $fid = MogileFS::FID->new($fid) unless ref($fid); 591 my $fidid = $fid->id; 592 my $expected_clen = $fid->length; 593 my $clen; 594 my $content_md5 = ''; 595 my ($sconn, $dconn); 596 my $fid_checksum = $fid->checksum; 597 if ($fid_checksum && $fid_checksum->hashname eq "MD5") { 598 # some HTTP servers may be able to verify Content-MD5 on PUT 599 # and reject corrupted requests. no HTTP server should reject 600 # a request for an unrecognized header 601 my $b64digest = encode_base64($fid_checksum->{checksum}, ""); 602 $content_md5 = "\r\nContent-MD5: $b64digest"; 603 } 604 605 $intercopy_cb ||= sub {}; 606 607 my $err_common = sub { 608 my ($err, $msg) = @_; 609 $$errref = $err if $errref; 610 $sconn->close($err) if $sconn; 611 $dconn->close($err) if $dconn; 612 return error($msg); 613 }; 614 615 # handles setting unreachable magic; $error->(reachability, "message") 616 my $error_unreachable = sub { 617 return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]"); 618 }; 619 620 my $dest_error = sub { 621 return $err_common->("dest_error", $_[0]); 622 }; 623 624 my $src_error = sub { 625 return $err_common->("src_error", $_[0]); 626 }; 627 628 # get some information we'll need 629 my $sdev = Mgd::device_factory()->get_by_id($sdevid); 630 my $ddev = Mgd::device_factory()->get_by_id($ddevid); 631 632 return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid") 633 unless $sdev && $ddev; 634 635 my $s_dfid = MogileFS::DevFID->new($sdev, $fid); 636 my $d_dfid = MogileFS::DevFID->new($ddev, $fid); 637 638 my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid)); 639 my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev)); 640 641 my ($shostip, $sport) = ($shost->ip, $shost->http_port); 642 if (MogileFS::Config->config("repl_use_get_port")) { 643 $sport = $shost->http_get_port; 644 } 645 my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port); 646 unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) { 647 # show detailed information to find out what's not configured right 648 error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid"); 649 error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath"); 650 return 0; 651 } 652 653 my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" . 654 "Content-length: $expected_clen$content_md5\r\n\r\n"; 655 656 # need by webdav servers, like lighttpd... 657 $ddev->vivify_directories($d_dfid->url); 658 659 # call a hook for odd casing completely different source data 660 # for specific files. 661 my $shttphost; 662 MogileFS::run_global_hook('replicate_alternate_source', 663 $fid, \$shostip, \$sport, \$spath, \$shttphost); 664 665 my $durl = "http://$dhostip:$dport$dpath"; 666 my $surl = "http://$shostip:$sport$spath"; 667 # okay, now get the file 668 my %sopts = ( ip => $shostip, port => $sport ); 669 670 my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n"; 671 # plugin set a custom host. 672 $get .= "Host: $shttphost\r\n" if $shttphost; 673 674 my ($sock, $dsock); 675 my ($wcount, $bytes_to_read, $written, $remain); 676 my ($stries, $dtries) = (0, 0); 677 my ($sres, $data, $bytes); 678 679retry: 680 $sconn->close("retrying") if $sconn; 681 $dconn->close("retrying") if $dconn; 682 $dconn = undef; 683 $stries++; 684 $sconn = $shost->http_conn_get(\%sopts) 685 or return $src_error->("Unable to create source socket to $shostip:$sport for $spath"); 686 $sock = $sconn->sock; 687 unless ($sock->write("$get\r\n")) { 688 goto retry if $sconn->retryable && $stries == 1; 689 return $src_error->("Pipe closed retrieving $spath from $shostip:$sport"); 690 } 691 692 # we just want a content length 693 ($sres, $data) = read_headers($sock, $intercopy_cb); 694 unless ($sres) { 695 goto retry if $sconn->retryable && $stries == 1; 696 return $error_unreachable->("Error: Resource $surl failed to return an HTTP response"); 697 } 698 unless ($sres->{code} >= 200 && $sres->{code} <= 299) { 699 return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}"); 700 } 701 $clen = $sres->{len}; 702 703 return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen") 704 if $clen != $expected_clen; 705 706 # open target for put 707 $dtries++; 708 $dconn = $dhost->http_conn_get 709 or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath"); 710 $dsock = $dconn->sock; 711 712 unless ($dsock->write($put)) { 713 goto retry if $dconn->retryable && $dtries == 1; 714 return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport"); 715 } 716 717 # now read data and print while we're reading. 718 $bytes = length($data); 719 ($written, $remain) = (0, $clen); 720 $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining 721 $bytes_to_read = $remain if $remain < $bytes_to_read; 722 $wcount = 0; 723 724 while ($bytes_to_read) { 725 unless (defined $bytes) { 726read_again: 727 $bytes = sysread($sock, $data, $bytes_to_read); 728 unless (defined $bytes) { 729 if ($!{EAGAIN} || $!{EINTR}) { 730 wait_for_readability(fileno($sock), SOCK_TIMEOUT) and 731 goto read_again; 732 } 733 return $src_error->("error reading midway through source: $!"); 734 } 735 if ($bytes == 0) { 736 return $src_error->("EOF reading midway through source: $!"); 737 } 738 } 739 740 # now we've read in $bytes bytes 741 $remain -= $bytes; 742 $bytes_to_read = $remain if $remain < $bytes_to_read; 743 $digest->add($data) if $digest; 744 745 my $data_len = $bytes; 746 $bytes = undef; 747 my $data_off = 0; 748 while (1) { 749 my $wbytes = syswrite($dsock, $data, $data_len, $data_off); 750 unless (defined $wbytes) { 751 # it can take two writes to determine if a socket is dead 752 # (TCP_NODELAY and TCP_CORK are (and must be) zero here) 753 goto retry if (!$wcount && $dconn->retryable && $dtries == 1); 754 return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath"); 755 } 756 $wcount++; 757 $written += $wbytes; 758 $intercopy_cb->(); 759 last if ($data_len == $wbytes); 760 761 $data_len -= $wbytes; 762 $data_off += $wbytes; 763 } 764 765 die if $bytes_to_read < 0; 766 } 767 768 # source connection drained, return to pool 769 if ($sres->{keep}) { 770 $shost->http_conn_put($sconn); 771 $sconn = undef; 772 } else { 773 $sconn->close("http_close"); 774 } 775 776 # callee will want this digest, too, so clone as "digest" is destructive 777 $digest = $digest->clone->digest if $digest; 778 779 if ($fid_checksum) { 780 if ($digest ne $fid_checksum->{checksum}) { 781 my $expect = $fid_checksum->hexdigest; 782 $digest = unpack("H*", $digest); 783 return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest"); 784 } 785 } 786 787 # now read in the response line (should be first line) 788 my ($dres, $ddata) = read_headers($dsock, $intercopy_cb); 789 unless ($dres) { 790 goto retry if (!$wcount && $dconn->retryable && $dtries == 1); 791 return $dest_error->("Error: HTTP response line not recognized writing to $durl"); 792 } 793 794 # drain the response body if there is one 795 # there may be no dres->{len}/Content-Length if there is no body 796 my $dlen = ($dres->{len} || 0) - length($ddata); 797 if ($dlen > 0) { 798 my $r = $dsock->read($data, $dlen); # dres->{len} should be tiny 799 if (defined $r) { 800 if ($r != $dlen) { 801 Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl"); 802 $dres->{keep} = 0; 803 } 804 } else { 805 Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)"); 806 $dres->{keep} = 0; 807 } 808 } elsif ($dlen < 0) { 809 Mgd::error("strange response Content-Length:$dres->{len} with ". 810 length($ddata) . 811 " extra bytes for PUT response on $durl ($!)"); 812 $dres->{keep} = 0; 813 } 814 815 # return the connection back to the connection pool 816 if ($dres->{keep}) { 817 $dhost->http_conn_put($dconn); 818 $dconn = undef; 819 } else { 820 $dconn->close("http_close"); 821 } 822 823 if ($dres->{code} >= 200 && $dres->{code} <= 299) { 824 if ($digest) { 825 my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname; 826 827 if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) { 828 # dest device would've rejected us with a error, 829 # no need to reread the file 830 return 1; 831 } 832 my $httpfile = MogileFS::HTTPFile->at($durl); 833 my $actual = $httpfile->digest($alg, $intercopy_cb); 834 if ($actual ne $digest) { 835 my $expect = unpack("H*", $digest); 836 $actual = unpack("H*", $actual); 837 return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest"); 838 } 839 } 840 return 1; 841 } 842 return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl"); 843} 844 8451; 846 847# Local Variables: 848# mode: perl 849# c-basic-indent: 4 850# indent-tabs-mode: nil 851# End: 852 853__END__ 854 855=head1 NAME 856 857MogileFS::Worker::Replicate -- replicates files 858 859=head1 OVERVIEW 860 861This process replicates files enqueued in B<file_to_replicate> table. 862 863The replication policy (which devices to replicate to) is pluggable, 864but only one policy comes with the server. See 865L<MogileFS::ReplicationPolicy::MultipleHosts> 866 867=head1 SEE ALSO 868 869L<MogileFS::Worker> 870 871L<MogileFS::ReplicationPolicy> 872 873L<MogileFS::ReplicationPolicy::MultipleHosts> 874 875