1<?php 2 3namespace React\EventLoop; 4 5use Ev; 6use EvIo; 7use EvLoop; 8use React\EventLoop\Tick\FutureTickQueue; 9use React\EventLoop\Timer\Timer; 10use SplObjectStorage; 11 12/** 13 * An `ext-ev` based event loop. 14 * 15 * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev), 16 * that provides an interface to `libev` library. 17 * 18 * This loop is known to work with PHP 5.4 through PHP 7+. 19 * 20 * @see http://php.net/manual/en/book.ev.php 21 * @see https://bitbucket.org/osmanov/pecl-ev/overview 22 */ 23class ExtEvLoop implements LoopInterface 24{ 25 /** 26 * @var EvLoop 27 */ 28 private $loop; 29 30 /** 31 * @var FutureTickQueue 32 */ 33 private $futureTickQueue; 34 35 /** 36 * @var SplObjectStorage 37 */ 38 private $timers; 39 40 /** 41 * @var EvIo[] 42 */ 43 private $readStreams = array(); 44 45 /** 46 * @var EvIo[] 47 */ 48 private $writeStreams = array(); 49 50 /** 51 * @var bool 52 */ 53 private $running; 54 55 /** 56 * @var SignalsHandler 57 */ 58 private $signals; 59 60 /** 61 * @var \EvSignal[] 62 */ 63 private $signalEvents = array(); 64 65 public function __construct() 66 { 67 $this->loop = new EvLoop(); 68 $this->futureTickQueue = new FutureTickQueue(); 69 $this->timers = new SplObjectStorage(); 70 $this->signals = new SignalsHandler(); 71 } 72 73 public function addReadStream($stream, $listener) 74 { 75 $key = (int)$stream; 76 77 if (isset($this->readStreams[$key])) { 78 return; 79 } 80 81 $callback = $this->getStreamListenerClosure($stream, $listener); 82 $event = $this->loop->io($stream, Ev::READ, $callback); 83 $this->readStreams[$key] = $event; 84 } 85 86 /** 87 * @param resource $stream 88 * @param callable $listener 89 * 90 * @return \Closure 91 */ 92 private function getStreamListenerClosure($stream, $listener) 93 { 94 return function () use ($stream, $listener) { 95 \call_user_func($listener, $stream); 96 }; 97 } 98 99 public function addWriteStream($stream, $listener) 100 { 101 $key = (int)$stream; 102 103 if (isset($this->writeStreams[$key])) { 104 return; 105 } 106 107 $callback = $this->getStreamListenerClosure($stream, $listener); 108 $event = $this->loop->io($stream, Ev::WRITE, $callback); 109 $this->writeStreams[$key] = $event; 110 } 111 112 public function removeReadStream($stream) 113 { 114 $key = (int)$stream; 115 116 if (!isset($this->readStreams[$key])) { 117 return; 118 } 119 120 $this->readStreams[$key]->stop(); 121 unset($this->readStreams[$key]); 122 } 123 124 public function removeWriteStream($stream) 125 { 126 $key = (int)$stream; 127 128 if (!isset($this->writeStreams[$key])) { 129 return; 130 } 131 132 $this->writeStreams[$key]->stop(); 133 unset($this->writeStreams[$key]); 134 } 135 136 public function addTimer($interval, $callback) 137 { 138 $timer = new Timer($interval, $callback, false); 139 140 $that = $this; 141 $timers = $this->timers; 142 $callback = function () use ($timer, $timers, $that) { 143 \call_user_func($timer->getCallback(), $timer); 144 145 if ($timers->contains($timer)) { 146 $that->cancelTimer($timer); 147 } 148 }; 149 150 $event = $this->loop->timer($timer->getInterval(), 0.0, $callback); 151 $this->timers->attach($timer, $event); 152 153 return $timer; 154 } 155 156 public function addPeriodicTimer($interval, $callback) 157 { 158 $timer = new Timer($interval, $callback, true); 159 160 $callback = function () use ($timer) { 161 \call_user_func($timer->getCallback(), $timer); 162 }; 163 164 $event = $this->loop->timer($interval, $interval, $callback); 165 $this->timers->attach($timer, $event); 166 167 return $timer; 168 } 169 170 public function cancelTimer(TimerInterface $timer) 171 { 172 if (!isset($this->timers[$timer])) { 173 return; 174 } 175 176 $event = $this->timers[$timer]; 177 $event->stop(); 178 $this->timers->detach($timer); 179 } 180 181 public function futureTick($listener) 182 { 183 $this->futureTickQueue->add($listener); 184 } 185 186 public function run() 187 { 188 $this->running = true; 189 190 while ($this->running) { 191 $this->futureTickQueue->tick(); 192 193 $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); 194 $wasJustStopped = !$this->running; 195 $nothingLeftToDo = !$this->readStreams 196 && !$this->writeStreams 197 && !$this->timers->count() 198 && $this->signals->isEmpty(); 199 200 $flags = Ev::RUN_ONCE; 201 if ($wasJustStopped || $hasPendingCallbacks) { 202 $flags |= Ev::RUN_NOWAIT; 203 } elseif ($nothingLeftToDo) { 204 break; 205 } 206 207 $this->loop->run($flags); 208 } 209 } 210 211 public function stop() 212 { 213 $this->running = false; 214 } 215 216 public function __destruct() 217 { 218 /** @var TimerInterface $timer */ 219 foreach ($this->timers as $timer) { 220 $this->cancelTimer($timer); 221 } 222 223 foreach ($this->readStreams as $key => $stream) { 224 $this->removeReadStream($key); 225 } 226 227 foreach ($this->writeStreams as $key => $stream) { 228 $this->removeWriteStream($key); 229 } 230 } 231 232 public function addSignal($signal, $listener) 233 { 234 $this->signals->add($signal, $listener); 235 236 if (!isset($this->signalEvents[$signal])) { 237 $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) { 238 $this->signals->call($signal); 239 }); 240 } 241 } 242 243 public function removeSignal($signal, $listener) 244 { 245 $this->signals->remove($signal, $listener); 246 247 if (isset($this->signalEvents[$signal])) { 248 $this->signalEvents[$signal]->stop(); 249 unset($this->signalEvents[$signal]); 250 } 251 } 252} 253