1<?php
2/*
3 * Copyright 2017 MongoDB, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18namespace MongoDB\Operation;
19
20use MongoDB\BSON\TimestampInterface;
21use MongoDB\ChangeStream;
22use MongoDB\Driver\Cursor;
23use MongoDB\Driver\Exception\RuntimeException;
24use MongoDB\Driver\Manager;
25use MongoDB\Driver\Monitoring\CommandFailedEvent;
26use MongoDB\Driver\Monitoring\CommandStartedEvent;
27use MongoDB\Driver\Monitoring\CommandSubscriber;
28use MongoDB\Driver\Monitoring\CommandSucceededEvent;
29use MongoDB\Driver\ReadPreference;
30use MongoDB\Driver\Server;
31use MongoDB\Exception\InvalidArgumentException;
32use MongoDB\Exception\UnexpectedValueException;
33use MongoDB\Exception\UnsupportedException;
34use MongoDB\Model\ChangeStreamIterator;
35use function array_intersect_key;
36use function array_unshift;
37use function count;
38use function is_array;
39use function is_object;
40use function is_string;
41use function MongoDB\Driver\Monitoring\addSubscriber;
42use function MongoDB\Driver\Monitoring\removeSubscriber;
43use function MongoDB\select_server;
44use function MongoDB\server_supports_feature;
45
46/**
47 * Operation for creating a change stream with the aggregate command.
48 *
49 * Note: the implementation of CommandSubscriber is an internal implementation
50 * detail and should not be considered part of the public API.
51 *
52 * @api
53 * @see \MongoDB\Collection::watch()
54 * @see https://docs.mongodb.com/manual/changeStreams/
55 */
56class Watch implements Executable, /* @internal */ CommandSubscriber
57{
58    const FULL_DOCUMENT_DEFAULT = 'default';
59    const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
60
61    /** @var integer */
62    private static $wireVersionForStartAtOperationTime = 7;
63
64    /** @var Aggregate */
65    private $aggregate;
66
67    /** @var array */
68    private $aggregateOptions;
69
70    /** @var array */
71    private $changeStreamOptions;
72
73    /** @var string|null */
74    private $collectionName;
75
76    /** @var string */
77    private $databaseName;
78
79    /** @var integer|null */
80    private $firstBatchSize;
81
82    /** @var boolean */
83    private $hasResumed = false;
84
85    /** @var Manager */
86    private $manager;
87
88    /** @var TimestampInterface */
89    private $operationTime;
90
91    /** @var array */
92    private $pipeline;
93
94    /** @var object|null */
95    private $postBatchResumeToken;
96
97    /**
98     * Constructs an aggregate command for creating a change stream.
99     *
100     * Supported options:
101     *
102     *  * batchSize (integer): The number of documents to return per batch.
103     *
104     *  * collation (document): Specifies a collation.
105     *
106     *  * fullDocument (string): Determines whether the "fullDocument" field
107     *    will be populated for update operations. By default, change streams
108     *    only return the delta of fields during the update operation (via the
109     *    "updateDescription" field). To additionally return the most current
110     *    majority-committed version of the updated document, specify
111     *    "updateLookup" for this option. Defaults to "default".
112     *
113     *    Insert and replace operations always include the "fullDocument" field
114     *    and delete operations omit the field as the document no longer exists.
115     *
116     *  * maxAwaitTimeMS (integer): The maximum amount of time for the server to
117     *    wait on new documents to satisfy a change stream query.
118     *
119     *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
120     *
121     *  * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
122     *    will be used to select a new server when resuming. Defaults to a
123     *    "primary" read preference.
124     *
125     *  * resumeAfter (document): Specifies the logical starting point for the
126     *    new change stream.
127     *
128     *    Using this option in conjunction with "startAfter" and/or
129     *    "startAtOperationTime" will result in a server error. The options are
130     *    mutually exclusive.
131     *
132     *  * session (MongoDB\Driver\Session): Client session.
133     *
134     *    Sessions are not supported for server versions < 3.6.
135     *
136     *  * startAfter (document): Specifies the logical starting point for the
137     *    new change stream. Unlike "resumeAfter", this option can be used with
138     *    a resume token from an "invalidate" event.
139     *
140     *    Using this option in conjunction with "resumeAfter" and/or
141     *    "startAtOperationTime" will result in a server error. The options are
142     *    mutually exclusive.
143     *
144     *  * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
145     *    the change stream will only provide changes that occurred at or after
146     *    the specified timestamp. Any command run against the server will
147     *    return an operation time that can be used here. Alternatively, an
148     *    operation time may be obtained from MongoDB\Driver\Server::getInfo().
149     *
150     *    Using this option in conjunction with "resumeAfter" and/or
151     *    "startAfter" will result in a server error. The options are mutually
152     *    exclusive.
153     *
154     *    This option is not supported for server versions < 4.0.
155     *
156     *  * typeMap (array): Type map for BSON deserialization. This will be
157     *    applied to the returned Cursor (it is not sent to the server).
158     *
159     * Note: A database-level change stream may be created by specifying null
160     * for the collection name. A cluster-level change stream may be created by
161     * specifying null for both the database and collection name.
162     *
163     * @param Manager     $manager        Manager instance from the driver
164     * @param string|null $databaseName   Database name
165     * @param string|null $collectionName Collection name
166     * @param array       $pipeline       List of pipeline operations
167     * @param array       $options        Command options
168     * @throws InvalidArgumentException for parameter/option parsing errors
169     */
170    public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
171    {
172        if (isset($collectionName) && ! isset($databaseName)) {
173            throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
174        }
175
176        $options += [
177            'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
178            'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
179        ];
180
181        if (! is_string($options['fullDocument'])) {
182            throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
183        }
184
185        if (! $options['readPreference'] instanceof ReadPreference) {
186            throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
187        }
188
189        if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
190            throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
191        }
192
193        if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
194            throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
195        }
196
197        if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
198            throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
199        }
200
201        /* In the absence of an explicit session, create one to ensure that the
202         * initial aggregation and any resume attempts can use the same session
203         * ("implicit from the user's perspective" per PHPLIB-342). Since this
204         * is filling in for an implicit session, we default "causalConsistency"
205         * to false. */
206        if (! isset($options['session'])) {
207            try {
208                $options['session'] = $manager->startSession(['causalConsistency' => false]);
209            } catch (RuntimeException $e) {
210                /* We can ignore the exception, as libmongoc likely cannot
211                 * create its own session and there is no risk of a mismatch. */
212            }
213        }
214
215        $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
216        $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
217
218        // Null database name implies a cluster-wide change stream
219        if ($databaseName === null) {
220            $databaseName = 'admin';
221            $this->changeStreamOptions['allChangesForCluster'] = true;
222        }
223
224        $this->manager = $manager;
225        $this->databaseName = (string) $databaseName;
226        $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
227        $this->pipeline = $pipeline;
228
229        $this->aggregate = $this->createAggregate();
230    }
231
232    /** @internal */
233    final public function commandFailed(CommandFailedEvent $event)
234    {
235    }
236
237    /** @internal */
238    final public function commandStarted(CommandStartedEvent $event)
239    {
240        if ($event->getCommandName() !== 'aggregate') {
241            return;
242        }
243
244        $this->firstBatchSize = null;
245        $this->postBatchResumeToken = null;
246    }
247
248    /** @internal */
249    final public function commandSucceeded(CommandSucceededEvent $event)
250    {
251        if ($event->getCommandName() !== 'aggregate') {
252            return;
253        }
254
255        $reply = $event->getReply();
256
257        if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
258            throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
259        }
260
261        $this->firstBatchSize = count($reply->cursor->firstBatch);
262
263        if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
264            $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
265        }
266
267        if ($this->shouldCaptureOperationTime($event->getServer()) &&
268            isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
269            $this->operationTime = $reply->operationTime;
270        }
271    }
272
273    /**
274     * Execute the operation.
275     *
276     * @see Executable::execute()
277     * @param Server $server
278     * @return ChangeStream
279     * @throws UnsupportedException if collation or read concern is used and unsupported
280     * @throws RuntimeException for other driver errors (e.g. connection errors)
281     */
282    public function execute(Server $server)
283    {
284        return new ChangeStream(
285            $this->createChangeStreamIterator($server),
286            function ($resumeToken, $hasAdvanced) {
287                return $this->resume($resumeToken, $hasAdvanced);
288            }
289        );
290    }
291
292    /**
293     * Create the aggregate command for a change stream.
294     *
295     * This method is also used to recreate the aggregate command when resuming.
296     *
297     * @return Aggregate
298     */
299    private function createAggregate()
300    {
301        $pipeline = $this->pipeline;
302        array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
303
304        return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
305    }
306
307    /**
308     * Create a ChangeStreamIterator by executing the aggregate command.
309     *
310     * @param Server $server
311     * @return ChangeStreamIterator
312     */
313    private function createChangeStreamIterator(Server $server)
314    {
315        return new ChangeStreamIterator(
316            $this->executeAggregate($server),
317            $this->firstBatchSize,
318            $this->getInitialResumeToken(),
319            $this->postBatchResumeToken
320        );
321    }
322
323    /**
324     * Execute the aggregate command.
325     *
326     * The command will be executed using APM so that we can capture data from
327     * its response (e.g. firstBatch size, postBatchResumeToken).
328     *
329     * @param Server $server
330     * @return Cursor
331     */
332    private function executeAggregate(Server $server)
333    {
334        addSubscriber($this);
335
336        try {
337            return $this->aggregate->execute($server);
338        } finally {
339            removeSubscriber($this);
340        }
341    }
342
343    /**
344     * Return the initial resume token for creating the ChangeStreamIterator.
345     *
346     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
347     * @return array|object|null
348     */
349    private function getInitialResumeToken()
350    {
351        if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
352            return $this->postBatchResumeToken;
353        }
354
355        if (isset($this->changeStreamOptions['startAfter'])) {
356            return $this->changeStreamOptions['startAfter'];
357        }
358
359        if (isset($this->changeStreamOptions['resumeAfter'])) {
360            return $this->changeStreamOptions['resumeAfter'];
361        }
362
363        return null;
364    }
365
366    /**
367     * Resumes a change stream.
368     *
369     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
370     * @param array|object|null $resumeToken
371     * @param bool              $hasAdvanced
372     * @return ChangeStreamIterator
373     * @throws InvalidArgumentException
374     */
375    private function resume($resumeToken = null, $hasAdvanced = false)
376    {
377        if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
378            throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
379        }
380
381        $this->hasResumed = true;
382
383        /* Select a new server using the original read preference. While watch
384         * is not usable within transactions, we still check if there is a
385         * pinned session. This is to avoid an ambiguous error message about
386         * running a command on the wrong server. */
387        $server = select_server($this->manager, $this->aggregateOptions);
388
389        $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
390
391        unset($this->changeStreamOptions['resumeAfter']);
392        unset($this->changeStreamOptions['startAfter']);
393        unset($this->changeStreamOptions['startAtOperationTime']);
394
395        if ($resumeToken !== null) {
396            $this->changeStreamOptions[$resumeOption] = $resumeToken;
397        }
398
399        if ($resumeToken === null && $this->operationTime !== null) {
400            $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
401        }
402
403        // Recreate the aggregate command and return a new ChangeStreamIterator
404        $this->aggregate = $this->createAggregate();
405
406        return $this->createChangeStreamIterator($server);
407    }
408
409    /**
410     * Determine whether to capture operation time from an aggregate response.
411     *
412     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
413     * @param Server $server
414     * @return boolean
415     */
416    private function shouldCaptureOperationTime(Server $server)
417    {
418        if ($this->hasResumed) {
419            return false;
420        }
421
422        if (isset($this->changeStreamOptions['resumeAfter']) ||
423            isset($this->changeStreamOptions['startAfter']) ||
424            isset($this->changeStreamOptions['startAtOperationTime'])) {
425            return false;
426        }
427
428        if ($this->firstBatchSize > 0) {
429            return false;
430        }
431
432        if ($this->postBatchResumeToken !== null) {
433            return false;
434        }
435
436        if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
437            return false;
438        }
439
440        return true;
441    }
442}
443