1<?php
2
3/**
4 * Interface for Danga's Gearman job scheduling system
5 *
6 * PHP version 5.1.0+
7 *
8 * LICENSE: This source file is subject to the New BSD license that is
9 * available through the world-wide-web at the following URI:
10 * http://www.opensource.org/licenses/bsd-license.php. If you did not receive
11 * a copy of the New BSD License and are unable to obtain it through the web,
12 * please send a note to license@php.net so we can mail you a copy immediately.
13 *
14 * @category  Net
15 * @package   Net_Gearman
16 * @author    Joe Stump <joe@joestump.net>
17 * @copyright 2007-2008 Digg.com, Inc.
18 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
19 * @version   CVS: $Id$
20 * @link      http://pear.php.net/package/Net_Gearman
21 * @link      http://www.danga.com/gearman/
22 */
23
24require_once 'Net/Gearman/Connection.php';
25require_once 'Net/Gearman/Set.php';
26
27/**
28 * A client for submitting jobs to Gearman
29 *
30 * This class is used by code submitting jobs to the Gearman server. It handles
31 * taking tasks and sets of tasks and submitting them to the Gearman server.
32 *
33 * @category  Net
34 * @package   Net_Gearman
35 * @author    Joe Stump <joe@joestump.net>
36 * @copyright 2007-2008 Digg.com, Inc.
37 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
38 * @link      http://www.danga.com/gearman/
39 */
40class Net_Gearman_Client
41{
42    /**
43     * Our randomly selected connection
44     *
45     * @var resource $conn An open socket to Gearman
46     */
47    protected $conn = array();
48
49    /**
50     * A list of Gearman servers
51     *
52     * @var array $servers A list of potential Gearman servers
53     */
54    protected $servers = array();
55
56    /**
57     * The timeout for Gearman connections
58     *
59     * @var integer $timeout
60     */
61    protected $timeout = 1000;
62
63    /**
64     * Constructor
65     *
66     * @param array   $servers An array of servers or a single server
67     * @param integer $timeout Timeout in microseconds
68     *
69     * @return void
70     * @throws Net_Gearman_Exception
71     * @see Net_Gearman_Connection
72     */
73    public function __construct($servers, $timeout = 1000)
74    {
75        if (!is_array($servers) && strlen($servers)) {
76            $servers = array($servers);
77        } elseif (is_array($servers) && !count($servers)) {
78            throw new Net_Gearman_Exception('Invalid servers specified');
79        }
80
81        $this->servers = $servers;
82        foreach ($this->servers as $key => $server) {
83            $conn = Net_Gearman_Connection::connect($server, $timeout);
84            if (!Net_Gearman_Connection::isConnected($conn)) {
85                unset($this->servers[$key]);
86                continue;
87            }
88
89            $this->conn[] = $conn;
90        }
91
92        $this->timeout = $timeout;
93    }
94
95    /**
96     * Get a connection to a Gearman server
97     *
98     * @return resource A connection to a Gearman server
99     */
100    protected function getConnection()
101    {
102        return $this->conn[array_rand($this->conn)];
103    }
104
105    /**
106     * Fire off a background task with the given arguments
107     *
108     * @param string $func Name of job to run
109     * @param array  $args First key should be args to send
110     *
111     * @return void
112     * @see Net_Gearman_Task, Net_Gearman_Set
113     */
114    public function __call($func, array $args = array())
115    {
116        $send = "";
117        if (isset($args[0]) && !empty($args[0])) {
118            $send = $args[0];
119        }
120
121        $task       = new Net_Gearman_Task($func, $send);
122        $task->type = Net_Gearman_Task::JOB_BACKGROUND;
123
124        $set = new Net_Gearman_Set();
125        $set->addTask($task);
126        $this->runSet($set);
127        return $task->handle;
128    }
129
130    /**
131     * Submit a task to Gearman
132     *
133     * @param object $task Task to submit to Gearman
134     *
135     * @return      void
136     * @see         Net_Gearman_Task, Net_Gearman_Client::runSet()
137     */
138    protected function submitTask(Net_Gearman_Task $task)
139    {
140        switch ($task->type) {
141        case Net_Gearman_Task::JOB_BACKGROUND:
142            $type = 'submit_job_bg';
143            break;
144        case Net_Gearman_Task::JOB_HIGH:
145            $type = 'submit_job_high';
146            break;
147        default:
148            $type = 'submit_job';
149            break;
150        }
151
152        // if we don't have a scalar
153        // json encode the data
154        if(!is_scalar($task->arg)){
155            $arg = json_encode($task->arg);
156        } else {
157            $arg = $task->arg;
158        }
159
160        $params = array(
161            'func' => $task->func,
162            'uniq' => $task->uniq,
163            'arg'  => $arg
164        );
165
166        $s = $this->getConnection();
167        Net_Gearman_Connection::send($s, $type, $params);
168
169        if (!is_array(Net_Gearman_Connection::$waiting[(int) $s])) {
170            Net_Gearman_Connection::$waiting[(int) $s] = array();
171        }
172
173        array_push(Net_Gearman_Connection::$waiting[(int) $s], $task);
174    }
175
176    /**
177     * Run a set of tasks
178     *
179     * @param object $set A set of tasks to run
180     *
181     * @return void
182     * @see Net_Gearman_Set, Net_Gearman_Task
183     */
184    public function runSet(Net_Gearman_Set $set)
185    {
186        $totalTasks = $set->tasksCount;
187        $taskKeys   = array_keys($set->tasks);
188        $t          = 0;
189
190        while (!$set->finished()) {
191            if ($t < $totalTasks) {
192                $k = $taskKeys[$t];
193                $this->submitTask($set->tasks[$k]);
194                if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND) {
195                    $set->tasks[$k]->finished = true;
196                    $set->tasksCount--;
197                }
198
199                $t++;
200            }
201
202            $write  = null;
203            $except = null;
204            $read   = $this->conn;
205            socket_select($read, $write, $except, 10);
206            foreach ($read as $socket) {
207                $resp = Net_Gearman_Connection::read($socket);
208                if (count($resp)) {
209                    $this->handleResponse($resp, $socket, $set);
210                }
211            }
212        }
213    }
214
215    /**
216     * Handle the response read in
217     *
218     * @param array    $resp  The raw array response
219     * @param resource $s     The socket
220     * @param object   $tasks The tasks being ran
221     *
222     * @return void
223     * @throws Net_Gearman_Exception
224     */
225    protected function handleResponse($resp, $s, Net_Gearman_Set $tasks)
226    {
227        if (isset($resp['data']['handle']) &&
228            $resp['function'] != 'job_created') {
229            $task = $tasks->getTask($resp['data']['handle']);
230        }
231
232        switch ($resp['function']) {
233        case 'work_complete':
234            $tasks->tasksCount--;
235            $task->complete(json_decode($resp['data']['result'], true));
236            break;
237        case 'work_status':
238            $n = (int)$resp['data']['numerator'];
239            $d = (int)$resp['data']['denominator'];
240            $task->status($n, $d);
241            break;
242        case 'work_fail':
243            $tasks->tasksCount--;
244            $task->fail();
245            break;
246        case 'job_created':
247            $task         = array_shift(Net_Gearman_Connection::$waiting[(int) $s]);
248            $task->handle = $resp['data']['handle'];
249            if ($task->type == Net_Gearman_Task::JOB_BACKGROUND) {
250                $task->finished = true;
251            }
252            $tasks->handles[$task->handle] = $task->uniq;
253            break;
254        case 'error':
255            throw new Net_Gearman_Exception('An error occurred');
256        default:
257            throw new Net_Gearman_Exception(
258                'Invalid function ' . $resp['function']
259            );
260        }
261    }
262
263    /**
264     * Disconnect from Gearman
265     *
266     * @return      void
267     */
268    public function disconnect()
269    {
270        if (!is_array($this->conn) || !count($this->conn)) {
271            return;
272        }
273
274        foreach ($this->conn as $conn) {
275            Net_Gearman_Connection::close($conn);
276        }
277    }
278
279    /**
280     * Destructor
281     *
282     * @return      void
283     */
284    public function __destruct()
285    {
286        $this->disconnect();
287    }
288}
289
290?>
291