1package MogileFS::Worker::Query; 2# responds to queries from Mogile clients 3 4use strict; 5use warnings; 6 7use base 'MogileFS::Worker'; 8use fields qw(querystarttime reqid callid); 9use MogileFS::Util qw(error error_code first weighted_list 10 device_state eurl decode_url_args); 11use MogileFS::HTTPFile; 12use MogileFS::Rebalance; 13use MogileFS::Config; 14use MogileFS::Server; 15 16sub new { 17 my ($class, $psock) = @_; 18 my $self = fields::new($class); 19 $self->SUPER::new($psock); 20 21 $self->{querystarttime} = undef; 22 $self->{reqid} = undef; 23 $self->{callid} = undef; 24 return $self; 25} 26 27# no query should take 30 seconds, and we check in every 5 seconds. 28sub watchdog_timeout { 30 } 29 30# called by plugins to register a command in the namespace 31sub register_command { 32 my ($cmd, $sub) = @_; 33 34 # validate the command, then convert it to the actual thing the user 35 # will be calling 36 return 0 unless $cmd =~ /^[\w\d]+$/; 37 $cmd = "plugin_$cmd"; 38 39 # register in namespace with 'cmd_' which we will automatically find 40 no strict 'refs'; 41 *{"cmd_$cmd"} = $sub; 42 43 # all's well 44 return 1; 45} 46 47sub work { 48 my $self = shift; 49 my $psock = $self->{psock}; 50 my $rin = ''; 51 vec($rin, fileno($psock), 1) = 1; 52 my $buf; 53 54 while (1) { 55 my $rout; 56 unless (select($rout=$rin, undef, undef, 5.0)) { 57 $self->still_alive; 58 next; 59 } 60 61 my $newread; 62 my $rv = sysread($psock, $newread, Mgd::UNIX_RCVBUF_SIZE()); 63 if (!$rv) { 64 if (defined $rv) { 65 die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n"; 66 } else { 67 die "Error reading pipe from parent: $!\n"; 68 } 69 } 70 $buf .= $newread; 71 72 while ($buf =~ s/^(.+?)\r?\n//) { 73 my $line = $1; 74 if ($self->process_generic_command(\$line)) { 75 $self->still_alive; # no-op for watchdog 76 } else { 77 $self->validate_dbh; 78 $self->process_line(\$line); 79 } 80 } 81 } 82} 83 84sub process_line { 85 my MogileFS::Worker::Query $self = shift; 86 my $lineref = shift; 87 88 # see what kind of command this is 89 return $self->err_line('unknown_command') 90 unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s*(.*)/; 91 92 $self->{reqid} = $1 || undef; 93 my ($client_ip, $line) = ($2, $3); 94 95 # set global variables for zone determination 96 local $MogileFS::REQ_client_ip = $client_ip; 97 98 # Use as array here, otherwise we get a string which breaks usage of 99 # Time::HiRes::tv_interval further on. 100 $self->{querystarttime} = [ Time::HiRes::gettimeofday() ]; 101 102 # fallback to normal command handling 103 if ($line =~ /^(\w+)\s*(.*)/) { 104 my ($cmd, $orig_args) = ($1, $2); 105 $cmd = lc($cmd); 106 107 no strict 'refs'; 108 my $cmd_handler = *{"cmd_$cmd"}{CODE}; 109 my $args = decode_url_args(\$orig_args); 110 $self->{callid} = $args->{callid}; 111 if ($cmd_handler) { 112 local $MogileFS::REQ_altzone = ($args->{zone} && $args->{zone} eq 'alt'); 113 eval { 114 $cmd_handler->($self, $args); 115 }; 116 if ($@) { 117 my $errc = error_code($@); 118 if ($errc eq "dup") { 119 return $self->err_line("dup"); 120 } else { 121 warn "Error: $@\n"; 122 error("Error running command '$cmd': $@"); 123 return $self->err_line("failure"); 124 } 125 } 126 return; 127 } 128 } 129 130 return $self->err_line('unknown_command'); 131} 132 133# this is a half-finished command. in particular, errors tend to 134# crash the parent or child or something. it's a quick hack for a quick 135# ops task that needs done. note in particular how it reaches across 136# package boundaries into an API that the Replicator probably doesn't 137# want exposed. 138sub cmd_httpcopy { 139 my MogileFS::Worker::Query $self = shift; 140 my $args = shift; 141 my $sdevid = $args->{sdevid}; 142 my $ddevid = $args->{ddevid}; 143 my $fid = $args->{fid}; 144 145 my $err; 146 my $rv = MogileFS::Worker::Replicate::http_copy(sdevid => $sdevid, 147 ddevid => $ddevid, 148 fid => $fid, 149 errref => \$err); 150 if ($rv) { 151 my $dfid = MogileFS::DevFID->new($ddevid, $fid); 152 $dfid->add_to_db 153 or return $self->err_line("copy_err", "failed to add link to database"); 154 return $self->ok_line; 155 } else { 156 return $self->err_line("copy_err", $err); 157 } 158} 159 160# returns 0 on error, or dmid of domain 161sub check_domain { 162 my MogileFS::Worker::Query $self = shift; 163 my $args = shift; 164 165 my $domain = $args->{domain}; 166 167 return $self->err_line("no_domain") unless defined $domain && length $domain; 168 169 # validate domain 170 my $dmid = eval { Mgd::domain_factory()->get_by_name($domain)->id } or 171 return $self->err_line("unreg_domain"); 172 173 return $dmid; 174} 175 176sub cmd_sleep { 177 my MogileFS::Worker::Query $self = shift; 178 my $args = shift; 179 sleep($args->{duration} || 10); 180 return $self->ok_line; 181} 182 183sub cmd_test { 184 my MogileFS::Worker::Query $self = shift; 185 my $args = shift; 186 die "Crashed on purpose" if $args->{crash}; 187 return $self->ok_line; 188} 189 190sub cmd_clear_cache { 191 my MogileFS::Worker::Query $self = shift; 192 193 $self->forget_that_monitor_has_run; 194 $self->send_to_parent(":refresh_monitor"); 195 $self->wait_for_monitor; 196 197 return $self->ok_line(@_); 198} 199 200sub cmd_create_open { 201 my MogileFS::Worker::Query $self = shift; 202 my $args = shift; 203 204 # has to be filled out for some plugins 205 $args->{dmid} = $self->check_domain($args) or return; 206 207 # first, pass this to a hook to do any manipulations needed 208 eval {MogileFS::run_global_hook('cmd_create_open', $args)}; 209 210 return $self->err_line("plugin_aborted", "$@") 211 if $@; 212 213 # validate parameters 214 my $dmid = $args->{dmid}; 215 my $key = $args->{key} || ""; 216 my $multi = $args->{multi_dest} ? 1 : 0; 217 my $size = $args->{size} || undef; # Size is optional at create time, 218 # but used to grep devices if available 219 220 # optional profiling of stages, if $args->{debug_profile} 221 my @profpoints; # array of [point,hires-starttime] 222 my $profstart = sub { 223 my $pt = shift; 224 push @profpoints, [$pt, Time::HiRes::time()]; 225 }; 226 $profstart = sub {} unless $args->{debug_profile}; 227 $profstart->("begin"); 228 229 # we want it to be undef if not explicit, else force to numeric 230 my $exp_fidid = $args->{fid} ? int($args->{fid}) : undef; 231 232 # get DB handle 233 my $sto = Mgd::get_store(); 234 235 # figure out what classid this file is for 236 my $class = $args->{class} || ""; 237 my $classid = 0; 238 if (length($class)) { 239 $classid = eval { Mgd::class_factory()->get_by_name($dmid, $class)->id } 240 or return $self->err_line("unreg_class"); 241 } 242 243 # if we haven't heard from the monitoring job yet, we need to chill a bit 244 # to prevent a race where we tell a user that we can't create a file when 245 # in fact we've just not heard from the monitor 246 $profstart->("wait_monitor"); 247 $self->wait_for_monitor; 248 249 $profstart->("find_deviceid"); 250 251 my @devices = Mgd::device_factory()->get_all; 252 if ($size) { 253 # We first ignore all the devices with an unknown space free. 254 @devices = grep { length($_->mb_free) && ($_->mb_free * 1024*1024) > $size } @devices; 255 256 # If we didn't find any, try all the devices with an unknown space free. 257 # This may happen if mogstored isn't running. 258 if (!@devices) { 259 @devices = grep { !length($_->mb_free) } Mgd::device_factory()->get_all; 260 } 261 } 262 263 unless (MogileFS::run_global_hook('cmd_create_open_order_devices', [ @devices ], \@devices)) { 264 @devices = sort_devs_by_freespace(@devices); 265 } 266 267 # find suitable device(s) to put this file on. 268 my @dests; # MogileFS::Device objects which are suitable 269 270 while (scalar(@dests) < ($multi ? 3 : 1)) { 271 my $ddev = shift @devices; 272 273 last unless $ddev; 274 next unless $ddev->not_on_hosts(map { $_->host } @dests); 275 276 push @dests, $ddev; 277 } 278 return $self->err_line("no_devices") unless @dests; 279 280 my $fidid = eval { 281 $sto->register_tempfile( 282 fid => $exp_fidid, # may be undef/NULL to mean auto-increment 283 dmid => $dmid, 284 key => $key, 285 classid => $classid, 286 devids => join(',', map { $_->id } @dests), 287 ); 288 }; 289 unless ($fidid) { 290 my $errc = error_code($@); 291 return $self->err_line("fid_in_use") if $errc eq "dup"; 292 warn "Error registering tempfile: $@\n"; 293 return $self->err_line("db"); 294 } 295 296 # make sure directories exist for client to be able to PUT into 297 my %dir_done; 298 $profstart->("vivify_dir_on_all_devs"); 299 300 my $t0 = Time::HiRes::time(); 301 foreach my $dev (@dests) { 302 my $dfid = MogileFS::DevFID->new($dev, $fidid); 303 $dfid->vivify_directories(sub { 304 $dir_done{$dfid->devid} = Time::HiRes::time() - $t0; 305 }); 306 } 307 308 # don't start the event loop if results are all cached 309 if (scalar keys %dir_done != scalar @dests) { 310 Danga::Socket->SetPostLoopCallback(sub { scalar keys %dir_done != scalar @dests }); 311 Danga::Socket->EventLoop; 312 } 313 $profstart->("end"); 314 315 # common reply variables 316 my $res = { 317 fid => $fidid, 318 }; 319 320 # add profiling data 321 if (@profpoints) { 322 $res->{profpoints} = 0; 323 for (my $i=0; $i<$#profpoints; $i++) { 324 my $ptnum = ++$res->{profpoints}; 325 $res->{"prof_${ptnum}_name"} = $profpoints[$i]->[0]; 326 $res->{"prof_${ptnum}_time"} = 327 sprintf("%0.03f", 328 $profpoints[$i+1]->[1] - $profpoints[$i]->[1]); 329 } 330 while (my ($devid, $time) = each %dir_done) { 331 my $ptnum = ++$res->{profpoints}; 332 $res->{"prof_${ptnum}_name"} = "vivify_dir_on_dev$devid"; 333 $res->{"prof_${ptnum}_time"} = sprintf("%0.03f", $time); 334 } 335 } 336 337 # add path info 338 if ($multi) { 339 my $ct = 0; 340 foreach my $dev (@dests) { 341 $ct++; 342 $res->{"devid_$ct"} = $dev->id; 343 $res->{"path_$ct"} = MogileFS::DevFID->new($dev, $fidid)->url; 344 } 345 $res->{dev_count} = $ct; 346 } else { 347 $res->{devid} = $dests[0]->id; 348 $res->{path} = MogileFS::DevFID->new($dests[0], $fidid)->url; 349 } 350 351 return $self->ok_line($res); 352} 353 354sub sort_devs_by_freespace { 355 my @devices_with_weights = map { 356 [$_, 100 * $_->percent_free] 357 } sort { 358 $b->percent_free <=> $a->percent_free; 359 } grep { 360 $_->should_get_new_files; 361 } @_; 362 363 my @list = 364 MogileFS::Util::weighted_list(splice(@devices_with_weights, 0, 20)); 365 366 return @list; 367} 368 369sub valid_key { 370 my ($key) = @_; 371 372 return defined($key) && length($key); 373} 374 375sub cmd_create_close { 376 my MogileFS::Worker::Query $self = shift; 377 my $args = shift; 378 379 # has to be filled out for some plugins 380 $args->{dmid} = $self->check_domain($args) or return; 381 382 # call out to a hook that might modify the arguments for us 383 MogileFS::run_global_hook('cmd_create_close', $args); 384 385 # late validation of parameters 386 my $dmid = $args->{dmid}; 387 my $key = $args->{key}; 388 my $fidid = $args->{fid} or return $self->err_line("no_fid"); 389 my $devid = $args->{devid} or return $self->err_line("no_devid"); 390 my $path = $args->{path} or return $self->err_line("no_path"); 391 my $checksum = $args->{checksum}; 392 393 if ($checksum) { 394 $checksum = eval { MogileFS::Checksum->from_string($fidid, $checksum) }; 395 return $self->err_line("invalid_checksum_format") if $@; 396 } 397 398 my $fid = MogileFS::FID->new($fidid); 399 my $dfid = MogileFS::DevFID->new($devid, $fid); 400 401 # is the provided path what we'd expect for this fid/devid? 402 return $self->err_line("bogus_args") 403 unless $path eq $dfid->url; 404 405 my $sto = Mgd::get_store(); 406 407 # find the temp file we're closing and making real. If another worker 408 # already has it, bail out---the client closed it twice. 409 # this is racy, but the only expected use case is a client retrying. 410 # should still be fixed better once more scalable locking is available. 411 my $trow = $sto->delete_and_return_tempfile_row($fidid) or 412 return $self->err_line("no_temp_file"); 413 414 # Protect against leaving orphaned uploads. 415 my $failed = sub { 416 $dfid->add_to_db; 417 $fid->delete; 418 }; 419 420 unless ($trow->{devids} =~ m/\b$devid\b/) { 421 $failed->(); 422 return $self->err_line("invalid_destdev", "File uploaded to invalid dest $devid. Valid devices were: " . $trow->{devids}); 423 } 424 425 # if a temp file is closed without a provided-key, that means to 426 # delete it. 427 unless (valid_key($key)) { 428 $failed->(); 429 return $self->ok_line; 430 } 431 432 # get size of file and verify that it matches what we were given, if anything 433 my $httpfile = MogileFS::HTTPFile->at($path); 434 my $size = $httpfile->size; 435 436 # size check is optional? Needs to support zero byte files. 437 $args->{size} = -1 unless $args->{size}; 438 if (!defined($size) || $size == MogileFS::HTTPFile::FILE_MISSING) { 439 # storage node is unreachable or the file is missing 440 my $type = defined $size ? "missing" : "cantreach"; 441 my $lasterr = MogileFS::Util::last_error(); 442 $failed->(); 443 return $self->err_line("size_verify_error", "Expected: $args->{size}; actual: 0 ($type); path: $path; error: $lasterr") 444 } 445 446 if ($args->{size} > -1 && ($args->{size} != $size)) { 447 $failed->(); 448 return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path") 449 } 450 451 # checksum validation is optional as it can be very expensive 452 # However, we /always/ verify it if the client wants us to, even 453 # if the class does not enforce or store it. 454 if ($checksum && $args->{checksumverify}) { 455 my $alg = $checksum->hashname; 456 my $actual = $httpfile->digest($alg, sub { $self->still_alive }); 457 if ($actual ne $checksum->{checksum}) { 458 $failed->(); 459 $actual = "$alg:" . unpack("H*", $actual); 460 return $self->err_line("checksum_mismatch", 461 "Expected: $checksum; actual: $actual; path: $path"); 462 } 463 } 464 465 # see if we have a fid for this key already 466 my $old_fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key); 467 if ($old_fid) { 468 # Fail if a file already exists for this fid. Should never 469 # happen, as it should not be possible to close a file twice. 470 return $self->err_line("fid_exists") 471 unless $old_fid->{fidid} != $fidid; 472 473 $old_fid->delete; 474 } 475 476 # TODO: check for EIO? 477 478 # insert file_on row 479 $dfid->add_to_db; 480 481 $checksum->maybe_save($dmid, $trow->{classid}) if $checksum; 482 483 $sto->replace_into_file( 484 fidid => $fidid, 485 dmid => $dmid, 486 key => $key, 487 length => $size, 488 classid => $trow->{classid}, 489 devcount => 1, 490 ); 491 492 # mark it as needing replicating: 493 $fid->enqueue_for_replication(); 494 495 # call the hook - if this fails, we need to back the file out 496 my $rv = MogileFS::run_global_hook('file_stored', $args); 497 if (defined $rv && ! $rv) { # undef = no hooks, 1 = success, 0 = failure 498 $fid->delete; 499 return $self->err_line("plugin_aborted"); 500 } 501 502 # all went well, we would've hit condthrow on DB errors 503 return $self->ok_line; 504} 505 506sub cmd_updateclass { 507 my MogileFS::Worker::Query $self = shift; 508 my $args = shift; 509 510 $args->{dmid} = $self->check_domain($args) or return; 511 512 # call out to a hook that might modify the arguments for us, abort if it tells us to 513 my $rv = MogileFS::run_global_hook('cmd_updateclass', $args); 514 return $self->err_line('plugin_aborted') if defined $rv && ! $rv; 515 516 my $dmid = $args->{dmid}; 517 my $key = $args->{key}; 518 valid_key($key) or return $self->err_line("no_key"); 519 my $class = $args->{class} or return $self->err_line("no_class"); 520 521 my $classobj = Mgd::class_factory()->get_by_name($dmid, $class) 522 or return $self->err_line('class_not_found'); 523 my $classid = $classobj->id; 524 525 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key) 526 or return $self->err_line('invalid_key'); 527 528 my @devids = $fid->devids; 529 return $self->err_line("no_devices") unless @devids; 530 531 if ($fid->classid != $classid) { 532 $fid->update_class(classid => $classid); 533 $fid->enqueue_for_replication(); 534 } 535 536 return $self->ok_line; 537} 538 539sub cmd_delete { 540 my MogileFS::Worker::Query $self = shift; 541 my $args = shift; 542 543 # validate domain for plugins 544 $args->{dmid} = $self->check_domain($args) or return; 545 546 # now invoke the plugin, abort if it tells us to 547 my $rv = MogileFS::run_global_hook('cmd_delete', $args); 548 return $self->err_line('plugin_aborted') 549 if defined $rv && ! $rv; 550 551 # validate parameters 552 my $dmid = $args->{dmid}; 553 my $key = $args->{key}; 554 555 valid_key($key) or return $self->err_line("no_key"); 556 557 # is this fid still owned by this key? 558 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key) 559 or return $self->err_line("unknown_key"); 560 561 $fid->delete; 562 563 return $self->ok_line; 564} 565 566# Takes either domain/dkey or fid and tries to return as much as possible. 567sub cmd_file_debug { 568 my MogileFS::Worker::Query $self = shift; 569 my $args = shift; 570 # Talk to the master since this is "debug mode" 571 my $sto = Mgd::get_store(); 572 my $ret = {}; 573 574 # If a FID is provided, just use that. 575 my $fid; 576 my $fidid; 577 if ($args->{fid}) { 578 $fidid = $args->{fid}+0; 579 # It's not fatal if we don't find the row here. 580 $fid = $sto->file_row_from_fidid($args->{fid}+0); 581 } else { 582 # If not, require dmid/dkey and pick up the fid from there. 583 $args->{dmid} = $self->check_domain($args) or return; 584 return $self->err_line("no_key") unless valid_key($args->{key}); 585 586 # now invoke the plugin, abort if it tells us to 587 my $rv = MogileFS::run_global_hook('cmd_file_debug', $args); 588 return $self->err_line('plugin_aborted') 589 if defined $rv && ! $rv; 590 591 $fid = $sto->file_row_from_dmid_key($args->{dmid}, $args->{key}); 592 return $self->err_line("unknown_key") unless $fid; 593 $fidid = $fid->{fid}; 594 } 595 596 if ($fid) { 597 $fid->{domain} = Mgd::domain_factory()->get_by_id($fid->{dmid})->name; 598 $fid->{class} = Mgd::class_factory()->get_by_id($fid->{dmid}, 599 $fid->{classid})->name; 600 } 601 602 # Fetch all of the queue data. 603 my $tfile = $sto->tempfile_row_from_fid($fidid); 604 my $repl = $sto->find_fid_from_file_to_replicate($fidid); 605 my $del = $sto->find_fid_from_file_to_delete2($fidid); 606 my $reb = $sto->find_fid_from_file_to_queue($fidid, REBAL_QUEUE); 607 my $fsck = $sto->find_fid_from_file_to_queue($fidid, FSCK_QUEUE); 608 609 # Fetch file_on rows, and turn into paths. 610 my @devids = $sto->fid_devids($fidid); 611 for my $devid (@devids) { 612 # Won't matter if we can't make the path (dev is dead/deleted/etc) 613 eval { 614 my $dfid = MogileFS::DevFID->new($devid, $fidid); 615 my $path = $dfid->get_url; 616 $ret->{'devpath_' . $devid} = $path; 617 }; 618 } 619 $ret->{devids} = join(',', @devids) if @devids; 620 621 # Always look for a checksum 622 my $checksum = Mgd::get_store()->get_checksum($fidid); 623 if ($checksum) { 624 $checksum = MogileFS::Checksum->new($checksum); 625 $ret->{checksum} = $checksum->info; 626 } else { 627 $ret->{checksum} = 'NONE'; 628 } 629 630 # Return file row (if found) and all other data. 631 my %toret = (fid => $fid, tempfile => $tfile, replqueue => $repl, 632 delqueue => $del, rebqueue => $reb, fsckqueue => $fsck); 633 while (my ($key, $hash) = each %toret) { 634 while (my ($name, $val) = each %$hash) { 635 $ret->{$key . '_' . $name} = $val; 636 } 637 } 638 639 return $self->err_line("unknown_fid") unless keys %$ret; 640 return $self->ok_line($ret); 641} 642 643sub cmd_file_info { 644 my MogileFS::Worker::Query $self = shift; 645 my $args = shift; 646 647 # validate domain for plugins 648 $args->{dmid} = $self->check_domain($args) or return; 649 650 # now invoke the plugin, abort if it tells us to 651 my $rv = MogileFS::run_global_hook('cmd_file_info', $args); 652 return $self->err_line('plugin_aborted') 653 if defined $rv && ! $rv; 654 655 # validate parameters 656 my $dmid = $args->{dmid}; 657 my $key = $args->{key}; 658 659 valid_key($key) or return $self->err_line("no_key"); 660 661 my $fid; 662 Mgd::get_store()->slaves_ok(sub { 663 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key); 664 }); 665 $fid or return $self->err_line("unknown_key"); 666 667 my $ret = {}; 668 $ret->{fid} = $fid->id; 669 $ret->{domain} = Mgd::domain_factory()->get_by_id($fid->dmid)->name; 670 my $class = Mgd::class_factory()->get_by_id($fid->dmid, $fid->classid); 671 $ret->{class} = $class->name; 672 if ($class->{hashtype}) { 673 my $checksum = Mgd::get_store()->get_checksum($fid->id); 674 if ($checksum) { 675 $checksum = MogileFS::Checksum->new($checksum); 676 $ret->{checksum} = $checksum->info; 677 } else { 678 $ret->{checksum} = "MISSING"; 679 } 680 } 681 $ret->{key} = $key; 682 $ret->{'length'} = $fid->length; 683 $ret->{devcount} = $fid->devcount; 684 # Only if requested, also return the raw devids. 685 # Caller should use get_paths if they intend to fetch the file. 686 if ($args->{devices}) { 687 $ret->{devids} = join(',', $fid->devids); 688 } 689 690 return $self->ok_line($ret); 691} 692 693sub cmd_list_fids { 694 my MogileFS::Worker::Query $self = shift; 695 my $args = shift; 696 697 # validate parameters 698 my $fromfid = ($args->{from} || 0)+0; 699 my $count = ($args->{to} || 0)+0; 700 $count ||= 100; 701 $count = 500 if $count > 500 || $count < 0; 702 703 my $rows = Mgd::get_store()->file_row_from_fidid_range($fromfid, $count); 704 return $self->err_line('failure') unless $rows; 705 return $self->ok_line({ fid_count => 0 }) unless @$rows; 706 707 # setup temporary storage of class/host 708 my (%domains, %classes); 709 710 # now iterate over our data rows and construct result 711 my $ct = 0; 712 my $ret = {}; 713 foreach my $r (@$rows) { 714 $ct++; 715 my $fid = $r->{fid}; 716 $ret->{"fid_${ct}_fid"} = $fid; 717 $ret->{"fid_${ct}_domain"} = ($domains{$r->{dmid}} ||= 718 Mgd::domain_factory()->get_by_id($r->{dmid})->name); 719 $ret->{"fid_${ct}_class"} = ($classes{$r->{dmid}}{$r->{classid}} ||= 720 Mgd::class_factory()->get_by_id($r->{dmid}, $r->{classid})->name); 721 $ret->{"fid_${ct}_key"} = $r->{dkey}; 722 $ret->{"fid_${ct}_length"} = $r->{length}; 723 $ret->{"fid_${ct}_devcount"} = $r->{devcount}; 724 } 725 $ret->{fid_count} = $ct; 726 return $self->ok_line($ret); 727} 728 729sub cmd_list_keys { 730 my MogileFS::Worker::Query $self = shift; 731 my $args = shift; 732 733 # validate parameters 734 my $dmid = $self->check_domain($args) or return; 735 my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit}); 736 737 if (defined $prefix and $prefix ne '') { 738 # now validate that after matches prefix 739 return $self->err_line('after_mismatch') 740 if $after && $after !~ /^$prefix/; 741 } 742 743 $limit ||= 1000; 744 $limit += 0; 745 $limit = 1000 if $limit > 1000; 746 747 my $keys = Mgd::get_store()->get_keys_like($dmid, $prefix, $after, $limit); 748 749 # if we got nothing, say so 750 return $self->err_line('none_match') unless $keys && @$keys; 751 752 # construct the output and send 753 my $ret = { key_count => 0, next_after => '' }; 754 foreach my $key (@$keys) { 755 $ret->{key_count}++; 756 $ret->{next_after} = $key 757 if $key gt $ret->{next_after}; 758 $ret->{"key_$ret->{key_count}"} = $key; 759 } 760 return $self->ok_line($ret); 761} 762 763sub cmd_rename { 764 my MogileFS::Worker::Query $self = shift; 765 my $args = shift; 766 767 # validate parameters 768 my $dmid = $self->check_domain($args) or return; 769 my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key}); 770 unless (valid_key($fkey) && valid_key($tkey)) { 771 return $self->err_line("no_key"); 772 } 773 774 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $fkey) 775 or return $self->err_line("unknown_key"); 776 777 $fid->rename($tkey) or 778 $self->err_line("key_exists"); 779 780 return $self->ok_line; 781} 782 783sub cmd_get_hosts { 784 my MogileFS::Worker::Query $self = shift; 785 my $args = shift; 786 787 my $ret = { hosts => 0 }; 788 for my $host (Mgd::host_factory()->get_all) { 789 next if defined $args->{hostid} && $host->id != $args->{hostid}; 790 my $n = ++$ret->{hosts}; 791 my $fields = $host->fields(qw(hostid status hostname hostip http_port 792 http_get_port altip altmask)); 793 while (my ($key, $val) = each %$fields) { 794 # must be regular data so copy it in 795 $ret->{"host${n}_$key"} = $val; 796 } 797 } 798 799 return $self->ok_line($ret); 800} 801 802sub cmd_get_devices { 803 my MogileFS::Worker::Query $self = shift; 804 my $args = shift; 805 806 my $ret = { devices => 0 }; 807 for my $dev (Mgd::device_factory()->get_all) { 808 next if defined $args->{devid} && $dev->id != $args->{devid}; 809 my $n = ++$ret->{devices}; 810 811 my $sum = $dev->fields; 812 while (my ($key, $val) = each %$sum) { 813 $ret->{"dev${n}_$key"} = $val; 814 } 815 } 816 817 return $self->ok_line($ret); 818} 819 820sub cmd_create_device { 821 my MogileFS::Worker::Query $self = shift; 822 my $args = shift; 823 824 my $status = $args->{state} || "alive"; 825 return $self->err_line("invalid_state") unless 826 device_state($status); 827 828 my $devid = $args->{devid}; 829 return $self->err_line("invalid_devid") unless $devid && $devid =~ /^\d+$/; 830 831 my $hostid; 832 833 my $sto = Mgd::get_store(); 834 if ($args->{hostid} && $args->{hostid} =~ /^\d+$/) { 835 $hostid = $sto->get_hostid_by_id($args->{hostid}); 836 return $self->err_line("unknown_hostid") unless $hostid; 837 } elsif (my $hname = $args->{hostname}) { 838 $hostid = $sto->get_hostid_by_name($hname); 839 return $self->err_line("unknown_host") unless $hostid; 840 } else { 841 return $self->err_line("bad_args", "No hostid/hostname parameter"); 842 } 843 844 if (eval { $sto->create_device($devid, $hostid, $status) }) { 845 return $self->cmd_clear_cache; 846 } 847 848 my $errc = error_code($@); 849 return $self->err_line("existing_devid") if $errc; 850 die $@; # rethrow; 851} 852 853sub cmd_create_domain { 854 my MogileFS::Worker::Query $self = shift; 855 my $args = shift; 856 857 my $domain = $args->{domain} or 858 return $self->err_line('no_domain'); 859 860 my $dom = eval { Mgd::get_store()->create_domain($domain); }; 861 if ($@) { 862 if (error_code($@) eq "dup") { 863 return $self->err_line('domain_exists'); 864 } 865 return $self->err_line('failure', "$@"); 866 } 867 868 return $self->cmd_clear_cache({ domain => $domain }); 869} 870 871sub cmd_delete_domain { 872 my MogileFS::Worker::Query $self = shift; 873 my $args = shift; 874 875 my $domain = $args->{domain} or 876 return $self->err_line('no_domain'); 877 878 my $sto = Mgd::get_store(); 879 my $dmid = $sto->get_domainid_by_name($domain) or 880 return $self->err_line('domain_not_found'); 881 882 if (eval { $sto->delete_domain($dmid) }) { 883 return $self->cmd_clear_cache({ domain => $domain }); 884 } 885 886 my $err = error_code($@); 887 return $self->err_line('domain_has_files') if $err eq "has_files"; 888 return $self->err_line('domain_has_classes') if $err eq "has_classes"; 889 return $self->err_line("failure"); 890} 891 892sub cmd_create_class { 893 my MogileFS::Worker::Query $self = shift; 894 my $args = shift; 895 896 my $domain = $args->{domain}; 897 return $self->err_line('no_domain') unless length $domain; 898 899 my $class = $args->{class}; 900 return $self->err_line('no_class') unless length $class; 901 902 my $mindevcount = $args->{mindevcount}+0; 903 return $self->err_line('invalid_mindevcount') unless $mindevcount > 0; 904 905 my $replpolicy = $args->{replpolicy} || ''; 906 if ($replpolicy) { 907 eval { 908 MogileFS::ReplicationPolicy->new_from_policy_string($replpolicy); 909 }; 910 return $self->err_line('invalid_replpolicy', $@) if $@; 911 } 912 913 my $hashtype = $args->{hashtype}; 914 if ($hashtype && $hashtype ne 'NONE') { 915 my $tmp = $MogileFS::Checksum::NAME2TYPE{$hashtype}; 916 return $self->err_line('invalid_hashtype') unless $tmp; 917 $hashtype = $tmp; 918 } 919 920 my $sto = Mgd::get_store(); 921 my $dmid = $sto->get_domainid_by_name($domain) or 922 return $self->err_line('domain_not_found'); 923 924 my $clsid = $sto->get_classid_by_name($dmid, $class); 925 if (!defined $clsid && $args->{update} && $class eq 'default') { 926 $args->{update} = 0; 927 } 928 if ($args->{update}) { 929 return $self->err_line('class_not_found') if ! defined $clsid; 930 $sto->update_class_name(dmid => $dmid, classid => $clsid, 931 classname => $class); 932 } else { 933 $clsid = eval { $sto->create_class($dmid, $class); }; 934 if ($@) { 935 if (error_code($@) eq "dup") { 936 return $self->err_line('class_exists'); 937 } 938 return $self->err_line('failure', "$@"); 939 } 940 } 941 $sto->update_class_mindevcount(dmid => $dmid, classid => $clsid, 942 mindevcount => $mindevcount); 943 # don't erase an existing replpolicy if we're not setting a new one. 944 $sto->update_class_replpolicy(dmid => $dmid, classid => $clsid, 945 replpolicy => $replpolicy) if $replpolicy; 946 if ($hashtype) { 947 $sto->update_class_hashtype(dmid => $dmid, classid => $clsid, 948 hashtype => $hashtype eq 'NONE' ? undef : $hashtype); 949 } 950 951 # return success 952 return $self->cmd_clear_cache({ class => $class, mindevcount => $mindevcount, domain => $domain }); 953} 954 955sub cmd_update_class { 956 my MogileFS::Worker::Query $self = shift; 957 my $args = shift; 958 959 # simply passes through to create_class with update set 960 $self->cmd_create_class({ %$args, update => 1 }); 961} 962 963sub cmd_delete_class { 964 my MogileFS::Worker::Query $self = shift; 965 my $args = shift; 966 967 my $domain = $args->{domain}; 968 return $self->err_line('no_domain') unless length $domain; 969 my $class = $args->{class}; 970 return $self->err_line('no_class') unless length $domain; 971 972 return $self->err_line('nodel_default_class') if $class eq 'default'; 973 974 my $sto = Mgd::get_store(); 975 my $dmid = $sto->get_domainid_by_name($domain) or 976 return $self->err_line('domain_not_found'); 977 my $clsid = $sto->get_classid_by_name($dmid, $class); 978 return $self->err_line('class_not_found') unless defined $clsid; 979 980 if (eval { Mgd::get_store()->delete_class($dmid, $clsid) }) { 981 return $self->cmd_clear_cache({ domain => $domain, class => $class }); 982 } 983 984 my $errc = error_code($@); 985 return $self->err_line('class_has_files') if $errc eq "has_files"; 986 return $self->err_line('failure'); 987} 988 989sub cmd_create_host { 990 my MogileFS::Worker::Query $self = shift; 991 my $args = shift; 992 993 my $hostname = $args->{host} or 994 return $self->err_line('no_host'); 995 996 my $sto = Mgd::get_store(); 997 my $hostid = $sto->get_hostid_by_name($hostname); 998 999 # if we're creating a new host, require ip/port, and default to 1000 # host being down if client didn't specify 1001 if ($args->{update}) { 1002 return $self->err_line('host_not_found') unless $hostid; 1003 } else { 1004 return $self->err_line('host_exists') if $hostid; 1005 return $self->err_line('no_ip') unless $args->{ip}; 1006 return $self->err_line('no_port') unless $args->{port}; 1007 $args->{status} ||= 'down'; 1008 } 1009 1010 if ($args->{status}) { 1011 return $self->err_line('unknown_state') 1012 unless MogileFS::Host->valid_state($args->{status}); 1013 } 1014 1015 # arguments all good, let's do it. 1016 1017 $hostid ||= $sto->create_host($hostname, $args->{ip}); 1018 1019 # Protocol mismatch data fixup. 1020 $args->{hostip} = delete $args->{ip} if exists $args->{ip}; 1021 $args->{http_port} = delete $args->{port} if exists $args->{port}; 1022 $args->{http_get_port} = delete $args->{getport} if exists $args->{getport}; 1023 my @toupdate = grep { exists $args->{$_} } qw(status hostip http_port 1024 http_get_port altip altmask); 1025 $sto->update_host($hostid, { map { $_ => $args->{$_} } @toupdate }); 1026 1027 # return success 1028 return $self->cmd_clear_cache({ hostid => $hostid, hostname => $hostname }); 1029} 1030 1031sub cmd_update_host { 1032 my MogileFS::Worker::Query $self = shift; 1033 my $args = shift; 1034 1035 # simply passes through to create_host with update set 1036 $self->cmd_create_host({ %$args, update => 1 }); 1037} 1038 1039sub cmd_delete_host { 1040 my MogileFS::Worker::Query $self = shift; 1041 my $args = shift; 1042 1043 my $sto = Mgd::get_store(); 1044 my $hostid = $sto->get_hostid_by_name($args->{host}) 1045 or return $self->err_line('unknown_host'); 1046 1047 # TODO: $sto->delete_host should have a "has_devices" test internally 1048 for my $dev ($sto->get_all_devices) { 1049 return $self->err_line('host_not_empty') 1050 if $dev->{hostid} == $hostid; 1051 } 1052 1053 $sto->delete_host($hostid); 1054 1055 return $self->cmd_clear_cache; 1056} 1057 1058sub cmd_get_domains { 1059 my MogileFS::Worker::Query $self = shift; 1060 my $args = shift; 1061 1062 my $ret = {}; 1063 my $dm_n = 0; 1064 for my $dom (Mgd::domain_factory()->get_all) { 1065 $dm_n++; 1066 $ret->{"domain${dm_n}"} = $dom->name; 1067 my $cl_n = 0; 1068 foreach my $cl ($dom->classes) { 1069 $cl_n++; 1070 $ret->{"domain${dm_n}class${cl_n}name"} = $cl->name; 1071 $ret->{"domain${dm_n}class${cl_n}mindevcount"} = $cl->mindevcount; 1072 $ret->{"domain${dm_n}class${cl_n}replpolicy"} = 1073 $cl->repl_policy_string; 1074 $ret->{"domain${dm_n}class${cl_n}hashtype"} = $cl->hashtype_string; 1075 } 1076 $ret->{"domain${dm_n}classes"} = $cl_n; 1077 } 1078 $ret->{"domains"} = $dm_n; 1079 1080 return $self->ok_line($ret); 1081} 1082 1083sub cmd_get_paths { 1084 my MogileFS::Worker::Query $self = shift; 1085 my $args = shift; 1086 1087 # memcache mappings are as follows: 1088 # mogfid:<dmid>:<dkey> -> fidid 1089 # mogdevids:<fidid> -> \@devids (and TODO: invalidate when deletion is run!) 1090 1091 # if you specify 'noverify', that means a correct answer isn't needed and memcache can 1092 # be used. 1093 my $memc = MogileFS::Config->memcache_client; 1094 my $get_from_memc = $memc && $args->{noverify}; 1095 my $memcache_ttl = MogileFS::Config->server_setting_cached("memcache_ttl") || 3600; 1096 1097 # validate domain for plugins 1098 $args->{dmid} = $self->check_domain($args) or return; 1099 1100 # now invoke the plugin, abort if it tells us to 1101 my $rv = MogileFS::run_global_hook('cmd_get_paths', $args); 1102 return $self->err_line('plugin_aborted') 1103 if defined $rv && ! $rv; 1104 1105 # validate parameters 1106 my $dmid = $args->{dmid}; 1107 my $key = $args->{key}; 1108 1109 valid_key($key) or return $self->err_line("no_key"); 1110 1111 # We default to returning two possible paths. 1112 # but the client may ask for more if they want. 1113 my $pathcount = $args->{pathcount} || 2; 1114 $pathcount = 2 if $pathcount < 2; 1115 1116 # get DB handle 1117 my $fid; 1118 my $need_fid_in_memcache = 0; 1119 my $mogfid_memkey = "mogfid:$args->{dmid}:$key"; 1120 if ($get_from_memc) { 1121 if (my $fidid = $memc->get($mogfid_memkey)) { 1122 $fid = MogileFS::FID->new($fidid); 1123 } else { 1124 $need_fid_in_memcache = 1; 1125 } 1126 } 1127 unless ($fid) { 1128 Mgd::get_store()->slaves_ok(sub { 1129 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key); 1130 }); 1131 $fid or return $self->err_line("unknown_key"); 1132 } 1133 1134 # add to memcache, if needed. for an hour. 1135 $memc->set($mogfid_memkey, $fid->id, $memcache_ttl ) if $need_fid_in_memcache || ($memc && !$get_from_memc); 1136 1137 my $dmap = Mgd::device_factory()->map_by_id; 1138 1139 my $ret = { 1140 paths => 0, 1141 }; 1142 1143 # find devids that FID is on in memcache or db. 1144 my @fid_devids; 1145 my $need_devids_in_memcache = 0; 1146 my $devid_memkey = "mogdevids:" . $fid->id; 1147 if ($get_from_memc) { 1148 if (my $list = $memc->get($devid_memkey)) { 1149 @fid_devids = @$list; 1150 } else { 1151 $need_devids_in_memcache = 1; 1152 } 1153 } 1154 unless (@fid_devids) { 1155 Mgd::get_store()->slaves_ok(sub { 1156 @fid_devids = $fid->devids; 1157 }); 1158 $memc->set($devid_memkey, \@fid_devids, $memcache_ttl ) if $need_devids_in_memcache || ($memc && !$get_from_memc); 1159 } 1160 1161 my @devices = map { $dmap->{$_} } @fid_devids; 1162 1163 my @sorted_devs; 1164 unless (MogileFS::run_global_hook('cmd_get_paths_order_devices', \@devices, \@sorted_devs)) { 1165 @sorted_devs = sort_devs_by_utilization(@devices); 1166 } 1167 1168 # keep one partially-bogus path around just in case we have nothing else to send. 1169 my $backup_path; 1170 1171 # files on devices set for drain may disappear soon. 1172 my @drain_paths; 1173 1174 # construct result paths 1175 foreach my $dev (@sorted_devs) { 1176 next unless $dev && $dev->host; 1177 1178 my $dfid = MogileFS::DevFID->new($dev, $fid); 1179 my $path = $dfid->get_url; 1180 my $currently_up = $dev->should_read_from; 1181 1182 if (! $currently_up) { 1183 $backup_path = $path; 1184 next; 1185 } 1186 1187 # only verify size one first one, and never verify if they've asked not to 1188 next unless 1189 $ret->{paths} || 1190 $args->{noverify} || 1191 $dfid->size_matches; 1192 1193 if ($dev->dstate->should_drain) { 1194 push @drain_paths, $path; 1195 next; 1196 } 1197 1198 my $n = ++$ret->{paths}; 1199 $ret->{"path$n"} = $path; 1200 last if $n == $pathcount; # one verified, one likely seems enough for now. time will tell. 1201 } 1202 1203 # deprioritize devices set for drain, they could disappear soon... 1204 # Clients /should/ try to use lower-numbered paths first to avoid this. 1205 if ($ret->{paths} < $pathcount && @drain_paths) { 1206 foreach my $path (@drain_paths) { 1207 my $n = ++$ret->{paths}; 1208 $ret->{"path$n"} = $path; 1209 last if $n == $pathcount; 1210 } 1211 } 1212 1213 # use our backup path if all else fails 1214 if ($backup_path && ! $ret->{paths}) { 1215 $ret->{paths} = 1; 1216 $ret->{path1} = $backup_path; 1217 } 1218 1219 return $self->ok_line($ret); 1220} 1221 1222sub sort_devs_by_utilization { 1223 my @devices_with_weights; 1224 1225 # is this fid still owned by this key? 1226 foreach my $dev (@_) { 1227 my $weight; 1228 my $util = $dev->observed_utilization; 1229 1230 if (defined($util) and $util =~ /\A\d+\Z/) { 1231 $weight = 102 - $util; 1232 $weight ||= 100; 1233 } else { 1234 $weight = $dev->weight; 1235 $weight ||= 100; 1236 } 1237 push @devices_with_weights, [$dev, $weight]; 1238 } 1239 1240 # randomly weight the devices 1241 my @list = MogileFS::Util::weighted_list(@devices_with_weights); 1242 1243 return @list; 1244} 1245 1246# ------------------------------------------------------------ 1247# 1248# NOTE: cmd_edit_file is EXPERIMENTAL. Please see the documentation 1249# for edit_file in L<MogileFS::Client>. 1250# It is not recommended to use cmd_edit_file on production systems. 1251# 1252# cmd_edit_file is similar to cmd_get_paths, except we: 1253# - take the device of the first path we would have returned 1254# - get a tempfile with a new fid (pointing to nothing) on the same device 1255# the tempfile has the same key, so will replace the old contents on 1256# create_close 1257# - detach the old fid from that device (leaving the file in place) 1258# - attach the new fid to that device 1259# - returns only the first path to the old fid and a path to new fid 1260# (the client then DAV-renames the old path to the new path) 1261# 1262# TODO - what to do about situations where we would be reducing the 1263# replica count to zero? 1264# TODO - what to do about pending replications where we remove the source? 1265# TODO - the current implementation of cmd_edit_file is based on a copy 1266# of cmd_get_paths. Once proven mature, consider factoring out common 1267# code from the two functions. 1268# ------------------------------------------------------------ 1269sub cmd_edit_file { 1270 my MogileFS::Worker::Query $self = shift; 1271 my $args = shift; 1272 1273 my $memc = MogileFS::Config->memcache_client; 1274 1275 # validate domain for plugins 1276 $args->{dmid} = $self->check_domain($args) or return; 1277 1278 # now invoke the plugin, abort if it tells us to 1279 my $rv = MogileFS::run_global_hook('cmd_get_paths', $args); 1280 return $self->err_line('plugin_aborted') 1281 if defined $rv && ! $rv; 1282 1283 # validate parameters 1284 my $dmid = $args->{dmid}; 1285 my $key = $args->{key}; 1286 1287 valid_key($key) or return $self->err_line("no_key"); 1288 1289 # get DB handle 1290 my $fid; 1291 my $need_fid_in_memcache = 0; 1292 my $mogfid_memkey = "mogfid:$args->{dmid}:$key"; 1293 if (my $fidid = $memc->get($mogfid_memkey)) { 1294 $fid = MogileFS::FID->new($fidid); 1295 } else { 1296 $need_fid_in_memcache = 1; 1297 } 1298 unless ($fid) { 1299 Mgd::get_store()->slaves_ok(sub { 1300 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key); 1301 }); 1302 $fid or return $self->err_line("unknown_key"); 1303 } 1304 1305 # add to memcache, if needed. for an hour. 1306 $memc->add($mogfid_memkey, $fid->id, 3600) if $need_fid_in_memcache; 1307 1308 my $dmap = Mgd::device_factory()->map_by_id; 1309 1310 my @devices_with_weights; 1311 1312 # find devids that FID is on in memcache or db. 1313 my @fid_devids; 1314 my $need_devids_in_memcache = 0; 1315 my $devid_memkey = "mogdevids:" . $fid->id; 1316 if (my $list = $memc->get($devid_memkey)) { 1317 @fid_devids = @$list; 1318 } else { 1319 $need_devids_in_memcache = 1; 1320 } 1321 unless (@fid_devids) { 1322 Mgd::get_store()->slaves_ok(sub { 1323 @fid_devids = $fid->devids; 1324 }); 1325 $memc->add($devid_memkey, \@fid_devids, 3600) if $need_devids_in_memcache; 1326 } 1327 1328 # is this fid still owned by this key? 1329 foreach my $devid (@fid_devids) { 1330 my $weight; 1331 my $dev = $dmap->{$devid}; 1332 my $util = $dev->observed_utilization; 1333 1334 if (defined($util) and $util =~ /\A\d+\Z/) { 1335 $weight = 102 - $util; 1336 $weight ||= 100; 1337 } else { 1338 $weight = $dev->weight; 1339 $weight ||= 100; 1340 } 1341 push @devices_with_weights, [$devid, $weight]; 1342 } 1343 1344 # randomly weight the devices 1345 # TODO - should we reverse the order, to leave the best 1346 # one there for get_paths? 1347 my @list = MogileFS::Util::weighted_list(@devices_with_weights); 1348 1349 # Filter out bad devs 1350 @list = grep { 1351 my $devid = $_; 1352 my $dev = $dmap->{$devid}; 1353 1354 $dev && $dev->should_read_from; 1355 } @list; 1356 1357 # Take first remaining device from list 1358 my $devid = $list[0]; 1359 1360 my $classid = $fid->classid; 1361 my $newfid = eval { 1362 Mgd::get_store()->register_tempfile( 1363 fid => undef, # undef => let the store pick a fid 1364 dmid => $dmid, 1365 key => $key, # This tempfile will ultimately become this key 1366 classid => $classid, 1367 devids => $devid, 1368 ); 1369 }; 1370 unless ($newfid) { 1371 my $errc = error_code($@); 1372 return $self->err_line("fid_in_use") if $errc eq "dup"; 1373 warn "Error registering tempfile: $@\n"; 1374 return $self->err_line("db"); 1375 } 1376 unless (Mgd::get_store()->remove_fidid_from_devid($fid->id, $devid)) { 1377 warn "Error removing fidid from devid"; 1378 return $self->err_line("db"); 1379 } 1380 unless (Mgd::get_store()->add_fidid_to_devid($newfid, $devid)) { 1381 warn "Error removing fidid from devid"; 1382 return $self->err_line("db"); 1383 } 1384 1385 my @paths = map { 1386 my $dfid = MogileFS::DevFID->new($devid, $_); 1387 my $path = $dfid->get_url; 1388 } ($fid, $newfid); 1389 my $ret; 1390 $ret->{oldpath} = $paths[0]; 1391 $ret->{newpath} = $paths[1]; 1392 $ret->{fid} = $newfid; 1393 $ret->{devid} = $devid; 1394 $ret->{class} = $classid; 1395 return $self->ok_line($ret); 1396} 1397 1398sub cmd_set_weight { 1399 my MogileFS::Worker::Query $self = shift; 1400 my $args = shift; 1401 1402 # figure out what they want to do 1403 my ($hostname, $devid, $weight) = ($args->{host}, $args->{device}+0, $args->{weight}+0); 1404 return $self->err_line('bad_params') 1405 unless $hostname && $devid && $weight >= 0; 1406 1407 my $dev = Mgd::device_factory()->get_by_id($devid); 1408 return $self->err_line('no_device') unless $dev; 1409 return $self->err_line('host_mismatch') 1410 unless $dev->host->hostname eq $hostname; 1411 1412 Mgd::get_store()->set_device_weight($dev->id, $weight); 1413 1414 return $self->cmd_clear_cache; 1415} 1416 1417sub cmd_set_state { 1418 my MogileFS::Worker::Query $self = shift; 1419 my $args = shift; 1420 1421 # figure out what they want to do 1422 my ($hostname, $devid, $state) = ($args->{host}, $args->{device}+0, $args->{state}); 1423 1424 my $dstate = device_state($state); 1425 return $self->err_line('bad_params') 1426 unless $hostname && $devid && $dstate; 1427 1428 my $dev = Mgd::device_factory()->get_by_id($devid); 1429 return $self->err_line('no_device') unless $dev; 1430 return $self->err_line('host_mismatch') 1431 unless $dev->host->hostname eq $hostname; 1432 1433 # make sure the destination state isn't too high 1434 return $self->err_line('state_too_high') 1435 unless $dev->can_change_to_state($state); 1436 1437 Mgd::get_store()->set_device_state($dev->id, $state); 1438 return $self->cmd_clear_cache; 1439} 1440 1441sub cmd_noop { 1442 my MogileFS::Worker::Query $self = shift; 1443 my $args = shift; 1444 return $self->ok_line; 1445} 1446 1447sub cmd_replicate_now { 1448 my MogileFS::Worker::Query $self = shift; 1449 1450 my $rv = Mgd::get_store()->replicate_now; 1451 return $self->ok_line({ count => int($rv) }); 1452} 1453 1454sub cmd_set_server_setting { 1455 my MogileFS::Worker::Query $self = shift; 1456 my $args = shift; 1457 my $key = $args->{key} or 1458 return $self->err_line("bad_params"); 1459 my $val = $args->{value}; 1460 1461 my $chk = MogileFS::Config->server_setting_is_writable($key) or 1462 return $self->err_line("not_writable"); 1463 1464 my $cleanval = eval { $chk->($val); }; 1465 return $self->err_line("invalid_format", $@) if $@; 1466 1467 MogileFS::Config->set_server_setting($key, $cleanval); 1468 1469 # GROSS HACK: slave settings are managed directly by MogileFS::Client, but 1470 # I need to add a version key, so we check and inject that code here. 1471 # FIXME: Move this when slave keys are managed by query worker commands! 1472 if ($key =~ /^slave_/) { 1473 Mgd::get_store()->incr_server_setting('slave_version', 1); 1474 } 1475 1476 return $self->ok_line; 1477} 1478 1479sub cmd_server_setting { 1480 my MogileFS::Worker::Query $self = shift; 1481 my $args = shift; 1482 my $key = $args->{key}; 1483 return $self->err_line("bad_params") unless $key; 1484 my $value = MogileFS::Config->server_setting($key); 1485 return $self->ok_line({key => $key, value => $value}); 1486} 1487 1488sub cmd_server_settings { 1489 my MogileFS::Worker::Query $self = shift; 1490 my $ss = Mgd::get_store()->server_settings; 1491 my $ret = {}; 1492 my $n = 0; 1493 while (my ($k, $v) = each %$ss) { 1494 next unless MogileFS::Config->server_setting_is_readable($k); 1495 $ret->{"key_count"} = ++$n; 1496 $ret->{"key_$n"} = $k; 1497 $ret->{"value_$n"} = $v; 1498 } 1499 return $self->ok_line($ret); 1500} 1501 1502sub cmd_do_monitor_round { 1503 my MogileFS::Worker::Query $self = shift; 1504 my $args = shift; 1505 $self->forget_that_monitor_has_run; 1506 $self->wait_for_monitor; 1507 return $self->ok_line; 1508} 1509 1510sub cmd_fsck_start { 1511 my MogileFS::Worker::Query $self = shift; 1512 my $sto = Mgd::get_store(); 1513 1514 my $fsck_host = MogileFS::Config->server_setting("fsck_host"); 1515 my $rebal_host = MogileFS::Config->server_setting("rebal_host"); 1516 1517 return $self->err_line("fsck_running", "fsck is already running") if $fsck_host; 1518 return $self->err_line("rebal_running", "rebalance running; cannot run fsck at same time") if $rebal_host; 1519 1520 # reset position, if a previous fsck was already completed. 1521 my $intss = sub { MogileFS::Config->server_setting($_[0]) || 0 }; 1522 my $checked_fid = $intss->("fsck_highest_fid_checked"); 1523 my $final_fid = $intss->("fsck_fid_at_end"); 1524 if (($checked_fid && $final_fid && $checked_fid >= $final_fid) || 1525 (!$final_fid && !$checked_fid)) { 1526 $self->_do_fsck_reset or return $self->err_line("db"); 1527 } 1528 1529 # set params for stats: 1530 $sto->set_server_setting("fsck_start_time", $sto->get_db_unixtime); 1531 $sto->set_server_setting("fsck_stop_time", undef); 1532 $sto->set_server_setting("fsck_fids_checked", 0); 1533 my $start_fid = 1534 MogileFS::Config->server_setting('fsck_highest_fid_checked') || 0; 1535 $sto->set_server_setting("fsck_start_fid", $start_fid); 1536 1537 # and start it: 1538 $sto->set_server_setting("fsck_host", MogileFS::Config->hostname); 1539 MogileFS::ProcManager->wake_a("fsck"); 1540 1541 return $self->ok_line; 1542} 1543 1544sub cmd_fsck_stop { 1545 my MogileFS::Worker::Query $self = shift; 1546 my $sto = Mgd::get_store(); 1547 $sto->set_server_setting("fsck_host", undef); 1548 $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime); 1549 return $self->ok_line; 1550} 1551 1552sub cmd_fsck_reset { 1553 my MogileFS::Worker::Query $self = shift; 1554 my $args = shift; 1555 1556 my $sto = Mgd::get_store(); 1557 $sto->set_server_setting("fsck_opt_policy_only", 1558 ($args->{policy_only} ? "1" : undef)); 1559 $sto->set_server_setting("fsck_highest_fid_checked", 1560 ($args->{startpos} ? $args->{startpos} : "0")); 1561 1562 $self->_do_fsck_reset or return $self->err_line("db"); 1563 return $self->ok_line; 1564} 1565 1566sub _do_fsck_reset { 1567 my MogileFS::Worker::Query $self = shift; 1568 eval { 1569 my $sto = Mgd::get_store(); 1570 $sto->set_server_setting("fsck_start_time", undef); 1571 $sto->set_server_setting("fsck_stop_time", undef); 1572 $sto->set_server_setting("fsck_fids_checked", 0); 1573 $sto->set_server_setting("fsck_fid_at_end", $sto->max_fidid); 1574 1575 # clear existing event counts summaries. 1576 my $ss = $sto->server_settings; 1577 foreach my $k (keys %$ss) { 1578 next unless $k =~ /^fsck_sum_evcount_/; 1579 $sto->set_server_setting($k, undef); 1580 } 1581 my $logid = $sto->max_fsck_logid; 1582 $sto->set_server_setting("fsck_start_maxlogid", $logid); 1583 $sto->set_server_setting("fsck_logid_processed", $logid); 1584 }; 1585 if ($@) { 1586 error("DB error in _do_fsck_reset: $@"); 1587 return 0; 1588 } 1589 return 1; 1590} 1591 1592sub cmd_fsck_clearlog { 1593 my MogileFS::Worker::Query $self = shift; 1594 my $sto = Mgd::get_store(); 1595 $sto->clear_fsck_log; 1596 return $self->ok_line; 1597} 1598 1599sub cmd_fsck_getlog { 1600 my MogileFS::Worker::Query $self = shift; 1601 my $args = shift; 1602 1603 my $sto = Mgd::get_store(); 1604 my @rows = $sto->fsck_log_rows($args->{after_logid}, 100); 1605 my $ret; 1606 my $n = 0; 1607 foreach my $row (@rows) { 1608 $n++; 1609 foreach my $k (keys %$row) { 1610 $ret->{"row_${n}_$k"} = $row->{$k} if defined $row->{$k}; 1611 } 1612 } 1613 $ret->{row_count} = $n; 1614 return $self->ok_line($ret); 1615} 1616 1617sub cmd_fsck_status { 1618 my MogileFS::Worker::Query $self = shift; 1619 1620 my $sto = Mgd::get_store(); 1621 # Kick up the summary before we read the values 1622 $sto->fsck_log_summarize; 1623 my $fsck_host = MogileFS::Config->server_setting('fsck_host'); 1624 my $intss = sub { MogileFS::Config->server_setting($_[0]) || 0 }; 1625 my $ret = { 1626 running => ($fsck_host ? 1 : 0), 1627 host => $fsck_host, 1628 max_fid_checked => $intss->('fsck_highest_fid_checked'), 1629 policy_only => $intss->('fsck_opt_policy_only'), 1630 end_fid => $intss->('fsck_fid_at_end'), 1631 start_time => $intss->('fsck_start_time'), 1632 stop_time => $intss->('fsck_stop_time'), 1633 current_time => $sto->get_db_unixtime, 1634 max_logid => $sto->max_fsck_logid, 1635 }; 1636 1637 # throw some stats in. 1638 my $ss = $sto->server_settings; 1639 foreach my $k (keys %$ss) { 1640 next unless $k =~ /^fsck_sum_evcount_(.+)/; 1641 $ret->{"num_$1"} += $ss->{$k}; 1642 } 1643 1644 return $self->ok_line($ret); 1645} 1646 1647sub cmd_rebalance_status { 1648 my MogileFS::Worker::Query $self = shift; 1649 1650 my $sto = Mgd::get_store(); 1651 1652 my $rebal_state = MogileFS::Config->server_setting('rebal_state'); 1653 return $self->err_line('no_rebal_state') unless $rebal_state; 1654 return $self->ok_line({ state => $rebal_state }); 1655} 1656 1657sub cmd_rebalance_start { 1658 my MogileFS::Worker::Query $self = shift; 1659 1660 my $rebal_host = MogileFS::Config->server_setting("rebal_host"); 1661 my $fsck_host = MogileFS::Config->server_setting("fsck_host"); 1662 1663 return $self->err_line("rebal_running", "rebalance is already running") if $rebal_host; 1664 return $self->err_line("fsck_running", "fsck running; cannot run rebalance at same time") if $fsck_host; 1665 1666 my $rebal_state = MogileFS::Config->server_setting('rebal_state'); 1667 unless ($rebal_state) { 1668 my $rebal_pol = MogileFS::Config->server_setting('rebal_policy'); 1669 return $self->err_line('no_rebal_policy') unless $rebal_pol; 1670 1671 my $rebal = MogileFS::Rebalance->new; 1672 $rebal->policy($rebal_pol); 1673 my @devs = Mgd::device_factory()->get_all; 1674 $rebal->init(\@devs); 1675 my $sdevs = $rebal->source_devices; 1676 1677 $rebal_state = $rebal->save_state; 1678 MogileFS::Config->set_server_setting('rebal_state', $rebal_state); 1679 } 1680 # TODO: register start time somewhere. 1681 MogileFS::Config->set_server_setting('rebal_host', MogileFS::Config->hostname); 1682 return $self->ok_line({ state => $rebal_state }); 1683} 1684 1685sub cmd_rebalance_test { 1686 my MogileFS::Worker::Query $self = shift; 1687 my $rebal_pol = MogileFS::Config->server_setting('rebal_policy'); 1688 my $rebal_state = MogileFS::Config->server_setting('rebal_state'); 1689 return $self->err_line('no_rebal_policy') unless $rebal_pol; 1690 1691 my $rebal = MogileFS::Rebalance->new; 1692 my @devs = Mgd::device_factory()->get_all; 1693 $rebal->policy($rebal_pol); 1694 $rebal->init(\@devs); 1695 1696 # client should display list of source, destination devices. 1697 # FIXME: can probably avoid calling this twice by pulling state? 1698 # *or* not running init. 1699 my $sdevs = $rebal->filter_source_devices(\@devs); 1700 my $ddevs = $rebal->filter_dest_devices(\@devs); 1701 my $ret = {}; 1702 $ret->{sdevs} = join(',', @$sdevs); 1703 $ret->{ddevs} = join(',', @$ddevs); 1704 1705 return $self->ok_line($ret); 1706} 1707 1708sub cmd_rebalance_reset { 1709 my MogileFS::Worker::Query $self = shift; 1710 my $host = MogileFS::Config->server_setting('rebal_host'); 1711 if ($host) { 1712 return $self->err_line("rebal_running", "rebalance is running") if $host; 1713 } 1714 MogileFS::Config->set_server_setting('rebal_state', undef); 1715 return $self->ok_line; 1716} 1717 1718sub cmd_rebalance_stop { 1719 my MogileFS::Worker::Query $self = shift; 1720 my $host = MogileFS::Config->server_setting('rebal_host'); 1721 unless ($host) { 1722 return $self->err_line('rebal_not_started'); 1723 } 1724 MogileFS::Config->set_server_setting('rebal_signal', 'stop'); 1725 return $self->ok_line; 1726} 1727 1728sub cmd_rebalance_set_policy { 1729 my MogileFS::Worker::Query $self = shift; 1730 my $args = shift; 1731 1732 my $rebal_host = MogileFS::Config->server_setting("rebal_host"); 1733 return $self->err_line("no_set_rebal", "cannot change rebalance policy while rebalance is running") if $rebal_host; 1734 1735 # load policy object, test policy, set policy. 1736 my $rebal = MogileFS::Rebalance->new; 1737 eval { 1738 $rebal->policy($args->{policy}); 1739 }; 1740 if ($@) { 1741 return $self->err_line("bad_rebal_pol", $@); 1742 } 1743 1744 MogileFS::Config->set_server_setting('rebal_policy', $args->{policy}); 1745 MogileFS::Config->set_server_setting('rebal_state', undef); 1746 return $self->ok_line; 1747} 1748 1749sub ok_line { 1750 my MogileFS::Worker::Query $self = shift; 1751 1752 my $delay = ''; 1753 if ($self->{querystarttime}) { 1754 $delay = sprintf("%.4f ", Time::HiRes::tv_interval( $self->{querystarttime} )); 1755 $self->{querystarttime} = undef; 1756 } 1757 1758 my $id = defined $self->{reqid} ? "$self->{reqid} " : ''; 1759 1760 my $args = shift || {}; 1761 $args->{callid} = $self->{callid} if defined $self->{callid}; 1762 my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args); 1763 $self->send_to_parent("${id}${delay}OK $argline"); 1764 return 1; 1765} 1766 1767# first argument: error code. 1768# second argument: optional error text. text will be taken from code if no text provided. 1769sub err_line { 1770 my MogileFS::Worker::Query $self = shift; 1771 1772 my $err_code = shift; 1773 my $err_text = shift || { 1774 'dup' => "Duplicate name/number used.", 1775 'after_mismatch' => "Pattern does not match the after-value?", 1776 'bad_params' => "Invalid parameters to command; please see documentation", 1777 'class_exists' => "That class already exists in that domain", 1778 'class_has_files' => "Class still has files, unable to delete", 1779 'class_not_found' => "Class not found", 1780 'db' => "Database error", 1781 'domain_has_files' => "Domain still has files, unable to delete", 1782 'domain_exists' => "That domain already exists", 1783 'domain_not_empty' => "Domain still has classes, unable to delete", 1784 'domain_not_found' => "Domain not found", 1785 'failure' => "Operation failed", 1786 'host_exists' => "That host already exists", 1787 'host_mismatch' => "The device specified doesn't belong to the host specified", 1788 'host_not_empty' => "Unable to delete host; it contains devices still", 1789 'host_not_found' => "Host not found", 1790 'invalid_checker_level' => "Checker level invalid. Please see documentation on this command.", 1791 'invalid_mindevcount' => "The mindevcount must be at least 1", 1792 'key_exists' => "Target key name already exists; can't overwrite.", 1793 'no_class' => "No class provided", 1794 'no_devices' => "No devices found to store file", 1795 'no_device' => "Device not found", 1796 'no_domain' => "No domain provided", 1797 'no_host' => "No host provided", 1798 'no_ip' => "IP required to create host", 1799 'no_port' => "Port required to create host", 1800 'no_temp_file' => "No tempfile or file already closed", 1801 'none_match' => "No keys match that pattern and after-value (if any).", 1802 'plugin_aborted' => "Action aborted by plugin", 1803 'state_too_high' => "Status cannot go from dead to alive; must use down", 1804 'unknown_command' => "Unknown server command", 1805 'unknown_host' => "Host not found", 1806 'unknown_state' => "Invalid/unknown state", 1807 'unreg_domain' => "Domain name invalid/not found", 1808 'rebal_not_started' => "Rebalance not running", 1809 'no_rebal_state' => "No available rebalance status", 1810 'no_rebal_policy' => "No rebalance policy available", 1811 'nodel_default_class' => "Cannot delete the default class", 1812 }->{$err_code} || $err_code; 1813 1814 my $delay = ''; 1815 if ($self->{querystarttime}) { 1816 $delay = sprintf("%.4f ", Time::HiRes::tv_interval($self->{querystarttime})); 1817 $self->{querystarttime} = undef; 1818 } else { 1819 # don't send another ERR line if we already sent one 1820 error("err_line called redundantly with $err_code ( " . eurl($err_text) . ")"); 1821 return 0; 1822 } 1823 1824 my $id = defined $self->{reqid} ? "$self->{reqid} " : ''; 1825 my $callid = defined $self->{callid} ? ' ' . eurl($self->{callid}) : ''; 1826 1827 $self->send_to_parent("${id}${delay}ERR $err_code " . eurl($err_text) . $callid); 1828 return 0; 1829} 1830 18311; 1832 1833# Local Variables: 1834# mode: perl 1835# c-basic-indent: 4 1836# indent-tabs-mode: nil 1837# End: 1838 1839__END__ 1840 1841=head1 NAME 1842 1843MogileFS::Worker::Query -- implements the MogileFS client protocol 1844 1845=head1 SEE ALSO 1846 1847L<MogileFS::Worker> 1848 1849 1850