1package Gearman::Util; 2use version (); 3$Gearman::Util::VERSION = version->declare("2.004.015"); 4 5use strict; 6use warnings; 7 8# for sake of _read_sock 9no warnings "recursion"; 10 11# man errno 12# Resource temporarily unavailable 13# (may be the same value as EWOULDBLOCK) (POSIX.1) 14use IO::Select; 15use POSIX qw(:errno_h); 16use Scalar::Util qw(); 17use Time::HiRes qw(); 18 19=head1 NAME 20 21Gearman::Util - Utility functions for gearman distributed job system 22 23=head1 METHODS 24 25=cut 26 27sub DEBUG () {0} 28 29# I: to jobserver 30# O: out of job server 31# W: worker 32# C: client of job server 33# J: jobserver 34our %cmd = ( 35 1 => ['I', "can_do"], # from W: [FUNC] 36 2 => ['I', "cant_do"], # from W: [FUNC] 37 3 => ['I', "reset_abilities"], # from W: --- 38 4 => ['I', "pre_sleep"], # from W: --- 39 6 => ['O', "noop"], # J->W --- 40 7 => ['I', "submit_job"], # C->J FUNC[0]UNIQ[0]ARGS 41 8 => ['O', "job_created"], # J->C HANDLE 42 9 => ['I', "grab_job"], # W->J -- 43 10 => ['O', "no_job"], # J->W -- 44 11 => ['O', "job_assign"], # J->W HANDLE[0]FUNC[0]ARG 45 12 => ['IO', "work_status"], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR 46 13 => ['IO', "work_complete"], # W->J/C: HANDLE[0]RES 47 14 => ['IO', "work_fail"], # W->J/C: HANDLE 48 15 => ['I', "get_status"], # C->J: HANDLE 49 16 => ['I', "echo_req"], # ?->J TEXT 50 17 => ['O', "echo_res"], # J->? TEXT 51 18 => ['I', "submit_job_bg"], # C->J " " " " " 52 19 => ['O', "error"], # J->? ERRCODE[0]ERR_TEXT 53 20 => ['O', "status_res"], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM 54 21 => ['I', "submit_job_high"], # C->J FUNC[0]UNIQ[0]ARGS 55 22 => ['I', "set_client_id"], # W->J: [RANDOM_STRING_NO_WHITESPACE] 56 23 => ['I', "can_do_timeout"], # from W: FUNC[0]TIMEOUT 57 58 # for worker to declare to the jobserver that this worker is only connected 59 # to one jobserver, so no polls/grabs will take place, and server is free 60 # to push "job_assign" packets back down. 61 24 => ['I', "all_yours"], # W->J --- 62 25 => ['IO', "work_exception"], # W->J/C: HANDLE[0]EXCEPTION 63 26 => ['I', "option_req"], # C->J: [OPT] 64 27 => ['O', "option_res"], # J->C: [OPT] 65 28 => ['IO', "work_data"], # W->J/C: HANDLE[0]RES 66 29 => ['IO', "work_warning"], # W->J/C: HANDLE[0]RES 67 32 => ['I', "submit_job_high_bg"], # C->J FUNC[0]UNIQ[0]ARGS 68 33 => ['I', "submit_job_low"], # C->J FUNC[0]UNIQ[0]ARGS 69 34 => ['I', "submit_job_low_bg"], # C->J FUNC[0]UNIQ[0]ARGS 70); 71 72our %num; # name -> num 73while (my ($num, $ary) = each %cmd) { 74 die if $num{ $ary->[1] }; 75 $num{ $ary->[1] } = $num; 76} 77 78=head2 cmd_name($num) 79 80B<return> cmd 81 82=cut 83 84sub cmd_name { 85 my $num = shift; 86 my $c = $cmd{$num}; 87 return $c ? $c->[1] : undef; 88} 89 90=head2 pack_req_command($key, $arg) 91 92B<return> request string 93 94=cut 95 96sub pack_req_command { 97 return _pack_command("REQ", @_); 98} 99 100=head2 pack_res_command($cmd, $arg) 101 102B<return> response string 103 104=cut 105 106sub pack_res_command { 107 return _pack_command("RES", @_); 108} 109 110=head2 read_res_packet($sock, $err_ref, $timeout) 111 112B<return> undef on closed socket or malformed packet 113 114=cut 115 116sub read_res_packet { 117 warn " Entering read_res_packet" if DEBUG; 118 my $sock = shift; 119 my $err_ref = shift; 120 my $timeout = shift; 121 my $time_start = Time::HiRes::time(); 122 unless (Scalar::Util::blessed($sock)) { 123 # for the sake of Gearman::Client::Async 124 # see https://github.com/p-alik/perl-Gearman/issues/37 125 (ref($sock) eq "GLOB") || die "provided value is not a blessed object"; 126 ($$sock && $$sock eq '*Gearman::Worker::$sock') 127 || die 128 "provided value is not a GLOB of type Gearman::Worker::\$sock"; 129 } ## end unless (Scalar::Util::blessed...) 130 131 my $err = sub { 132 my $code = shift; 133 Scalar::Util::blessed($sock) && $sock->close() if $sock->connected; 134 $$err_ref = $code if ref $err_ref; 135 return undef; 136 }; 137 138 $sock->blocking(0); 139 140 my $is = IO::Select->new($sock); 141 142 my $readlen = 12; 143 my $offset = 0; 144 my $buf = ''; 145 my $using_ssl = $sock->isa("IO::Socket::SSL"); 146 147 my ($magic, $type, $len); 148 149 warn " Starting up event loop\n" if DEBUG; 150 while (1) { 151 if ($using_ssl && $sock->pending()) { 152 warn " We have @{[ $sock->pending() ]} bytes...\n" if DEBUG; 153 } 154 else { 155 my $time_remaining = undef; 156 if (defined $timeout) { 157 warn " We have a timeout of $timeout\n" if DEBUG; 158 $time_remaining = $time_start + $timeout - Time::HiRes::time(); 159 return $err->("timeout") if $time_remaining < 0; 160 } 161 162 $is->can_read($time_remaining) || next; 163 } ## end else [ if ($using_ssl && $sock...)] 164 warn " Entering read loop\n" if DEBUG; 165 166 my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset); 167 if (!defined($ok)) { 168 next; 169 } 170 elsif ($ok == 0) { 171 return $err->($err_code); 172 } 173 174 if (!defined $type) { 175 next unless length($buf) >= 12; 176 my $header = substr($buf, 0, 12, ''); 177 ($magic, $type, $len) = unpack("a4NN", $header); 178 return $err->("malformed_magic: '$magic'") unless $magic eq "\0RES"; 179 my $starting = length($buf); 180 $readlen = $len - $starting; 181 $offset = $starting; 182 183 if ($readlen) { 184 my ($ok, $err_code) 185 = _read_sock($sock, \$buf, \$readlen, \$offset); 186 if (!defined($ok)) { 187 next; 188 } 189 elsif ($ok == 0) { 190 return $err->($err_code); 191 } 192 } ## end if ($readlen) 193 } ## end if (!defined $type) 194 195 $type = $cmd{$type}; 196 return $err->("bogus_command") unless $type; 197 return $err->("bogus_command_type") unless index($type->[0], "O") != -1; 198 199 warn " Fully formed res packet, returning; type=$type->[1] len=$len\n" 200 if DEBUG; 201 202 $sock->blocking(1); 203 204 return { 205 type => $type->[1], 206 len => $len, 207 blobref => \$buf, 208 }; 209 } ## end while (1) 210} ## end sub read_res_packet 211 212sub _read_sock { 213 my ($sock, $buf_ref, $readlen_ref, $offset_ref) = @_; 214 local $!; 215 my $rv = sysread($sock, $$buf_ref, $$readlen_ref, $$offset_ref); 216 unless ($rv) { 217 warn " Read error: $!\n" if DEBUG; 218 $! == EAGAIN && return; 219 } 220 221 return (0, "read_error") unless defined $rv; 222 return (0, "eof") unless $rv; 223 224 unless ($rv >= $$readlen_ref) { 225 warn 226 " Partial read of $rv bytes, at offset $$offset_ref, readlen was $$readlen_ref\n" 227 if DEBUG; 228 $$offset_ref += $rv; 229 $$readlen_ref -= $rv; 230 231 $sock->blocking(1); 232 my $ret = _read_sock($sock, $buf_ref, $readlen_ref, $offset_ref); 233 $sock->blocking(0); 234 return $ret; 235 } ## end unless ($rv >= $$readlen_ref) 236 237 warn " Finished reading\n" if DEBUG; 238 return (1); 239} ## end sub _read_sock 240 241=head2 read_text_status($sock, $err_ref) 242 243=cut 244 245sub read_text_status { 246 my $sock = shift; 247 my $err_ref = shift; 248 249 my $err = sub { 250 my $code = shift; 251 $sock->close() if $sock->connected; 252 $$err_ref = $code if ref $err_ref; 253 return undef; 254 }; 255 256 $sock->connected || return $err->("can't read from unconnected socket"); 257 my @lines; 258 my $complete = 0; 259 while (my $line = <$sock>) { 260 chomp $line; 261 return $err->($1) if $line =~ /^ERR (\w+) /; 262 263 if ($line eq '.') { 264 $complete++; 265 last; 266 } 267 268 push @lines, $line; 269 } ## end while (my $line = <$sock>) 270 return $err->("eof") unless $complete; 271 272 return @lines; 273} ## end sub read_text_status 274 275=head2 send_req($sock, $reqref) 276 277=cut 278 279sub send_req { 280 my ($sock, $reqref) = @_; 281 return 0 unless $sock; 282 283 my $data = ${$reqref}; 284 (my $total_len) = (my $len) = length($data); 285 my ($num_zero_writes, $offset) = (0, 0); 286 local $SIG{PIPE} = "IGNORE"; 287 288 while ($len && ($num_zero_writes < 5)) { 289 my $written = $sock->syswrite($data, $len, $offset); 290 if (!defined $written) { 291 warn "send_req: syswrite error: $!" if DEBUG; 292 return 0; 293 } 294 elsif ($written > 0) { 295 $len -= $written; 296 $offset += $written; 297 } 298 else { 299 $num_zero_writes++; 300 } 301 } ## end while ($len && ($num_zero_writes...)) 302 303 return ($total_len > 0 && $offset == $total_len); 304} ## end sub send_req 305 306=head2 wait_for_readability($fileno, $timeout) 307 308given a file descriptor number and a timeout, 309 310wait for that descriptor to become readable 311 312B<return> 0 or 1 on if it did or not 313 314=cut 315 316sub wait_for_readability { 317 my ($fileno, $timeout) = @_; 318 return 0 unless $fileno && $timeout; 319 320 my $rin = ''; 321 vec($rin, $fileno, 1) = 1; 322 my $nfound = select($rin, undef, undef, $timeout); 323 324 # nfound can be undef or 0, both failures, or 1, a success 325 return $nfound ? 1 : 0; 326} ## end sub wait_for_readability 327 328# 329# _pack_command($prefix, $key, $arg) 330# 331sub _pack_command { 332 my ($prefix, $key, $arg) = @_; 333 ($key && $num{$key}) || die sprintf("Bogus type arg of '%s'", $key || ''); 334 335 $arg ||= ''; 336 my $len = length($arg); 337 return "\0$prefix" . pack("NN", $num{$key}, $len) . $arg; 338} ## end sub _pack_command 339 3401; 341