1package MogileFS::Worker::Replicate;
2# replicates files around
3
4use strict;
5use base 'MogileFS::Worker';
6use fields (
7            'fidtodo',   # hashref { fid => 1 }
8            );
9
10use List::Util ();
11use MogileFS::Server;
12use MogileFS::Util qw(error every debug wait_for_readability);
13use MogileFS::Config;
14use MogileFS::ReplicationRequest qw(rr_upgrade);
15use Digest;
16use MIME::Base64 qw(encode_base64);
17
18sub new {
19    my ($class, $psock) = @_;
20    my $self = fields::new($class);
21    $self->SUPER::new($psock);
22    $self->{fidtodo} = {};
23    return $self;
24}
25
26# replicator wants
27sub watchdog_timeout { 90; }
28use constant SOCK_TIMEOUT => 45;
29
30sub work {
31    my $self = shift;
32
33    every(1.0, sub {
34        $self->send_to_parent("worker_bored 100 replicate rebalance");
35
36        my $queue_todo  = $self->queue_todo('replicate');
37        my $queue_todo2 = $self->queue_todo('rebalance');
38        return unless (@$queue_todo || @$queue_todo2);
39
40        return unless $self->validate_dbh;
41        my $sto = Mgd::get_store();
42
43        while (my $todo = shift @$queue_todo) {
44            my $fid = $todo->{fid};
45            $self->replicate_using_torepl_table($todo);
46        }
47        while (my $todo = shift @$queue_todo2) {
48            $self->still_alive;
49            # deserialize the arg :/
50            $todo->{arg} = [split /,/, $todo->{arg}];
51            my $devfid =
52                MogileFS::DevFID->new($todo->{devid}, $todo->{fid});
53            $self->rebalance_devfid($devfid,
54                { target_devids => $todo->{arg} });
55
56            # If files error out, we want to send the error up to syslog
57            # and make a real effort to chew through the queue. Users may
58            # manually re-run rebalance to retry.
59            $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE);
60        }
61        $_[0]->(0); # don't sleep.
62    });
63}
64
65# return 1 if we did something (or tried to do something), return 0 if
66# there was nothing to be done.
67sub replicate_using_torepl_table {
68    my $self = shift;
69    my $todo = shift;
70
71    # find some fids to replicate, prioritize based on when they should be tried
72    my $sto = Mgd::get_store();
73
74    my $fid = $todo->{fid};
75    $self->still_alive;
76
77    my $errcode;
78
79    my %opts;
80    $opts{errref}       = \$errcode;
81    $opts{no_unlock}    = 1; # to make it return an $unlock subref
82    $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
83
84    my ($status, $unlock) = replicate($fid, %opts);
85
86    if ($status) {
87        # $status is either 0 (failure, handled below), 1 (success, we actually
88        # replicated this file), or 2 (success, but someone else replicated it).
89
90        # when $staus eq "lost_race", this delete is unnecessary normally
91        # (somebody else presumably already deleted it if they
92        # also replicated it), but in the case of running with old
93        # replicators from previous versions, -or- simply if the
94        # other guy's delete failed, this cleans it up....
95        $sto->delete_fid_from_file_to_replicate($fid);
96        $unlock->() if $unlock;
97        next;
98    }
99
100    debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
101
102    # ERROR CASES:
103
104    # README: please keep this up to date if you update the replicate() function so we ensure
105    # that this code always does the right thing
106    #
107    # -- HARMLESS --
108    # failed_getting_lock        => harmless.  skip.  somebody else probably doing.
109    #
110    # -- ACTIONABLE --
111    # too_happy                  => too many copies, attempt to rebalance.
112    #
113    # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
114    # source_down                => only source available is observed down.
115    # policy_error_doing_failed  => policy plugin fucked up.  it's looping.
116    # policy_error_already_there => policy plugin fucked up.  it's dumb.
117    # policy_no_suggestions      => no copy was attempted.  policy is just not happy.
118    # copy_error                 => policy said to do 1+ things, we failed, it ran out of suggestions.
119    #
120    # -- FATAL; DON'T TRY AGAIN --
121    # no_source                  => it simply exists nowhere.  not that something's down, but file_on is empty.
122
123    # bail if we failed getting the lock, that means someone else probably
124    # already did it, so we should just move on
125    if ($errcode eq 'failed_getting_lock') {
126        $unlock->() if $unlock;
127        next;
128    }
129
130    # logic for setting the next try time appropriately
131    my $update_nexttry = sub {
132        my ($type, $delay) = @_;
133        my $sto = Mgd::get_store();
134        if ($type eq 'end_of_time') {
135            # special; update to a time that won't happen again,
136            # as we've encountered a scenario in which case we're
137            # really hosed
138            $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time);
139        } elsif ($type eq "offset") {
140            $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
141        } else {
142            $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
143        }
144    };
145
146    # now let's handle any error we want to consider a total failure; do not
147    # retry at any point.  push this file off to the end so someone has to come
148    # along and figure out what went wrong.
149    if ($errcode eq 'no_source') {
150        $update_nexttry->( end_of_time => 1 );
151        $unlock->() if $unlock;
152        next;
153    }
154
155    # try to shake off extra copies. fall through to the backoff logic
156    # so we don't flood if it's impossible to properly weaken the fid.
157    # there's a race where the fid could be checked again, but the
158    # exclusive locking prevents replication clobbering.
159    if ($errcode eq 'too_happy') {
160        $unlock->() if $unlock;
161        $unlock = undef;
162        my $f = MogileFS::FID->new($fid);
163        my @devs = List::Util::shuffle($f->devids);
164        my $devfid;
165        # First one we can delete from, we try to rebalance away from.
166        for (@devs) {
167            my $dev = Mgd::device_factory()->get_by_id($_);
168            # Not positive 'should_read_from' needs to be here.
169            # We must be able to delete off of this dev so the fid can
170            # move.
171            if ($dev->can_delete_from && $dev->should_read_from) {
172                $devfid = MogileFS::DevFID->new($dev, $f);
173                last;
174            }
175        }
176        if ($devfid) {
177            if ($self->rebalance_devfid($devfid)) {
178                # disable exponential backoff below if we rebalanced due to
179                # excessive replication:
180                $todo->{failcount} = 0;
181            }
182        }
183    }
184
185    # at this point, the rest of the errors require exponential backoff.  define what this means
186    # as far as failcount -> delay to next try.
187    # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
188    my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
189    $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
190    $unlock->() if $unlock;
191    return 1;
192}
193
194# Return 1 on success, 0 on failure or no-op.
195sub rebalance_devfid {
196    my ($self, $devfid, $opts) = @_;
197    $opts ||= {};
198    MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids));
199
200    my $fid = $devfid->fid;
201
202    # bail out early if this FID is no longer in the namespace (weird
203    # case where file is in file_on because not yet deleted, but
204    # has been replaced/deleted in 'file' table...).  not too harmful
205    # (just noisy) if this line didn't exist, but whatever... it
206    # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
207    # dev machine...
208    return 1 if ! $fid->exists;
209
210    my $errcode;
211    my ($ret, $unlock) = replicate($fid,
212                                   mask_devids  => { $devfid->devid => 1 },
213                                   no_unlock    => 1,
214                                   target_devids => $opts->{target_devids},
215                                   errref       => \$errcode,
216                                   );
217
218    my $fail = sub {
219        my $error = shift;
220        $unlock->();
221        error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
222        return 0;
223    };
224
225    unless ($ret || $errcode eq "too_happy") {
226        return $fail->("Replication failed");
227    }
228
229    my $should_delete = 0;
230    my $del_reason;
231
232    if ($errcode eq "too_happy" || $ret eq "lost_race") {
233        # for some reason, we did no work. that could be because
234        # either 1) we lost the race, as the error code implies,
235        # and some other process rebalanced this first, or 2)
236        # the file is over-replicated, and everybody just thinks they
237        # lost the race because the replication policy said there's
238        # nothing to do, even with this devfid masked away.
239        # so let's figure it out... if this devfid still exists,
240        # we're over-replicated, else we just lost the race.
241        if ($devfid->exists) {
242            # over-replicated
243
244            # see if some copy, besides this one we want
245            # to delete, is currently alive & of right size..
246            # just as extra paranoid check before we delete it
247            foreach my $test_df ($fid->devfids) {
248                next if $test_df->devid == $devfid->devid;
249                if ($test_df->size_matches) {
250                    $should_delete = 1;
251                    $del_reason = "over_replicated";
252                    last;
253                }
254            }
255        } else {
256            # lost race
257            $should_delete = 0;  # no-op
258        }
259    } elsif ($ret eq "would_worsen") {
260        # replication has indicated we would be making ruining this fid's day
261        # if we delete an existing copy, so lets not do that.
262        # this indicates a condition where there're no suitable devices to
263        # copy new data onto, so lets be loud about it.
264        return $fail->("no suitable destination devices available");
265    } else {
266        $should_delete = 1;
267        $del_reason = "did_rebalance;ret=$ret";
268    }
269
270    my %destroy_opts;
271
272    $destroy_opts{ignore_missing} = 1
273        if MogileFS::Config->config("rebalance_ignore_missing");
274
275    if ($should_delete) {
276        eval { $devfid->destroy(%destroy_opts) };
277        if ($@) {
278            return $fail->("HTTP delete (due to '$del_reason') failed: $@");
279        }
280    }
281
282    $unlock->();
283    return $should_delete;
284}
285
286# replicates $fid to make sure it meets its class' replicate policy.
287#
288# README: if you update this sub to return a new error code, please update the
289# appropriate callers to know how to deal with the errors returned.
290#
291# returns either:
292#    $rv
293#    ($rv, $unlock_sub)    -- when 'no_unlock' %opt is used. subref to release lock.
294# $rv is one of:
295#    0 = failure  (failure written to ${$opts{errref}})
296#    1 = success
297#    "lost_race" = skipping, we did no work and policy was already met.
298#    "nofid" => fid no longer exists. skip replication.
299sub replicate {
300    my ($fid, %opts) = @_;
301    $fid = MogileFS::FID->new($fid) unless ref $fid;
302    my $fidid = $fid->id;
303
304    debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
305
306    my $errref    = delete $opts{'errref'};
307    my $no_unlock = delete $opts{'no_unlock'};
308    my $fixed_source = delete $opts{'source_devid'};
309    my $mask_devids  = delete $opts{'mask_devids'}  || {};
310    my $avoid_devids = delete $opts{'avoid_devids'} || {};
311    my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
312    die "unknown_opts" if %opts;
313    die unless ref $mask_devids eq "HASH";
314
315    my $sdevid;
316
317    my $sto = Mgd::get_store();
318    my $unlock = sub {
319        $sto->note_done_replicating($fidid);
320    };
321
322    my $retunlock = sub {
323        my $rv = shift;
324        my ($errmsg, $errcode);
325        if (@_ == 2) {
326            ($errcode, $errmsg) = @_;
327            $errmsg = "$errcode: $errmsg"; # include code with message
328        } else {
329            ($errmsg) = @_;
330        }
331        $$errref = $errcode if $errref;
332
333        my $ret;
334        if ($errcode && $errcode eq "failed_getting_lock") {
335            # don't emit a warning with error() on lock failure.  not
336            # a big deal, don't scare people.
337            $ret = 0;
338        } else {
339            $ret = $rv ? $rv : error($errmsg);
340        }
341        if ($no_unlock) {
342            die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
343            return ($ret, $unlock);
344        } else {
345            die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
346            $unlock->();
347            return $ret;
348        }
349    };
350
351    # hashref of devid -> MogileFS::Device
352    my $devs = Mgd::device_factory()->map_by_id
353        or die "No device map";
354
355    return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
356        unless $sto->should_begin_replicating_fidid($fidid);
357
358    # if the fid doesn't even exist, consider our job done!  no point
359    # replicating file contents of a file no longer in the namespace.
360    return $retunlock->("nofid") unless $fid->exists;
361
362    my $cls = $fid->class;
363    my $polobj = $cls->repl_policy_obj;
364
365    # learn what this devices file is already on
366    my @on_devs;         # all devices fid is on, reachable or not.
367    my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
368    my @on_up_devid;     # subset of @on_devs:  just devs that are readable
369
370    foreach my $devid ($fid->devids) {
371        my $d = Mgd::device_factory()->get_by_id($devid)
372            or next;
373        push @on_devs, $d;
374        if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
375            push @on_devs_tellpol, $d;
376        }
377        if ($d->should_read_from) {
378            push @on_up_devid, $devid;
379        }
380    }
381
382    return $retunlock->(0, "no_source",   "Source is no longer available replicating $fidid") if @on_devs == 0;
383    return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
384
385    if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) {
386        error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices");
387    }
388
389    my %dest_failed;    # devid -> 1 for each devid we were asked to copy to, but failed.
390    my %source_failed;  # devid -> 1 for each devid we had problems reading from.
391    my $got_copy_request = 0;  # true once replication policy asks us to move something somewhere
392    my $copy_err;
393
394    my $dest_devs = $devs;
395    if (@$target_devids) {
396        $dest_devs = {map { $_ => $devs->{$_} } @$target_devids};
397    }
398
399    my $rr;  # MogileFS::ReplicationRequest
400    while (1) {
401        $rr = rr_upgrade($polobj->replicate_to(
402                                               fid       => $fidid,
403                                               on_devs   => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
404                                               all_devs  => $dest_devs,
405                                               failed    => \%dest_failed,
406                                               min       => $cls->mindevcount,
407                                               ));
408
409        last if $rr->is_happy;
410
411        my @ddevs;  # dest devs, in order of preference
412        my $ddevid; # dest devid we've chosen to copy to
413        if (@ddevs = $rr->copy_to_one_of_ideally) {
414            if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
415                                             ! $avoid_devids->{$_}
416                                         }
417                                      map { $_->id } @ddevs)) {
418                $ddevid = $not_masked_ids[0];
419            } else {
420                # once we masked devids away, there were no
421                # ideal suggestions.  this is the case of rebalancing,
422                # which without this check could 'worsen' the state
423                # of the world.  consider the case:
424                #    h1[ d1 d2 ] h2[ d3 ]
425                # and files are on d1 & d3, an ideal layout.
426                # if d3 is being rebalanced, and masked away, the
427                # replication policy could presumably say to put
428                # the file on d2, even though d3 isn't dead.
429                # so instead, when masking is in effect, we don't
430                # use non-ideal placement, just bailing out.
431
432                # this used to return "lost_race" as a lie, but rebalance was
433                # happily deleting the masked fid if at least one other fid
434                # existed... because it assumed it was over replicated.
435                # now we tell rebalance that touching this fid would be
436                # stupid.
437                return $retunlock->("would_worsen");
438            }
439        } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
440            # TODO: reschedule a replication for 'n' minutes in future, or
441            # when new hosts/devices become available or change state
442            $ddevid = $ddevs[0]->id;
443        } else {
444            last;
445        }
446
447        $got_copy_request = 1;
448
449        # replication policy shouldn't tell us to put a file on a device
450        # we've already told it that we've failed at.  so if we get that response,
451        # the policy plugin is broken and we should terminate now.
452        if ($dest_failed{$ddevid}) {
453            return $retunlock->(0, "policy_error_doing_failed",
454                                "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
455        }
456
457        # replication policy shouldn't tell us to put a file on a
458        # device that it's already on.  that's just stupid.
459        if (grep { $_->id == $ddevid } @on_devs) {
460            return $retunlock->(0, "policy_error_already_there",
461                                "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
462        }
463
464        # find where we're replicating from
465        {
466            # TODO: use an observed good device+host as source to start.
467            my @choices = grep { ! $source_failed{$_} } @on_up_devid;
468            return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
469            if ($fixed_source && grep { $_ == $fixed_source } @choices) {
470                $sdevid = $fixed_source;
471            } else {
472                @choices = List::Util::shuffle(@choices);
473                MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices);
474                $sdevid = shift @choices;
475            }
476        }
477
478        my $worker = MogileFS::ProcManager->is_child or die;
479        my $digest;
480        my $fid_checksum = $fid->checksum;
481        $digest = Digest->new($fid_checksum->hashname) if $fid_checksum;
482        $digest ||= Digest->new($cls->hashname) if $cls->hashtype;
483
484        my $rv = http_copy(
485                           sdevid       => $sdevid,
486                           ddevid       => $ddevid,
487                           fid          => $fid,
488                           errref       => \$copy_err,
489                           callback     => sub { $worker->still_alive; },
490                           digest       => $digest,
491                           );
492        die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
493
494        unless ($rv) {
495            error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
496            if ($copy_err eq "src_error") {
497                $source_failed{$sdevid} = 1;
498
499                if ($fixed_source && $fixed_source == $sdevid) {
500                    error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources");
501                }
502
503            } else {
504                $dest_failed{$ddevid} = 1;
505            }
506            next;
507        }
508
509        my $dfid = MogileFS::DevFID->new($ddevid, $fid);
510        $dfid->add_to_db;
511        if ($digest && !$fid->checksum) {
512            $sto->set_checksum($fidid, $cls->hashtype, $digest->digest);
513        }
514
515        push @on_devs, $devs->{$ddevid};
516        push @on_devs_tellpol, $devs->{$ddevid};
517        push @on_up_devid, $ddevid;
518    }
519
520    # We are over replicated. Let caller decide if it should rebalance.
521    if ($rr->too_happy) {
522        return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
523    }
524
525    if ($rr->is_happy) {
526        return $retunlock->(1) if $got_copy_request;
527        return $retunlock->("lost_race");  # some other process got to it first.  policy was happy immediately.
528    }
529
530    return $retunlock->(0, "policy_no_suggestions",
531                        "replication policy ran out of suggestions for us replicating fid $fidid");
532}
533
534# Returns a hashref with the following:
535# {
536#   code => HTTP status code integer,
537#   keep => boolean, whether to keep the connection after reading
538#   len =>  value of the Content-Length header (integer)
539# }
540# Returns undef on timeout
541sub read_headers {
542    my ($sock, $intercopy_cb) = @_;
543    my $head = '';
544
545    do {
546        wait_for_readability(fileno($sock), SOCK_TIMEOUT) or return;
547        $intercopy_cb->();
548        my $r = sysread($sock, $head, 1024, length($head));
549        if (defined $r) {
550            return if $r == 0; # EOF
551        } elsif ($!{EAGAIN} || $!{EINTR}) {
552            # loop again
553        } else {
554            return;
555        }
556    } until ($head =~ /\r?\n\r?\n/);
557
558    my $data;
559    ($head, $data) = split(/\r?\n\r?\n/, $head, 2);
560    my @head = split(/\r?\n/, $head);
561    $head = shift(@head);
562    $head =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return;
563    my %rv = ( keep => $1 >= 1.1, code => $2 );
564
565    foreach my $line (@head) {
566        if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) {
567            $rv{keep} = 1;
568        } elsif ($line =~ /\AConnection:\s*close\s*\z/is) {
569            $rv{keep} = 0;
570        } elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) {
571            $rv{len} = $1;
572        }
573    }
574    return (\%rv, $data);
575}
576
577# copies a file from one Perlbal to another utilizing HTTP
578sub http_copy {
579    my %opts = @_;
580    my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) =
581        map { delete $opts{$_} } qw(sdevid
582                                    ddevid
583                                    fid
584                                    callback
585                                    errref
586                                    digest
587                                    );
588    die if %opts;
589
590    $fid = MogileFS::FID->new($fid) unless ref($fid);
591    my $fidid = $fid->id;
592    my $expected_clen = $fid->length;
593    my $clen;
594    my $content_md5 = '';
595    my ($sconn, $dconn);
596    my $fid_checksum = $fid->checksum;
597    if ($fid_checksum && $fid_checksum->hashname eq "MD5") {
598        # some HTTP servers may be able to verify Content-MD5 on PUT
599        # and reject corrupted requests.  no HTTP server should reject
600        # a request for an unrecognized header
601        my $b64digest = encode_base64($fid_checksum->{checksum}, "");
602        $content_md5 = "\r\nContent-MD5: $b64digest";
603    }
604
605    $intercopy_cb ||= sub {};
606
607    my $err_common = sub {
608        my ($err, $msg) = @_;
609        $$errref = $err if $errref;
610        $sconn->close($err) if $sconn;
611        $dconn->close($err) if $dconn;
612        return error($msg);
613    };
614
615    # handles setting unreachable magic; $error->(reachability, "message")
616    my $error_unreachable = sub {
617        return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]");
618    };
619
620    my $dest_error = sub {
621        return $err_common->("dest_error", $_[0]);
622    };
623
624    my $src_error = sub {
625        return $err_common->("src_error", $_[0]);
626    };
627
628    # get some information we'll need
629    my $sdev = Mgd::device_factory()->get_by_id($sdevid);
630    my $ddev = Mgd::device_factory()->get_by_id($ddevid);
631
632    return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid")
633        unless $sdev && $ddev;
634
635    my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
636    my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
637
638    my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
639    my ($shost, $dhost) = (map { $_->host     } ($sdev, $ddev));
640
641    my ($shostip, $sport) = ($shost->ip, $shost->http_port);
642    if (MogileFS::Config->config("repl_use_get_port")) {
643        $sport = $shost->http_get_port;
644    }
645    my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
646    unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
647        # show detailed information to find out what's not configured right
648        error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid");
649        error("       http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
650        return 0;
651    }
652
653    my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" .
654              "Content-length: $expected_clen$content_md5\r\n\r\n";
655
656    # need by webdav servers, like lighttpd...
657    $ddev->vivify_directories($d_dfid->url);
658
659    # call a hook for odd casing completely different source data
660    # for specific files.
661    my $shttphost;
662    MogileFS::run_global_hook('replicate_alternate_source',
663                              $fid, \$shostip, \$sport, \$spath, \$shttphost);
664
665    my $durl = "http://$dhostip:$dport$dpath";
666    my $surl = "http://$shostip:$sport$spath";
667    # okay, now get the file
668    my %sopts = ( ip => $shostip, port => $sport );
669
670    my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n";
671    # plugin set a custom host.
672    $get .= "Host: $shttphost\r\n" if $shttphost;
673
674    my ($sock, $dsock);
675    my ($wcount, $bytes_to_read, $written, $remain);
676    my ($stries, $dtries) = (0, 0);
677    my ($sres, $data, $bytes);
678
679retry:
680    $sconn->close("retrying") if $sconn;
681    $dconn->close("retrying") if $dconn;
682    $dconn = undef;
683    $stries++;
684    $sconn = $shost->http_conn_get(\%sopts)
685        or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
686    $sock = $sconn->sock;
687    unless ($sock->write("$get\r\n")) {
688        goto retry if $sconn->retryable && $stries == 1;
689        return $src_error->("Pipe closed retrieving $spath from $shostip:$sport");
690    }
691
692    # we just want a content length
693    ($sres, $data) = read_headers($sock, $intercopy_cb);
694    unless ($sres) {
695        goto retry if $sconn->retryable && $stries == 1;
696        return $error_unreachable->("Error: Resource $surl failed to return an HTTP response");
697    }
698    unless ($sres->{code} >= 200 && $sres->{code} <= 299) {
699        return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}");
700    }
701    $clen = $sres->{len};
702
703    return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
704        if $clen != $expected_clen;
705
706    # open target for put
707    $dtries++;
708    $dconn = $dhost->http_conn_get
709        or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
710    $dsock = $dconn->sock;
711
712    unless ($dsock->write($put)) {
713        goto retry if $dconn->retryable && $dtries == 1;
714        return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport");
715    }
716
717    # now read data and print while we're reading.
718    $bytes = length($data);
719    ($written, $remain) = (0, $clen);
720    $bytes_to_read = 1024*1024;  # read 1MB at a time until there's less than that remaining
721    $bytes_to_read = $remain if $remain < $bytes_to_read;
722    $wcount = 0;
723
724    while ($bytes_to_read) {
725        unless (defined $bytes) {
726read_again:
727            $bytes = sysread($sock, $data, $bytes_to_read);
728            unless (defined $bytes) {
729                if ($!{EAGAIN} || $!{EINTR}) {
730                    wait_for_readability(fileno($sock), SOCK_TIMEOUT) and
731                        goto read_again;
732                }
733                return $src_error->("error reading midway through source: $!");
734            }
735            if ($bytes == 0) {
736                return $src_error->("EOF reading midway through source: $!");
737            }
738        }
739
740        # now we've read in $bytes bytes
741        $remain -= $bytes;
742        $bytes_to_read = $remain if $remain < $bytes_to_read;
743        $digest->add($data) if $digest;
744
745        my $data_len = $bytes;
746        $bytes = undef;
747        my $data_off = 0;
748        while (1) {
749            my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
750            unless (defined $wbytes) {
751                # it can take two writes to determine if a socket is dead
752                # (TCP_NODELAY and TCP_CORK are (and must be) zero here)
753                goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
754                return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
755            }
756            $wcount++;
757            $written += $wbytes;
758            $intercopy_cb->();
759            last if ($data_len == $wbytes);
760
761            $data_len -= $wbytes;
762            $data_off += $wbytes;
763        }
764
765        die if $bytes_to_read < 0;
766    }
767
768    # source connection drained, return to pool
769    if ($sres->{keep}) {
770        $shost->http_conn_put($sconn);
771        $sconn = undef;
772    } else {
773        $sconn->close("http_close");
774    }
775
776    # callee will want this digest, too, so clone as "digest" is destructive
777    $digest = $digest->clone->digest if $digest;
778
779    if ($fid_checksum) {
780        if ($digest ne $fid_checksum->{checksum}) {
781            my $expect = $fid_checksum->hexdigest;
782            $digest = unpack("H*", $digest);
783            return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest");
784        }
785    }
786
787    # now read in the response line (should be first line)
788    my ($dres, $ddata) = read_headers($dsock, $intercopy_cb);
789    unless ($dres) {
790        goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
791        return $dest_error->("Error: HTTP response line not recognized writing to $durl");
792    }
793
794    # drain the response body if there is one
795    # there may be no dres->{len}/Content-Length if there is no body
796    my $dlen = ($dres->{len} || 0) - length($ddata);
797    if ($dlen > 0) {
798        my $r = $dsock->read($data, $dlen); # dres->{len} should be tiny
799        if (defined $r) {
800            if ($r != $dlen) {
801                Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl");
802                $dres->{keep} = 0;
803            }
804        } else {
805            Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)");
806            $dres->{keep} = 0;
807        }
808    } elsif ($dlen < 0) {
809        Mgd::error("strange response Content-Length:$dres->{len} with ".
810                    length($ddata) .
811                    " extra bytes for PUT response on $durl ($!)");
812        $dres->{keep} = 0;
813    }
814
815    # return the connection back to the connection pool
816    if ($dres->{keep}) {
817        $dhost->http_conn_put($dconn);
818        $dconn = undef;
819    } else {
820        $dconn->close("http_close");
821    }
822
823    if ($dres->{code} >= 200 && $dres->{code} <= 299) {
824        if ($digest) {
825            my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname;
826
827            if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) {
828                # dest device would've rejected us with a error,
829                # no need to reread the file
830                return 1;
831            }
832            my $httpfile = MogileFS::HTTPFile->at($durl);
833            my $actual = $httpfile->digest($alg, $intercopy_cb);
834            if ($actual ne $digest) {
835                my $expect = unpack("H*", $digest);
836                $actual = unpack("H*", $actual);
837                return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest");
838            }
839        }
840        return 1;
841    }
842    return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl");
843}
844
8451;
846
847# Local Variables:
848# mode: perl
849# c-basic-indent: 4
850# indent-tabs-mode: nil
851# End:
852
853__END__
854
855=head1 NAME
856
857MogileFS::Worker::Replicate -- replicates files
858
859=head1 OVERVIEW
860
861This process replicates files enqueued in B<file_to_replicate> table.
862
863The replication policy (which devices to replicate to) is pluggable,
864but only one policy comes with the server.  See
865L<MogileFS::ReplicationPolicy::MultipleHosts>
866
867=head1 SEE ALSO
868
869L<MogileFS::Worker>
870
871L<MogileFS::ReplicationPolicy>
872
873L<MogileFS::ReplicationPolicy::MultipleHosts>
874
875