1<?php
2
3/**
4 * Interface for Danga's Gearman job scheduling system
5 *
6 * PHP version 5.1.0+
7 *
8 * LICENSE: This source file is subject to the New BSD license that is
9 * available through the world-wide-web at the following URI:
10 * http://www.opensource.org/licenses/bsd-license.php. If you did not receive
11 * a copy of the New BSD License and are unable to obtain it through the web,
12 * please send a note to license@php.net so we can mail you a copy immediately.
13 *
14 * @category  Net
15 * @package   Net_Gearman
16 * @author    Joe Stump <joe@joestump.net>
17 * @copyright 2007-2008 Digg.com, Inc.
18 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
19 * @version   CVS: $Id$
20 * @link      http://pear.php.net/package/Net_Gearman
21 * @link      http://www.danga.com/gearman/
22 */
23
24require_once 'Net/Gearman/Connection.php';
25require_once 'Net/Gearman/Job.php';
26
27/**
28 * Gearman worker class
29 *
30 * Run an instance of a worker to listen for jobs. It then manages the running
31 * of jobs, etc.
32 *
33 * <code>
34 * <?php
35 *
36 * $servers = array(
37 *     '127.0.0.1:7003',
38 *     '127.0.0.1:7004'
39 * );
40 *
41 * $abilities = array('HelloWorld', 'Foo', 'Bar');
42 *
43 * try {
44 *     $worker = new Net_Gearman_Worker($servers);
45 *     foreach ($abilities as $ability) {
46 *         $worker->addAbility('HelloWorld');
47 *     }
48 *     $worker->beginWork();
49 * } catch (Net_Gearman_Exception $e) {
50 *     echo $e->getMessage() . "\n";
51 *     exit;
52 * }
53 *
54 * ?>
55 * </code>
56 *
57 * @category  Net
58 * @package   Net_Gearman
59 * @author    Joe Stump <joe@joestump.net>
60 * @copyright 2007-2008 Digg.com, Inc.
61 * @license   http://www.opensource.org/licenses/bsd-license.php New BSD License
62 * @link      http://www.danga.com/gearman/
63 * @see       Net_Gearman_Job, Net_Gearman_Connection
64 */
65class Net_Gearman_Worker
66{
67    /**
68     * Pool of connections to Gearman servers
69     *
70     * @var array $conn
71     */
72    protected $conn = array();
73
74    /**
75     * Pool of retry connections
76     *
77     * @var array $conn
78     */
79    protected $retryConn = array();
80
81    /**
82     * Pool of worker abilities
83     *
84     * @var array $conn
85     */
86    protected $abilities = array();
87
88
89    /**
90     * Callbacks registered for this worker
91     *
92     * @var array $callback
93     * @see Net_Gearman_Worker::JOB_START
94     * @see Net_Gearman_Worker::JOB_COMPLETE
95     * @see Net_Gearman_Worker::JOB_FAIL
96     */
97    protected $callback = array(
98        self::JOB_START     => array(),
99        self::JOB_COMPLETE  => array(),
100        self::JOB_FAIL      => array()
101    );
102
103    /**
104     * Unique id for this worker
105     *
106     * @var string $id
107     */
108    protected $id = "";
109
110
111    /**
112     * Callback types
113     *
114     * @const integer JOB_START    Ran when a job is started
115     * @const integer JOB_COMPLETE Ran when a job is finished
116     * @const integer JOB_FAIL     Ran when a job fails
117     */
118    const JOB_START    = 1;
119    const JOB_COMPLETE = 2;
120    const JOB_FAIL     = 3;
121
122    /**
123     * Constructor
124     *
125     * @param array $servers List of servers to connect to
126     * @param string $id     Optional unique id for this worker
127     *
128     * @return void
129     * @throws Net_Gearman_Exception
130     * @see Net_Gearman_Connection
131     */
132    public function __construct($servers, $id = "")
133    {
134        if (!is_array($servers) && strlen($servers)) {
135            $servers = array($servers);
136        } elseif (is_array($servers) && !count($servers)) {
137            throw new Net_Gearman_Exception('Invalid servers specified');
138        }
139
140        if(empty($id)){
141            $id = "pid_".getmypid()."_".uniqid();
142        }
143
144        $this->id = $id;
145
146        foreach ($servers as $s) {
147            try {
148                $conn = Net_Gearman_Connection::connect($s);
149
150                Net_Gearman_Connection::send($conn, "set_client_id", array("client_id" => $this->id));
151
152                $this->conn[$s] = $conn;
153
154            } catch (Net_Gearman_Exception $e) {
155
156                $this->retryConn[$s] = time();
157            }
158        }
159
160        if (empty($this->conn)) {
161            throw new Net_Gearman_Exception(
162                "Couldn't connect to any available servers"
163            );
164        }
165    }
166
167    /**
168     * Announce an ability to the job server
169     *
170     * @param string  $ability Name of functcion/ability
171     * @param integer $timeout How long to give this job
172     *
173     * @return void
174     * @see Net_Gearman_Connection::send()
175     */
176    public function addAbility($ability, $timeout = null)
177    {
178        $call   = 'can_do';
179        $params = array('func' => $ability);
180        if (is_int($timeout) && $timeout > 0) {
181            $params['timeout'] = $timeout;
182            $call              = 'can_do_timeout';
183        }
184
185        $this->abilities[$ability] = $timeout;
186
187        foreach ($this->conn as $conn) {
188            Net_Gearman_Connection::send($conn, $call, $params);
189        }
190    }
191
192    /**
193     * Begin working
194     *
195     * This starts the worker on its journey of actually working. The first
196     * argument is a PHP callback to a function that can be used to monitor
197     * the worker. If no callback is provided then the worker works until it
198     * is killed. The monitor is passed two arguments; whether or not the
199     * worker is idle and when the last job was ran.
200     *
201     * @param callback $monitor Function to monitor work
202     *
203     * @return void
204     * @see Net_Gearman_Connection::send(), Net_Gearman_Connection::connect()
205     * @see Net_Gearman_Worker::doWork(), Net_Gearman_Worker::addAbility()
206     */
207    public function beginWork($monitor = null)
208    {
209        if (!is_callable($monitor)) {
210            $monitor = array($this, 'stopWork');
211        }
212
213        $write     = null;
214        $except    = null;
215        $working   = true;
216        $lastJob   = time();
217        $retryTime = 5;
218
219        while ($working) {
220            $sleep = true;
221            $currentTime = time();
222
223            foreach ($this->conn as $server => $socket) {
224                try {
225                    $worked = $this->doWork($socket);
226                } catch (Net_Gearman_Exception $e) {
227                    unset($this->conn[$server]);
228                    $this->retryConn[$server] = $currentTime;
229                }
230                if ($worked) {
231                    $lastJob = time();
232                    $sleep   = false;
233                }
234            }
235
236            $idle = false;
237            if ($sleep && count($this->conn)) {
238                foreach ($this->conn as $socket) {
239                    Net_Gearman_Connection::send($socket, 'pre_sleep');
240                }
241
242                $read = $this->conn;
243                socket_select($read, $write, $except, 60);
244                $idle = (count($read) == 0);
245            }
246
247            $retryChange = false;
248            foreach ($this->retryConn as $s => $lastTry) {
249                if (($lastTry + $retryTime) < $currentTime) {
250                    try {
251                        $conn = Net_Gearman_Connection::connect($s);
252                        $this->conn[$s]         = $conn;
253                        $retryChange            = true;
254                        unset($this->retryConn[$s]);
255                        Net_Gearman_Connection::send($conn, "set_client_id", array("client_id" => $this->id));
256                    } catch (Net_Gearman_Exception $e) {
257                        $this->retryConn[$s] = $currentTime;
258                    }
259                }
260            }
261
262            if (count($this->conn) == 0) {
263                // sleep to avoid wasted cpu cycles if no connections to block on using socket_select
264                sleep(1);
265            }
266
267            if ($retryChange === true) {
268                // broadcast all abilities to all servers
269                foreach ($this->abilities as $ability => $timeout) {
270                    $this->addAbility($ability, $timeout);
271                }
272            }
273
274            if (call_user_func($monitor, $idle, $lastJob) == true) {
275                $working = false;
276            }
277        }
278    }
279
280    /**
281     * Listen on the socket for work
282     *
283     * Sends the 'grab_job' command and then listens for either the 'noop' or
284     * the 'no_job' command to come back. If the 'job_assign' comes down the
285     * pipe then we run that job.
286     *
287     * @param resource $socket The socket to work on
288     *
289     * @return boolean Returns true if work was done, false if not
290     * @throws Net_Gearman_Exception
291     * @see Net_Gearman_Connection::send()
292     */
293    protected function doWork($socket)
294    {
295        Net_Gearman_Connection::send($socket, 'grab_job');
296
297        $resp = array('function' => 'noop');
298        while (count($resp) && $resp['function'] == 'noop') {
299            $resp = Net_Gearman_Connection::blockingRead($socket);
300        }
301
302        if (in_array($resp['function'], array('noop', 'no_job'))) {
303            return false;
304        }
305
306        if ($resp['function'] != 'job_assign') {
307            throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
308        }
309
310        $name   = $resp['data']['func'];
311        $handle = $resp['data']['handle'];
312        $arg    = array();
313
314        if (isset($resp['data']['arg']) &&
315            Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
316            $arg = json_decode($resp['data']['arg'], true);
317            if($arg === null){
318                $arg = $resp['data']['arg'];
319            }
320        }
321
322        $job = Net_Gearman_Job::factory($name, $socket, $handle);
323        try {
324            $this->start($handle, $name, $arg);
325            $res = $job->run($arg);
326            if (!is_array($res)) {
327                $res = array('result' => $res);
328            }
329
330            $job->complete($res);
331            $this->complete($handle, $name, $res);
332        } catch (Net_Gearman_Job_Exception $e) {
333            $job->fail();
334            $this->fail($handle, $name, $e);
335        }
336
337        // Force the job's destructor to run
338        $job = null;
339
340        return true;
341    }
342
343    /**
344     * Attach a callback
345     *
346     * @param callback $callback A valid PHP callback
347     * @param integer  $type     Type of callback
348     *
349     * @return void
350     * @throws Net_Gearman_Exception
351     */
352    public function attachCallback($callback, $type = self::JOB_COMPLETE)
353    {
354        if (!is_callable($callback)) {
355            throw new Net_Gearman_Exception('Invalid callback specified');
356        }
357
358        $this->callback[$type][] = $callback;
359    }
360
361    /**
362     * Run the job start callbacks
363     *
364     * @param string $handle The job's Gearman handle
365     * @param string $job    The name of the job
366     * @param mixed  $args   The job's argument list
367     *
368     * @return void
369     */
370    protected function start($handle, $job, $args)
371    {
372        if (!count($this->callback[self::JOB_START])) {
373            return; // No callbacks to run
374        }
375
376        foreach ($this->callback[self::JOB_START] as $callback) {
377            call_user_func($callback, $handle, $job, $args);
378        }
379    }
380
381    /**
382     * Run the complete callbacks
383     *
384     * @param string $handle The job's Gearman handle
385     * @param string $job    The name of the job
386     * @param array  $result The job's returned result
387     *
388     * @return void
389     */
390    protected function complete($handle, $job, array $result)
391    {
392        if (!count($this->callback[self::JOB_COMPLETE])) {
393            return; // No callbacks to run
394        }
395
396        foreach ($this->callback[self::JOB_COMPLETE] as $callback) {
397            call_user_func($callback, $handle, $job, $result);
398        }
399    }
400
401    /**
402     * Run the fail callbacks
403     *
404     * @param string $handle The job's Gearman handle
405     * @param string $job    The name of the job
406     * @param object $error  The exception thrown
407     *
408     * @return void
409     */
410    protected function fail($handle, $job, PEAR_Exception $error)
411    {
412        if (!count($this->callback[self::JOB_FAIL])) {
413            return; // No callbacks to run
414        }
415
416        foreach ($this->callback[self::JOB_FAIL] as $callback) {
417            call_user_func($callback, $handle, $job, $error);
418        }
419    }
420
421    /**
422     * Stop working
423     *
424     * @return void
425     */
426    public function endWork()
427    {
428        foreach ($this->conn as $conn) {
429            Net_Gearman_Connection::close($conn);
430        }
431    }
432
433    /**
434     * Destructor
435     *
436     * @return void
437     * @see Net_Gearman_Worker::stop()
438     */
439    public function __destruct()
440    {
441        $this->endWork();
442    }
443
444    /**
445     * Should we stop work?
446     *
447     * @return boolean
448     */
449    public function stopWork()
450    {
451        return false;
452    }
453}
454
455?>
456