1<?php 2 3namespace Amp\Parallel\Context\Internal; 4 5use Amp\Loop; 6use Amp\Parallel\Sync\ChannelledSocket; 7use parallel\Events; 8use parallel\Future; 9 10class ParallelHub extends ProcessHub 11{ 12 const EXIT_CHECK_FREQUENCY = 250; 13 14 /** @var ChannelledSocket[] */ 15 private $channels; 16 17 /** @var string */ 18 private $watcher; 19 20 /** @var Events */ 21 private $events; 22 23 public function __construct() 24 { 25 parent::__construct(); 26 27 $events = $this->events = new Events; 28 $this->events->setBlocking(false); 29 30 $channels = &$this->channels; 31 $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use (&$channels, $events): void { 32 while ($event = $events->poll()) { 33 $id = (int) $event->source; 34 \assert(isset($channels[$id]), 'Channel for context ID not found'); 35 $channel = $channels[$id]; 36 unset($channels[$id]); 37 $channel->close(); 38 } 39 }); 40 Loop::disable($this->watcher); 41 Loop::unreference($this->watcher); 42 } 43 44 public function add(int $id, ChannelledSocket $channel, Future $future): void 45 { 46 $this->channels[$id] = $channel; 47 $this->events->addFuture((string) $id, $future); 48 49 Loop::enable($this->watcher); 50 } 51 52 public function remove(int $id): void 53 { 54 if (!isset($this->channels[$id])) { 55 return; 56 } 57 58 unset($this->channels[$id]); 59 $this->events->remove((string) $id); 60 61 if (empty($this->channels)) { 62 Loop::disable($this->watcher); 63 } 64 } 65} 66