1package Gearman::Server::Client; 2 3=head1 NAME 4 5Gearman::Server::Client 6 7=head1 NAME 8 9Used by L<Gearman::Server> to instantiate connections from clients. 10Clients speak either a binary protocol, for normal operation (calling 11functions, grabbing function call requests, returning function values, 12etc), or a text-based line protocol, for relatively rare 13administrative / monitoring commands. 14 15The binary protocol commands aren't currently documented. (FIXME) But 16they're well-implemented in L<Gearman::Client>, L<Gearman::Worker>, 17and L<Gearman::Client::Async>, if that's any consolation. 18 19The line-based administrative commands are documented below. 20 21=cut 22 23use strict; 24use Danga::Socket; 25use base 'Danga::Socket'; 26use fields ( 27 'can_do', # { $job_name => $timeout } $timeout can be undef indicating no timeout 28 'can_do_list', 29 'can_do_iter', 30 'fast_read', 31 'fast_buffer', 32 'read_buf', 33 'sleeping', # 0/1: they've said they're sleeping and we haven't woken them up 34 'timer', # Timer for job cancellation 35 'doing', # { $job_handle => Job } 36 'client_id', # opaque string, no whitespace. workers give this so checker scripts 37 # can tell apart the same worker connected to multiple jobservers. 38 'server', # pointer up to client's server 39 'options', 40 'jobs_done_since_sleep', 41 ); 42 43# 60k read buffer default, similar to perlbal's backend read. 44use constant READ_SIZE => 60 * 1024; 45use constant MAX_READ_SIZE => 512 * 1024; 46 47# Class Method: 48sub new { 49 my Gearman::Server::Client $self = shift; 50 my ($sock, $server) = @_; 51 $self = fields::new($self) unless ref $self; 52 $self->SUPER::new($sock); 53 54 $self->{fast_read} = undef; # Number of bytes to read as fast as we can (don't try to process them) 55 $self->{fast_buffer} = []; # Array of buffers used during fast read operation 56 $self->{read_buf} = ''; 57 $self->{sleeping} = 0; 58 $self->{can_do} = {}; 59 $self->{doing} = {}; # handle -> Job 60 $self->{can_do_list} = []; 61 $self->{can_do_iter} = 0; # numeric iterator for where we start looking for jobs 62 $self->{client_id} = "-"; 63 $self->{server} = $server; 64 $self->{options} = {}; 65 $self->{jobs_done_since_sleep} = 0; 66 67 return $self; 68} 69 70sub option { 71 my Gearman::Server::Client $self = shift; 72 my $option = shift; 73 74 return $self->{options}->{$option}; 75} 76 77sub close { 78 my Gearman::Server::Client $self = shift; 79 80 my $doing = $self->{doing}; 81 82 while (my ($handle, $job) = each %$doing) { 83 my $msg = Gearman::Util::pack_res_command("work_fail", $handle); 84 $job->relay_to_listeners($msg); 85 $job->note_finished(0); 86 } 87 88 # Clear the doing list, since it may contain a set of jobs which contain 89 # references back to us. 90 %$doing = (); 91 92 # Remove self from sleepers, otherwise it will be leaked if another worker 93 # for the job never connects. 94 my $sleepers = $self->{server}{sleepers}; 95 for my $job (@{ $self->{can_do_list} }) { 96 my $sleeping = $sleepers->{$job}; 97 delete $sleeping->{$self}; 98 delete $sleepers->{$job} unless %$sleeping; 99 } 100 101 $self->{server}->note_disconnected_client($self); 102 103 $self->CMD_reset_abilities; 104 105 $self->SUPER::close; 106} 107 108# Client 109sub event_read { 110 my Gearman::Server::Client $self = shift; 111 112 my $read_size = $self->{fast_read} || READ_SIZE; 113 my $bref = $self->read($read_size); 114 115 # Delay close till after buffers are written on EOF. If we are unable 116 # to write 'err' or 'hup' will be thrown and we'll close faster. 117 return $self->write(sub { $self->close } ) unless defined $bref; 118 119 if ($self->{fast_read}) { 120 push @{$self->{fast_buffer}}, $$bref; 121 $self->{fast_read} -= length($$bref); 122 123 # If fast_read is still positive, then we need to read more data 124 return if ($self->{fast_read} > 0); 125 126 # Append the whole giant read buffer to our main read buffer 127 $self->{read_buf} .= join('', @{$self->{fast_buffer}}); 128 129 # Reset the fast read state for next time. 130 $self->{fast_buffer} = []; 131 $self->{fast_read} = undef; 132 } else { 133 # Exact read size length likely means we have more sitting on the 134 # socket. Buffer up to half a meg in one go. 135 if (length($$bref) == READ_SIZE) { 136 my $limit = int(MAX_READ_SIZE / READ_SIZE); 137 my @crefs = ($$bref); 138 while (my $cref = $self->read(READ_SIZE)) { 139 push(@crefs, $$cref); 140 last if (length($$cref) < READ_SIZE || $limit-- < 1); 141 } 142 $bref = \join('', @crefs); 143 } 144 $self->{read_buf} .= $$bref; 145 } 146 147 my $found_cmd; 148 do { 149 $found_cmd = 1; 150 my $blen = length($self->{read_buf}); 151 152 if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) { 153 my ($cmd, $len) = unpack("NN", $1); 154 if ($blen < $len + 12) { 155 # Start a fast read loop to get all the data we need, less 156 # what we already have in the buffer. 157 $self->{fast_read} = $len + 12 - $blen; 158 return; 159 } 160 161 $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len)); 162 163 # and slide down buf: 164 $self->{read_buf} = substr($self->{read_buf}, 12+$len); 165 166 } elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) { 167 # ASCII command case (useful for telnetting in) 168 my $line = $1; 169 $self->process_line($line); 170 } else { 171 $found_cmd = 0; 172 } 173 } while ($found_cmd); 174} 175 176sub event_write { 177 my $self = shift; 178 my $done = $self->write(undef); 179 $self->watch_write(0) if $done; 180} 181 182# Line based command processor 183sub process_line { 184 my Gearman::Server::Client $self = shift; 185 my $line = shift; 186 187 if ($line && $line =~ /^(\w+)\s*(.*)/) { 188 my ($cmd, $args) = ($1, $2); 189 $cmd = lc($cmd); 190 my $code = $self->can("TXTCMD_$cmd"); 191 if ($code) { 192 $code->($self, $args); 193 return; 194 } 195 } 196 197 return $self->err_line('unknown_command'); 198} 199 200=head1 Binary Protocol Structure 201 202All binary protocol exchanges between clients (which can be callers, 203workers, or both) and the Gearman server have common packet header: 204 205 4 byte magic -- either "\0REQ" for requests to the server, or 206 "\0RES" for responses from the server 207 4 byte type -- network order integer, representing the packet type 208 4 byte length -- network order length, for data segment. 209 data -- optional, if length is non-zero 210 211=head1 Binary Protocol Commands 212 213=head2 echo_req (type=16) 214 215A debug command. The server will reply with the same data, in a echo_res (type=17) packet. 216 217=head2 (and many more...) 218 219FIXME: auto-generate protocol docs from internal Gearman::Util table, 220once annotated with some English? 221 222=cut 223 224sub CMD_echo_req { 225 my Gearman::Server::Client $self = shift; 226 my $blobref = shift; 227 228 return $self->res_packet("echo_res", $$blobref); 229} 230 231sub CMD_work_status { 232 my Gearman::Server::Client $self = shift; 233 my $ar = shift; 234 my ($handle, $nu, $de) = split(/\0/, $$ar); 235 236 my $job = $self->{doing}{$handle}; 237 return $self->error_packet("not_worker") unless $job && $job->worker == $self; 238 239 my $msg = Gearman::Util::pack_res_command("work_status", $$ar); 240 $job->relay_to_listeners($msg); 241 $job->status([$nu, $de]); 242 return 1; 243} 244 245sub CMD_work_complete { 246 my Gearman::Server::Client $self = shift; 247 my $ar = shift; 248 249 $$ar =~ s/^(.+?)\0//; 250 my $handle = $1; 251 252 my $job = delete $self->{doing}{$handle}; 253 return $self->error_packet("not_worker") unless $job && $job->worker == $self; 254 255 my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar)); 256 $job->relay_to_listeners($msg); 257 $job->note_finished(1); 258 if (my $timer = $self->{timer}) { 259 $timer->cancel; 260 $self->{timer} = undef; 261 } 262 263 return 1; 264} 265 266sub CMD_work_fail { 267 my Gearman::Server::Client $self = shift; 268 my $ar = shift; 269 my $handle = $$ar; 270 my $job = delete $self->{doing}{$handle}; 271 return $self->error_packet("not_worker") unless $job && $job->worker == $self; 272 273 my $msg = Gearman::Util::pack_res_command("work_fail", $handle); 274 $job->relay_to_listeners($msg); 275 $job->note_finished(1); 276 if (my $timer = $self->{timer}) { 277 $timer->cancel; 278 $self->{timer} = undef; 279 } 280 281 return 1; 282} 283 284sub CMD_work_exception { 285 my Gearman::Server::Client $self = shift; 286 my $ar = shift; 287 288 $$ar =~ s/^(.+?)\0//; 289 my $handle = $1; 290 my $job = $self->{doing}{$handle}; 291 292 return $self->error_packet("not_worker") unless $job && $job->worker == $self; 293 294 my $msg = Gearman::Util::pack_res_command("work_exception", join("\0", $handle, $$ar)); 295 $job->relay_to_option_listeners($msg, "exceptions"); 296 297 return 1; 298} 299 300sub CMD_pre_sleep { 301 my Gearman::Server::Client $self = shift; 302 $self->{'sleeping'} = 1; 303 $self->{server}->on_client_sleep($self); 304 return 1; 305} 306 307sub CMD_grab_job { 308 my Gearman::Server::Client $self = shift; 309 310 my $job; 311 my $can_do_size = scalar @{$self->{can_do_list}}; 312 313 unless ($can_do_size) { 314 $self->res_packet("no_job"); 315 return; 316 } 317 318 # the offset where we start asking for jobs, to prevent starvation 319 # of some job types. 320 $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size; 321 322 my $tried = 0; 323 while ($tried < $can_do_size) { 324 my $idx = ($tried + $self->{can_do_iter}) % $can_do_size; 325 $tried++; 326 my $job_to_grab = $self->{can_do_list}->[$idx]; 327 $job = $self->{server}->grab_job($job_to_grab) 328 or next; 329 330 $job->worker($self); 331 $self->{doing}{$job->handle} = $job; 332 333 my $timeout = $self->{can_do}->{$job_to_grab}; 334 if (defined $timeout) { 335 my $timer = Danga::Socket->AddTimer($timeout, sub { 336 return $self->error_packet("not_worker") unless $job->worker == $self; 337 338 my $msg = Gearman::Util::pack_res_command("work_fail", $job->handle); 339 $job->relay_to_listeners($msg); 340 $job->note_finished(1); 341 $job->clear_listeners; 342 $self->{timer} = undef; 343 }); 344 $self->{timer} = $timer; 345 } 346 return $self->res_packet("job_assign", 347 join("\0", 348 $job->handle, 349 $job->func, 350 ${$job->argref}, 351 )); 352 } 353 354 $self->res_packet("no_job"); 355} 356 357sub CMD_can_do { 358 my Gearman::Server::Client $self = shift; 359 my $ar = shift; 360 361 $self->{can_do}->{$$ar} = undef; 362 $self->_setup_can_do_list; 363} 364 365sub CMD_can_do_timeout { 366 my Gearman::Server::Client $self = shift; 367 my $ar = shift; 368 369 my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/; 370 371 if (defined $timeout) { 372 $self->{can_do}->{$task} = $timeout; 373 } else { 374 $self->{can_do}->{$task} = undef; 375 } 376 377 $self->_setup_can_do_list; 378} 379 380sub CMD_option_req { 381 my Gearman::Server::Client $self = shift; 382 my $ar = shift; 383 384 my $success = sub { 385 return $self->res_packet("option_res", $$ar); 386 }; 387 388 if ($$ar eq 'exceptions') { 389 $self->{options}->{exceptions} = 1; 390 return $success->(); 391 } 392 393 return $self->error_packet("unknown_option"); 394} 395 396sub CMD_set_client_id { 397 my Gearman::Server::Client $self = shift; 398 my $ar = shift; 399 400 $self->{client_id} = $$ar; 401 $self->{client_id} =~ s/\s+//g; 402 $self->{client_id} = "-" unless length $self->{client_id}; 403} 404 405sub CMD_cant_do { 406 my Gearman::Server::Client $self = shift; 407 my $ar = shift; 408 409 delete $self->{can_do}->{$$ar}; 410 $self->_setup_can_do_list; 411} 412 413sub CMD_get_status { 414 my Gearman::Server::Client $self = shift; 415 my $ar = shift; 416 my $job = $self->{server}->job_by_handle($$ar); 417 418 # handles can't contain nulls 419 return if $$ar =~ /\0/; 420 421 my ($known, $running, $num, $den); 422 $known = 0; 423 $running = 0; 424 if ($job) { 425 $known = 1; 426 $running = $job->worker ? 1 : 0; 427 if (my $stat = $job->status) { 428 ($num, $den) = @$stat; 429 } 430 } 431 432 $num = '' unless defined $num; 433 $den = '' unless defined $den; 434 435 $self->res_packet("status_res", join("\0", 436 $$ar, 437 $known, 438 $running, 439 $num, 440 $den)); 441} 442 443sub CMD_reset_abilities { 444 my Gearman::Server::Client $self = shift; 445 446 $self->{can_do} = {}; 447 $self->_setup_can_do_list; 448} 449 450sub _setup_can_do_list { 451 my Gearman::Server::Client $self = shift; 452 $self->{can_do_list} = [ keys %{$self->{can_do}} ]; 453 $self->{can_do_iter} = 0; 454} 455 456sub CMD_submit_job { push @_, 1; &_cmd_submit_job; } 457sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; } 458sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; } 459 460sub _cmd_submit_job { 461 my Gearman::Server::Client $self = shift; 462 my $ar = shift; 463 my $subscribe = shift; 464 my $high_pri = shift; 465 466 return $self->error_packet("invalid_args", "No func/uniq header [$$ar].") 467 unless $$ar =~ s/^(.+?)\0(.*?)\0//; 468 469 my ($func, $uniq) = ($1, $2); 470 471 my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar, $high_pri); 472 473 if ($subscribe) { 474 $job->add_listener($self); 475 } else { 476 # background mode 477 $job->require_listener(0); 478 } 479 480 $self->res_packet("job_created", $job->handle); 481 $self->{server}->wake_up_sleepers($func); 482} 483 484sub res_packet { 485 my Gearman::Server::Client $self = shift; 486 my ($code, $arg) = @_; 487 $self->write(Gearman::Util::pack_res_command($code, $arg)); 488 return 1; 489} 490 491sub error_packet { 492 my Gearman::Server::Client $self = shift; 493 my ($code, $msg) = @_; 494 $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg")); 495 return 0; 496} 497 498sub process_cmd { 499 my Gearman::Server::Client $self = shift; 500 my $cmd = shift; 501 my $blob = shift; 502 503 my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd); 504 my $ret = eval { 505 $self->$cmd_name(\$blob); 506 }; 507 return $ret unless $@; 508 warn "Error: $@\n"; 509 return $self->error_packet("server_error", $@); 510} 511 512sub event_err { my $self = shift; $self->close; } 513sub event_hup { my $self = shift; $self->close; } 514 515############################################################################ 516 517=head1 Line based commands 518 519These commands are used for administrative or statistic tasks to be done on the gearman server. They can be entered using a line based client (telnet, etc.) by connecting to the listening port (7003) and are also intended to be machine parsable. 520 521=head2 "workers" 522 523Emits list of registered workers, their fds, IPs, client ids, and list of registered abilities (function names they can do). Of format: 524 525 fd ip.x.y.z client_id : func_a func_b func_c 526 fd ip.x.y.z client_id : func_a func_b func_c 527 fd ip.x.y.z client_id : func_a func_b func_c 528 . 529 530It ends with a line with just a period. 531 532=cut 533 534sub TXTCMD_workers { 535 my Gearman::Server::Client $self = shift; 536 537 foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) { 538 my $fd = $cl->{fd}; 539 $self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n"); 540 541 } 542 $self->write(".\n"); 543} 544 545=head2 "status" 546 547The output format of this function is tab separated columns as follows, followed by a line consisting of a fullstop and a newline (".\n") to indicate the end of output. 548 549=over 550 551=item Function name 552 553A string denoting the name of the function of the job 554 555=item Number in queue 556 557A positive integer indicating the total number of jobs for this function in the queue. This includes currently running ones as well (next column) 558 559=item Number of jobs running 560 561A positive integer showing how many jobs of this function are currently running 562 563=item Number of capable workers 564 565A positive integer denoting the maximum possible count of workers that could be doing this job. Though they may not all be working on it due to other tasks holding them busy. 566 567=back 568 569=cut 570 571sub TXTCMD_status { 572 my Gearman::Server::Client $self = shift; 573 574 my %funcs; # func -> 1 (set of all funcs to display) 575 576 # keep track of how many workers can do which functions 577 my %can; 578 foreach my $client ($self->{server}->clients) { 579 foreach my $func (@{$client->{can_do_list}}) { 580 $can{$func}++; 581 $funcs{$func} = 1; 582 } 583 } 584 585 my %queued_funcs; 586 my %running_funcs; 587 588 foreach my $job ($self->{server}->jobs) { 589 my $func = $job->func; 590 $queued_funcs{$func}++; 591 if ($job->worker) { 592 $running_funcs{$func}++; 593 } 594 } 595 596 # also include queued functions (even if there aren't workers) 597 # in our list of funcs to show. 598 $funcs{$_} = 1 foreach keys %queued_funcs; 599 600 foreach my $func (sort keys %funcs) { 601 my $queued = $queued_funcs{$func} || 0; 602 my $running = $running_funcs{$func} || 0; 603 my $can = $can{$func} || 0; 604 $self->write( "$func\t$queued\t$running\t$can\n" ); 605 } 606 607 $self->write( ".\n" ); 608} 609 610=head2 "jobs" 611 612Output format is zero or more lines of: 613 614 [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n 615 616Follows by a single line of: 617 618 .\n 619 620\t is a literal tab character 621\n is perl's definition of newline (literal \n on linux, something else on win32) 622 623=cut 624 625sub TXTCMD_jobs { 626 my Gearman::Server::Client $self = shift; 627 628 foreach my $job ($self->{server}->jobs) { 629 my $func = $job->func; 630 my $uniq = $job->uniq; 631 my $worker_addr = "-"; 632 633 if (my $worker = $job->worker) { 634 $worker_addr = $worker->peer_addr_string; 635 } 636 637 my $listeners = $job->listeners; 638 639 $self->write("$func\t$uniq\t$worker_addr\t$listeners\n"); 640 } 641 642 $self->write(".\n"); 643} 644 645=head2 "clients" 646 647Output format is zero or more sections of: 648 649=over 650 651One line of: 652 653 [Client Address]\n 654 655Followed by zero or more lines of: 656 657 \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n 658 659=back 660 661Follows by a single line of: 662 663 .\n 664 665\t is a literal tab character 666\n is perl's definition of newline (literal \n on linux, something else on win32) 667 668=cut 669 670sub TXTCMD_clients { 671 my Gearman::Server::Client $self = shift; 672 673 my %jobs_by_client; 674 675 foreach my $job ($self->{server}->jobs) { 676 foreach my $client ($job->listeners) { 677 my $ent = $jobs_by_client{$client} ||= []; 678 push @$ent, $job; 679 } 680 } 681 682 foreach my $client ($self->{server}->clients) { 683 my $client_addr = $client->peer_addr_string; 684 $self->write("$client_addr\n"); 685 my $jobs = $jobs_by_client{$client} || []; 686 687 foreach my $job (@$jobs) { 688 my $func = $job->func; 689 my $uniq = $job->uniq; 690 my $worker_addr = "-"; 691 692 if (my $worker = $job->worker) { 693 $worker_addr = $worker->peer_addr_string; 694 } 695 $self->write("\t$func\t$uniq\t$worker_addr\n"); 696 } 697 698 } 699 700 $self->write(".\n"); 701} 702 703sub TXTCMD_gladiator { 704 my Gearman::Server::Client $self = shift; 705 my $args = shift || ""; 706 my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;"; 707 if ($has_gladiator) { 708 my $all = Devel::Gladiator::walk_arena(); 709 my %ct; 710 foreach my $it (@$all) { 711 $ct{ref $it}++; 712 if (ref $it eq "CODE") { 713 my $name = Devel::Peek::CvGV($it); 714 $ct{$name}++ if $name =~ /ANON/; 715 } 716 } 717 $all = undef; # required to free memory 718 foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) { 719 next unless $ct{$n} > 1 || $args eq "all"; 720 $self->write(sprintf("%7d $n\n", $ct{$n})); 721 } 722 } 723 $self->write(".\n"); 724} 725 726=head2 "maxqueue" function [max_queue_size] 727 728For a given function of job, the maximum queue size is adjusted to be max_queue_size jobs long. A negative value indicates unlimited queue size. 729 730If the max_queue_size value is not supplied then it is unset (and the default maximum queue size will apply to this function). 731 732This function will return OK upon success, and will return ERR incomplete_args upon an invalid number of arguments. 733 734=cut 735 736sub TXTCMD_maxqueue { 737 my Gearman::Server::Client $self = shift; 738 my $args = shift; 739 my ($func, $max) = split /\s+/, $args; 740 741 unless (length $func) { 742 return $self->err_line('incomplete_args'); 743 } 744 745 $self->{server}->set_max_queue($func, $max); 746 $self->write("OK\n"); 747} 748 749=head2 "shutdown" ["graceful"] 750 751Close the server. Or "shutdown graceful" to close the listening socket, then close the server when traffic has died away. 752 753=cut 754 755sub TXTCMD_shutdown { 756 my Gearman::Server::Client $self = shift; 757 my $args = shift; 758 if ($args eq "graceful") { 759 $self->write("OK\n"); 760 Gearmand::shutdown_graceful(); 761 } elsif (! $args) { 762 $self->write("OK\n"); 763 exit 0; 764 } else { 765 $self->err_line('unknown_args'); 766 } 767} 768 769=head2 "version" 770 771Returns server version. 772 773=cut 774 775sub TXTCMD_version { 776 my Gearman::Server::Client $self = shift; 777 $self->write("$Gearman::Server::VERSION\n"); 778} 779 780sub err_line { 781 my Gearman::Server::Client $self = shift; 782 my $err_code = shift; 783 my $err_text = { 784 'unknown_command' => "Unknown server command", 785 'unknown_args' => "Unknown arguments to server command", 786 'incomplete_args' => "An incomplete set of arguments was sent to this command", 787 }->{$err_code}; 788 789 $self->write("ERR $err_code " . eurl($err_text) . "\r\n"); 790 return 0; 791} 792 793sub eurl { 794 my $a = $_[0]; 795 $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; 796 $a =~ tr/ /+/; 797 return $a; 798} 799 8001; 801