1<?php
2
3namespace GuzzleHttp\Promise;
4
5/**
6 * Represents a promise that iterates over many promises and invokes
7 * side-effect functions in the process.
8 */
9class EachPromise implements PromisorInterface
10{
11    private $pending = [];
12
13    private $nextPendingIndex = 0;
14
15    /** @var \Iterator|null */
16    private $iterable;
17
18    /** @var callable|int|null */
19    private $concurrency;
20
21    /** @var callable|null */
22    private $onFulfilled;
23
24    /** @var callable|null */
25    private $onRejected;
26
27    /** @var Promise|null */
28    private $aggregate;
29
30    /** @var bool|null */
31    private $mutex;
32
33    /**
34     * Configuration hash can include the following key value pairs:
35     *
36     * - fulfilled: (callable) Invoked when a promise fulfills. The function
37     *   is invoked with three arguments: the fulfillment value, the index
38     *   position from the iterable list of the promise, and the aggregate
39     *   promise that manages all of the promises. The aggregate promise may
40     *   be resolved from within the callback to short-circuit the promise.
41     * - rejected: (callable) Invoked when a promise is rejected. The
42     *   function is invoked with three arguments: the rejection reason, the
43     *   index position from the iterable list of the promise, and the
44     *   aggregate promise that manages all of the promises. The aggregate
45     *   promise may be resolved from within the callback to short-circuit
46     *   the promise.
47     * - concurrency: (integer) Pass this configuration option to limit the
48     *   allowed number of outstanding concurrently executing promises,
49     *   creating a capped pool of promises. There is no limit by default.
50     *
51     * @param mixed $iterable Promises or values to iterate.
52     * @param array $config   Configuration options
53     */
54    public function __construct($iterable, array $config = [])
55    {
56        $this->iterable = Create::iterFor($iterable);
57
58        if (isset($config['concurrency'])) {
59            $this->concurrency = $config['concurrency'];
60        }
61
62        if (isset($config['fulfilled'])) {
63            $this->onFulfilled = $config['fulfilled'];
64        }
65
66        if (isset($config['rejected'])) {
67            $this->onRejected = $config['rejected'];
68        }
69    }
70
71    /** @psalm-suppress InvalidNullableReturnType */
72    public function promise()
73    {
74        if ($this->aggregate) {
75            return $this->aggregate;
76        }
77
78        try {
79            $this->createPromise();
80            /** @psalm-assert Promise $this->aggregate */
81            $this->iterable->rewind();
82            if (!$this->checkIfFinished()) {
83                $this->refillPending();
84            }
85        } catch (\Throwable $e) {
86            /**
87             * @psalm-suppress NullReference
88             * @phpstan-ignore-next-line
89             */
90            $this->aggregate->reject($e);
91        } catch (\Exception $e) {
92            /**
93             * @psalm-suppress NullReference
94             * @phpstan-ignore-next-line
95             */
96            $this->aggregate->reject($e);
97        }
98
99        /**
100         * @psalm-suppress NullableReturnStatement
101         * @phpstan-ignore-next-line
102         */
103        return $this->aggregate;
104    }
105
106    private function createPromise()
107    {
108        $this->mutex = false;
109        $this->aggregate = new Promise(function () {
110            reset($this->pending);
111            // Consume a potentially fluctuating list of promises while
112            // ensuring that indexes are maintained (precluding array_shift).
113            while ($promise = current($this->pending)) {
114                next($this->pending);
115                $promise->wait();
116                if (Is::settled($this->aggregate)) {
117                    return;
118                }
119            }
120        });
121
122        // Clear the references when the promise is resolved.
123        $clearFn = function () {
124            $this->iterable = $this->concurrency = $this->pending = null;
125            $this->onFulfilled = $this->onRejected = null;
126            $this->nextPendingIndex = 0;
127        };
128
129        $this->aggregate->then($clearFn, $clearFn);
130    }
131
132    private function refillPending()
133    {
134        if (!$this->concurrency) {
135            // Add all pending promises.
136            while ($this->addPending() && $this->advanceIterator());
137            return;
138        }
139
140        // Add only up to N pending promises.
141        $concurrency = is_callable($this->concurrency)
142            ? call_user_func($this->concurrency, count($this->pending))
143            : $this->concurrency;
144        $concurrency = max($concurrency - count($this->pending), 0);
145        // Concurrency may be set to 0 to disallow new promises.
146        if (!$concurrency) {
147            return;
148        }
149        // Add the first pending promise.
150        $this->addPending();
151        // Note this is special handling for concurrency=1 so that we do
152        // not advance the iterator after adding the first promise. This
153        // helps work around issues with generators that might not have the
154        // next value to yield until promise callbacks are called.
155        while (--$concurrency
156            && $this->advanceIterator()
157            && $this->addPending());
158    }
159
160    private function addPending()
161    {
162        if (!$this->iterable || !$this->iterable->valid()) {
163            return false;
164        }
165
166        $promise = Create::promiseFor($this->iterable->current());
167        $key = $this->iterable->key();
168
169        // Iterable keys may not be unique, so we use a counter to
170        // guarantee uniqueness
171        $idx = $this->nextPendingIndex++;
172
173        $this->pending[$idx] = $promise->then(
174            function ($value) use ($idx, $key) {
175                if ($this->onFulfilled) {
176                    call_user_func(
177                        $this->onFulfilled,
178                        $value,
179                        $key,
180                        $this->aggregate
181                    );
182                }
183                $this->step($idx);
184            },
185            function ($reason) use ($idx, $key) {
186                if ($this->onRejected) {
187                    call_user_func(
188                        $this->onRejected,
189                        $reason,
190                        $key,
191                        $this->aggregate
192                    );
193                }
194                $this->step($idx);
195            }
196        );
197
198        return true;
199    }
200
201    private function advanceIterator()
202    {
203        // Place a lock on the iterator so that we ensure to not recurse,
204        // preventing fatal generator errors.
205        if ($this->mutex) {
206            return false;
207        }
208
209        $this->mutex = true;
210
211        try {
212            $this->iterable->next();
213            $this->mutex = false;
214            return true;
215        } catch (\Throwable $e) {
216            $this->aggregate->reject($e);
217            $this->mutex = false;
218            return false;
219        } catch (\Exception $e) {
220            $this->aggregate->reject($e);
221            $this->mutex = false;
222            return false;
223        }
224    }
225
226    private function step($idx)
227    {
228        // If the promise was already resolved, then ignore this step.
229        if (Is::settled($this->aggregate)) {
230            return;
231        }
232
233        unset($this->pending[$idx]);
234
235        // Only refill pending promises if we are not locked, preventing the
236        // EachPromise to recursively invoke the provided iterator, which
237        // cause a fatal error: "Cannot resume an already running generator"
238        if ($this->advanceIterator() && !$this->checkIfFinished()) {
239            // Add more pending promises if possible.
240            $this->refillPending();
241        }
242    }
243
244    private function checkIfFinished()
245    {
246        if (!$this->pending && !$this->iterable->valid()) {
247            // Resolve the promise if there's nothing left to do.
248            $this->aggregate->resolve(null);
249            return true;
250        }
251
252        return false;
253    }
254}
255