1package Gearman::Util;
2use version ();
3$Gearman::Util::VERSION = version->declare("2.004.015");
4
5use strict;
6use warnings;
7
8# for sake of _read_sock
9no warnings "recursion";
10
11# man errno
12# Resource temporarily unavailable
13# (may be the same value as EWOULDBLOCK) (POSIX.1)
14use IO::Select;
15use POSIX qw(:errno_h);
16use Scalar::Util qw();
17use Time::HiRes qw();
18
19=head1 NAME
20
21Gearman::Util - Utility functions for gearman distributed job system
22
23=head1 METHODS
24
25=cut
26
27sub DEBUG () {0}
28
29# I: to jobserver
30# O: out of job server
31# W: worker
32# C: client of job server
33# J: jobserver
34our %cmd = (
35    1  => ['I',  "can_do"],           # from W:  [FUNC]
36    2  => ['I',  "cant_do"],          # from W:  [FUNC]
37    3  => ['I',  "reset_abilities"],  # from W:  ---
38    4  => ['I',  "pre_sleep"],        # from W: ---
39    6  => ['O',  "noop"],             # J->W  ---
40    7  => ['I',  "submit_job"],       # C->J  FUNC[0]UNIQ[0]ARGS
41    8  => ['O',  "job_created"],      # J->C HANDLE
42    9  => ['I',  "grab_job"],         # W->J --
43    10 => ['O',  "no_job"],           # J->W --
44    11 => ['O',  "job_assign"],       # J->W HANDLE[0]FUNC[0]ARG
45    12 => ['IO', "work_status"],      # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
46    13 => ['IO', "work_complete"],    # W->J/C: HANDLE[0]RES
47    14 => ['IO', "work_fail"],        # W->J/C: HANDLE
48    15 => ['I',  "get_status"],       # C->J: HANDLE
49    16 => ['I',  "echo_req"],         # ?->J TEXT
50    17 => ['O',  "echo_res"],         # J->? TEXT
51    18 => ['I',  "submit_job_bg"],    # C->J     " "   "  " "
52    19 => ['O',  "error"],            # J->? ERRCODE[0]ERR_TEXT
53    20 => ['O', "status_res"],    # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
54    21 => ['I', "submit_job_high"],    # C->J  FUNC[0]UNIQ[0]ARGS
55    22 => ['I', "set_client_id"],      # W->J: [RANDOM_STRING_NO_WHITESPACE]
56    23 => ['I', "can_do_timeout"],     # from W: FUNC[0]TIMEOUT
57
58    # for worker to declare to the jobserver that this worker is only connected
59    # to one jobserver, so no polls/grabs will take place, and server is free
60    # to push "job_assign" packets back down.
61    24 => ['I',  "all_yours"],             # W->J ---
62    25 => ['IO', "work_exception"],        # W->J/C: HANDLE[0]EXCEPTION
63    26 => ['I',  "option_req"],            # C->J: [OPT]
64    27 => ['O',  "option_res"],            # J->C: [OPT]
65    28 => ['IO', "work_data"],             # W->J/C: HANDLE[0]RES
66    29 => ['IO', "work_warning"],          # W->J/C: HANDLE[0]RES
67    32 => ['I',  "submit_job_high_bg"],    # C->J  FUNC[0]UNIQ[0]ARGS
68    33 => ['I',  "submit_job_low"],        # C->J  FUNC[0]UNIQ[0]ARGS
69    34 => ['I',  "submit_job_low_bg"],     # C->J  FUNC[0]UNIQ[0]ARGS
70);
71
72our %num;                                  # name -> num
73while (my ($num, $ary) = each %cmd) {
74    die if $num{ $ary->[1] };
75    $num{ $ary->[1] } = $num;
76}
77
78=head2 cmd_name($num)
79
80B<return> cmd
81
82=cut
83
84sub cmd_name {
85    my $num = shift;
86    my $c   = $cmd{$num};
87    return $c ? $c->[1] : undef;
88}
89
90=head2 pack_req_command($key, $arg)
91
92B<return> request string
93
94=cut
95
96sub pack_req_command {
97    return _pack_command("REQ", @_);
98}
99
100=head2 pack_res_command($cmd, $arg)
101
102B<return> response string
103
104=cut
105
106sub pack_res_command {
107    return _pack_command("RES", @_);
108}
109
110=head2 read_res_packet($sock, $err_ref, $timeout)
111
112B<return> undef on closed socket or malformed packet
113
114=cut
115
116sub read_res_packet {
117    warn " Entering read_res_packet" if DEBUG;
118    my $sock       = shift;
119    my $err_ref    = shift;
120    my $timeout    = shift;
121    my $time_start = Time::HiRes::time();
122    unless (Scalar::Util::blessed($sock)) {
123        # for the sake of Gearman::Client::Async
124        # see https://github.com/p-alik/perl-Gearman/issues/37
125        (ref($sock) eq "GLOB") || die "provided value is not a blessed object";
126        ($$sock && $$sock eq '*Gearman::Worker::$sock')
127            || die
128            "provided value is not a GLOB of type Gearman::Worker::\$sock";
129    } ## end unless (Scalar::Util::blessed...)
130
131    my $err = sub {
132        my $code = shift;
133        Scalar::Util::blessed($sock) && $sock->close() if $sock->connected;
134        $$err_ref = $code if ref $err_ref;
135        return undef;
136    };
137
138    $sock->blocking(0);
139
140    my $is = IO::Select->new($sock);
141
142    my $readlen   = 12;
143    my $offset    = 0;
144    my $buf       = '';
145    my $using_ssl = $sock->isa("IO::Socket::SSL");
146
147    my ($magic, $type, $len);
148
149    warn " Starting up event loop\n" if DEBUG;
150    while (1) {
151        if ($using_ssl && $sock->pending()) {
152            warn "  We have @{[ $sock->pending() ]}  bytes...\n" if DEBUG;
153        }
154        else {
155            my $time_remaining = undef;
156            if (defined $timeout) {
157                warn "  We have a timeout of $timeout\n" if DEBUG;
158                $time_remaining = $time_start + $timeout - Time::HiRes::time();
159                return $err->("timeout") if $time_remaining < 0;
160            }
161
162            $is->can_read($time_remaining) || next;
163        } ## end else [ if ($using_ssl && $sock...)]
164        warn "   Entering read loop\n" if DEBUG;
165
166        my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
167        if (!defined($ok)) {
168            next;
169        }
170        elsif ($ok == 0) {
171            return $err->($err_code);
172        }
173
174        if (!defined $type) {
175            next unless length($buf) >= 12;
176            my $header = substr($buf, 0, 12, '');
177            ($magic, $type, $len) = unpack("a4NN", $header);
178            return $err->("malformed_magic: '$magic'") unless $magic eq "\0RES";
179            my $starting = length($buf);
180            $readlen = $len - $starting;
181            $offset  = $starting;
182
183            if ($readlen) {
184                my ($ok, $err_code)
185                    = _read_sock($sock, \$buf, \$readlen, \$offset);
186                if (!defined($ok)) {
187                    next;
188                }
189                elsif ($ok == 0) {
190                    return $err->($err_code);
191                }
192            } ## end if ($readlen)
193        } ## end if (!defined $type)
194
195        $type = $cmd{$type};
196        return $err->("bogus_command") unless $type;
197        return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
198
199        warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
200            if DEBUG;
201
202        $sock->blocking(1);
203
204        return {
205            type    => $type->[1],
206            len     => $len,
207            blobref => \$buf,
208        };
209    } ## end while (1)
210} ## end sub read_res_packet
211
212sub _read_sock {
213    my ($sock, $buf_ref, $readlen_ref, $offset_ref) = @_;
214    local $!;
215    my $rv = sysread($sock, $$buf_ref, $$readlen_ref, $$offset_ref);
216    unless ($rv) {
217        warn "   Read error: $!\n" if DEBUG;
218        $! == EAGAIN && return;
219    }
220
221    return (0, "read_error") unless defined $rv;
222    return (0, "eof")        unless $rv;
223
224    unless ($rv >= $$readlen_ref) {
225        warn
226            "   Partial read of $rv bytes, at offset $$offset_ref, readlen was $$readlen_ref\n"
227            if DEBUG;
228        $$offset_ref += $rv;
229        $$readlen_ref -= $rv;
230
231        $sock->blocking(1);
232        my $ret = _read_sock($sock, $buf_ref, $readlen_ref, $offset_ref);
233        $sock->blocking(0);
234        return $ret;
235    } ## end unless ($rv >= $$readlen_ref)
236
237    warn "   Finished reading\n" if DEBUG;
238    return (1);
239} ## end sub _read_sock
240
241=head2 read_text_status($sock, $err_ref)
242
243=cut
244
245sub read_text_status {
246    my $sock    = shift;
247    my $err_ref = shift;
248
249    my $err = sub {
250        my $code = shift;
251        $sock->close() if $sock->connected;
252        $$err_ref = $code if ref $err_ref;
253        return undef;
254    };
255
256    $sock->connected || return $err->("can't read from unconnected socket");
257    my @lines;
258    my $complete = 0;
259    while (my $line = <$sock>) {
260        chomp $line;
261        return $err->($1) if $line =~ /^ERR (\w+) /;
262
263        if ($line eq '.') {
264            $complete++;
265            last;
266        }
267
268        push @lines, $line;
269    } ## end while (my $line = <$sock>)
270    return $err->("eof") unless $complete;
271
272    return @lines;
273} ## end sub read_text_status
274
275=head2 send_req($sock, $reqref)
276
277=cut
278
279sub send_req {
280    my ($sock, $reqref) = @_;
281    return 0 unless $sock;
282
283    my $data = ${$reqref};
284    (my $total_len) = (my $len) = length($data);
285    my ($num_zero_writes, $offset) = (0, 0);
286    local $SIG{PIPE} = "IGNORE";
287
288    while ($len && ($num_zero_writes < 5)) {
289        my $written = $sock->syswrite($data, $len, $offset);
290        if (!defined $written) {
291            warn "send_req: syswrite error: $!" if DEBUG;
292            return 0;
293        }
294        elsif ($written > 0) {
295            $len -= $written;
296            $offset += $written;
297        }
298        else {
299            $num_zero_writes++;
300        }
301    } ## end while ($len && ($num_zero_writes...))
302
303    return ($total_len > 0 && $offset == $total_len);
304} ## end sub send_req
305
306=head2 wait_for_readability($fileno, $timeout)
307
308given a file descriptor number and a timeout,
309
310wait for that descriptor to become readable
311
312B<return> 0 or 1 on if it did or not
313
314=cut
315
316sub wait_for_readability {
317    my ($fileno, $timeout) = @_;
318    return 0 unless $fileno && $timeout;
319
320    my $rin = '';
321    vec($rin, $fileno, 1) = 1;
322    my $nfound = select($rin, undef, undef, $timeout);
323
324    # nfound can be undef or 0, both failures, or 1, a success
325    return $nfound ? 1 : 0;
326} ## end sub wait_for_readability
327
328#
329# _pack_command($prefix, $key, $arg)
330#
331sub _pack_command {
332    my ($prefix, $key, $arg) = @_;
333    ($key && $num{$key}) || die sprintf("Bogus type arg of '%s'", $key || '');
334
335    $arg ||= '';
336    my $len = length($arg);
337    return "\0$prefix" . pack("NN", $num{$key}, $len) . $arg;
338} ## end sub _pack_command
339
3401;
341