1<?php
2/*
3 * Copyright 2019 MongoDB, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18namespace MongoDB\Model;
19
20use IteratorIterator;
21use MongoDB\BSON\Serializable;
22use MongoDB\Driver\Cursor;
23use MongoDB\Driver\Monitoring\CommandFailedEvent;
24use MongoDB\Driver\Monitoring\CommandStartedEvent;
25use MongoDB\Driver\Monitoring\CommandSubscriber;
26use MongoDB\Driver\Monitoring\CommandSucceededEvent;
27use MongoDB\Driver\Server;
28use MongoDB\Exception\InvalidArgumentException;
29use MongoDB\Exception\ResumeTokenException;
30use MongoDB\Exception\UnexpectedValueException;
31use function count;
32use function is_array;
33use function is_integer;
34use function is_object;
35use function MongoDB\Driver\Monitoring\addSubscriber;
36use function MongoDB\Driver\Monitoring\removeSubscriber;
37
38/**
39 * ChangeStreamIterator wraps a change stream's tailable cursor.
40 *
41 * This iterator tracks the size of each batch in order to determine when the
42 * postBatchResumeToken is applicable. It also ensures that initial calls to
43 * rewind() do not execute getMore commands.
44 *
45 * @internal
46 */
47class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
48{
49    /** @var integer */
50    private $batchPosition = 0;
51
52    /** @var integer */
53    private $batchSize;
54
55    /** @var boolean */
56    private $isRewindNop;
57
58    /** @var boolean */
59    private $isValid = false;
60
61    /** @var object|null */
62    private $postBatchResumeToken;
63
64    /** @var array|object|null */
65    private $resumeToken;
66
67    /** @var Server */
68    private $server;
69
70    /**
71     * @internal
72     * @param Cursor            $cursor
73     * @param integer           $firstBatchSize
74     * @param array|object|null $initialResumeToken
75     * @param object|null       $postBatchResumeToken
76     */
77    public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
78    {
79        if (! is_integer($firstBatchSize)) {
80            throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
81        }
82
83        if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
84            throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
85        }
86
87        if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
88            throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
89        }
90
91        parent::__construct($cursor);
92
93        $this->batchSize = $firstBatchSize;
94        $this->isRewindNop = ($firstBatchSize === 0);
95        $this->postBatchResumeToken = $postBatchResumeToken;
96        $this->resumeToken = $initialResumeToken;
97        $this->server = $cursor->getServer();
98    }
99
100    /** @internal */
101    final public function commandFailed(CommandFailedEvent $event)
102    {
103    }
104
105    /** @internal */
106    final public function commandStarted(CommandStartedEvent $event)
107    {
108        if ($event->getCommandName() !== 'getMore') {
109            return;
110        }
111
112        $this->batchPosition = 0;
113        $this->batchSize = null;
114        $this->postBatchResumeToken = null;
115    }
116
117    /** @internal */
118    final public function commandSucceeded(CommandSucceededEvent $event)
119    {
120        if ($event->getCommandName() !== 'getMore') {
121            return;
122        }
123
124        $reply = $event->getReply();
125
126        if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
127            throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
128        }
129
130        $this->batchSize = count($reply->cursor->nextBatch);
131
132        if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
133            $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
134        }
135    }
136
137    /**
138     * @see https://php.net/iteratoriterator.current
139     * @return mixed
140     */
141    public function current()
142    {
143        return $this->isValid ? parent::current() : null;
144    }
145
146    /**
147     * Returns the resume token for the iterator's current position.
148     *
149     * Null may be returned if no change documents have been iterated and the
150     * server did not include a postBatchResumeToken in its aggregate or getMore
151     * command response.
152     *
153     * @return array|object|null
154     */
155    public function getResumeToken()
156    {
157        return $this->resumeToken;
158    }
159
160    /**
161     * Returns the server the cursor is running on.
162     */
163    public function getServer() : Server
164    {
165        return $this->server;
166    }
167
168    /**
169     * @see https://php.net/iteratoriterator.key
170     * @return mixed
171     */
172    public function key()
173    {
174        return $this->isValid ? parent::key() : null;
175    }
176
177    /**
178     * @see https://php.net/iteratoriterator.rewind
179     * @return void
180     */
181    public function next()
182    {
183        /* Determine if advancing the iterator will execute a getMore command
184         * (i.e. we are already positioned at the end of the current batch). If
185         * so, rely on the APM callbacks to reset $batchPosition and update
186         * $batchSize. Otherwise, we can forgo APM and manually increment
187         * $batchPosition after calling next(). */
188        $getMore = $this->isAtEndOfBatch();
189
190        if ($getMore) {
191            addSubscriber($this);
192        }
193
194        try {
195            parent::next();
196            $this->onIteration(! $getMore);
197        } finally {
198            if ($getMore) {
199                removeSubscriber($this);
200            }
201        }
202    }
203
204    /**
205     * @see https://php.net/iteratoriterator.rewind
206     * @return void
207     */
208    public function rewind()
209    {
210        if ($this->isRewindNop) {
211            return;
212        }
213
214        parent::rewind();
215        $this->onIteration(false);
216    }
217
218    /**
219     * @see https://php.net/iteratoriterator.valid
220     * @return boolean
221     */
222    public function valid()
223    {
224        return $this->isValid;
225    }
226
227    /**
228     * Extracts the resume token (i.e. "_id" field) from a change document.
229     *
230     * @param array|object $document Change document
231     * @return array|object
232     * @throws InvalidArgumentException
233     * @throws ResumeTokenException if the resume token is not found or invalid
234     */
235    private function extractResumeToken($document)
236    {
237        if (! is_array($document) && ! is_object($document)) {
238            throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
239        }
240
241        if ($document instanceof Serializable) {
242            return $this->extractResumeToken($document->bsonSerialize());
243        }
244
245        $resumeToken = is_array($document)
246            ? ($document['_id'] ?? null)
247            : ($document->_id ?? null);
248
249        if (! isset($resumeToken)) {
250            $this->isValid = false;
251            throw ResumeTokenException::notFound();
252        }
253
254        if (! is_array($resumeToken) && ! is_object($resumeToken)) {
255            $this->isValid = false;
256            throw ResumeTokenException::invalidType($resumeToken);
257        }
258
259        return $resumeToken;
260    }
261
262    /**
263     * Return whether the iterator is positioned at the end of the batch.
264     *
265     * @return boolean
266     */
267    private function isAtEndOfBatch()
268    {
269        return $this->batchPosition + 1 >= $this->batchSize;
270    }
271
272    /**
273     * Perform housekeeping after an iteration event.
274     *
275     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
276     * @param boolean $incrementBatchPosition
277     */
278    private function onIteration($incrementBatchPosition)
279    {
280        $this->isValid = parent::valid();
281
282        /* Disable rewind()'s NOP behavior once we advance to a valid position.
283         * This will allow the driver to throw a LogicException if rewind() is
284         * called after the cursor has advanced past its first element. */
285        if ($this->isRewindNop && $this->isValid) {
286            $this->isRewindNop = false;
287        }
288
289        if ($incrementBatchPosition && $this->isValid) {
290            $this->batchPosition++;
291        }
292
293        /* If the iterator is positioned at the end of the batch, apply the
294         * postBatchResumeToken if it's available. This handles both the case
295         * where the current batch is empty (since onIteration() will be called
296         * after a successful getMore) and when the iterator has advanced to the
297         * last document in its current batch. Otherwise, extract a resume token
298         * from the current document if possible. */
299        if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
300            $this->resumeToken = $this->postBatchResumeToken;
301        } elseif ($this->isValid) {
302            $this->resumeToken = $this->extractResumeToken($this->current());
303        }
304    }
305}
306