1<?php
2
3namespace Amp\Internal;
4
5use Amp\Deferred;
6use Amp\Failure;
7use Amp\Promise;
8use Amp\Success;
9use React\Promise\PromiseInterface as ReactPromise;
10
11/**
12 * Trait used by Iterator implementations. Do not use this trait in your code, instead compose your class from one of
13 * the available classes implementing \Amp\Iterator.
14 * Note that it is the responsibility of the user of this trait to ensure that listeners have a chance to listen first
15 * before emitting values.
16 *
17 * @internal
18 * @template-covariant TValue
19 */
20trait Producer
21{
22    /** @var Promise|null */
23    private $complete;
24
25    /** @var mixed[] */
26    private $values = [];
27
28    /** @var Deferred[] */
29    private $backPressure = [];
30
31    /** @var int */
32    private $consumePosition = -1;
33
34    /** @var int */
35    private $emitPosition = -1;
36
37    /** @var Deferred|null */
38    private $waiting;
39
40    /** @var null|array */
41    private $resolutionTrace;
42
43    /**
44     * {@inheritdoc}
45     *
46     * @return Promise<bool>
47     */
48    public function advance(): Promise
49    {
50        if ($this->waiting !== null) {
51            throw new \Error("The prior promise returned must resolve before invoking this method again");
52        }
53
54        unset($this->values[$this->consumePosition]);
55
56        $position = ++$this->consumePosition;
57
58        if (\array_key_exists($position, $this->values)) {
59            \assert(isset($this->backPressure[$position]));
60            $deferred = $this->backPressure[$position];
61            unset($this->backPressure[$position]);
62            $deferred->resolve();
63
64            return new Success(true);
65        }
66
67        if ($this->complete) {
68            return $this->complete;
69        }
70
71        $this->waiting = new Deferred;
72
73        return $this->waiting->promise();
74    }
75
76    /**
77     * {@inheritdoc}
78     *
79     * @return TValue
80     */
81    public function getCurrent()
82    {
83        if (empty($this->values) && $this->complete) {
84            throw new \Error("The iterator has completed");
85        }
86
87        if (!\array_key_exists($this->consumePosition, $this->values)) {
88            throw new \Error("Promise returned from advance() must resolve before calling this method");
89        }
90
91        return $this->values[$this->consumePosition];
92    }
93
94    /**
95     * Emits a value from the iterator. The returned promise is resolved once the emitted value has been consumed.
96     *
97     * @param mixed $value
98     *
99     * @return Promise
100     * @psalm-return Promise<null>
101     *
102     * @throws \Error If the iterator has completed.
103     */
104    private function emit($value): Promise
105    {
106        if ($this->complete) {
107            throw new \Error("Iterators cannot emit values after calling complete");
108        }
109
110        if ($value instanceof ReactPromise) {
111            $value = Promise\adapt($value);
112        }
113
114        if ($value instanceof Promise) {
115            $deferred = new Deferred;
116            $value->onResolve(function ($e, $v) use ($deferred) {
117                if ($this->complete) {
118                    $deferred->fail(
119                        new \Error("The iterator was completed before the promise result could be emitted")
120                    );
121                    return;
122                }
123
124                if ($e) {
125                    $this->fail($e);
126                    $deferred->fail($e);
127                    return;
128                }
129
130                $deferred->resolve($this->emit($v));
131            });
132
133            return $deferred->promise();
134        }
135
136        $position = ++$this->emitPosition;
137
138        $this->values[$position] = $value;
139
140        if ($this->waiting !== null) {
141            $waiting = $this->waiting;
142            $this->waiting = null;
143            $waiting->resolve(true);
144            return new Success; // Consumer was already waiting for a new value, so back-pressure is unnecessary.
145        }
146
147        $this->backPressure[$position] = $pressure = new Deferred;
148
149        return $pressure->promise();
150    }
151
152    /**
153     * Completes the iterator.
154     *
155     * @return void
156     *
157     * @throws \Error If the iterator has already been completed.
158     */
159    private function complete()
160    {
161        if ($this->complete) {
162            $message = "Iterator has already been completed";
163
164            if (isset($this->resolutionTrace)) {
165                $trace = formatStacktrace($this->resolutionTrace);
166                $message .= ". Previous completion trace:\n\n{$trace}\n\n";
167            } else {
168                // @codeCoverageIgnoreStart
169                $message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
170                    . "for a stacktrace of the previous resolution.";
171                // @codeCoverageIgnoreEnd
172            }
173
174            throw new \Error($message);
175        }
176
177        \assert((function () {
178            $env = \getenv("AMP_DEBUG") ?: "0";
179            if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) {
180                $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
181                \array_shift($trace); // remove current closure
182                $this->resolutionTrace = $trace;
183            }
184
185            return true;
186        })());
187
188        $this->complete = new Success(false);
189
190        if ($this->waiting !== null) {
191            $waiting = $this->waiting;
192            $this->waiting = null;
193            $waiting->resolve($this->complete);
194        }
195    }
196
197    /**
198     * @param \Throwable $exception
199     *
200     * @return void
201     */
202    private function fail(\Throwable $exception)
203    {
204        $this->complete = new Failure($exception);
205
206        if ($this->waiting !== null) {
207            $waiting = $this->waiting;
208            $this->waiting = null;
209            $waiting->resolve($this->complete);
210        }
211    }
212}
213