1package Gearman::Server::Client;
2
3=head1 NAME
4
5Gearman::Server::Client
6
7=head1 NAME
8
9Used by L<Gearman::Server> to instantiate connections from clients.
10Clients speak either a binary protocol, for normal operation (calling
11functions, grabbing function call requests, returning function values,
12etc), or a text-based line protocol, for relatively rare
13administrative / monitoring commands.
14
15The binary protocol commands aren't currently documented. (FIXME) But
16they're well-implemented in L<Gearman::Client>, L<Gearman::Worker>,
17and L<Gearman::Client::Async>, if that's any consolation.
18
19The line-based administrative commands are documented below.
20
21=cut
22
23use strict;
24use Danga::Socket;
25use base 'Danga::Socket';
26use fields (
27            'can_do',  # { $job_name => $timeout } $timeout can be undef indicating no timeout
28            'can_do_list',
29            'can_do_iter',
30            'fast_read',
31            'fast_buffer',
32            'read_buf',
33            'sleeping',   # 0/1:  they've said they're sleeping and we haven't woken them up
34            'timer', # Timer for job cancellation
35            'doing',  # { $job_handle => Job }
36            'client_id',  # opaque string, no whitespace.  workers give this so checker scripts
37                          # can tell apart the same worker connected to multiple jobservers.
38            'server',     # pointer up to client's server
39            'options',
40            'jobs_done_since_sleep',
41            );
42
43# 60k read buffer default, similar to perlbal's backend read.
44use constant READ_SIZE => 60 * 1024;
45use constant MAX_READ_SIZE => 512 * 1024;
46
47# Class Method:
48sub new {
49    my Gearman::Server::Client $self = shift;
50    my ($sock, $server) = @_;
51    $self = fields::new($self) unless ref $self;
52    $self->SUPER::new($sock);
53
54    $self->{fast_read}   = undef; # Number of bytes to read as fast as we can (don't try to process them)
55    $self->{fast_buffer} = []; # Array of buffers used during fast read operation
56    $self->{read_buf}    = '';
57    $self->{sleeping}    = 0;
58    $self->{can_do}      = {};
59    $self->{doing}       = {}; # handle -> Job
60    $self->{can_do_list} = [];
61    $self->{can_do_iter} = 0;  # numeric iterator for where we start looking for jobs
62    $self->{client_id}   = "-";
63    $self->{server}      = $server;
64    $self->{options}     = {};
65    $self->{jobs_done_since_sleep} = 0;
66
67    return $self;
68}
69
70sub option {
71    my Gearman::Server::Client $self = shift;
72    my $option = shift;
73
74    return $self->{options}->{$option};
75}
76
77sub close {
78    my Gearman::Server::Client $self = shift;
79
80    my $doing = $self->{doing};
81
82    while (my ($handle, $job) = each %$doing) {
83        my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
84        $job->relay_to_listeners($msg);
85        $job->note_finished(0);
86    }
87
88    # Clear the doing list, since it may contain a set of jobs which contain
89    # references back to us.
90    %$doing = ();
91
92    # Remove self from sleepers, otherwise it will be leaked if another worker
93    # for the job never connects.
94    my $sleepers = $self->{server}{sleepers};
95    for my $job (@{ $self->{can_do_list} }) {
96        my $sleeping = $sleepers->{$job};
97        delete $sleeping->{$self};
98        delete $sleepers->{$job} unless %$sleeping;
99    }
100
101    $self->{server}->note_disconnected_client($self);
102
103    $self->CMD_reset_abilities;
104
105    $self->SUPER::close;
106}
107
108# Client
109sub event_read {
110    my Gearman::Server::Client $self = shift;
111
112    my $read_size = $self->{fast_read} || READ_SIZE;
113    my $bref = $self->read($read_size);
114
115    # Delay close till after buffers are written on EOF. If we are unable
116    # to write 'err' or 'hup' will be thrown and we'll close faster.
117    return $self->write(sub { $self->close } ) unless defined $bref;
118
119    if ($self->{fast_read}) {
120        push @{$self->{fast_buffer}}, $$bref;
121        $self->{fast_read} -= length($$bref);
122
123        # If fast_read is still positive, then we need to read more data
124        return if ($self->{fast_read} > 0);
125
126        # Append the whole giant read buffer to our main read buffer
127        $self->{read_buf} .= join('', @{$self->{fast_buffer}});
128
129        # Reset the fast read state for next time.
130        $self->{fast_buffer} = [];
131        $self->{fast_read} = undef;
132    } else {
133        # Exact read size length likely means we have more sitting on the
134        # socket. Buffer up to half a meg in one go.
135        if (length($$bref) == READ_SIZE) {
136            my $limit = int(MAX_READ_SIZE / READ_SIZE);
137            my @crefs = ($$bref);
138            while (my $cref = $self->read(READ_SIZE)) {
139                push(@crefs, $$cref);
140                last if (length($$cref) < READ_SIZE || $limit-- < 1);
141            }
142            $bref = \join('', @crefs);
143        }
144        $self->{read_buf} .= $$bref;
145    }
146
147    my $found_cmd;
148    do {
149        $found_cmd = 1;
150        my $blen = length($self->{read_buf});
151
152        if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
153            my ($cmd, $len) = unpack("NN", $1);
154            if ($blen < $len + 12) {
155                # Start a fast read loop to get all the data we need, less
156                # what we already have in the buffer.
157                $self->{fast_read} = $len + 12 - $blen;
158                return;
159            }
160
161            $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len));
162
163            # and slide down buf:
164            $self->{read_buf} = substr($self->{read_buf}, 12+$len);
165
166        } elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) {
167            # ASCII command case (useful for telnetting in)
168            my $line = $1;
169            $self->process_line($line);
170        } else {
171            $found_cmd = 0;
172        }
173    } while ($found_cmd);
174}
175
176sub event_write {
177    my $self = shift;
178    my $done = $self->write(undef);
179    $self->watch_write(0) if $done;
180}
181
182# Line based command processor
183sub process_line {
184    my Gearman::Server::Client $self = shift;
185    my $line = shift;
186
187    if ($line && $line =~ /^(\w+)\s*(.*)/) {
188        my ($cmd, $args) = ($1, $2);
189        $cmd = lc($cmd);
190        my $code = $self->can("TXTCMD_$cmd");
191        if ($code) {
192            $code->($self, $args);
193            return;
194        }
195    }
196
197    return $self->err_line('unknown_command');
198}
199
200=head1 Binary Protocol Structure
201
202All binary protocol exchanges between clients (which can be callers,
203workers, or both) and the Gearman server have common packet header:
204
205  4 byte magic  -- either "\0REQ" for requests to the server, or
206                   "\0RES" for responses from the server
207  4 byte type   -- network order integer, representing the packet type
208  4 byte length -- network order length, for data segment.
209  data          -- optional, if length is non-zero
210
211=head1 Binary Protocol Commands
212
213=head2 echo_req (type=16)
214
215A debug command.  The server will reply with the same data, in a echo_res (type=17) packet.
216
217=head2 (and many more...)
218
219FIXME: auto-generate protocol docs from internal Gearman::Util table,
220once annotated with some English?
221
222=cut
223
224sub CMD_echo_req {
225    my Gearman::Server::Client $self = shift;
226    my $blobref = shift;
227
228    return $self->res_packet("echo_res", $$blobref);
229}
230
231sub CMD_work_status {
232    my Gearman::Server::Client $self = shift;
233    my $ar = shift;
234    my ($handle, $nu, $de) = split(/\0/, $$ar);
235
236    my $job = $self->{doing}{$handle};
237    return $self->error_packet("not_worker") unless $job && $job->worker == $self;
238
239    my $msg = Gearman::Util::pack_res_command("work_status", $$ar);
240    $job->relay_to_listeners($msg);
241    $job->status([$nu, $de]);
242    return 1;
243}
244
245sub CMD_work_complete {
246    my Gearman::Server::Client $self = shift;
247    my $ar = shift;
248
249    $$ar =~ s/^(.+?)\0//;
250    my $handle = $1;
251
252    my $job = delete $self->{doing}{$handle};
253    return $self->error_packet("not_worker") unless $job && $job->worker == $self;
254
255    my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar));
256    $job->relay_to_listeners($msg);
257    $job->note_finished(1);
258    if (my $timer = $self->{timer}) {
259        $timer->cancel;
260        $self->{timer} = undef;
261    }
262
263    return 1;
264}
265
266sub CMD_work_fail {
267    my Gearman::Server::Client $self = shift;
268    my $ar = shift;
269    my $handle = $$ar;
270    my $job = delete $self->{doing}{$handle};
271    return $self->error_packet("not_worker") unless $job && $job->worker == $self;
272
273    my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
274    $job->relay_to_listeners($msg);
275    $job->note_finished(1);
276    if (my $timer = $self->{timer}) {
277        $timer->cancel;
278        $self->{timer} = undef;
279    }
280
281    return 1;
282}
283
284sub CMD_work_exception {
285    my Gearman::Server::Client $self = shift;
286    my $ar = shift;
287
288    $$ar =~ s/^(.+?)\0//;
289    my $handle = $1;
290    my $job = $self->{doing}{$handle};
291
292    return $self->error_packet("not_worker") unless $job && $job->worker == $self;
293
294    my $msg = Gearman::Util::pack_res_command("work_exception", join("\0", $handle, $$ar));
295    $job->relay_to_option_listeners($msg, "exceptions");
296
297    return 1;
298}
299
300sub CMD_pre_sleep {
301    my Gearman::Server::Client $self = shift;
302    $self->{'sleeping'} = 1;
303    $self->{server}->on_client_sleep($self);
304    return 1;
305}
306
307sub CMD_grab_job {
308    my Gearman::Server::Client $self = shift;
309
310    my $job;
311    my $can_do_size = scalar @{$self->{can_do_list}};
312
313    unless ($can_do_size) {
314        $self->res_packet("no_job");
315        return;
316    }
317
318    # the offset where we start asking for jobs, to prevent starvation
319    # of some job types.
320    $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size;
321
322    my $tried = 0;
323    while ($tried < $can_do_size) {
324        my $idx = ($tried + $self->{can_do_iter}) % $can_do_size;
325        $tried++;
326        my $job_to_grab = $self->{can_do_list}->[$idx];
327        $job = $self->{server}->grab_job($job_to_grab)
328            or next;
329
330        $job->worker($self);
331        $self->{doing}{$job->handle} = $job;
332
333        my $timeout = $self->{can_do}->{$job_to_grab};
334        if (defined $timeout) {
335            my $timer = Danga::Socket->AddTimer($timeout, sub {
336                return $self->error_packet("not_worker") unless $job->worker == $self;
337
338                my $msg = Gearman::Util::pack_res_command("work_fail", $job->handle);
339                $job->relay_to_listeners($msg);
340                $job->note_finished(1);
341                $job->clear_listeners;
342                $self->{timer} = undef;
343            });
344            $self->{timer} = $timer;
345        }
346        return $self->res_packet("job_assign",
347                                 join("\0",
348                                      $job->handle,
349                                      $job->func,
350                                      ${$job->argref},
351                                      ));
352    }
353
354    $self->res_packet("no_job");
355}
356
357sub CMD_can_do {
358    my Gearman::Server::Client $self = shift;
359    my $ar = shift;
360
361    $self->{can_do}->{$$ar} = undef;
362    $self->_setup_can_do_list;
363}
364
365sub CMD_can_do_timeout {
366    my Gearman::Server::Client $self = shift;
367    my $ar = shift;
368
369    my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/;
370
371    if (defined $timeout) {
372        $self->{can_do}->{$task} = $timeout;
373    } else {
374        $self->{can_do}->{$task} = undef;
375    }
376
377    $self->_setup_can_do_list;
378}
379
380sub CMD_option_req {
381    my Gearman::Server::Client $self = shift;
382    my $ar = shift;
383
384    my $success = sub {
385        return $self->res_packet("option_res", $$ar);
386    };
387
388    if ($$ar eq 'exceptions') {
389        $self->{options}->{exceptions} = 1;
390        return $success->();
391    }
392
393    return $self->error_packet("unknown_option");
394}
395
396sub CMD_set_client_id {
397    my Gearman::Server::Client $self = shift;
398    my $ar = shift;
399
400    $self->{client_id} = $$ar;
401    $self->{client_id} =~ s/\s+//g;
402    $self->{client_id} = "-" unless length $self->{client_id};
403}
404
405sub CMD_cant_do {
406    my Gearman::Server::Client $self = shift;
407    my $ar = shift;
408
409    delete $self->{can_do}->{$$ar};
410    $self->_setup_can_do_list;
411}
412
413sub CMD_get_status {
414    my Gearman::Server::Client $self = shift;
415    my $ar = shift;
416    my $job = $self->{server}->job_by_handle($$ar);
417
418    # handles can't contain nulls
419    return if $$ar =~ /\0/;
420
421    my ($known, $running, $num, $den);
422    $known = 0;
423    $running = 0;
424    if ($job) {
425        $known = 1;
426        $running = $job->worker ? 1 : 0;
427        if (my $stat = $job->status) {
428            ($num, $den) = @$stat;
429        }
430    }
431
432    $num = '' unless defined $num;
433    $den = '' unless defined $den;
434
435    $self->res_packet("status_res", join("\0",
436                                         $$ar,
437                                         $known,
438                                         $running,
439                                         $num,
440                                         $den));
441}
442
443sub CMD_reset_abilities {
444    my Gearman::Server::Client $self = shift;
445
446    $self->{can_do} = {};
447    $self->_setup_can_do_list;
448}
449
450sub _setup_can_do_list {
451    my Gearman::Server::Client $self = shift;
452    $self->{can_do_list} = [ keys %{$self->{can_do}} ];
453    $self->{can_do_iter} = 0;
454}
455
456sub CMD_submit_job    {  push @_, 1; &_cmd_submit_job; }
457sub CMD_submit_job_bg {  push @_, 0; &_cmd_submit_job; }
458sub CMD_submit_job_high {  push @_, 1, 1; &_cmd_submit_job; }
459
460sub _cmd_submit_job {
461    my Gearman::Server::Client $self = shift;
462    my $ar = shift;
463    my $subscribe = shift;
464    my $high_pri = shift;
465
466    return $self->error_packet("invalid_args", "No func/uniq header [$$ar].")
467        unless $$ar =~ s/^(.+?)\0(.*?)\0//;
468
469    my ($func, $uniq) = ($1, $2);
470
471    my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar, $high_pri);
472
473    if ($subscribe) {
474        $job->add_listener($self);
475    } else {
476        # background mode
477        $job->require_listener(0);
478    }
479
480    $self->res_packet("job_created", $job->handle);
481    $self->{server}->wake_up_sleepers($func);
482}
483
484sub res_packet {
485    my Gearman::Server::Client $self = shift;
486    my ($code, $arg) = @_;
487    $self->write(Gearman::Util::pack_res_command($code, $arg));
488    return 1;
489}
490
491sub error_packet {
492    my Gearman::Server::Client $self = shift;
493    my ($code, $msg) = @_;
494    $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg"));
495    return 0;
496}
497
498sub process_cmd {
499    my Gearman::Server::Client $self = shift;
500    my $cmd = shift;
501    my $blob = shift;
502
503    my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd);
504    my $ret = eval {
505        $self->$cmd_name(\$blob);
506    };
507    return $ret unless $@;
508    warn "Error: $@\n";
509    return $self->error_packet("server_error", $@);
510}
511
512sub event_err { my $self = shift; $self->close; }
513sub event_hup { my $self = shift; $self->close; }
514
515############################################################################
516
517=head1 Line based commands
518
519These commands are used for administrative or statistic tasks to be done on the gearman server. They can be entered using a line based client (telnet, etc.) by connecting to the listening port (7003) and are also intended to be machine parsable.
520
521=head2 "workers"
522
523Emits list of registered workers, their fds, IPs, client ids, and list of registered abilities (function names they can do).  Of format:
524
525  fd ip.x.y.z client_id : func_a func_b func_c
526  fd ip.x.y.z client_id : func_a func_b func_c
527  fd ip.x.y.z client_id : func_a func_b func_c
528  .
529
530It ends with a line with just a period.
531
532=cut
533
534sub TXTCMD_workers {
535    my Gearman::Server::Client $self = shift;
536
537    foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) {
538        my $fd = $cl->{fd};
539        $self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n");
540
541    }
542    $self->write(".\n");
543}
544
545=head2 "status"
546
547The output format of this function is tab separated columns as follows, followed by a line consisting of a fullstop and a newline (".\n") to indicate the end of output.
548
549=over
550
551=item Function name
552
553A string denoting the name of the function of the job
554
555=item Number in queue
556
557A positive integer indicating the total number of jobs for this function in the queue. This includes currently running ones as well (next column)
558
559=item Number of jobs running
560
561A positive integer showing how many jobs of this function are currently running
562
563=item Number of capable workers
564
565A positive integer denoting the maximum possible count of workers that could be doing this job. Though they may not all be working on it due to other tasks holding them busy.
566
567=back
568
569=cut
570
571sub TXTCMD_status {
572    my Gearman::Server::Client $self = shift;
573
574    my %funcs; # func -> 1  (set of all funcs to display)
575
576    # keep track of how many workers can do which functions
577    my %can;
578    foreach my $client ($self->{server}->clients) {
579        foreach my $func (@{$client->{can_do_list}}) {
580            $can{$func}++;
581            $funcs{$func} = 1;
582        }
583    }
584
585    my %queued_funcs;
586    my %running_funcs;
587
588    foreach my $job ($self->{server}->jobs) {
589        my $func = $job->func;
590        $queued_funcs{$func}++;
591        if ($job->worker) {
592            $running_funcs{$func}++;
593        }
594    }
595
596    # also include queued functions (even if there aren't workers)
597    # in our list of funcs to show.
598    $funcs{$_} = 1 foreach keys %queued_funcs;
599
600    foreach my $func (sort keys %funcs) {
601        my $queued  = $queued_funcs{$func}  || 0;
602        my $running = $running_funcs{$func} || 0;
603        my $can     = $can{$func}           || 0;
604        $self->write( "$func\t$queued\t$running\t$can\n" );
605    }
606
607    $self->write( ".\n" );
608}
609
610=head2 "jobs"
611
612Output format is zero or more lines of:
613
614    [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n
615
616Follows by a single line of:
617
618    .\n
619
620\t is a literal tab character
621\n is perl's definition of newline (literal \n on linux, something else on win32)
622
623=cut
624
625sub TXTCMD_jobs {
626    my Gearman::Server::Client $self = shift;
627
628    foreach my $job ($self->{server}->jobs) {
629        my $func = $job->func;
630        my $uniq = $job->uniq;
631        my $worker_addr = "-";
632
633        if (my $worker = $job->worker) {
634            $worker_addr = $worker->peer_addr_string;
635        }
636
637        my $listeners = $job->listeners;
638
639        $self->write("$func\t$uniq\t$worker_addr\t$listeners\n");
640    }
641
642    $self->write(".\n");
643}
644
645=head2 "clients"
646
647Output format is zero or more sections of:
648
649=over
650
651One line of:
652
653    [Client Address]\n
654
655Followed by zero or more lines of:
656
657    \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n
658
659=back
660
661Follows by a single line of:
662
663    .\n
664
665\t is a literal tab character
666\n is perl's definition of newline (literal \n on linux, something else on win32)
667
668=cut
669
670sub TXTCMD_clients {
671    my Gearman::Server::Client $self = shift;
672
673    my %jobs_by_client;
674
675    foreach my $job ($self->{server}->jobs) {
676        foreach my $client ($job->listeners) {
677            my $ent = $jobs_by_client{$client} ||= [];
678            push @$ent, $job;
679        }
680    }
681
682    foreach my $client ($self->{server}->clients) {
683        my $client_addr = $client->peer_addr_string;
684        $self->write("$client_addr\n");
685        my $jobs = $jobs_by_client{$client} || [];
686
687        foreach my $job (@$jobs) {
688            my $func = $job->func;
689            my $uniq = $job->uniq;
690            my $worker_addr = "-";
691
692            if (my $worker = $job->worker) {
693                $worker_addr = $worker->peer_addr_string;
694            }
695            $self->write("\t$func\t$uniq\t$worker_addr\n");
696        }
697
698    }
699
700    $self->write(".\n");
701}
702
703sub TXTCMD_gladiator {
704    my Gearman::Server::Client $self = shift;
705    my $args = shift || "";
706    my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;";
707    if ($has_gladiator) {
708        my $all = Devel::Gladiator::walk_arena();
709        my %ct;
710        foreach my $it (@$all) {
711            $ct{ref $it}++;
712            if (ref $it eq "CODE") {
713                my $name = Devel::Peek::CvGV($it);
714                $ct{$name}++ if $name =~ /ANON/;
715            }
716        }
717        $all = undef;  # required to free memory
718        foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) {
719            next unless $ct{$n} > 1 || $args eq "all";
720            $self->write(sprintf("%7d $n\n", $ct{$n}));
721        }
722    }
723    $self->write(".\n");
724}
725
726=head2 "maxqueue" function [max_queue_size]
727
728For a given function of job, the maximum queue size is adjusted to be max_queue_size jobs long. A negative value indicates unlimited queue size.
729
730If the max_queue_size value is not supplied then it is unset (and the default maximum queue size will apply to this function).
731
732This function will return OK upon success, and will return ERR incomplete_args upon an invalid number of arguments.
733
734=cut
735
736sub TXTCMD_maxqueue {
737    my Gearman::Server::Client $self = shift;
738    my $args = shift;
739    my ($func, $max) = split /\s+/, $args;
740
741    unless (length $func) {
742        return $self->err_line('incomplete_args');
743    }
744
745    $self->{server}->set_max_queue($func, $max);
746    $self->write("OK\n");
747}
748
749=head2 "shutdown" ["graceful"]
750
751Close the server.  Or "shutdown graceful" to close the listening socket, then close the server when traffic has died away.
752
753=cut
754
755sub TXTCMD_shutdown {
756    my Gearman::Server::Client $self = shift;
757    my $args = shift;
758    if ($args eq "graceful") {
759        $self->write("OK\n");
760        Gearmand::shutdown_graceful();
761    } elsif (! $args) {
762        $self->write("OK\n");
763        exit 0;
764    } else {
765        $self->err_line('unknown_args');
766    }
767}
768
769=head2 "version"
770
771Returns server version.
772
773=cut
774
775sub TXTCMD_version {
776    my Gearman::Server::Client $self = shift;
777    $self->write("$Gearman::Server::VERSION\n");
778}
779
780sub err_line {
781    my Gearman::Server::Client $self = shift;
782    my $err_code = shift;
783    my $err_text = {
784        'unknown_command' => "Unknown server command",
785        'unknown_args' => "Unknown arguments to server command",
786        'incomplete_args' => "An incomplete set of arguments was sent to this command",
787    }->{$err_code};
788
789    $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
790    return 0;
791}
792
793sub eurl {
794    my $a = $_[0];
795    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
796    $a =~ tr/ /+/;
797    return $a;
798}
799
8001;
801