1<?php
2
3namespace Amp\Parallel\Context\Internal;
4
5use Amp\Loop;
6use Amp\Parallel\Sync\ChannelledSocket;
7use parallel\Events;
8use parallel\Future;
9
10class ParallelHub extends ProcessHub
11{
12    const EXIT_CHECK_FREQUENCY = 250;
13
14    /** @var ChannelledSocket[] */
15    private $channels;
16
17    /** @var string */
18    private $watcher;
19
20    /** @var Events */
21    private $events;
22
23    public function __construct()
24    {
25        parent::__construct();
26
27        $events = $this->events = new Events;
28        $this->events->setBlocking(false);
29
30        $channels = &$this->channels;
31        $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use (&$channels, $events): void {
32            while ($event = $events->poll()) {
33                $id = (int) $event->source;
34                \assert(isset($channels[$id]), 'Channel for context ID not found');
35                $channel = $channels[$id];
36                unset($channels[$id]);
37                $channel->close();
38            }
39        });
40        Loop::disable($this->watcher);
41        Loop::unreference($this->watcher);
42    }
43
44    public function add(int $id, ChannelledSocket $channel, Future $future): void
45    {
46        $this->channels[$id] = $channel;
47        $this->events->addFuture((string) $id, $future);
48
49        Loop::enable($this->watcher);
50    }
51
52    public function remove(int $id): void
53    {
54        if (!isset($this->channels[$id])) {
55            return;
56        }
57
58        unset($this->channels[$id]);
59        $this->events->remove((string) $id);
60
61        if (empty($this->channels)) {
62            Loop::disable($this->watcher);
63        }
64    }
65}
66