1<?php 2 3namespace Amp\Internal; 4 5use Amp\Deferred; 6use Amp\Failure; 7use Amp\Promise; 8use Amp\Success; 9use React\Promise\PromiseInterface as ReactPromise; 10 11/** 12 * Trait used by Iterator implementations. Do not use this trait in your code, instead compose your class from one of 13 * the available classes implementing \Amp\Iterator. 14 * Note that it is the responsibility of the user of this trait to ensure that listeners have a chance to listen first 15 * before emitting values. 16 * 17 * @internal 18 * @template-covariant TValue 19 */ 20trait Producer 21{ 22 /** @var Promise|null */ 23 private $complete; 24 25 /** @var mixed[] */ 26 private $values = []; 27 28 /** @var Deferred[] */ 29 private $backPressure = []; 30 31 /** @var int */ 32 private $consumePosition = -1; 33 34 /** @var int */ 35 private $emitPosition = -1; 36 37 /** @var Deferred|null */ 38 private $waiting; 39 40 /** @var null|array */ 41 private $resolutionTrace; 42 43 /** 44 * {@inheritdoc} 45 * 46 * @return Promise<bool> 47 */ 48 public function advance(): Promise 49 { 50 if ($this->waiting !== null) { 51 throw new \Error("The prior promise returned must resolve before invoking this method again"); 52 } 53 54 unset($this->values[$this->consumePosition]); 55 56 $position = ++$this->consumePosition; 57 58 if (\array_key_exists($position, $this->values)) { 59 \assert(isset($this->backPressure[$position])); 60 $deferred = $this->backPressure[$position]; 61 unset($this->backPressure[$position]); 62 $deferred->resolve(); 63 64 return new Success(true); 65 } 66 67 if ($this->complete) { 68 return $this->complete; 69 } 70 71 $this->waiting = new Deferred; 72 73 return $this->waiting->promise(); 74 } 75 76 /** 77 * {@inheritdoc} 78 * 79 * @return TValue 80 */ 81 public function getCurrent() 82 { 83 if (empty($this->values) && $this->complete) { 84 throw new \Error("The iterator has completed"); 85 } 86 87 if (!\array_key_exists($this->consumePosition, $this->values)) { 88 throw new \Error("Promise returned from advance() must resolve before calling this method"); 89 } 90 91 return $this->values[$this->consumePosition]; 92 } 93 94 /** 95 * Emits a value from the iterator. The returned promise is resolved once the emitted value has been consumed. 96 * 97 * @param mixed $value 98 * 99 * @return Promise 100 * @psalm-return Promise<null> 101 * 102 * @throws \Error If the iterator has completed. 103 */ 104 private function emit($value): Promise 105 { 106 if ($this->complete) { 107 throw new \Error("Iterators cannot emit values after calling complete"); 108 } 109 110 if ($value instanceof ReactPromise) { 111 $value = Promise\adapt($value); 112 } 113 114 if ($value instanceof Promise) { 115 $deferred = new Deferred; 116 $value->onResolve(function ($e, $v) use ($deferred) { 117 if ($this->complete) { 118 $deferred->fail( 119 new \Error("The iterator was completed before the promise result could be emitted") 120 ); 121 return; 122 } 123 124 if ($e) { 125 $this->fail($e); 126 $deferred->fail($e); 127 return; 128 } 129 130 $deferred->resolve($this->emit($v)); 131 }); 132 133 return $deferred->promise(); 134 } 135 136 $position = ++$this->emitPosition; 137 138 $this->values[$position] = $value; 139 140 if ($this->waiting !== null) { 141 $waiting = $this->waiting; 142 $this->waiting = null; 143 $waiting->resolve(true); 144 return new Success; // Consumer was already waiting for a new value, so back-pressure is unnecessary. 145 } 146 147 $this->backPressure[$position] = $pressure = new Deferred; 148 149 return $pressure->promise(); 150 } 151 152 /** 153 * Completes the iterator. 154 * 155 * @return void 156 * 157 * @throws \Error If the iterator has already been completed. 158 */ 159 private function complete() 160 { 161 if ($this->complete) { 162 $message = "Iterator has already been completed"; 163 164 if (isset($this->resolutionTrace)) { 165 $trace = formatStacktrace($this->resolutionTrace); 166 $message .= ". Previous completion trace:\n\n{$trace}\n\n"; 167 } else { 168 // @codeCoverageIgnoreStart 169 $message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions " 170 . "for a stacktrace of the previous resolution."; 171 // @codeCoverageIgnoreEnd 172 } 173 174 throw new \Error($message); 175 } 176 177 \assert((function () { 178 $env = \getenv("AMP_DEBUG") ?: "0"; 179 if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) { 180 $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); 181 \array_shift($trace); // remove current closure 182 $this->resolutionTrace = $trace; 183 } 184 185 return true; 186 })()); 187 188 $this->complete = new Success(false); 189 190 if ($this->waiting !== null) { 191 $waiting = $this->waiting; 192 $this->waiting = null; 193 $waiting->resolve($this->complete); 194 } 195 } 196 197 /** 198 * @param \Throwable $exception 199 * 200 * @return void 201 */ 202 private function fail(\Throwable $exception) 203 { 204 $this->complete = new Failure($exception); 205 206 if ($this->waiting !== null) { 207 $waiting = $this->waiting; 208 $this->waiting = null; 209 $waiting->resolve($this->complete); 210 } 211 } 212} 213