1<?php 2 3/** 4 * Interface for Danga's Gearman job scheduling system 5 * 6 * PHP version 5.1.0+ 7 * 8 * LICENSE: This source file is subject to the New BSD license that is 9 * available through the world-wide-web at the following URI: 10 * http://www.opensource.org/licenses/bsd-license.php. If you did not receive 11 * a copy of the New BSD License and are unable to obtain it through the web, 12 * please send a note to license@php.net so we can mail you a copy immediately. 13 * 14 * @category Net 15 * @package Net_Gearman 16 * @author Joe Stump <joe@joestump.net> 17 * @copyright 2007-2008 Digg.com, Inc. 18 * @license http://www.opensource.org/licenses/bsd-license.php New BSD License 19 * @version CVS: $Id$ 20 * @link http://pear.php.net/package/Net_Gearman 21 * @link http://www.danga.com/gearman/ 22 */ 23 24require_once 'Net/Gearman/Connection.php'; 25require_once 'Net/Gearman/Job.php'; 26 27/** 28 * Gearman worker class 29 * 30 * Run an instance of a worker to listen for jobs. It then manages the running 31 * of jobs, etc. 32 * 33 * <code> 34 * <?php 35 * 36 * $servers = array( 37 * '127.0.0.1:7003', 38 * '127.0.0.1:7004' 39 * ); 40 * 41 * $abilities = array('HelloWorld', 'Foo', 'Bar'); 42 * 43 * try { 44 * $worker = new Net_Gearman_Worker($servers); 45 * foreach ($abilities as $ability) { 46 * $worker->addAbility('HelloWorld'); 47 * } 48 * $worker->beginWork(); 49 * } catch (Net_Gearman_Exception $e) { 50 * echo $e->getMessage() . "\n"; 51 * exit; 52 * } 53 * 54 * ?> 55 * </code> 56 * 57 * @category Net 58 * @package Net_Gearman 59 * @author Joe Stump <joe@joestump.net> 60 * @copyright 2007-2008 Digg.com, Inc. 61 * @license http://www.opensource.org/licenses/bsd-license.php New BSD License 62 * @link http://www.danga.com/gearman/ 63 * @see Net_Gearman_Job, Net_Gearman_Connection 64 */ 65class Net_Gearman_Worker 66{ 67 /** 68 * Pool of connections to Gearman servers 69 * 70 * @var array $conn 71 */ 72 protected $conn = array(); 73 74 /** 75 * Pool of retry connections 76 * 77 * @var array $conn 78 */ 79 protected $retryConn = array(); 80 81 /** 82 * Pool of worker abilities 83 * 84 * @var array $conn 85 */ 86 protected $abilities = array(); 87 88 89 /** 90 * Callbacks registered for this worker 91 * 92 * @var array $callback 93 * @see Net_Gearman_Worker::JOB_START 94 * @see Net_Gearman_Worker::JOB_COMPLETE 95 * @see Net_Gearman_Worker::JOB_FAIL 96 */ 97 protected $callback = array( 98 self::JOB_START => array(), 99 self::JOB_COMPLETE => array(), 100 self::JOB_FAIL => array() 101 ); 102 103 /** 104 * Unique id for this worker 105 * 106 * @var string $id 107 */ 108 protected $id = ""; 109 110 111 /** 112 * Callback types 113 * 114 * @const integer JOB_START Ran when a job is started 115 * @const integer JOB_COMPLETE Ran when a job is finished 116 * @const integer JOB_FAIL Ran when a job fails 117 */ 118 const JOB_START = 1; 119 const JOB_COMPLETE = 2; 120 const JOB_FAIL = 3; 121 122 /** 123 * Constructor 124 * 125 * @param array $servers List of servers to connect to 126 * @param string $id Optional unique id for this worker 127 * 128 * @return void 129 * @throws Net_Gearman_Exception 130 * @see Net_Gearman_Connection 131 */ 132 public function __construct($servers, $id = "") 133 { 134 if (!is_array($servers) && strlen($servers)) { 135 $servers = array($servers); 136 } elseif (is_array($servers) && !count($servers)) { 137 throw new Net_Gearman_Exception('Invalid servers specified'); 138 } 139 140 if(empty($id)){ 141 $id = "pid_".getmypid()."_".uniqid(); 142 } 143 144 $this->id = $id; 145 146 foreach ($servers as $s) { 147 try { 148 $conn = Net_Gearman_Connection::connect($s); 149 150 Net_Gearman_Connection::send($conn, "set_client_id", array("client_id" => $this->id)); 151 152 $this->conn[$s] = $conn; 153 154 } catch (Net_Gearman_Exception $e) { 155 156 $this->retryConn[$s] = time(); 157 } 158 } 159 160 if (empty($this->conn)) { 161 throw new Net_Gearman_Exception( 162 "Couldn't connect to any available servers" 163 ); 164 } 165 } 166 167 /** 168 * Announce an ability to the job server 169 * 170 * @param string $ability Name of functcion/ability 171 * @param integer $timeout How long to give this job 172 * 173 * @return void 174 * @see Net_Gearman_Connection::send() 175 */ 176 public function addAbility($ability, $timeout = null) 177 { 178 $call = 'can_do'; 179 $params = array('func' => $ability); 180 if (is_int($timeout) && $timeout > 0) { 181 $params['timeout'] = $timeout; 182 $call = 'can_do_timeout'; 183 } 184 185 $this->abilities[$ability] = $timeout; 186 187 foreach ($this->conn as $conn) { 188 Net_Gearman_Connection::send($conn, $call, $params); 189 } 190 } 191 192 /** 193 * Begin working 194 * 195 * This starts the worker on its journey of actually working. The first 196 * argument is a PHP callback to a function that can be used to monitor 197 * the worker. If no callback is provided then the worker works until it 198 * is killed. The monitor is passed two arguments; whether or not the 199 * worker is idle and when the last job was ran. 200 * 201 * @param callback $monitor Function to monitor work 202 * 203 * @return void 204 * @see Net_Gearman_Connection::send(), Net_Gearman_Connection::connect() 205 * @see Net_Gearman_Worker::doWork(), Net_Gearman_Worker::addAbility() 206 */ 207 public function beginWork($monitor = null) 208 { 209 if (!is_callable($monitor)) { 210 $monitor = array($this, 'stopWork'); 211 } 212 213 $write = null; 214 $except = null; 215 $working = true; 216 $lastJob = time(); 217 $retryTime = 5; 218 219 while ($working) { 220 $sleep = true; 221 $currentTime = time(); 222 223 foreach ($this->conn as $server => $socket) { 224 try { 225 $worked = $this->doWork($socket); 226 } catch (Net_Gearman_Exception $e) { 227 unset($this->conn[$server]); 228 $this->retryConn[$server] = $currentTime; 229 } 230 if ($worked) { 231 $lastJob = time(); 232 $sleep = false; 233 } 234 } 235 236 $idle = false; 237 if ($sleep && count($this->conn)) { 238 foreach ($this->conn as $socket) { 239 Net_Gearman_Connection::send($socket, 'pre_sleep'); 240 } 241 242 $read = $this->conn; 243 socket_select($read, $write, $except, 60); 244 $idle = (count($read) == 0); 245 } 246 247 $retryChange = false; 248 foreach ($this->retryConn as $s => $lastTry) { 249 if (($lastTry + $retryTime) < $currentTime) { 250 try { 251 $conn = Net_Gearman_Connection::connect($s); 252 $this->conn[$s] = $conn; 253 $retryChange = true; 254 unset($this->retryConn[$s]); 255 Net_Gearman_Connection::send($conn, "set_client_id", array("client_id" => $this->id)); 256 } catch (Net_Gearman_Exception $e) { 257 $this->retryConn[$s] = $currentTime; 258 } 259 } 260 } 261 262 if (count($this->conn) == 0) { 263 // sleep to avoid wasted cpu cycles if no connections to block on using socket_select 264 sleep(1); 265 } 266 267 if ($retryChange === true) { 268 // broadcast all abilities to all servers 269 foreach ($this->abilities as $ability => $timeout) { 270 $this->addAbility($ability, $timeout); 271 } 272 } 273 274 if (call_user_func($monitor, $idle, $lastJob) == true) { 275 $working = false; 276 } 277 } 278 } 279 280 /** 281 * Listen on the socket for work 282 * 283 * Sends the 'grab_job' command and then listens for either the 'noop' or 284 * the 'no_job' command to come back. If the 'job_assign' comes down the 285 * pipe then we run that job. 286 * 287 * @param resource $socket The socket to work on 288 * 289 * @return boolean Returns true if work was done, false if not 290 * @throws Net_Gearman_Exception 291 * @see Net_Gearman_Connection::send() 292 */ 293 protected function doWork($socket) 294 { 295 Net_Gearman_Connection::send($socket, 'grab_job'); 296 297 $resp = array('function' => 'noop'); 298 while (count($resp) && $resp['function'] == 'noop') { 299 $resp = Net_Gearman_Connection::blockingRead($socket); 300 } 301 302 if (in_array($resp['function'], array('noop', 'no_job'))) { 303 return false; 304 } 305 306 if ($resp['function'] != 'job_assign') { 307 throw new Net_Gearman_Exception('Holy Cow! What are you doing?!'); 308 } 309 310 $name = $resp['data']['func']; 311 $handle = $resp['data']['handle']; 312 $arg = array(); 313 314 if (isset($resp['data']['arg']) && 315 Net_Gearman_Connection::stringLength($resp['data']['arg'])) { 316 $arg = json_decode($resp['data']['arg'], true); 317 if($arg === null){ 318 $arg = $resp['data']['arg']; 319 } 320 } 321 322 $job = Net_Gearman_Job::factory($name, $socket, $handle); 323 try { 324 $this->start($handle, $name, $arg); 325 $res = $job->run($arg); 326 if (!is_array($res)) { 327 $res = array('result' => $res); 328 } 329 330 $job->complete($res); 331 $this->complete($handle, $name, $res); 332 } catch (Net_Gearman_Job_Exception $e) { 333 $job->fail(); 334 $this->fail($handle, $name, $e); 335 } 336 337 // Force the job's destructor to run 338 $job = null; 339 340 return true; 341 } 342 343 /** 344 * Attach a callback 345 * 346 * @param callback $callback A valid PHP callback 347 * @param integer $type Type of callback 348 * 349 * @return void 350 * @throws Net_Gearman_Exception 351 */ 352 public function attachCallback($callback, $type = self::JOB_COMPLETE) 353 { 354 if (!is_callable($callback)) { 355 throw new Net_Gearman_Exception('Invalid callback specified'); 356 } 357 358 $this->callback[$type][] = $callback; 359 } 360 361 /** 362 * Run the job start callbacks 363 * 364 * @param string $handle The job's Gearman handle 365 * @param string $job The name of the job 366 * @param mixed $args The job's argument list 367 * 368 * @return void 369 */ 370 protected function start($handle, $job, $args) 371 { 372 if (!count($this->callback[self::JOB_START])) { 373 return; // No callbacks to run 374 } 375 376 foreach ($this->callback[self::JOB_START] as $callback) { 377 call_user_func($callback, $handle, $job, $args); 378 } 379 } 380 381 /** 382 * Run the complete callbacks 383 * 384 * @param string $handle The job's Gearman handle 385 * @param string $job The name of the job 386 * @param array $result The job's returned result 387 * 388 * @return void 389 */ 390 protected function complete($handle, $job, array $result) 391 { 392 if (!count($this->callback[self::JOB_COMPLETE])) { 393 return; // No callbacks to run 394 } 395 396 foreach ($this->callback[self::JOB_COMPLETE] as $callback) { 397 call_user_func($callback, $handle, $job, $result); 398 } 399 } 400 401 /** 402 * Run the fail callbacks 403 * 404 * @param string $handle The job's Gearman handle 405 * @param string $job The name of the job 406 * @param object $error The exception thrown 407 * 408 * @return void 409 */ 410 protected function fail($handle, $job, PEAR_Exception $error) 411 { 412 if (!count($this->callback[self::JOB_FAIL])) { 413 return; // No callbacks to run 414 } 415 416 foreach ($this->callback[self::JOB_FAIL] as $callback) { 417 call_user_func($callback, $handle, $job, $error); 418 } 419 } 420 421 /** 422 * Stop working 423 * 424 * @return void 425 */ 426 public function endWork() 427 { 428 foreach ($this->conn as $conn) { 429 Net_Gearman_Connection::close($conn); 430 } 431 } 432 433 /** 434 * Destructor 435 * 436 * @return void 437 * @see Net_Gearman_Worker::stop() 438 */ 439 public function __destruct() 440 { 441 $this->endWork(); 442 } 443 444 /** 445 * Should we stop work? 446 * 447 * @return boolean 448 */ 449 public function stopWork() 450 { 451 return false; 452 } 453} 454 455?> 456