1package MogileFS::Worker::Query;
2# responds to queries from Mogile clients
3
4use strict;
5use warnings;
6
7use base 'MogileFS::Worker';
8use fields qw(querystarttime reqid callid);
9use MogileFS::Util qw(error error_code first weighted_list
10                      device_state eurl decode_url_args);
11use MogileFS::HTTPFile;
12use MogileFS::Rebalance;
13use MogileFS::Config;
14use MogileFS::Server;
15
16sub new {
17    my ($class, $psock) = @_;
18    my $self = fields::new($class);
19    $self->SUPER::new($psock);
20
21    $self->{querystarttime} = undef;
22    $self->{reqid}          = undef;
23    $self->{callid}         = undef;
24    return $self;
25}
26
27# no query should take 30 seconds, and we check in every 5 seconds.
28sub watchdog_timeout { 30 }
29
30# called by plugins to register a command in the namespace
31sub register_command {
32    my ($cmd, $sub) = @_;
33
34    # validate the command, then convert it to the actual thing the user
35    # will be calling
36    return 0 unless $cmd =~ /^[\w\d]+$/;
37    $cmd = "plugin_$cmd";
38
39    # register in namespace with 'cmd_' which we will automatically find
40    no strict 'refs';
41    *{"cmd_$cmd"} = $sub;
42
43    # all's well
44    return 1;
45}
46
47sub work {
48    my $self = shift;
49    my $psock = $self->{psock};
50    my $rin = '';
51    vec($rin, fileno($psock), 1) = 1;
52    my $buf;
53
54    while (1) {
55        my $rout;
56        unless (select($rout=$rin, undef, undef, 5.0)) {
57            $self->still_alive;
58            next;
59        }
60
61        my $newread;
62        my $rv = sysread($psock, $newread, Mgd::UNIX_RCVBUF_SIZE());
63        if (!$rv) {
64            if (defined $rv) {
65                die "While reading pipe from parent, got EOF.  Parent's gone.  Quitting.\n";
66            } else {
67                die "Error reading pipe from parent: $!\n";
68            }
69        }
70        $buf .= $newread;
71
72        while ($buf =~ s/^(.+?)\r?\n//) {
73            my $line = $1;
74            if ($self->process_generic_command(\$line)) {
75                $self->still_alive;  # no-op for watchdog
76            } else {
77                $self->validate_dbh;
78                $self->process_line(\$line);
79            }
80        }
81    }
82}
83
84sub process_line {
85    my MogileFS::Worker::Query $self = shift;
86    my $lineref = shift;
87
88    # see what kind of command this is
89    return $self->err_line('unknown_command')
90        unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s*(.*)/;
91
92    $self->{reqid} = $1 || undef;
93    my ($client_ip, $line) = ($2, $3);
94
95    # set global variables for zone determination
96    local $MogileFS::REQ_client_ip = $client_ip;
97
98    # Use as array here, otherwise we get a string which breaks usage of
99    # Time::HiRes::tv_interval further on.
100    $self->{querystarttime} = [ Time::HiRes::gettimeofday() ];
101
102    # fallback to normal command handling
103    if ($line =~ /^(\w+)\s*(.*)/) {
104        my ($cmd, $orig_args) = ($1, $2);
105        $cmd = lc($cmd);
106
107        no strict 'refs';
108        my $cmd_handler = *{"cmd_$cmd"}{CODE};
109        my $args = decode_url_args(\$orig_args);
110        $self->{callid} = $args->{callid};
111        if ($cmd_handler) {
112            local $MogileFS::REQ_altzone = ($args->{zone} && $args->{zone} eq 'alt');
113            eval {
114                $cmd_handler->($self, $args);
115            };
116            if ($@) {
117                my $errc = error_code($@);
118                if ($errc eq "dup") {
119                    return $self->err_line("dup");
120                } else {
121                    warn "Error: $@\n";
122                    error("Error running command '$cmd': $@");
123                    return $self->err_line("failure");
124                }
125            }
126            return;
127        }
128    }
129
130    return $self->err_line('unknown_command');
131}
132
133# this is a half-finished command.  in particular, errors tend to
134# crash the parent or child or something.  it's a quick hack for a quick
135# ops task that needs done.  note in particular how it reaches across
136# package boundaries into an API that the Replicator probably doesn't
137# want exposed.
138sub cmd_httpcopy {
139    my MogileFS::Worker::Query $self = shift;
140    my $args = shift;
141    my $sdevid = $args->{sdevid};
142    my $ddevid = $args->{ddevid};
143    my $fid    = $args->{fid};
144
145    my $err;
146    my $rv = MogileFS::Worker::Replicate::http_copy(sdevid => $sdevid,
147                                                    ddevid => $ddevid,
148                                                    fid    => $fid,
149                                                    errref => \$err);
150    if ($rv) {
151        my $dfid = MogileFS::DevFID->new($ddevid, $fid);
152        $dfid->add_to_db
153            or return $self->err_line("copy_err", "failed to add link to database");
154        return $self->ok_line;
155    } else {
156        return $self->err_line("copy_err", $err);
157    }
158}
159
160# returns 0 on error, or dmid of domain
161sub check_domain {
162    my MogileFS::Worker::Query $self = shift;
163    my $args = shift;
164
165    my $domain = $args->{domain};
166
167    return $self->err_line("no_domain") unless defined $domain && length $domain;
168
169    # validate domain
170    my $dmid = eval { Mgd::domain_factory()->get_by_name($domain)->id } or
171        return $self->err_line("unreg_domain");
172
173    return $dmid;
174}
175
176sub cmd_sleep {
177    my MogileFS::Worker::Query $self = shift;
178    my $args = shift;
179    sleep($args->{duration} || 10);
180    return $self->ok_line;
181}
182
183sub cmd_test {
184    my MogileFS::Worker::Query $self = shift;
185    my $args = shift;
186    die "Crashed on purpose" if $args->{crash};
187    return $self->ok_line;
188}
189
190sub cmd_clear_cache {
191    my MogileFS::Worker::Query $self = shift;
192
193    $self->forget_that_monitor_has_run;
194    $self->send_to_parent(":refresh_monitor");
195    $self->wait_for_monitor;
196
197    return $self->ok_line(@_);
198}
199
200sub cmd_create_open {
201    my MogileFS::Worker::Query $self = shift;
202    my $args = shift;
203
204    # has to be filled out for some plugins
205    $args->{dmid} = $self->check_domain($args) or return;
206
207    # first, pass this to a hook to do any manipulations needed
208    eval {MogileFS::run_global_hook('cmd_create_open', $args)};
209
210    return $self->err_line("plugin_aborted", "$@")
211        if $@;
212
213    # validate parameters
214    my $dmid = $args->{dmid};
215    my $key = $args->{key} || "";
216    my $multi = $args->{multi_dest} ? 1 : 0;
217    my $size = $args->{size} || undef; # Size is optional at create time,
218                                       # but used to grep devices if available
219
220    # optional profiling of stages, if $args->{debug_profile}
221    my @profpoints;  # array of [point,hires-starttime]
222    my $profstart = sub {
223        my $pt = shift;
224        push @profpoints, [$pt, Time::HiRes::time()];
225    };
226    $profstart = sub {} unless $args->{debug_profile};
227    $profstart->("begin");
228
229    # we want it to be undef if not explicit, else force to numeric
230    my $exp_fidid = $args->{fid} ? int($args->{fid}) : undef;
231
232    # get DB handle
233    my $sto = Mgd::get_store();
234
235    # figure out what classid this file is for
236    my $class = $args->{class} || "";
237    my $classid = 0;
238    if (length($class)) {
239        $classid = eval { Mgd::class_factory()->get_by_name($dmid, $class)->id }
240            or return $self->err_line("unreg_class");
241    }
242
243    # if we haven't heard from the monitoring job yet, we need to chill a bit
244    # to prevent a race where we tell a user that we can't create a file when
245    # in fact we've just not heard from the monitor
246    $profstart->("wait_monitor");
247    $self->wait_for_monitor;
248
249    $profstart->("find_deviceid");
250
251    my @devices = Mgd::device_factory()->get_all;
252    if ($size) {
253        # We first ignore all the devices with an unknown space free.
254        @devices = grep { length($_->mb_free) && ($_->mb_free * 1024*1024) > $size } @devices;
255
256        # If we didn't find any, try all the devices with an unknown space free.
257        # This may happen if mogstored isn't running.
258        if (!@devices) {
259            @devices = grep { !length($_->mb_free) } Mgd::device_factory()->get_all;
260        }
261    }
262
263    unless (MogileFS::run_global_hook('cmd_create_open_order_devices', [ @devices ], \@devices)) {
264        @devices = sort_devs_by_freespace(@devices);
265    }
266
267    # find suitable device(s) to put this file on.
268    my @dests; # MogileFS::Device objects which are suitable
269
270    while (scalar(@dests) < ($multi ? 3 : 1)) {
271        my $ddev = shift @devices;
272
273        last unless $ddev;
274        next unless $ddev->not_on_hosts(map { $_->host } @dests);
275
276        push @dests, $ddev;
277    }
278    return $self->err_line("no_devices") unless @dests;
279
280    my $fidid = eval {
281        $sto->register_tempfile(
282                                fid     => $exp_fidid, # may be undef/NULL to mean auto-increment
283                                dmid    => $dmid,
284                                key     => $key,
285                                classid => $classid,
286                                devids  => join(',', map { $_->id } @dests),
287                                );
288    };
289    unless ($fidid) {
290        my $errc = error_code($@);
291        return $self->err_line("fid_in_use") if $errc eq "dup";
292        warn "Error registering tempfile: $@\n";
293        return $self->err_line("db");
294    }
295
296    # make sure directories exist for client to be able to PUT into
297    my %dir_done;
298    $profstart->("vivify_dir_on_all_devs");
299
300    my $t0 = Time::HiRes::time();
301    foreach my $dev (@dests) {
302        my $dfid = MogileFS::DevFID->new($dev, $fidid);
303        $dfid->vivify_directories(sub {
304            $dir_done{$dfid->devid} = Time::HiRes::time() - $t0;
305        });
306    }
307
308    # don't start the event loop if results are all cached
309    if (scalar keys %dir_done != scalar @dests) {
310        Danga::Socket->SetPostLoopCallback(sub { scalar keys %dir_done != scalar @dests });
311        Danga::Socket->EventLoop;
312    }
313    $profstart->("end");
314
315    # common reply variables
316    my $res = {
317        fid => $fidid,
318    };
319
320    # add profiling data
321    if (@profpoints) {
322        $res->{profpoints} = 0;
323        for (my $i=0; $i<$#profpoints; $i++) {
324            my $ptnum = ++$res->{profpoints};
325            $res->{"prof_${ptnum}_name"} = $profpoints[$i]->[0];
326            $res->{"prof_${ptnum}_time"} =
327                sprintf("%0.03f",
328                        $profpoints[$i+1]->[1] - $profpoints[$i]->[1]);
329        }
330        while (my ($devid, $time) = each %dir_done) {
331            my $ptnum = ++$res->{profpoints};
332            $res->{"prof_${ptnum}_name"} = "vivify_dir_on_dev$devid";
333            $res->{"prof_${ptnum}_time"} = sprintf("%0.03f", $time);
334        }
335    }
336
337    # add path info
338    if ($multi) {
339        my $ct = 0;
340        foreach my $dev (@dests) {
341            $ct++;
342            $res->{"devid_$ct"} = $dev->id;
343            $res->{"path_$ct"} = MogileFS::DevFID->new($dev, $fidid)->url;
344        }
345        $res->{dev_count} = $ct;
346    } else {
347        $res->{devid} = $dests[0]->id;
348        $res->{path}  = MogileFS::DevFID->new($dests[0], $fidid)->url;
349    }
350
351    return $self->ok_line($res);
352}
353
354sub sort_devs_by_freespace {
355    my @devices_with_weights = map {
356        [$_, 100 * $_->percent_free]
357    } sort {
358        $b->percent_free <=> $a->percent_free;
359    } grep {
360        $_->should_get_new_files;
361    } @_;
362
363    my @list =
364        MogileFS::Util::weighted_list(splice(@devices_with_weights, 0, 20));
365
366    return @list;
367}
368
369sub valid_key {
370    my ($key) = @_;
371
372    return defined($key) && length($key);
373}
374
375sub cmd_create_close {
376    my MogileFS::Worker::Query $self = shift;
377    my $args = shift;
378
379    # has to be filled out for some plugins
380    $args->{dmid} = $self->check_domain($args) or return;
381
382    # call out to a hook that might modify the arguments for us
383    MogileFS::run_global_hook('cmd_create_close', $args);
384
385    # late validation of parameters
386    my $dmid  = $args->{dmid};
387    my $key   = $args->{key};
388    my $fidid = $args->{fid}    or return $self->err_line("no_fid");
389    my $devid = $args->{devid}  or return $self->err_line("no_devid");
390    my $path  = $args->{path}   or return $self->err_line("no_path");
391    my $checksum = $args->{checksum};
392
393    if ($checksum) {
394        $checksum = eval { MogileFS::Checksum->from_string($fidid, $checksum) };
395        return $self->err_line("invalid_checksum_format") if $@;
396    }
397
398    my $fid  = MogileFS::FID->new($fidid);
399    my $dfid = MogileFS::DevFID->new($devid, $fid);
400
401    # is the provided path what we'd expect for this fid/devid?
402    return $self->err_line("bogus_args")
403        unless $path eq $dfid->url;
404
405    my $sto = Mgd::get_store();
406
407    # find the temp file we're closing and making real.  If another worker
408    # already has it, bail out---the client closed it twice.
409    # this is racy, but the only expected use case is a client retrying.
410    # should still be fixed better once more scalable locking is available.
411    my $trow = $sto->delete_and_return_tempfile_row($fidid) or
412        return $self->err_line("no_temp_file");
413
414    # Protect against leaving orphaned uploads.
415    my $failed = sub {
416        $dfid->add_to_db;
417        $fid->delete;
418    };
419
420    unless ($trow->{devids} =~ m/\b$devid\b/) {
421        $failed->();
422        return $self->err_line("invalid_destdev", "File uploaded to invalid dest $devid. Valid devices were: " . $trow->{devids});
423    }
424
425    # if a temp file is closed without a provided-key, that means to
426    # delete it.
427    unless (valid_key($key)) {
428        $failed->();
429        return $self->ok_line;
430    }
431
432    # get size of file and verify that it matches what we were given, if anything
433    my $httpfile = MogileFS::HTTPFile->at($path);
434    my $size = $httpfile->size;
435
436    # size check is optional? Needs to support zero byte files.
437    $args->{size} = -1 unless $args->{size};
438    if (!defined($size) || $size == MogileFS::HTTPFile::FILE_MISSING) {
439        # storage node is unreachable or the file is missing
440        my $type    = defined $size ? "missing" : "cantreach";
441        my $lasterr = MogileFS::Util::last_error();
442        $failed->();
443        return $self->err_line("size_verify_error", "Expected: $args->{size}; actual: 0 ($type); path: $path; error: $lasterr")
444    }
445
446    if ($args->{size} > -1 && ($args->{size} != $size)) {
447        $failed->();
448        return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
449    }
450
451    # checksum validation is optional as it can be very expensive
452    # However, we /always/ verify it if the client wants us to, even
453    # if the class does not enforce or store it.
454    if ($checksum && $args->{checksumverify}) {
455        my $alg = $checksum->hashname;
456        my $actual = $httpfile->digest($alg, sub { $self->still_alive });
457        if ($actual ne $checksum->{checksum}) {
458            $failed->();
459            $actual = "$alg:" . unpack("H*", $actual);
460            return $self->err_line("checksum_mismatch",
461                           "Expected: $checksum; actual: $actual; path: $path");
462        }
463    }
464
465    # see if we have a fid for this key already
466    my $old_fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
467    if ($old_fid) {
468        # Fail if a file already exists for this fid.  Should never
469        # happen, as it should not be possible to close a file twice.
470        return $self->err_line("fid_exists")
471            unless $old_fid->{fidid} != $fidid;
472
473        $old_fid->delete;
474    }
475
476    # TODO: check for EIO?
477
478    # insert file_on row
479    $dfid->add_to_db;
480
481    $checksum->maybe_save($dmid, $trow->{classid}) if $checksum;
482
483    $sto->replace_into_file(
484                            fidid   => $fidid,
485                            dmid    => $dmid,
486                            key     => $key,
487                            length  => $size,
488                            classid => $trow->{classid},
489                            devcount => 1,
490                            );
491
492    # mark it as needing replicating:
493    $fid->enqueue_for_replication();
494
495    # call the hook - if this fails, we need to back the file out
496    my $rv = MogileFS::run_global_hook('file_stored', $args);
497    if (defined $rv && ! $rv) { # undef = no hooks, 1 = success, 0 = failure
498        $fid->delete;
499        return $self->err_line("plugin_aborted");
500    }
501
502    # all went well, we would've hit condthrow on DB errors
503    return $self->ok_line;
504}
505
506sub cmd_updateclass {
507    my MogileFS::Worker::Query $self = shift;
508    my $args = shift;
509
510    $args->{dmid} = $self->check_domain($args) or return;
511
512    # call out to a hook that might modify the arguments for us, abort if it tells us to
513    my $rv = MogileFS::run_global_hook('cmd_updateclass', $args);
514    return $self->err_line('plugin_aborted') if defined $rv && ! $rv;
515
516    my $dmid  = $args->{dmid};
517    my $key   = $args->{key};
518    valid_key($key) or return $self->err_line("no_key");
519    my $class = $args->{class}      or return $self->err_line("no_class");
520
521    my $classobj = Mgd::class_factory()->get_by_name($dmid, $class)
522        or return $self->err_line('class_not_found');
523    my $classid = $classobj->id;
524
525    my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key)
526        or return $self->err_line('invalid_key');
527
528    my @devids = $fid->devids;
529    return $self->err_line("no_devices") unless @devids;
530
531    if ($fid->classid != $classid) {
532        $fid->update_class(classid => $classid);
533        $fid->enqueue_for_replication();
534    }
535
536    return $self->ok_line;
537}
538
539sub cmd_delete {
540    my MogileFS::Worker::Query $self = shift;
541    my $args = shift;
542
543    # validate domain for plugins
544    $args->{dmid} = $self->check_domain($args) or return;
545
546    # now invoke the plugin, abort if it tells us to
547    my $rv = MogileFS::run_global_hook('cmd_delete', $args);
548    return $self->err_line('plugin_aborted')
549        if defined $rv && ! $rv;
550
551    # validate parameters
552    my $dmid = $args->{dmid};
553    my $key = $args->{key};
554
555    valid_key($key) or return $self->err_line("no_key");
556
557    # is this fid still owned by this key?
558    my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key)
559        or return $self->err_line("unknown_key");
560
561    $fid->delete;
562
563    return $self->ok_line;
564}
565
566# Takes either domain/dkey or fid and tries to return as much as possible.
567sub cmd_file_debug {
568    my MogileFS::Worker::Query $self = shift;
569    my $args = shift;
570    # Talk to the master since this is "debug mode"
571    my $sto = Mgd::get_store();
572    my $ret = {};
573
574    # If a FID is provided, just use that.
575    my $fid;
576    my $fidid;
577    if ($args->{fid}) {
578        $fidid = $args->{fid}+0;
579        # It's not fatal if we don't find the row here.
580        $fid = $sto->file_row_from_fidid($args->{fid}+0);
581    } else {
582        # If not, require dmid/dkey and pick up the fid from there.
583        $args->{dmid} = $self->check_domain($args) or return;
584        return $self->err_line("no_key") unless valid_key($args->{key});
585
586        # now invoke the plugin, abort if it tells us to
587        my $rv = MogileFS::run_global_hook('cmd_file_debug', $args);
588        return $self->err_line('plugin_aborted')
589            if defined $rv && ! $rv;
590
591        $fid = $sto->file_row_from_dmid_key($args->{dmid}, $args->{key});
592        return $self->err_line("unknown_key") unless $fid;
593        $fidid = $fid->{fid};
594    }
595
596    if ($fid) {
597        $fid->{domain}   = Mgd::domain_factory()->get_by_id($fid->{dmid})->name;
598        $fid->{class}    = Mgd::class_factory()->get_by_id($fid->{dmid},
599            $fid->{classid})->name;
600    }
601
602    # Fetch all of the queue data.
603    my $tfile = $sto->tempfile_row_from_fid($fidid);
604    my $repl  = $sto->find_fid_from_file_to_replicate($fidid);
605    my $del   = $sto->find_fid_from_file_to_delete2($fidid);
606    my $reb   = $sto->find_fid_from_file_to_queue($fidid, REBAL_QUEUE);
607    my $fsck  = $sto->find_fid_from_file_to_queue($fidid, FSCK_QUEUE);
608
609    # Fetch file_on rows, and turn into paths.
610    my @devids = $sto->fid_devids($fidid);
611    for my $devid (@devids) {
612        # Won't matter if we can't make the path (dev is dead/deleted/etc)
613        eval {
614            my $dfid = MogileFS::DevFID->new($devid, $fidid);
615            my $path = $dfid->get_url;
616            $ret->{'devpath_' . $devid} = $path;
617        };
618    }
619    $ret->{devids} = join(',', @devids) if @devids;
620
621    # Always look for a checksum
622    my $checksum = Mgd::get_store()->get_checksum($fidid);
623    if ($checksum) {
624        $checksum = MogileFS::Checksum->new($checksum);
625        $ret->{checksum} = $checksum->info;
626    } else {
627        $ret->{checksum} = 'NONE';
628    }
629
630    # Return file row (if found) and all other data.
631    my %toret = (fid => $fid, tempfile => $tfile, replqueue => $repl,
632        delqueue => $del, rebqueue => $reb, fsckqueue => $fsck);
633    while (my ($key, $hash) = each %toret) {
634        while (my ($name, $val) = each %$hash) {
635            $ret->{$key . '_' . $name} = $val;
636        }
637    }
638
639    return $self->err_line("unknown_fid") unless keys %$ret;
640    return $self->ok_line($ret);
641}
642
643sub cmd_file_info {
644    my MogileFS::Worker::Query $self = shift;
645    my $args = shift;
646
647    # validate domain for plugins
648    $args->{dmid} = $self->check_domain($args) or return;
649
650    # now invoke the plugin, abort if it tells us to
651    my $rv = MogileFS::run_global_hook('cmd_file_info', $args);
652    return $self->err_line('plugin_aborted')
653        if defined $rv && ! $rv;
654
655    # validate parameters
656    my $dmid = $args->{dmid};
657    my $key = $args->{key};
658
659    valid_key($key) or return $self->err_line("no_key");
660
661    my $fid;
662    Mgd::get_store()->slaves_ok(sub {
663        $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
664    });
665    $fid or return $self->err_line("unknown_key");
666
667    my $ret = {};
668    $ret->{fid}      = $fid->id;
669    $ret->{domain}   = Mgd::domain_factory()->get_by_id($fid->dmid)->name;
670    my $class = Mgd::class_factory()->get_by_id($fid->dmid, $fid->classid);
671    $ret->{class}    = $class->name;
672    if ($class->{hashtype}) {
673        my $checksum = Mgd::get_store()->get_checksum($fid->id);
674        if ($checksum) {
675            $checksum = MogileFS::Checksum->new($checksum);
676            $ret->{checksum} = $checksum->info;
677        } else {
678            $ret->{checksum} = "MISSING";
679        }
680    }
681    $ret->{key}      = $key;
682    $ret->{'length'} = $fid->length;
683    $ret->{devcount} = $fid->devcount;
684    # Only if requested, also return the raw devids.
685    # Caller should use get_paths if they intend to fetch the file.
686    if ($args->{devices}) {
687        $ret->{devids} = join(',', $fid->devids);
688    }
689
690    return $self->ok_line($ret);
691}
692
693sub cmd_list_fids {
694    my MogileFS::Worker::Query $self = shift;
695    my $args = shift;
696
697    # validate parameters
698    my $fromfid = ($args->{from} || 0)+0;
699    my $count = ($args->{to} || 0)+0;
700    $count ||= 100;
701    $count = 500 if $count > 500 || $count < 0;
702
703    my $rows = Mgd::get_store()->file_row_from_fidid_range($fromfid, $count);
704    return $self->err_line('failure') unless $rows;
705    return $self->ok_line({ fid_count => 0 }) unless @$rows;
706
707    # setup temporary storage of class/host
708    my (%domains, %classes);
709
710    # now iterate over our data rows and construct result
711    my $ct = 0;
712    my $ret = {};
713    foreach my $r (@$rows) {
714        $ct++;
715        my $fid = $r->{fid};
716        $ret->{"fid_${ct}_fid"} = $fid;
717        $ret->{"fid_${ct}_domain"} = ($domains{$r->{dmid}} ||=
718            Mgd::domain_factory()->get_by_id($r->{dmid})->name);
719        $ret->{"fid_${ct}_class"} = ($classes{$r->{dmid}}{$r->{classid}} ||=
720            Mgd::class_factory()->get_by_id($r->{dmid}, $r->{classid})->name);
721        $ret->{"fid_${ct}_key"} = $r->{dkey};
722        $ret->{"fid_${ct}_length"} = $r->{length};
723        $ret->{"fid_${ct}_devcount"} = $r->{devcount};
724    }
725    $ret->{fid_count} = $ct;
726    return $self->ok_line($ret);
727}
728
729sub cmd_list_keys {
730    my MogileFS::Worker::Query $self = shift;
731    my $args = shift;
732
733    # validate parameters
734    my $dmid = $self->check_domain($args) or return;
735    my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit});
736
737    if (defined $prefix and $prefix ne '') {
738        # now validate that after matches prefix
739        return $self->err_line('after_mismatch')
740            if $after && $after !~ /^$prefix/;
741    }
742
743    $limit ||= 1000;
744    $limit += 0;
745    $limit = 1000 if $limit > 1000;
746
747    my $keys = Mgd::get_store()->get_keys_like($dmid, $prefix, $after, $limit);
748
749    # if we got nothing, say so
750    return $self->err_line('none_match') unless $keys && @$keys;
751
752    # construct the output and send
753    my $ret = { key_count => 0, next_after => '' };
754    foreach my $key (@$keys) {
755        $ret->{key_count}++;
756        $ret->{next_after} = $key
757            if $key gt $ret->{next_after};
758        $ret->{"key_$ret->{key_count}"} = $key;
759    }
760    return $self->ok_line($ret);
761}
762
763sub cmd_rename {
764    my MogileFS::Worker::Query $self = shift;
765    my $args = shift;
766
767    # validate parameters
768    my $dmid = $self->check_domain($args) or return;
769    my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key});
770    unless (valid_key($fkey) && valid_key($tkey)) {
771        return $self->err_line("no_key");
772    }
773
774    my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $fkey)
775        or return  $self->err_line("unknown_key");
776
777    $fid->rename($tkey) or
778        $self->err_line("key_exists");
779
780    return $self->ok_line;
781}
782
783sub cmd_get_hosts {
784    my MogileFS::Worker::Query $self = shift;
785    my $args = shift;
786
787    my $ret = { hosts => 0 };
788    for my $host (Mgd::host_factory()->get_all) {
789        next if defined $args->{hostid} && $host->id != $args->{hostid};
790        my $n = ++$ret->{hosts};
791        my $fields = $host->fields(qw(hostid status hostname hostip http_port
792            http_get_port altip altmask));
793        while (my ($key, $val) = each %$fields) {
794            # must be regular data so copy it in
795            $ret->{"host${n}_$key"} = $val;
796        }
797    }
798
799    return $self->ok_line($ret);
800}
801
802sub cmd_get_devices {
803    my MogileFS::Worker::Query $self = shift;
804    my $args = shift;
805
806    my $ret = { devices => 0 };
807    for my $dev (Mgd::device_factory()->get_all) {
808        next if defined $args->{devid} && $dev->id != $args->{devid};
809        my $n = ++$ret->{devices};
810
811        my $sum = $dev->fields;
812        while (my ($key, $val) = each %$sum) {
813            $ret->{"dev${n}_$key"} = $val;
814        }
815    }
816
817    return $self->ok_line($ret);
818}
819
820sub cmd_create_device {
821    my MogileFS::Worker::Query $self = shift;
822    my $args = shift;
823
824    my $status = $args->{state} || "alive";
825    return $self->err_line("invalid_state") unless
826        device_state($status);
827
828    my $devid = $args->{devid};
829    return $self->err_line("invalid_devid") unless $devid && $devid =~ /^\d+$/;
830
831    my $hostid;
832
833    my $sto = Mgd::get_store();
834    if ($args->{hostid} && $args->{hostid} =~ /^\d+$/) {
835        $hostid = $sto->get_hostid_by_id($args->{hostid});
836        return $self->err_line("unknown_hostid") unless $hostid;
837    } elsif (my $hname = $args->{hostname}) {
838        $hostid = $sto->get_hostid_by_name($hname);
839        return $self->err_line("unknown_host") unless $hostid;
840    } else {
841        return $self->err_line("bad_args", "No hostid/hostname parameter");
842    }
843
844    if (eval { $sto->create_device($devid, $hostid, $status) }) {
845        return $self->cmd_clear_cache;
846    }
847
848    my $errc = error_code($@);
849    return $self->err_line("existing_devid") if $errc;
850    die $@;  # rethrow;
851}
852
853sub cmd_create_domain {
854    my MogileFS::Worker::Query $self = shift;
855    my $args = shift;
856
857    my $domain = $args->{domain} or
858        return $self->err_line('no_domain');
859
860    my $dom = eval { Mgd::get_store()->create_domain($domain); };
861    if ($@) {
862        if (error_code($@) eq "dup") {
863            return $self->err_line('domain_exists');
864        }
865        return $self->err_line('failure', "$@");
866    }
867
868    return $self->cmd_clear_cache({ domain => $domain });
869}
870
871sub cmd_delete_domain {
872    my MogileFS::Worker::Query $self = shift;
873    my $args = shift;
874
875    my $domain = $args->{domain} or
876        return $self->err_line('no_domain');
877
878    my $sto = Mgd::get_store();
879    my $dmid = $sto->get_domainid_by_name($domain) or
880        return $self->err_line('domain_not_found');
881
882    if (eval { $sto->delete_domain($dmid) }) {
883        return $self->cmd_clear_cache({ domain => $domain });
884    }
885
886    my $err = error_code($@);
887    return $self->err_line('domain_has_files') if $err eq "has_files";
888    return $self->err_line('domain_has_classes') if $err eq "has_classes";
889    return $self->err_line("failure");
890}
891
892sub cmd_create_class {
893    my MogileFS::Worker::Query $self = shift;
894    my $args = shift;
895
896    my $domain = $args->{domain};
897    return $self->err_line('no_domain') unless length $domain;
898
899    my $class = $args->{class};
900    return $self->err_line('no_class') unless length $class;
901
902    my $mindevcount = $args->{mindevcount}+0;
903    return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;
904
905    my $replpolicy = $args->{replpolicy} || '';
906    if ($replpolicy) {
907        eval {
908            MogileFS::ReplicationPolicy->new_from_policy_string($replpolicy);
909        };
910        return $self->err_line('invalid_replpolicy', $@) if $@;
911    }
912
913    my $hashtype = $args->{hashtype};
914    if ($hashtype && $hashtype ne 'NONE') {
915        my $tmp = $MogileFS::Checksum::NAME2TYPE{$hashtype};
916        return $self->err_line('invalid_hashtype') unless $tmp;
917        $hashtype = $tmp;
918    }
919
920    my $sto = Mgd::get_store();
921    my $dmid  = $sto->get_domainid_by_name($domain) or
922        return $self->err_line('domain_not_found');
923
924    my $clsid = $sto->get_classid_by_name($dmid, $class);
925    if (!defined $clsid && $args->{update} && $class eq 'default') {
926        $args->{update} = 0;
927    }
928    if ($args->{update}) {
929        return $self->err_line('class_not_found') if ! defined $clsid;
930        $sto->update_class_name(dmid => $dmid, classid => $clsid,
931            classname => $class);
932    } else {
933        $clsid = eval { $sto->create_class($dmid, $class); };
934        if ($@) {
935            if (error_code($@) eq "dup") {
936                return $self->err_line('class_exists');
937            }
938            return $self->err_line('failure', "$@");
939        }
940    }
941    $sto->update_class_mindevcount(dmid => $dmid, classid => $clsid,
942        mindevcount => $mindevcount);
943    # don't erase an existing replpolicy if we're not setting a new one.
944    $sto->update_class_replpolicy(dmid => $dmid, classid => $clsid,
945        replpolicy => $replpolicy) if $replpolicy;
946    if ($hashtype) {
947        $sto->update_class_hashtype(dmid => $dmid, classid => $clsid,
948            hashtype => $hashtype eq 'NONE' ? undef : $hashtype);
949    }
950
951    # return success
952    return $self->cmd_clear_cache({ class => $class, mindevcount => $mindevcount, domain => $domain });
953}
954
955sub cmd_update_class {
956    my MogileFS::Worker::Query $self = shift;
957    my $args = shift;
958
959    # simply passes through to create_class with update set
960    $self->cmd_create_class({ %$args, update => 1 });
961}
962
963sub cmd_delete_class {
964    my MogileFS::Worker::Query $self = shift;
965    my $args = shift;
966
967    my $domain = $args->{domain};
968    return $self->err_line('no_domain') unless length $domain;
969    my $class = $args->{class};
970    return $self->err_line('no_class') unless length $domain;
971
972    return $self->err_line('nodel_default_class') if $class eq 'default';
973
974    my $sto = Mgd::get_store();
975    my $dmid  = $sto->get_domainid_by_name($domain) or
976        return $self->err_line('domain_not_found');
977    my $clsid = $sto->get_classid_by_name($dmid, $class);
978    return $self->err_line('class_not_found') unless defined $clsid;
979
980    if (eval { Mgd::get_store()->delete_class($dmid, $clsid) }) {
981        return $self->cmd_clear_cache({ domain => $domain, class => $class });
982    }
983
984    my $errc = error_code($@);
985    return $self->err_line('class_has_files') if $errc eq "has_files";
986    return $self->err_line('failure');
987}
988
989sub cmd_create_host {
990    my MogileFS::Worker::Query $self = shift;
991    my $args = shift;
992
993    my $hostname = $args->{host} or
994        return $self->err_line('no_host');
995
996    my $sto = Mgd::get_store();
997    my $hostid = $sto->get_hostid_by_name($hostname);
998
999    # if we're creating a new host, require ip/port, and default to
1000    # host being down if client didn't specify
1001    if ($args->{update}) {
1002        return $self->err_line('host_not_found') unless $hostid;
1003    } else {
1004        return $self->err_line('host_exists') if $hostid;
1005        return $self->err_line('no_ip') unless $args->{ip};
1006        return $self->err_line('no_port') unless $args->{port};
1007        $args->{status} ||= 'down';
1008    }
1009
1010    if ($args->{status}) {
1011        return $self->err_line('unknown_state')
1012            unless MogileFS::Host->valid_state($args->{status});
1013    }
1014
1015    # arguments all good, let's do it.
1016
1017    $hostid ||= $sto->create_host($hostname, $args->{ip});
1018
1019    # Protocol mismatch data fixup.
1020    $args->{hostip} = delete $args->{ip} if exists $args->{ip};
1021    $args->{http_port} = delete $args->{port} if exists $args->{port};
1022    $args->{http_get_port} = delete $args->{getport} if exists $args->{getport};
1023    my @toupdate = grep { exists $args->{$_} } qw(status hostip http_port
1024        http_get_port altip altmask);
1025    $sto->update_host($hostid, { map { $_ => $args->{$_} } @toupdate });
1026
1027    # return success
1028    return $self->cmd_clear_cache({ hostid => $hostid, hostname => $hostname });
1029}
1030
1031sub cmd_update_host {
1032    my MogileFS::Worker::Query $self = shift;
1033    my $args = shift;
1034
1035    # simply passes through to create_host with update set
1036    $self->cmd_create_host({ %$args, update => 1 });
1037}
1038
1039sub cmd_delete_host {
1040    my MogileFS::Worker::Query $self = shift;
1041    my $args = shift;
1042
1043    my $sto = Mgd::get_store();
1044    my $hostid = $sto->get_hostid_by_name($args->{host})
1045        or return $self->err_line('unknown_host');
1046
1047    # TODO: $sto->delete_host should have a "has_devices" test internally
1048    for my $dev ($sto->get_all_devices) {
1049        return $self->err_line('host_not_empty')
1050            if $dev->{hostid} == $hostid;
1051    }
1052
1053    $sto->delete_host($hostid);
1054
1055    return $self->cmd_clear_cache;
1056}
1057
1058sub cmd_get_domains {
1059    my MogileFS::Worker::Query $self = shift;
1060    my $args = shift;
1061
1062    my $ret = {};
1063    my $dm_n = 0;
1064    for my $dom (Mgd::domain_factory()->get_all) {
1065        $dm_n++;
1066        $ret->{"domain${dm_n}"} = $dom->name;
1067        my $cl_n = 0;
1068        foreach my $cl ($dom->classes) {
1069            $cl_n++;
1070            $ret->{"domain${dm_n}class${cl_n}name"}        = $cl->name;
1071            $ret->{"domain${dm_n}class${cl_n}mindevcount"} = $cl->mindevcount;
1072            $ret->{"domain${dm_n}class${cl_n}replpolicy"}  =
1073                $cl->repl_policy_string;
1074            $ret->{"domain${dm_n}class${cl_n}hashtype"} = $cl->hashtype_string;
1075        }
1076        $ret->{"domain${dm_n}classes"} = $cl_n;
1077    }
1078    $ret->{"domains"} = $dm_n;
1079
1080    return $self->ok_line($ret);
1081}
1082
1083sub cmd_get_paths {
1084    my MogileFS::Worker::Query $self = shift;
1085    my $args = shift;
1086
1087    # memcache mappings are as follows:
1088    #  mogfid:<dmid>:<dkey> -> fidid
1089    #  mogdevids:<fidid>    -> \@devids  (and TODO: invalidate when deletion is run!)
1090
1091    # if you specify 'noverify', that means a correct answer isn't needed and memcache can
1092    # be used.
1093    my $memc          = MogileFS::Config->memcache_client;
1094    my $get_from_memc = $memc && $args->{noverify};
1095    my $memcache_ttl  = MogileFS::Config->server_setting_cached("memcache_ttl") || 3600;
1096
1097    # validate domain for plugins
1098    $args->{dmid} = $self->check_domain($args) or return;
1099
1100    # now invoke the plugin, abort if it tells us to
1101    my $rv = MogileFS::run_global_hook('cmd_get_paths', $args);
1102    return $self->err_line('plugin_aborted')
1103        if defined $rv && ! $rv;
1104
1105    # validate parameters
1106    my $dmid = $args->{dmid};
1107    my $key = $args->{key};
1108
1109    valid_key($key) or return $self->err_line("no_key");
1110
1111    # We default to returning two possible paths.
1112    # but the client may ask for more if they want.
1113    my $pathcount = $args->{pathcount} || 2;
1114    $pathcount = 2 if $pathcount < 2;
1115
1116    # get DB handle
1117    my $fid;
1118    my $need_fid_in_memcache = 0;
1119    my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1120    if ($get_from_memc) {
1121        if (my $fidid = $memc->get($mogfid_memkey)) {
1122            $fid = MogileFS::FID->new($fidid);
1123        } else {
1124            $need_fid_in_memcache = 1;
1125        }
1126    }
1127    unless ($fid) {
1128        Mgd::get_store()->slaves_ok(sub {
1129            $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
1130        });
1131        $fid or return $self->err_line("unknown_key");
1132    }
1133
1134    # add to memcache, if needed.  for an hour.
1135    $memc->set($mogfid_memkey, $fid->id, $memcache_ttl ) if $need_fid_in_memcache || ($memc && !$get_from_memc);
1136
1137    my $dmap = Mgd::device_factory()->map_by_id;
1138
1139    my $ret = {
1140        paths => 0,
1141    };
1142
1143    # find devids that FID is on in memcache or db.
1144    my @fid_devids;
1145    my $need_devids_in_memcache = 0;
1146    my $devid_memkey = "mogdevids:" . $fid->id;
1147    if ($get_from_memc) {
1148        if (my $list = $memc->get($devid_memkey)) {
1149            @fid_devids = @$list;
1150        } else {
1151            $need_devids_in_memcache = 1;
1152        }
1153    }
1154    unless (@fid_devids) {
1155        Mgd::get_store()->slaves_ok(sub {
1156            @fid_devids = $fid->devids;
1157        });
1158        $memc->set($devid_memkey, \@fid_devids, $memcache_ttl ) if $need_devids_in_memcache || ($memc && !$get_from_memc);
1159    }
1160
1161    my @devices = map { $dmap->{$_} } @fid_devids;
1162
1163    my @sorted_devs;
1164    unless (MogileFS::run_global_hook('cmd_get_paths_order_devices', \@devices, \@sorted_devs)) {
1165        @sorted_devs = sort_devs_by_utilization(@devices);
1166    }
1167
1168    # keep one partially-bogus path around just in case we have nothing else to send.
1169    my $backup_path;
1170
1171    # files on devices set for drain may disappear soon.
1172    my @drain_paths;
1173
1174    # construct result paths
1175    foreach my $dev (@sorted_devs) {
1176        next unless $dev && $dev->host;
1177
1178        my $dfid = MogileFS::DevFID->new($dev, $fid);
1179        my $path = $dfid->get_url;
1180        my $currently_up = $dev->should_read_from;
1181
1182        if (! $currently_up) {
1183            $backup_path = $path;
1184            next;
1185        }
1186
1187        # only verify size one first one, and never verify if they've asked not to
1188        next unless
1189            $ret->{paths}        ||
1190            $args->{noverify}    ||
1191            $dfid->size_matches;
1192
1193        if ($dev->dstate->should_drain) {
1194            push @drain_paths, $path;
1195            next;
1196        }
1197
1198        my $n = ++$ret->{paths};
1199        $ret->{"path$n"} = $path;
1200        last if $n == $pathcount;   # one verified, one likely seems enough for now.  time will tell.
1201    }
1202
1203    # deprioritize devices set for drain, they could disappear soon...
1204    # Clients /should/ try to use lower-numbered paths first to avoid this.
1205    if ($ret->{paths} < $pathcount && @drain_paths) {
1206        foreach my $path (@drain_paths) {
1207            my $n = ++$ret->{paths};
1208            $ret->{"path$n"} = $path;
1209            last if $n == $pathcount;
1210        }
1211    }
1212
1213    # use our backup path if all else fails
1214    if ($backup_path && ! $ret->{paths}) {
1215        $ret->{paths} = 1;
1216        $ret->{path1} = $backup_path;
1217    }
1218
1219    return $self->ok_line($ret);
1220}
1221
1222sub sort_devs_by_utilization {
1223    my @devices_with_weights;
1224
1225    # is this fid still owned by this key?
1226    foreach my $dev (@_) {
1227        my $weight;
1228        my $util = $dev->observed_utilization;
1229
1230        if (defined($util) and $util =~ /\A\d+\Z/) {
1231            $weight = 102 - $util;
1232            $weight ||= 100;
1233        } else {
1234            $weight = $dev->weight;
1235            $weight ||= 100;
1236        }
1237        push @devices_with_weights, [$dev, $weight];
1238    }
1239
1240    # randomly weight the devices
1241    my @list = MogileFS::Util::weighted_list(@devices_with_weights);
1242
1243    return @list;
1244}
1245
1246# ------------------------------------------------------------
1247#
1248# NOTE: cmd_edit_file is EXPERIMENTAL. Please see the documentation
1249# for edit_file in L<MogileFS::Client>.
1250# It is not recommended to use cmd_edit_file on production systems.
1251#
1252# cmd_edit_file is similar to cmd_get_paths, except we:
1253# - take the device of the first path we would have returned
1254# - get a tempfile with a new fid (pointing to nothing) on the same device
1255#   the tempfile has the same key, so will replace the old contents on
1256#   create_close
1257# - detach the old fid from that device (leaving the file in place)
1258# - attach the new fid to that device
1259# - returns only the first path to the old fid and a path to new fid
1260# (the client then DAV-renames the old path to the new path)
1261#
1262# TODO - what to do about situations where we would be reducing the
1263# replica count to zero?
1264# TODO - what to do about pending replications where we remove the source?
1265# TODO - the current implementation of cmd_edit_file is based on a copy
1266#   of cmd_get_paths. Once proven mature, consider factoring out common
1267#   code from the two functions.
1268# ------------------------------------------------------------
1269sub cmd_edit_file {
1270    my MogileFS::Worker::Query $self = shift;
1271    my $args = shift;
1272
1273    my $memc = MogileFS::Config->memcache_client;
1274
1275    # validate domain for plugins
1276    $args->{dmid} = $self->check_domain($args) or return;
1277
1278    # now invoke the plugin, abort if it tells us to
1279    my $rv = MogileFS::run_global_hook('cmd_get_paths', $args);
1280    return $self->err_line('plugin_aborted')
1281        if defined $rv && ! $rv;
1282
1283    # validate parameters
1284    my $dmid = $args->{dmid};
1285    my $key = $args->{key};
1286
1287    valid_key($key) or return $self->err_line("no_key");
1288
1289    # get DB handle
1290    my $fid;
1291    my $need_fid_in_memcache = 0;
1292    my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1293    if (my $fidid = $memc->get($mogfid_memkey)) {
1294        $fid = MogileFS::FID->new($fidid);
1295    } else {
1296        $need_fid_in_memcache = 1;
1297    }
1298    unless ($fid) {
1299        Mgd::get_store()->slaves_ok(sub {
1300            $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
1301        });
1302        $fid or return $self->err_line("unknown_key");
1303    }
1304
1305    # add to memcache, if needed.  for an hour.
1306    $memc->add($mogfid_memkey, $fid->id, 3600) if $need_fid_in_memcache;
1307
1308    my $dmap = Mgd::device_factory()->map_by_id;
1309
1310    my @devices_with_weights;
1311
1312    # find devids that FID is on in memcache or db.
1313    my @fid_devids;
1314    my $need_devids_in_memcache = 0;
1315    my $devid_memkey = "mogdevids:" . $fid->id;
1316    if (my $list = $memc->get($devid_memkey)) {
1317        @fid_devids = @$list;
1318    } else {
1319        $need_devids_in_memcache = 1;
1320    }
1321    unless (@fid_devids) {
1322        Mgd::get_store()->slaves_ok(sub {
1323            @fid_devids = $fid->devids;
1324        });
1325        $memc->add($devid_memkey, \@fid_devids, 3600) if $need_devids_in_memcache;
1326    }
1327
1328    # is this fid still owned by this key?
1329    foreach my $devid (@fid_devids) {
1330        my $weight;
1331        my $dev = $dmap->{$devid};
1332        my $util = $dev->observed_utilization;
1333
1334        if (defined($util) and $util =~ /\A\d+\Z/) {
1335            $weight = 102 - $util;
1336            $weight ||= 100;
1337        } else {
1338            $weight = $dev->weight;
1339            $weight ||= 100;
1340        }
1341        push @devices_with_weights, [$devid, $weight];
1342    }
1343
1344    # randomly weight the devices
1345    # TODO - should we reverse the order, to leave the best
1346    # one there for get_paths?
1347    my @list = MogileFS::Util::weighted_list(@devices_with_weights);
1348
1349    # Filter out bad devs
1350    @list = grep {
1351        my $devid = $_;
1352        my $dev = $dmap->{$devid};
1353
1354        $dev && $dev->should_read_from;
1355    } @list;
1356
1357    # Take first remaining device from list
1358    my $devid = $list[0];
1359
1360    my $classid = $fid->classid;
1361    my $newfid = eval {
1362        Mgd::get_store()->register_tempfile(
1363            fid     => undef,   # undef => let the store pick a fid
1364            dmid    => $dmid,
1365            key     => $key,    # This tempfile will ultimately become this key
1366            classid => $classid,
1367            devids  => $devid,
1368        );
1369    };
1370    unless ($newfid) {
1371        my $errc = error_code($@);
1372        return $self->err_line("fid_in_use") if $errc eq "dup";
1373        warn "Error registering tempfile: $@\n";
1374        return $self->err_line("db");
1375    }
1376    unless (Mgd::get_store()->remove_fidid_from_devid($fid->id, $devid)) {
1377        warn "Error removing fidid from devid";
1378        return $self->err_line("db");
1379    }
1380    unless (Mgd::get_store()->add_fidid_to_devid($newfid, $devid)) {
1381        warn "Error removing fidid from devid";
1382        return $self->err_line("db");
1383    }
1384
1385    my @paths = map {
1386        my $dfid = MogileFS::DevFID->new($devid, $_);
1387        my $path = $dfid->get_url;
1388    } ($fid, $newfid);
1389    my $ret;
1390    $ret->{oldpath} = $paths[0];
1391    $ret->{newpath} = $paths[1];
1392    $ret->{fid} = $newfid;
1393    $ret->{devid} = $devid;
1394    $ret->{class} = $classid;
1395    return $self->ok_line($ret);
1396}
1397
1398sub cmd_set_weight {
1399    my MogileFS::Worker::Query $self = shift;
1400    my $args = shift;
1401
1402    # figure out what they want to do
1403    my ($hostname, $devid, $weight) = ($args->{host}, $args->{device}+0, $args->{weight}+0);
1404    return $self->err_line('bad_params')
1405        unless $hostname && $devid && $weight >= 0;
1406
1407    my $dev = Mgd::device_factory()->get_by_id($devid);
1408    return $self->err_line('no_device') unless $dev;
1409    return $self->err_line('host_mismatch')
1410        unless $dev->host->hostname eq $hostname;
1411
1412    Mgd::get_store()->set_device_weight($dev->id, $weight);
1413
1414    return $self->cmd_clear_cache;
1415}
1416
1417sub cmd_set_state {
1418    my MogileFS::Worker::Query $self = shift;
1419    my $args = shift;
1420
1421    # figure out what they want to do
1422    my ($hostname, $devid, $state) = ($args->{host}, $args->{device}+0, $args->{state});
1423
1424    my $dstate = device_state($state);
1425    return $self->err_line('bad_params')
1426        unless $hostname && $devid && $dstate;
1427
1428    my $dev = Mgd::device_factory()->get_by_id($devid);
1429    return $self->err_line('no_device') unless $dev;
1430    return $self->err_line('host_mismatch')
1431        unless $dev->host->hostname eq $hostname;
1432
1433    # make sure the destination state isn't too high
1434    return $self->err_line('state_too_high')
1435        unless $dev->can_change_to_state($state);
1436
1437    Mgd::get_store()->set_device_state($dev->id, $state);
1438    return $self->cmd_clear_cache;
1439}
1440
1441sub cmd_noop {
1442    my MogileFS::Worker::Query $self = shift;
1443    my $args = shift;
1444    return $self->ok_line;
1445}
1446
1447sub cmd_replicate_now {
1448    my MogileFS::Worker::Query $self = shift;
1449
1450    my $rv = Mgd::get_store()->replicate_now;
1451    return $self->ok_line({ count => int($rv) });
1452}
1453
1454sub cmd_set_server_setting {
1455    my MogileFS::Worker::Query $self = shift;
1456    my $args = shift;
1457    my $key = $args->{key} or
1458        return $self->err_line("bad_params");
1459    my $val = $args->{value};
1460
1461    my $chk  = MogileFS::Config->server_setting_is_writable($key) or
1462        return $self->err_line("not_writable");
1463
1464    my $cleanval = eval { $chk->($val); };
1465    return $self->err_line("invalid_format", $@) if $@;
1466
1467    MogileFS::Config->set_server_setting($key, $cleanval);
1468
1469    # GROSS HACK: slave settings are managed directly by MogileFS::Client, but
1470    # I need to add a version key, so we check and inject that code here.
1471    # FIXME: Move this when slave keys are managed by query worker commands!
1472    if ($key =~ /^slave_/) {
1473        Mgd::get_store()->incr_server_setting('slave_version', 1);
1474    }
1475
1476    return $self->ok_line;
1477}
1478
1479sub cmd_server_setting {
1480    my MogileFS::Worker::Query $self = shift;
1481    my $args = shift;
1482    my $key = $args->{key};
1483    return $self->err_line("bad_params") unless $key;
1484    my $value = MogileFS::Config->server_setting($key);
1485    return $self->ok_line({key => $key, value => $value});
1486}
1487
1488sub cmd_server_settings {
1489    my MogileFS::Worker::Query $self = shift;
1490    my $ss = Mgd::get_store()->server_settings;
1491    my $ret = {};
1492    my $n = 0;
1493    while (my ($k, $v) = each %$ss) {
1494        next unless MogileFS::Config->server_setting_is_readable($k);
1495        $ret->{"key_count"} = ++$n;
1496        $ret->{"key_$n"}    = $k;
1497        $ret->{"value_$n"}  = $v;
1498    }
1499    return $self->ok_line($ret);
1500}
1501
1502sub cmd_do_monitor_round {
1503    my MogileFS::Worker::Query $self = shift;
1504    my $args = shift;
1505    $self->forget_that_monitor_has_run;
1506    $self->wait_for_monitor;
1507    return $self->ok_line;
1508}
1509
1510sub cmd_fsck_start {
1511    my MogileFS::Worker::Query $self = shift;
1512    my $sto = Mgd::get_store();
1513
1514    my $fsck_host  = MogileFS::Config->server_setting("fsck_host");
1515    my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1516
1517    return $self->err_line("fsck_running", "fsck is already running") if $fsck_host;
1518    return $self->err_line("rebal_running", "rebalance running; cannot run fsck at same time") if $rebal_host;
1519
1520    # reset position, if a previous fsck was already completed.
1521    my $intss       = sub { MogileFS::Config->server_setting($_[0]) || 0 };
1522    my $checked_fid = $intss->("fsck_highest_fid_checked");
1523    my $final_fid   = $intss->("fsck_fid_at_end");
1524    if (($checked_fid && $final_fid && $checked_fid >= $final_fid) ||
1525        (!$final_fid && !$checked_fid)) {
1526        $self->_do_fsck_reset or return $self->err_line("db");
1527    }
1528
1529    # set params for stats:
1530    $sto->set_server_setting("fsck_start_time", $sto->get_db_unixtime);
1531    $sto->set_server_setting("fsck_stop_time", undef);
1532    $sto->set_server_setting("fsck_fids_checked", 0);
1533    my $start_fid =
1534        MogileFS::Config->server_setting('fsck_highest_fid_checked') || 0;
1535    $sto->set_server_setting("fsck_start_fid", $start_fid);
1536
1537    # and start it:
1538    $sto->set_server_setting("fsck_host", MogileFS::Config->hostname);
1539    MogileFS::ProcManager->wake_a("fsck");
1540
1541    return $self->ok_line;
1542}
1543
1544sub cmd_fsck_stop {
1545    my MogileFS::Worker::Query $self = shift;
1546    my $sto = Mgd::get_store();
1547    $sto->set_server_setting("fsck_host", undef);
1548    $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime);
1549    return $self->ok_line;
1550}
1551
1552sub cmd_fsck_reset {
1553    my MogileFS::Worker::Query $self = shift;
1554    my $args = shift;
1555
1556    my $sto = Mgd::get_store();
1557    $sto->set_server_setting("fsck_opt_policy_only",
1558        ($args->{policy_only} ? "1" : undef));
1559    $sto->set_server_setting("fsck_highest_fid_checked",
1560        ($args->{startpos} ? $args->{startpos} : "0"));
1561
1562    $self->_do_fsck_reset or return $self->err_line("db");
1563    return $self->ok_line;
1564}
1565
1566sub _do_fsck_reset {
1567    my MogileFS::Worker::Query $self = shift;
1568    eval {
1569        my $sto = Mgd::get_store();
1570        $sto->set_server_setting("fsck_start_time",       undef);
1571        $sto->set_server_setting("fsck_stop_time",        undef);
1572        $sto->set_server_setting("fsck_fids_checked",     0);
1573        $sto->set_server_setting("fsck_fid_at_end",       $sto->max_fidid);
1574
1575        # clear existing event counts summaries.
1576        my $ss = $sto->server_settings;
1577        foreach my $k (keys %$ss) {
1578            next unless $k =~ /^fsck_sum_evcount_/;
1579            $sto->set_server_setting($k, undef);
1580        }
1581        my $logid = $sto->max_fsck_logid;
1582        $sto->set_server_setting("fsck_start_maxlogid", $logid);
1583        $sto->set_server_setting("fsck_logid_processed", $logid);
1584    };
1585    if ($@) {
1586        error("DB error in _do_fsck_reset: $@");
1587        return 0;
1588    }
1589    return 1;
1590}
1591
1592sub cmd_fsck_clearlog {
1593    my MogileFS::Worker::Query $self = shift;
1594    my $sto = Mgd::get_store();
1595    $sto->clear_fsck_log;
1596    return $self->ok_line;
1597}
1598
1599sub cmd_fsck_getlog {
1600    my MogileFS::Worker::Query $self = shift;
1601    my $args = shift;
1602
1603    my $sto = Mgd::get_store();
1604    my @rows = $sto->fsck_log_rows($args->{after_logid}, 100);
1605    my $ret;
1606    my $n = 0;
1607    foreach my $row (@rows) {
1608        $n++;
1609        foreach my $k (keys %$row) {
1610            $ret->{"row_${n}_$k"} = $row->{$k} if defined $row->{$k};
1611        }
1612    }
1613    $ret->{row_count} = $n;
1614    return $self->ok_line($ret);
1615}
1616
1617sub cmd_fsck_status {
1618    my MogileFS::Worker::Query $self = shift;
1619
1620    my $sto        = Mgd::get_store();
1621    # Kick up the summary before we read the values
1622    $sto->fsck_log_summarize;
1623    my $fsck_host  = MogileFS::Config->server_setting('fsck_host');
1624    my $intss      = sub { MogileFS::Config->server_setting($_[0]) || 0 };
1625    my $ret = {
1626        running         => ($fsck_host ? 1 : 0),
1627        host            => $fsck_host,
1628        max_fid_checked => $intss->('fsck_highest_fid_checked'),
1629        policy_only     => $intss->('fsck_opt_policy_only'),
1630        end_fid         => $intss->('fsck_fid_at_end'),
1631        start_time      => $intss->('fsck_start_time'),
1632        stop_time       => $intss->('fsck_stop_time'),
1633        current_time    => $sto->get_db_unixtime,
1634        max_logid       => $sto->max_fsck_logid,
1635    };
1636
1637    # throw some stats in.
1638    my $ss = $sto->server_settings;
1639    foreach my $k (keys %$ss) {
1640        next unless $k =~ /^fsck_sum_evcount_(.+)/;
1641        $ret->{"num_$1"} += $ss->{$k};
1642    }
1643
1644    return $self->ok_line($ret);
1645}
1646
1647sub cmd_rebalance_status {
1648    my MogileFS::Worker::Query $self = shift;
1649
1650    my $sto = Mgd::get_store();
1651
1652    my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1653    return $self->err_line('no_rebal_state') unless $rebal_state;
1654    return $self->ok_line({ state => $rebal_state });
1655}
1656
1657sub cmd_rebalance_start {
1658    my MogileFS::Worker::Query $self = shift;
1659
1660    my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1661    my $fsck_host  = MogileFS::Config->server_setting("fsck_host");
1662
1663    return $self->err_line("rebal_running", "rebalance is already running") if $rebal_host;
1664    return $self->err_line("fsck_running", "fsck running; cannot run rebalance at same time") if $fsck_host;
1665
1666    my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1667    unless ($rebal_state) {
1668        my $rebal_pol = MogileFS::Config->server_setting('rebal_policy');
1669        return $self->err_line('no_rebal_policy') unless $rebal_pol;
1670
1671        my $rebal = MogileFS::Rebalance->new;
1672        $rebal->policy($rebal_pol);
1673        my @devs  = Mgd::device_factory()->get_all;
1674        $rebal->init(\@devs);
1675        my $sdevs = $rebal->source_devices;
1676
1677        $rebal_state = $rebal->save_state;
1678        MogileFS::Config->set_server_setting('rebal_state', $rebal_state);
1679    }
1680    # TODO: register start time somewhere.
1681    MogileFS::Config->set_server_setting('rebal_host', MogileFS::Config->hostname);
1682    return $self->ok_line({ state => $rebal_state });
1683}
1684
1685sub cmd_rebalance_test {
1686    my MogileFS::Worker::Query $self = shift;
1687    my $rebal_pol   = MogileFS::Config->server_setting('rebal_policy');
1688    my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1689    return $self->err_line('no_rebal_policy') unless $rebal_pol;
1690
1691    my $rebal = MogileFS::Rebalance->new;
1692    my @devs  = Mgd::device_factory()->get_all;
1693    $rebal->policy($rebal_pol);
1694    $rebal->init(\@devs);
1695
1696    # client should display list of source, destination devices.
1697    # FIXME: can probably avoid calling this twice by pulling state?
1698    # *or* not running init.
1699    my $sdevs = $rebal->filter_source_devices(\@devs);
1700    my $ddevs = $rebal->filter_dest_devices(\@devs);
1701    my $ret   = {};
1702    $ret->{sdevs} = join(',', @$sdevs);
1703    $ret->{ddevs} = join(',', @$ddevs);
1704
1705    return $self->ok_line($ret);
1706}
1707
1708sub cmd_rebalance_reset {
1709    my MogileFS::Worker::Query $self = shift;
1710    my $host = MogileFS::Config->server_setting('rebal_host');
1711    if ($host) {
1712        return $self->err_line("rebal_running", "rebalance is running") if $host;
1713    }
1714    MogileFS::Config->set_server_setting('rebal_state', undef);
1715    return $self->ok_line;
1716}
1717
1718sub cmd_rebalance_stop {
1719    my MogileFS::Worker::Query $self = shift;
1720    my $host = MogileFS::Config->server_setting('rebal_host');
1721    unless ($host) {
1722        return $self->err_line('rebal_not_started');
1723    }
1724    MogileFS::Config->set_server_setting('rebal_signal', 'stop');
1725    return $self->ok_line;
1726}
1727
1728sub cmd_rebalance_set_policy {
1729    my MogileFS::Worker::Query $self = shift;
1730    my $args = shift;
1731
1732    my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1733    return $self->err_line("no_set_rebal", "cannot change rebalance policy while rebalance is running") if $rebal_host;
1734
1735    # load policy object, test policy, set policy.
1736    my $rebal = MogileFS::Rebalance->new;
1737    eval {
1738        $rebal->policy($args->{policy});
1739    };
1740    if ($@) {
1741        return $self->err_line("bad_rebal_pol", $@);
1742    }
1743
1744    MogileFS::Config->set_server_setting('rebal_policy', $args->{policy});
1745    MogileFS::Config->set_server_setting('rebal_state', undef);
1746    return $self->ok_line;
1747}
1748
1749sub ok_line {
1750    my MogileFS::Worker::Query $self = shift;
1751
1752    my $delay = '';
1753    if ($self->{querystarttime}) {
1754        $delay = sprintf("%.4f ", Time::HiRes::tv_interval( $self->{querystarttime} ));
1755        $self->{querystarttime} = undef;
1756    }
1757
1758    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
1759
1760    my $args = shift || {};
1761    $args->{callid} = $self->{callid} if defined $self->{callid};
1762    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
1763    $self->send_to_parent("${id}${delay}OK $argline");
1764    return 1;
1765}
1766
1767# first argument: error code.
1768# second argument: optional error text.  text will be taken from code if no text provided.
1769sub err_line {
1770    my MogileFS::Worker::Query $self = shift;
1771
1772    my $err_code = shift;
1773    my $err_text = shift || {
1774        'dup' => "Duplicate name/number used.",
1775        'after_mismatch' => "Pattern does not match the after-value?",
1776        'bad_params' => "Invalid parameters to command; please see documentation",
1777        'class_exists' => "That class already exists in that domain",
1778        'class_has_files' => "Class still has files, unable to delete",
1779        'class_not_found' => "Class not found",
1780        'db' => "Database error",
1781        'domain_has_files' => "Domain still has files, unable to delete",
1782        'domain_exists' => "That domain already exists",
1783        'domain_not_empty' => "Domain still has classes, unable to delete",
1784        'domain_not_found' => "Domain not found",
1785        'failure' => "Operation failed",
1786        'host_exists' => "That host already exists",
1787        'host_mismatch' => "The device specified doesn't belong to the host specified",
1788        'host_not_empty' => "Unable to delete host; it contains devices still",
1789        'host_not_found' => "Host not found",
1790        'invalid_checker_level' => "Checker level invalid.  Please see documentation on this command.",
1791        'invalid_mindevcount' => "The mindevcount must be at least 1",
1792        'key_exists' => "Target key name already exists; can't overwrite.",
1793        'no_class' => "No class provided",
1794        'no_devices' => "No devices found to store file",
1795        'no_device' => "Device not found",
1796        'no_domain' => "No domain provided",
1797        'no_host' => "No host provided",
1798        'no_ip' => "IP required to create host",
1799        'no_port' => "Port required to create host",
1800        'no_temp_file' => "No tempfile or file already closed",
1801        'none_match' => "No keys match that pattern and after-value (if any).",
1802        'plugin_aborted' => "Action aborted by plugin",
1803        'state_too_high' => "Status cannot go from dead to alive; must use down",
1804        'unknown_command' => "Unknown server command",
1805        'unknown_host' => "Host not found",
1806        'unknown_state' => "Invalid/unknown state",
1807        'unreg_domain' => "Domain name invalid/not found",
1808        'rebal_not_started' => "Rebalance not running",
1809        'no_rebal_state' => "No available rebalance status",
1810        'no_rebal_policy' => "No rebalance policy available",
1811        'nodel_default_class' => "Cannot delete the default class",
1812    }->{$err_code} || $err_code;
1813
1814    my $delay = '';
1815    if ($self->{querystarttime}) {
1816        $delay = sprintf("%.4f ", Time::HiRes::tv_interval($self->{querystarttime}));
1817        $self->{querystarttime} = undef;
1818    } else {
1819        # don't send another ERR line if we already sent one
1820        error("err_line called redundantly with $err_code ( " . eurl($err_text) . ")");
1821        return 0;
1822    }
1823
1824    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
1825    my $callid = defined $self->{callid} ? ' ' . eurl($self->{callid}) : '';
1826
1827    $self->send_to_parent("${id}${delay}ERR $err_code " . eurl($err_text) . $callid);
1828    return 0;
1829}
1830
18311;
1832
1833# Local Variables:
1834# mode: perl
1835# c-basic-indent: 4
1836# indent-tabs-mode: nil
1837# End:
1838
1839__END__
1840
1841=head1 NAME
1842
1843MogileFS::Worker::Query -- implements the MogileFS client protocol
1844
1845=head1 SEE ALSO
1846
1847L<MogileFS::Worker>
1848
1849
1850