1<?php 2/* 3 * Copyright 2015-2017 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18namespace MongoDB\Operation; 19 20use ArrayIterator; 21use MongoDB\BSON\JavascriptInterface; 22use MongoDB\Driver\Command; 23use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; 24use MongoDB\Driver\ReadConcern; 25use MongoDB\Driver\ReadPreference; 26use MongoDB\Driver\Server; 27use MongoDB\Driver\Session; 28use MongoDB\Driver\WriteConcern; 29use MongoDB\Exception\InvalidArgumentException; 30use MongoDB\Exception\UnexpectedValueException; 31use MongoDB\Exception\UnsupportedException; 32use MongoDB\MapReduceResult; 33use stdClass; 34use function current; 35use function is_array; 36use function is_bool; 37use function is_integer; 38use function is_object; 39use function is_string; 40use function MongoDB\create_field_path_type_map; 41use function MongoDB\is_mapreduce_output_inline; 42use function MongoDB\server_supports_feature; 43 44/** 45 * Operation for the mapReduce command. 46 * 47 * @api 48 * @see \MongoDB\Collection::mapReduce() 49 * @see https://docs.mongodb.com/manual/reference/command/mapReduce/ 50 */ 51class MapReduce implements Executable 52{ 53 /** @var integer */ 54 private static $wireVersionForCollation = 5; 55 56 /** @var integer */ 57 private static $wireVersionForDocumentLevelValidation = 4; 58 59 /** @var integer */ 60 private static $wireVersionForReadConcern = 4; 61 62 /** @var integer */ 63 private static $wireVersionForWriteConcern = 4; 64 65 /** @var string */ 66 private $databaseName; 67 68 /** @var string */ 69 private $collectionName; 70 71 /** @var JavascriptInterface */ 72 private $map; 73 74 /** @var JavascriptInterface */ 75 private $reduce; 76 77 /** @var array|object|string */ 78 private $out; 79 80 /** @var array */ 81 private $options; 82 83 /** 84 * Constructs a mapReduce command. 85 * 86 * Required arguments: 87 * 88 * * map (MongoDB\BSON\Javascript): A JavaScript function that associates 89 * or "maps" a value with a key and emits the key and value pair. 90 * 91 * * reduce (MongoDB\BSON\Javascript): A JavaScript function that "reduces" 92 * to a single object all the values associated with a particular key. 93 * 94 * * out (string|document): Specifies where to output the result of the 95 * map-reduce operation. You can either output to a collection or return 96 * the result inline. On a primary member of a replica set you can output 97 * either to a collection or inline, but on a secondary, only inline 98 * output is possible. 99 * 100 * Supported options: 101 * 102 * * bypassDocumentValidation (boolean): If true, allows the write to 103 * circumvent document level validation. This only applies when results 104 * are output to a collection. 105 * 106 * For servers < 3.2, this option is ignored as document level validation 107 * is not available. 108 * 109 * * collation (document): Collation specification. 110 * 111 * This is not supported for server versions < 3.4 and will result in an 112 * exception at execution time if used. 113 * 114 * * finalize (MongoDB\BSON\JavascriptInterface): Follows the reduce method 115 * and modifies the output. 116 * 117 * * jsMode (boolean): Specifies whether to convert intermediate data into 118 * BSON format between the execution of the map and reduce functions. 119 * 120 * * limit (integer): Specifies a maximum number of documents for the input 121 * into the map function. 122 * 123 * * maxTimeMS (integer): The maximum amount of time to allow the query to 124 * run. 125 * 126 * * query (document): Specifies the selection criteria using query 127 * operators for determining the documents input to the map function. 128 * 129 * * readConcern (MongoDB\Driver\ReadConcern): Read concern. This is not 130 * supported when results are returned inline. 131 * 132 * This is not supported for server versions < 3.2 and will result in an 133 * exception at execution time if used. 134 * 135 * * readPreference (MongoDB\Driver\ReadPreference): Read preference. 136 * 137 * This option is ignored if results are output to a collection. 138 * 139 * * scope (document): Specifies global variables that are accessible in 140 * the map, reduce and finalize functions. 141 * 142 * * session (MongoDB\Driver\Session): Client session. 143 * 144 * Sessions are not supported for server versions < 3.6. 145 * 146 * * sort (document): Sorts the input documents. This option is useful for 147 * optimization. For example, specify the sort key to be the same as the 148 * emit key so that there are fewer reduce operations. The sort key must 149 * be in an existing index for this collection. 150 * 151 * * typeMap (array): Type map for BSON deserialization. This will be 152 * applied to the returned Cursor (it is not sent to the server). 153 * 154 * * verbose (boolean): Specifies whether to include the timing information 155 * in the result information. 156 * 157 * * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only 158 * applies when results are output to a collection. 159 * 160 * This is not supported for server versions < 3.4 and will result in an 161 * exception at execution time if used. 162 * 163 * @param string $databaseName Database name 164 * @param string $collectionName Collection name 165 * @param JavascriptInterface $map Map function 166 * @param JavascriptInterface $reduce Reduce function 167 * @param string|array|object $out Output specification 168 * @param array $options Command options 169 * @throws InvalidArgumentException for parameter/option parsing errors 170 */ 171 public function __construct($databaseName, $collectionName, JavascriptInterface $map, JavascriptInterface $reduce, $out, array $options = []) 172 { 173 if (! is_string($out) && ! is_array($out) && ! is_object($out)) { 174 throw InvalidArgumentException::invalidType('$out', $out, 'string or array or object'); 175 } 176 177 if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) { 178 throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean'); 179 } 180 181 if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) { 182 throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object'); 183 } 184 185 if (isset($options['finalize']) && ! $options['finalize'] instanceof JavascriptInterface) { 186 throw InvalidArgumentException::invalidType('"finalize" option', $options['finalize'], JavascriptInterface::class); 187 } 188 189 if (isset($options['jsMode']) && ! is_bool($options['jsMode'])) { 190 throw InvalidArgumentException::invalidType('"jsMode" option', $options['jsMode'], 'boolean'); 191 } 192 193 if (isset($options['limit']) && ! is_integer($options['limit'])) { 194 throw InvalidArgumentException::invalidType('"limit" option', $options['limit'], 'integer'); 195 } 196 197 if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) { 198 throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer'); 199 } 200 201 if (isset($options['query']) && ! is_array($options['query']) && ! is_object($options['query'])) { 202 throw InvalidArgumentException::invalidType('"query" option', $options['query'], 'array or object'); 203 } 204 205 if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) { 206 throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class); 207 } 208 209 if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) { 210 throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); 211 } 212 213 if (isset($options['scope']) && ! is_array($options['scope']) && ! is_object($options['scope'])) { 214 throw InvalidArgumentException::invalidType('"scope" option', $options['scope'], 'array or object'); 215 } 216 217 if (isset($options['session']) && ! $options['session'] instanceof Session) { 218 throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class); 219 } 220 221 if (isset($options['sort']) && ! is_array($options['sort']) && ! is_object($options['sort'])) { 222 throw InvalidArgumentException::invalidType('"sort" option', $options['sort'], 'array or object'); 223 } 224 225 if (isset($options['typeMap']) && ! is_array($options['typeMap'])) { 226 throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array'); 227 } 228 229 if (isset($options['verbose']) && ! is_bool($options['verbose'])) { 230 throw InvalidArgumentException::invalidType('"verbose" option', $options['verbose'], 'boolean'); 231 } 232 233 if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) { 234 throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class); 235 } 236 237 if (isset($options['readConcern']) && $options['readConcern']->isDefault()) { 238 unset($options['readConcern']); 239 } 240 241 if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) { 242 unset($options['writeConcern']); 243 } 244 245 $this->databaseName = (string) $databaseName; 246 $this->collectionName = (string) $collectionName; 247 $this->map = $map; 248 $this->reduce = $reduce; 249 $this->out = $out; 250 $this->options = $options; 251 } 252 253 /** 254 * Execute the operation. 255 * 256 * @see Executable::execute() 257 * @param Server $server 258 * @return MapReduceResult 259 * @throws UnexpectedValueException if the command response was malformed 260 * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported 261 * @throws DriverRuntimeException for other driver errors (e.g. connection errors) 262 */ 263 public function execute(Server $server) 264 { 265 if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) { 266 throw UnsupportedException::collationNotSupported(); 267 } 268 269 if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) { 270 throw UnsupportedException::readConcernNotSupported(); 271 } 272 273 if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) { 274 throw UnsupportedException::writeConcernNotSupported(); 275 } 276 277 $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction(); 278 if ($inTransaction) { 279 if (isset($this->options['readConcern'])) { 280 throw UnsupportedException::readConcernNotSupportedInTransaction(); 281 } 282 if (isset($this->options['writeConcern'])) { 283 throw UnsupportedException::writeConcernNotSupportedInTransaction(); 284 } 285 } 286 287 $hasOutputCollection = ! is_mapreduce_output_inline($this->out); 288 289 $command = $this->createCommand($server); 290 $options = $this->createOptions($hasOutputCollection); 291 292 /* If the mapReduce operation results in a write, use 293 * executeReadWriteCommand to ensure we're handling the writeConcern 294 * option. 295 * In other cases, we use executeCommand as this will prevent the 296 * mapReduce operation from being retried when retryReads is enabled. 297 * See https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst#unsupported-read-operations. */ 298 $cursor = $hasOutputCollection 299 ? $server->executeReadWriteCommand($this->databaseName, $command, $options) 300 : $server->executeCommand($this->databaseName, $command, $options); 301 302 if (isset($this->options['typeMap']) && ! $hasOutputCollection) { 303 $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'results.$')); 304 } 305 306 $result = current($cursor->toArray()); 307 308 $getIterator = $this->createGetIteratorCallable($result, $server); 309 310 return new MapReduceResult($getIterator, $result); 311 } 312 313 /** 314 * Create the mapReduce command. 315 * 316 * @param Server $server 317 * @return Command 318 */ 319 private function createCommand(Server $server) 320 { 321 $cmd = [ 322 'mapReduce' => $this->collectionName, 323 'map' => $this->map, 324 'reduce' => $this->reduce, 325 'out' => $this->out, 326 ]; 327 328 foreach (['finalize', 'jsMode', 'limit', 'maxTimeMS', 'verbose'] as $option) { 329 if (isset($this->options[$option])) { 330 $cmd[$option] = $this->options[$option]; 331 } 332 } 333 334 foreach (['collation', 'query', 'scope', 'sort'] as $option) { 335 if (isset($this->options[$option])) { 336 $cmd[$option] = (object) $this->options[$option]; 337 } 338 } 339 340 if (! empty($this->options['bypassDocumentValidation']) && 341 server_supports_feature($server, self::$wireVersionForDocumentLevelValidation) 342 ) { 343 $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation']; 344 } 345 346 return new Command($cmd); 347 } 348 349 /** 350 * Creates a callable for MapReduceResult::getIterator(). 351 * 352 * @param stdClass $result 353 * @param Server $server 354 * @return callable 355 * @throws UnexpectedValueException if the command response was malformed 356 */ 357 private function createGetIteratorCallable(stdClass $result, Server $server) 358 { 359 // Inline results can be wrapped with an ArrayIterator 360 if (isset($result->results) && is_array($result->results)) { 361 $results = $result->results; 362 363 return function () use ($results) { 364 return new ArrayIterator($results); 365 }; 366 } 367 368 if (isset($result->result) && (is_string($result->result) || is_object($result->result))) { 369 $options = isset($this->options['typeMap']) ? ['typeMap' => $this->options['typeMap']] : []; 370 371 $find = is_string($result->result) 372 ? new Find($this->databaseName, $result->result, [], $options) 373 : new Find($result->result->db, $result->result->collection, [], $options); 374 375 return function () use ($find, $server) { 376 return $find->execute($server); 377 }; 378 } 379 380 throw new UnexpectedValueException('mapReduce command did not return inline results or an output collection'); 381 } 382 383 /** 384 * Create options for executing the command. 385 * 386 * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php 387 * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php 388 * @param boolean $hasOutputCollection 389 * @return array 390 */ 391 private function createOptions($hasOutputCollection) 392 { 393 $options = []; 394 395 if (isset($this->options['readConcern'])) { 396 $options['readConcern'] = $this->options['readConcern']; 397 } 398 399 if (! $hasOutputCollection && isset($this->options['readPreference'])) { 400 $options['readPreference'] = $this->options['readPreference']; 401 } 402 403 if (isset($this->options['session'])) { 404 $options['session'] = $this->options['session']; 405 } 406 407 if ($hasOutputCollection && isset($this->options['writeConcern'])) { 408 $options['writeConcern'] = $this->options['writeConcern']; 409 } 410 411 return $options; 412 } 413} 414