1<?php 2 3/** 4 * Interface for Danga's Gearman job scheduling system 5 * 6 * PHP version 5.1.0+ 7 * 8 * LICENSE: This source file is subject to the New BSD license that is 9 * available through the world-wide-web at the following URI: 10 * http://www.opensource.org/licenses/bsd-license.php. If you did not receive 11 * a copy of the New BSD License and are unable to obtain it through the web, 12 * please send a note to license@php.net so we can mail you a copy immediately. 13 * 14 * @category Net 15 * @package Net_Gearman 16 * @author Joe Stump <joe@joestump.net> 17 * @copyright 2007-2008 Digg.com, Inc. 18 * @license http://www.opensource.org/licenses/bsd-license.php New BSD License 19 * @version CVS: $Id$ 20 * @link http://pear.php.net/package/Net_Gearman 21 * @link http://www.danga.com/gearman/ 22 */ 23 24require_once 'Net/Gearman/Connection.php'; 25require_once 'Net/Gearman/Set.php'; 26 27/** 28 * A client for submitting jobs to Gearman 29 * 30 * This class is used by code submitting jobs to the Gearman server. It handles 31 * taking tasks and sets of tasks and submitting them to the Gearman server. 32 * 33 * @category Net 34 * @package Net_Gearman 35 * @author Joe Stump <joe@joestump.net> 36 * @copyright 2007-2008 Digg.com, Inc. 37 * @license http://www.opensource.org/licenses/bsd-license.php New BSD License 38 * @link http://www.danga.com/gearman/ 39 */ 40class Net_Gearman_Client 41{ 42 /** 43 * Our randomly selected connection 44 * 45 * @var resource $conn An open socket to Gearman 46 */ 47 protected $conn = array(); 48 49 /** 50 * A list of Gearman servers 51 * 52 * @var array $servers A list of potential Gearman servers 53 */ 54 protected $servers = array(); 55 56 /** 57 * The timeout for Gearman connections 58 * 59 * @var integer $timeout 60 */ 61 protected $timeout = 1000; 62 63 /** 64 * Constructor 65 * 66 * @param array $servers An array of servers or a single server 67 * @param integer $timeout Timeout in microseconds 68 * 69 * @return void 70 * @throws Net_Gearman_Exception 71 * @see Net_Gearman_Connection 72 */ 73 public function __construct($servers, $timeout = 1000) 74 { 75 if (!is_array($servers) && strlen($servers)) { 76 $servers = array($servers); 77 } elseif (is_array($servers) && !count($servers)) { 78 throw new Net_Gearman_Exception('Invalid servers specified'); 79 } 80 81 $this->servers = $servers; 82 foreach ($this->servers as $key => $server) { 83 $conn = Net_Gearman_Connection::connect($server, $timeout); 84 if (!Net_Gearman_Connection::isConnected($conn)) { 85 unset($this->servers[$key]); 86 continue; 87 } 88 89 $this->conn[] = $conn; 90 } 91 92 $this->timeout = $timeout; 93 } 94 95 /** 96 * Get a connection to a Gearman server 97 * 98 * @return resource A connection to a Gearman server 99 */ 100 protected function getConnection() 101 { 102 return $this->conn[array_rand($this->conn)]; 103 } 104 105 /** 106 * Fire off a background task with the given arguments 107 * 108 * @param string $func Name of job to run 109 * @param array $args First key should be args to send 110 * 111 * @return void 112 * @see Net_Gearman_Task, Net_Gearman_Set 113 */ 114 public function __call($func, array $args = array()) 115 { 116 $send = ""; 117 if (isset($args[0]) && !empty($args[0])) { 118 $send = $args[0]; 119 } 120 121 $task = new Net_Gearman_Task($func, $send); 122 $task->type = Net_Gearman_Task::JOB_BACKGROUND; 123 124 $set = new Net_Gearman_Set(); 125 $set->addTask($task); 126 $this->runSet($set); 127 return $task->handle; 128 } 129 130 /** 131 * Submit a task to Gearman 132 * 133 * @param object $task Task to submit to Gearman 134 * 135 * @return void 136 * @see Net_Gearman_Task, Net_Gearman_Client::runSet() 137 */ 138 protected function submitTask(Net_Gearman_Task $task) 139 { 140 switch ($task->type) { 141 case Net_Gearman_Task::JOB_BACKGROUND: 142 $type = 'submit_job_bg'; 143 break; 144 case Net_Gearman_Task::JOB_HIGH: 145 $type = 'submit_job_high'; 146 break; 147 default: 148 $type = 'submit_job'; 149 break; 150 } 151 152 // if we don't have a scalar 153 // json encode the data 154 if(!is_scalar($task->arg)){ 155 $arg = json_encode($task->arg); 156 } else { 157 $arg = $task->arg; 158 } 159 160 $params = array( 161 'func' => $task->func, 162 'uniq' => $task->uniq, 163 'arg' => $arg 164 ); 165 166 $s = $this->getConnection(); 167 Net_Gearman_Connection::send($s, $type, $params); 168 169 if (!is_array(Net_Gearman_Connection::$waiting[(int) $s])) { 170 Net_Gearman_Connection::$waiting[(int) $s] = array(); 171 } 172 173 array_push(Net_Gearman_Connection::$waiting[(int) $s], $task); 174 } 175 176 /** 177 * Run a set of tasks 178 * 179 * @param object $set A set of tasks to run 180 * 181 * @return void 182 * @see Net_Gearman_Set, Net_Gearman_Task 183 */ 184 public function runSet(Net_Gearman_Set $set) 185 { 186 $totalTasks = $set->tasksCount; 187 $taskKeys = array_keys($set->tasks); 188 $t = 0; 189 190 while (!$set->finished()) { 191 if ($t < $totalTasks) { 192 $k = $taskKeys[$t]; 193 $this->submitTask($set->tasks[$k]); 194 if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND) { 195 $set->tasks[$k]->finished = true; 196 $set->tasksCount--; 197 } 198 199 $t++; 200 } 201 202 $write = null; 203 $except = null; 204 $read = $this->conn; 205 socket_select($read, $write, $except, 10); 206 foreach ($read as $socket) { 207 $resp = Net_Gearman_Connection::read($socket); 208 if (count($resp)) { 209 $this->handleResponse($resp, $socket, $set); 210 } 211 } 212 } 213 } 214 215 /** 216 * Handle the response read in 217 * 218 * @param array $resp The raw array response 219 * @param resource $s The socket 220 * @param object $tasks The tasks being ran 221 * 222 * @return void 223 * @throws Net_Gearman_Exception 224 */ 225 protected function handleResponse($resp, $s, Net_Gearman_Set $tasks) 226 { 227 if (isset($resp['data']['handle']) && 228 $resp['function'] != 'job_created') { 229 $task = $tasks->getTask($resp['data']['handle']); 230 } 231 232 switch ($resp['function']) { 233 case 'work_complete': 234 $tasks->tasksCount--; 235 $task->complete(json_decode($resp['data']['result'], true)); 236 break; 237 case 'work_status': 238 $n = (int)$resp['data']['numerator']; 239 $d = (int)$resp['data']['denominator']; 240 $task->status($n, $d); 241 break; 242 case 'work_fail': 243 $tasks->tasksCount--; 244 $task->fail(); 245 break; 246 case 'job_created': 247 $task = array_shift(Net_Gearman_Connection::$waiting[(int) $s]); 248 $task->handle = $resp['data']['handle']; 249 if ($task->type == Net_Gearman_Task::JOB_BACKGROUND) { 250 $task->finished = true; 251 } 252 $tasks->handles[$task->handle] = $task->uniq; 253 break; 254 case 'error': 255 throw new Net_Gearman_Exception('An error occurred'); 256 default: 257 throw new Net_Gearman_Exception( 258 'Invalid function ' . $resp['function'] 259 ); 260 } 261 } 262 263 /** 264 * Disconnect from Gearman 265 * 266 * @return void 267 */ 268 public function disconnect() 269 { 270 if (!is_array($this->conn) || !count($this->conn)) { 271 return; 272 } 273 274 foreach ($this->conn as $conn) { 275 Net_Gearman_Connection::close($conn); 276 } 277 } 278 279 /** 280 * Destructor 281 * 282 * @return void 283 */ 284 public function __destruct() 285 { 286 $this->disconnect(); 287 } 288} 289 290?> 291