1<?php
2/*
3 * Copyright 2015-2017 MongoDB, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18namespace MongoDB\Operation;
19
20use ArrayIterator;
21use MongoDB\BSON\JavascriptInterface;
22use MongoDB\Driver\Command;
23use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
24use MongoDB\Driver\ReadConcern;
25use MongoDB\Driver\ReadPreference;
26use MongoDB\Driver\Server;
27use MongoDB\Driver\Session;
28use MongoDB\Driver\WriteConcern;
29use MongoDB\Exception\InvalidArgumentException;
30use MongoDB\Exception\UnexpectedValueException;
31use MongoDB\Exception\UnsupportedException;
32use MongoDB\MapReduceResult;
33use stdClass;
34use function current;
35use function is_array;
36use function is_bool;
37use function is_integer;
38use function is_object;
39use function is_string;
40use function MongoDB\create_field_path_type_map;
41use function MongoDB\is_mapreduce_output_inline;
42use function MongoDB\server_supports_feature;
43
44/**
45 * Operation for the mapReduce command.
46 *
47 * @api
48 * @see \MongoDB\Collection::mapReduce()
49 * @see https://docs.mongodb.com/manual/reference/command/mapReduce/
50 */
51class MapReduce implements Executable
52{
53    /** @var integer */
54    private static $wireVersionForCollation = 5;
55
56    /** @var integer */
57    private static $wireVersionForDocumentLevelValidation = 4;
58
59    /** @var integer */
60    private static $wireVersionForReadConcern = 4;
61
62    /** @var integer */
63    private static $wireVersionForWriteConcern = 4;
64
65    /** @var string */
66    private $databaseName;
67
68    /** @var string */
69    private $collectionName;
70
71    /** @var JavascriptInterface */
72    private $map;
73
74    /** @var JavascriptInterface */
75    private $reduce;
76
77    /** @var array|object|string */
78    private $out;
79
80    /** @var array */
81    private $options;
82
83    /**
84     * Constructs a mapReduce command.
85     *
86     * Required arguments:
87     *
88     *  * map (MongoDB\BSON\Javascript): A JavaScript function that associates
89     *    or "maps" a value with a key and emits the key and value pair.
90     *
91     *  * reduce (MongoDB\BSON\Javascript): A JavaScript function that "reduces"
92     *    to a single object all the values associated with a particular key.
93     *
94     *  * out (string|document): Specifies where to output the result of the
95     *    map-reduce operation. You can either output to a collection or return
96     *    the result inline. On a primary member of a replica set you can output
97     *    either to a collection or inline, but on a secondary, only inline
98     *    output is possible.
99     *
100     * Supported options:
101     *
102     *  * bypassDocumentValidation (boolean): If true, allows the write to
103     *    circumvent document level validation. This only applies when results
104     *    are output to a collection.
105     *
106     *    For servers < 3.2, this option is ignored as document level validation
107     *    is not available.
108     *
109     *  * collation (document): Collation specification.
110     *
111     *    This is not supported for server versions < 3.4 and will result in an
112     *    exception at execution time if used.
113     *
114     *  * finalize (MongoDB\BSON\JavascriptInterface): Follows the reduce method
115     *    and modifies the output.
116     *
117     *  * jsMode (boolean): Specifies whether to convert intermediate data into
118     *    BSON format between the execution of the map and reduce functions.
119     *
120     *  * limit (integer): Specifies a maximum number of documents for the input
121     *    into the map function.
122     *
123     *  * maxTimeMS (integer): The maximum amount of time to allow the query to
124     *    run.
125     *
126     *  * query (document): Specifies the selection criteria using query
127     *    operators for determining the documents input to the map function.
128     *
129     *  * readConcern (MongoDB\Driver\ReadConcern): Read concern. This is not
130     *    supported when results are returned inline.
131     *
132     *    This is not supported for server versions < 3.2 and will result in an
133     *    exception at execution time if used.
134     *
135     *  * readPreference (MongoDB\Driver\ReadPreference): Read preference.
136     *
137     *    This option is ignored if results are output to a collection.
138     *
139     *  * scope (document): Specifies global variables that are accessible in
140     *    the map, reduce and finalize functions.
141     *
142     *  * session (MongoDB\Driver\Session): Client session.
143     *
144     *    Sessions are not supported for server versions < 3.6.
145     *
146     *  * sort (document): Sorts the input documents. This option is useful for
147     *    optimization. For example, specify the sort key to be the same as the
148     *    emit key so that there are fewer reduce operations. The sort key must
149     *    be in an existing index for this collection.
150     *
151     *  * typeMap (array): Type map for BSON deserialization. This will be
152     *    applied to the returned Cursor (it is not sent to the server).
153     *
154     *  * verbose (boolean): Specifies whether to include the timing information
155     *    in the result information.
156     *
157     *  * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only
158     *    applies when results are output to a collection.
159     *
160     *    This is not supported for server versions < 3.4 and will result in an
161     *    exception at execution time if used.
162     *
163     * @param string              $databaseName   Database name
164     * @param string              $collectionName Collection name
165     * @param JavascriptInterface $map            Map function
166     * @param JavascriptInterface $reduce         Reduce function
167     * @param string|array|object $out            Output specification
168     * @param array               $options        Command options
169     * @throws InvalidArgumentException for parameter/option parsing errors
170     */
171    public function __construct($databaseName, $collectionName, JavascriptInterface $map, JavascriptInterface $reduce, $out, array $options = [])
172    {
173        if (! is_string($out) && ! is_array($out) && ! is_object($out)) {
174            throw InvalidArgumentException::invalidType('$out', $out, 'string or array or object');
175        }
176
177        if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) {
178            throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean');
179        }
180
181        if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) {
182            throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object');
183        }
184
185        if (isset($options['finalize']) && ! $options['finalize'] instanceof JavascriptInterface) {
186            throw InvalidArgumentException::invalidType('"finalize" option', $options['finalize'], JavascriptInterface::class);
187        }
188
189        if (isset($options['jsMode']) && ! is_bool($options['jsMode'])) {
190            throw InvalidArgumentException::invalidType('"jsMode" option', $options['jsMode'], 'boolean');
191        }
192
193        if (isset($options['limit']) && ! is_integer($options['limit'])) {
194            throw InvalidArgumentException::invalidType('"limit" option', $options['limit'], 'integer');
195        }
196
197        if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
198            throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
199        }
200
201        if (isset($options['query']) && ! is_array($options['query']) && ! is_object($options['query'])) {
202            throw InvalidArgumentException::invalidType('"query" option', $options['query'], 'array or object');
203        }
204
205        if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
206            throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class);
207        }
208
209        if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
210            throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
211        }
212
213        if (isset($options['scope']) && ! is_array($options['scope']) && ! is_object($options['scope'])) {
214            throw InvalidArgumentException::invalidType('"scope" option', $options['scope'], 'array or object');
215        }
216
217        if (isset($options['session']) && ! $options['session'] instanceof Session) {
218            throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class);
219        }
220
221        if (isset($options['sort']) && ! is_array($options['sort']) && ! is_object($options['sort'])) {
222            throw InvalidArgumentException::invalidType('"sort" option', $options['sort'], 'array or object');
223        }
224
225        if (isset($options['typeMap']) && ! is_array($options['typeMap'])) {
226            throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array');
227        }
228
229        if (isset($options['verbose']) && ! is_bool($options['verbose'])) {
230            throw InvalidArgumentException::invalidType('"verbose" option', $options['verbose'], 'boolean');
231        }
232
233        if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) {
234            throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class);
235        }
236
237        if (isset($options['readConcern']) && $options['readConcern']->isDefault()) {
238            unset($options['readConcern']);
239        }
240
241        if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) {
242            unset($options['writeConcern']);
243        }
244
245        $this->databaseName = (string) $databaseName;
246        $this->collectionName = (string) $collectionName;
247        $this->map = $map;
248        $this->reduce = $reduce;
249        $this->out = $out;
250        $this->options = $options;
251    }
252
253    /**
254     * Execute the operation.
255     *
256     * @see Executable::execute()
257     * @param Server $server
258     * @return MapReduceResult
259     * @throws UnexpectedValueException if the command response was malformed
260     * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported
261     * @throws DriverRuntimeException for other driver errors (e.g. connection errors)
262     */
263    public function execute(Server $server)
264    {
265        if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) {
266            throw UnsupportedException::collationNotSupported();
267        }
268
269        if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) {
270            throw UnsupportedException::readConcernNotSupported();
271        }
272
273        if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) {
274            throw UnsupportedException::writeConcernNotSupported();
275        }
276
277        $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction();
278        if ($inTransaction) {
279            if (isset($this->options['readConcern'])) {
280                throw UnsupportedException::readConcernNotSupportedInTransaction();
281            }
282            if (isset($this->options['writeConcern'])) {
283                throw UnsupportedException::writeConcernNotSupportedInTransaction();
284            }
285        }
286
287        $hasOutputCollection = ! is_mapreduce_output_inline($this->out);
288
289        $command = $this->createCommand($server);
290        $options = $this->createOptions($hasOutputCollection);
291
292        /* If the mapReduce operation results in a write, use
293         * executeReadWriteCommand to ensure we're handling the writeConcern
294         * option.
295         * In other cases, we use executeCommand as this will prevent the
296         * mapReduce operation from being retried when retryReads is enabled.
297         * See https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst#unsupported-read-operations. */
298        $cursor = $hasOutputCollection
299            ? $server->executeReadWriteCommand($this->databaseName, $command, $options)
300            : $server->executeCommand($this->databaseName, $command, $options);
301
302        if (isset($this->options['typeMap']) && ! $hasOutputCollection) {
303            $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'results.$'));
304        }
305
306        $result = current($cursor->toArray());
307
308        $getIterator = $this->createGetIteratorCallable($result, $server);
309
310        return new MapReduceResult($getIterator, $result);
311    }
312
313    /**
314     * Create the mapReduce command.
315     *
316     * @param Server $server
317     * @return Command
318     */
319    private function createCommand(Server $server)
320    {
321        $cmd = [
322            'mapReduce' => $this->collectionName,
323            'map' => $this->map,
324            'reduce' => $this->reduce,
325            'out' => $this->out,
326        ];
327
328        foreach (['finalize', 'jsMode', 'limit', 'maxTimeMS', 'verbose'] as $option) {
329            if (isset($this->options[$option])) {
330                $cmd[$option] = $this->options[$option];
331            }
332        }
333
334        foreach (['collation', 'query', 'scope', 'sort'] as $option) {
335            if (isset($this->options[$option])) {
336                $cmd[$option] = (object) $this->options[$option];
337            }
338        }
339
340        if (! empty($this->options['bypassDocumentValidation']) &&
341            server_supports_feature($server, self::$wireVersionForDocumentLevelValidation)
342        ) {
343            $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation'];
344        }
345
346        return new Command($cmd);
347    }
348
349    /**
350     * Creates a callable for MapReduceResult::getIterator().
351     *
352     * @param stdClass $result
353     * @param Server   $server
354     * @return callable
355     * @throws UnexpectedValueException if the command response was malformed
356     */
357    private function createGetIteratorCallable(stdClass $result, Server $server)
358    {
359        // Inline results can be wrapped with an ArrayIterator
360        if (isset($result->results) && is_array($result->results)) {
361            $results = $result->results;
362
363            return function () use ($results) {
364                return new ArrayIterator($results);
365            };
366        }
367
368        if (isset($result->result) && (is_string($result->result) || is_object($result->result))) {
369            $options = isset($this->options['typeMap']) ? ['typeMap' => $this->options['typeMap']] : [];
370
371            $find = is_string($result->result)
372                ? new Find($this->databaseName, $result->result, [], $options)
373                : new Find($result->result->db, $result->result->collection, [], $options);
374
375            return function () use ($find, $server) {
376                return $find->execute($server);
377            };
378        }
379
380        throw new UnexpectedValueException('mapReduce command did not return inline results or an output collection');
381    }
382
383    /**
384     * Create options for executing the command.
385     *
386     * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
387     * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
388     * @param boolean $hasOutputCollection
389     * @return array
390     */
391    private function createOptions($hasOutputCollection)
392    {
393        $options = [];
394
395        if (isset($this->options['readConcern'])) {
396            $options['readConcern'] = $this->options['readConcern'];
397        }
398
399        if (! $hasOutputCollection && isset($this->options['readPreference'])) {
400            $options['readPreference'] = $this->options['readPreference'];
401        }
402
403        if (isset($this->options['session'])) {
404            $options['session'] = $this->options['session'];
405        }
406
407        if ($hasOutputCollection && isset($this->options['writeConcern'])) {
408            $options['writeConcern'] = $this->options['writeConcern'];
409        }
410
411        return $options;
412    }
413}
414