1<?php
2
3namespace React\EventLoop;
4
5use Ev;
6use EvIo;
7use EvLoop;
8use React\EventLoop\Tick\FutureTickQueue;
9use React\EventLoop\Timer\Timer;
10use SplObjectStorage;
11
12/**
13 * An `ext-ev` based event loop.
14 *
15 * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev),
16 * that provides an interface to `libev` library.
17 *
18 * This loop is known to work with PHP 5.4 through PHP 7+.
19 *
20 * @see http://php.net/manual/en/book.ev.php
21 * @see https://bitbucket.org/osmanov/pecl-ev/overview
22 */
23class ExtEvLoop implements LoopInterface
24{
25    /**
26     * @var EvLoop
27     */
28    private $loop;
29
30    /**
31     * @var FutureTickQueue
32     */
33    private $futureTickQueue;
34
35    /**
36     * @var SplObjectStorage
37     */
38    private $timers;
39
40    /**
41     * @var EvIo[]
42     */
43    private $readStreams = array();
44
45    /**
46     * @var EvIo[]
47     */
48    private $writeStreams = array();
49
50    /**
51     * @var bool
52     */
53    private $running;
54
55    /**
56     * @var SignalsHandler
57     */
58    private $signals;
59
60    /**
61     * @var \EvSignal[]
62     */
63    private $signalEvents = array();
64
65    public function __construct()
66    {
67        $this->loop = new EvLoop();
68        $this->futureTickQueue = new FutureTickQueue();
69        $this->timers = new SplObjectStorage();
70        $this->signals = new SignalsHandler();
71    }
72
73    public function addReadStream($stream, $listener)
74    {
75        $key = (int)$stream;
76
77        if (isset($this->readStreams[$key])) {
78            return;
79        }
80
81        $callback = $this->getStreamListenerClosure($stream, $listener);
82        $event = $this->loop->io($stream, Ev::READ, $callback);
83        $this->readStreams[$key] = $event;
84    }
85
86    /**
87     * @param resource $stream
88     * @param callable $listener
89     *
90     * @return \Closure
91     */
92    private function getStreamListenerClosure($stream, $listener)
93    {
94        return function () use ($stream, $listener) {
95            \call_user_func($listener, $stream);
96        };
97    }
98
99    public function addWriteStream($stream, $listener)
100    {
101        $key = (int)$stream;
102
103        if (isset($this->writeStreams[$key])) {
104            return;
105        }
106
107        $callback = $this->getStreamListenerClosure($stream, $listener);
108        $event = $this->loop->io($stream, Ev::WRITE, $callback);
109        $this->writeStreams[$key] = $event;
110    }
111
112    public function removeReadStream($stream)
113    {
114        $key = (int)$stream;
115
116        if (!isset($this->readStreams[$key])) {
117            return;
118        }
119
120        $this->readStreams[$key]->stop();
121        unset($this->readStreams[$key]);
122    }
123
124    public function removeWriteStream($stream)
125    {
126        $key = (int)$stream;
127
128        if (!isset($this->writeStreams[$key])) {
129            return;
130        }
131
132        $this->writeStreams[$key]->stop();
133        unset($this->writeStreams[$key]);
134    }
135
136    public function addTimer($interval, $callback)
137    {
138        $timer = new Timer($interval, $callback, false);
139
140        $that = $this;
141        $timers = $this->timers;
142        $callback = function () use ($timer, $timers, $that) {
143            \call_user_func($timer->getCallback(), $timer);
144
145            if ($timers->contains($timer)) {
146                $that->cancelTimer($timer);
147            }
148        };
149
150        $event = $this->loop->timer($timer->getInterval(), 0.0, $callback);
151        $this->timers->attach($timer, $event);
152
153        return $timer;
154    }
155
156    public function addPeriodicTimer($interval, $callback)
157    {
158        $timer = new Timer($interval, $callback, true);
159
160        $callback = function () use ($timer) {
161            \call_user_func($timer->getCallback(), $timer);
162        };
163
164        $event = $this->loop->timer($interval, $interval, $callback);
165        $this->timers->attach($timer, $event);
166
167        return $timer;
168    }
169
170    public function cancelTimer(TimerInterface $timer)
171    {
172        if (!isset($this->timers[$timer])) {
173            return;
174        }
175
176        $event = $this->timers[$timer];
177        $event->stop();
178        $this->timers->detach($timer);
179    }
180
181    public function futureTick($listener)
182    {
183        $this->futureTickQueue->add($listener);
184    }
185
186    public function run()
187    {
188        $this->running = true;
189
190        while ($this->running) {
191            $this->futureTickQueue->tick();
192
193            $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
194            $wasJustStopped = !$this->running;
195            $nothingLeftToDo = !$this->readStreams
196                && !$this->writeStreams
197                && !$this->timers->count()
198                && $this->signals->isEmpty();
199
200            $flags = Ev::RUN_ONCE;
201            if ($wasJustStopped || $hasPendingCallbacks) {
202                $flags |= Ev::RUN_NOWAIT;
203            } elseif ($nothingLeftToDo) {
204                break;
205            }
206
207            $this->loop->run($flags);
208        }
209    }
210
211    public function stop()
212    {
213        $this->running = false;
214    }
215
216    public function __destruct()
217    {
218        /** @var TimerInterface $timer */
219        foreach ($this->timers as $timer) {
220            $this->cancelTimer($timer);
221        }
222
223        foreach ($this->readStreams as $key => $stream) {
224            $this->removeReadStream($key);
225        }
226
227        foreach ($this->writeStreams as $key => $stream) {
228            $this->removeWriteStream($key);
229        }
230    }
231
232    public function addSignal($signal, $listener)
233    {
234        $this->signals->add($signal, $listener);
235
236        if (!isset($this->signalEvents[$signal])) {
237            $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) {
238                $this->signals->call($signal);
239            });
240        }
241    }
242
243    public function removeSignal($signal, $listener)
244    {
245        $this->signals->remove($signal, $listener);
246
247        if (isset($this->signalEvents[$signal])) {
248            $this->signalEvents[$signal]->stop();
249            unset($this->signalEvents[$signal]);
250        }
251    }
252}
253