1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/commands/mr.h"
36
37 #include "mongo/base/status_with.h"
38 #include "mongo/bson/util/builder.h"
39 #include "mongo/client/connpool.h"
40 #include "mongo/db/auth/authorization_session.h"
41 #include "mongo/db/bson/dotted_path_support.h"
42 #include "mongo/db/catalog/collection.h"
43 #include "mongo/db/catalog/collection_catalog_entry.h"
44 #include "mongo/db/catalog/database_holder.h"
45 #include "mongo/db/catalog/document_validation.h"
46 #include "mongo/db/catalog/index_catalog.h"
47 #include "mongo/db/catalog/index_key_validate.h"
48 #include "mongo/db/client.h"
49 #include "mongo/db/clientcursor.h"
50 #include "mongo/db/commands.h"
51 #include "mongo/db/concurrency/write_conflict_exception.h"
52 #include "mongo/db/db.h"
53 #include "mongo/db/db_raii.h"
54 #include "mongo/db/dbhelpers.h"
55 #include "mongo/db/exec/working_set_common.h"
56 #include "mongo/db/index/index_descriptor.h"
57 #include "mongo/db/matcher/extensions_callback_real.h"
58 #include "mongo/db/op_observer.h"
59 #include "mongo/db/ops/insert.h"
60 #include "mongo/db/query/find_common.h"
61 #include "mongo/db/query/get_executor.h"
62 #include "mongo/db/query/plan_summary_stats.h"
63 #include "mongo/db/query/query_planner.h"
64 #include "mongo/db/repl/replication_coordinator_global.h"
65 #include "mongo/db/s/collection_metadata.h"
66 #include "mongo/db/s/collection_sharding_state.h"
67 #include "mongo/db/s/sharding_state.h"
68 #include "mongo/db/server_options.h"
69 #include "mongo/db/service_context.h"
70 #include "mongo/s/catalog_cache.h"
71 #include "mongo/s/client/parallel.h"
72 #include "mongo/s/client/shard_connection.h"
73 #include "mongo/s/client/shard_registry.h"
74 #include "mongo/s/grid.h"
75 #include "mongo/s/shard_key_pattern.h"
76 #include "mongo/s/stale_exception.h"
77 #include "mongo/scripting/engine.h"
78 #include "mongo/stdx/mutex.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81 #include "mongo/util/scopeguard.h"
82
83 namespace mongo {
84
85 using std::set;
86 using std::shared_ptr;
87 using std::string;
88 using std::stringstream;
89 using std::unique_ptr;
90 using std::vector;
91
92 using IndexVersion = IndexDescriptor::IndexVersion;
93
94 namespace dps = ::mongo::dotted_path_support;
95
96 namespace mr {
97
98 AtomicUInt32 Config::JOB_NUMBER;
99
JSFunction(const std::string & type,const BSONElement & e)100 JSFunction::JSFunction(const std::string& type, const BSONElement& e) {
101 _type = type;
102 _code = e._asCode();
103
104 if (e.type() == CodeWScope)
105 _wantedScope = e.codeWScopeObject();
106 }
107
init(State * state)108 void JSFunction::init(State* state) {
109 _scope = state->scope();
110 verify(_scope);
111 _scope->init(&_wantedScope);
112
113 _func = _scope->createFunction(_code.c_str());
114 uassert(13598, str::stream() << "couldn't compile code for: " << _type, _func);
115
116 // install in JS scope so that it can be called in JS mode
117 _scope->setFunction(_type.c_str(), _code.c_str());
118 }
119
init(State * state)120 void JSMapper::init(State* state) {
121 _func.init(state);
122 _params = state->config().mapParams;
123 }
124
125 /**
126 * Applies the map function to an object, which should internally call emit()
127 */
map(const BSONObj & o)128 void JSMapper::map(const BSONObj& o) {
129 Scope* s = _func.scope();
130 verify(s);
131 if (s->invoke(_func.func(), &_params, &o, 0, true))
132 uasserted(9014, str::stream() << "map invoke failed: " << s->getError());
133 }
134
135 /**
136 * Applies the finalize function to a tuple obj (key, val)
137 * Returns tuple obj {_id: key, value: newval}
138 */
finalize(const BSONObj & o)139 BSONObj JSFinalizer::finalize(const BSONObj& o) {
140 Scope* s = _func.scope();
141
142 Scope::NoDBAccess no = s->disableDBAccess("can't access db inside finalize");
143 s->invokeSafe(_func.func(), &o, 0);
144
145 // don't want to use o.objsize() to size b
146 // since there are many cases where the point of finalize
147 // is converting many fields to 1
148 BSONObjBuilder b;
149 b.append(o.firstElement());
150 s->append(b, "value", "__returnValue");
151 return b.obj();
152 }
153
init(State * state)154 void JSReducer::init(State* state) {
155 _func.init(state);
156 }
157
158 /**
159 * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
160 */
reduce(const BSONList & tuples)161 BSONObj JSReducer::reduce(const BSONList& tuples) {
162 if (tuples.size() <= 1)
163 return tuples[0];
164 BSONObj key;
165 int endSizeEstimate = 16;
166 _reduce(tuples, key, endSizeEstimate);
167
168 BSONObjBuilder b(endSizeEstimate);
169 b.appendAs(key.firstElement(), "0");
170 _func.scope()->append(b, "1", "__returnValue");
171 return b.obj();
172 }
173
174 /**
175 * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
176 * Also applies a finalizer method if present.
177 */
finalReduce(const BSONList & tuples,Finalizer * finalizer)178 BSONObj JSReducer::finalReduce(const BSONList& tuples, Finalizer* finalizer) {
179 BSONObj res;
180 BSONObj key;
181
182 if (tuples.size() == 1) {
183 // 1 obj, just use it
184 key = tuples[0];
185 BSONObjBuilder b(key.objsize());
186 BSONObjIterator it(key);
187 b.appendAs(it.next(), "_id");
188 b.appendAs(it.next(), "value");
189 res = b.obj();
190 } else {
191 // need to reduce
192 int endSizeEstimate = 16;
193 _reduce(tuples, key, endSizeEstimate);
194 BSONObjBuilder b(endSizeEstimate);
195 b.appendAs(key.firstElement(), "_id");
196 _func.scope()->append(b, "value", "__returnValue");
197 res = b.obj();
198 }
199
200 if (finalizer) {
201 res = finalizer->finalize(res);
202 }
203
204 return res;
205 }
206
207 /**
208 * actually applies a reduce, to a list of tuples (key, value).
209 * After the call, tuples will hold a single tuple {"0": key, "1": value}
210 */
_reduce(const BSONList & tuples,BSONObj & key,int & endSizeEstimate)211 void JSReducer::_reduce(const BSONList& tuples, BSONObj& key, int& endSizeEstimate) {
212 uassert(10074, "need values", tuples.size());
213
214 int sizeEstimate = (tuples.size() * tuples.begin()->getField("value").size()) + 128;
215
216 // need to build the reduce args: ( key, [values] )
217 BSONObjBuilder reduceArgs(sizeEstimate);
218 std::unique_ptr<BSONArrayBuilder> valueBuilder;
219 unsigned n = 0;
220 for (; n < tuples.size(); n++) {
221 BSONObjIterator j(tuples[n]);
222 BSONElement keyE = j.next();
223 if (n == 0) {
224 reduceArgs.append(keyE);
225 key = keyE.wrap();
226 valueBuilder.reset(new BSONArrayBuilder(reduceArgs.subarrayStart("tuples")));
227 }
228
229 BSONElement ee = j.next();
230
231 uassert(13070, "value too large to reduce", ee.size() < (BSONObjMaxUserSize / 2));
232
233 // If adding this element to the array would cause it to be too large, break. The
234 // remainder of the tuples will be processed recursively at the end of this
235 // function.
236 if (valueBuilder->len() + ee.size() > BSONObjMaxUserSize) {
237 verify(n > 1); // if not, inf. loop
238 break;
239 }
240
241 valueBuilder->append(ee);
242 }
243 verify(valueBuilder);
244 valueBuilder->done();
245 BSONObj args = reduceArgs.obj();
246
247 Scope* s = _func.scope();
248
249 s->invokeSafe(_func.func(), &args, 0);
250 ++numReduces;
251
252 if (s->type("__returnValue") == Array) {
253 uasserted(10075, "reduce -> multiple not supported yet");
254 return;
255 }
256
257 endSizeEstimate = key.objsize() + (args.objsize() / tuples.size());
258
259 if (n == tuples.size())
260 return;
261
262 // the input list was too large, add the rest of elmts to new tuples and reduce again
263 // note: would be better to use loop instead of recursion to avoid stack overflow
264 BSONList x;
265 for (; n < tuples.size(); n++) {
266 x.push_back(tuples[n]);
267 }
268 BSONObjBuilder temp(endSizeEstimate);
269 temp.append(key.firstElement());
270 s->append(temp, "1", "__returnValue");
271 x.push_back(temp.obj());
272 _reduce(x, key, endSizeEstimate);
273 }
274
Config(const string & _dbname,const BSONObj & cmdObj)275 Config::Config(const string& _dbname, const BSONObj& cmdObj) {
276 dbname = _dbname;
277 uassert(ErrorCodes::TypeMismatch,
278 str::stream() << "'mapReduce' must be of type String",
279 cmdObj.firstElement().type() == BSONType::String);
280 nss = NamespaceString(dbname, cmdObj.firstElement().valueStringData());
281 uassert(ErrorCodes::InvalidNamespace,
282 str::stream() << "Invalid namespace: " << nss.ns(),
283 nss.isValid());
284
285 verbose = cmdObj["verbose"].trueValue();
286 jsMode = cmdObj["jsMode"].trueValue();
287 splitInfo = 0;
288
289 if (cmdObj.hasField("splitInfo")) {
290 splitInfo = cmdObj["splitInfo"].Int();
291 }
292
293 jsMaxKeys = 500000;
294 reduceTriggerRatio = 10.0;
295 maxInMemSize = 500 * 1024;
296
297 uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo());
298
299 outputOptions = parseOutputOptions(dbname, cmdObj);
300
301 shardedFirstPass = false;
302 if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) {
303 massert(16054,
304 "shardedFirstPass should only use replace outType",
305 outputOptions.outType == REPLACE);
306 shardedFirstPass = true;
307 }
308
309 if (outputOptions.outType != INMEMORY) { // setup temp collection name
310 tempNamespace = NamespaceString(
311 outputOptions.outDB.empty() ? dbname : outputOptions.outDB,
312 str::stream() << "tmp.mr." << cmdObj.firstElement().valueStringData() << "_"
313 << JOB_NUMBER.fetchAndAdd(1));
314 incLong = NamespaceString(str::stream() << tempNamespace.ns() << "_inc");
315 }
316
317 {
318 // scope and code
319
320 if (cmdObj["scope"].type() == Object)
321 scopeSetup = cmdObj["scope"].embeddedObjectUserCheck().getOwned();
322
323 mapper.reset(new JSMapper(cmdObj["map"]));
324 reducer.reset(new JSReducer(cmdObj["reduce"]));
325 if (cmdObj["finalize"].type() && cmdObj["finalize"].trueValue())
326 finalizer.reset(new JSFinalizer(cmdObj["finalize"]));
327
328 if (cmdObj["mapparams"].type() == Array) {
329 mapParams = cmdObj["mapparams"].embeddedObjectUserCheck().getOwned();
330 }
331 }
332
333 {
334 // query options
335 BSONElement q = cmdObj["query"];
336 if (q.type() == Object)
337 filter = q.embeddedObjectUserCheck();
338 else
339 uassert(13608, "query has to be blank or an Object", !q.trueValue());
340
341
342 BSONElement s = cmdObj["sort"];
343 if (s.type() == Object)
344 sort = s.embeddedObjectUserCheck();
345 else
346 uassert(13609, "sort has to be blank or an Object", !s.trueValue());
347
348 BSONElement collationElt = cmdObj["collation"];
349 if (collationElt.type() == Object)
350 collation = collationElt.embeddedObjectUserCheck();
351 else
352 uassert(40082,
353 str::stream()
354 << "mapReduce 'collation' parameter must be of type Object but found type: "
355 << typeName(collationElt.type()),
356 collationElt.eoo());
357
358 if (cmdObj["limit"].isNumber())
359 limit = cmdObj["limit"].numberLong();
360 else
361 limit = 0;
362 }
363 }
364
365 /**
366 * Clean up the temporary and incremental collections
367 */
dropTempCollections()368 void State::dropTempCollections() {
369 if (!_config.tempNamespace.isEmpty()) {
370 writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] {
371 AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X);
372 if (auto db = autoDb.getDb()) {
373 WriteUnitOfWork wunit(_opCtx);
374 uassert(
375 ErrorCodes::PrimarySteppedDown,
376 str::stream()
377 << "no longer primary while dropping temporary collection for mapReduce: "
378 << _config.tempNamespace.ns(),
379 repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(
380 _opCtx, _config.tempNamespace));
381 uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns()));
382 wunit.commit();
383 }
384 });
385 // Always forget about temporary namespaces, so we don't cache lots of them
386 ShardConnection::forgetNS(_config.tempNamespace.ns());
387 }
388 if (_useIncremental && !_config.incLong.isEmpty()) {
389 // We don't want to log the deletion of incLong as it isn't replicated. While
390 // harmless, this would lead to a scary looking warning on the secondaries.
391 repl::UnreplicatedWritesBlock uwb(_opCtx);
392
393 writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] {
394 Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X);
395 if (Database* db = dbHolder().get(_opCtx, _config.incLong.ns())) {
396 WriteUnitOfWork wunit(_opCtx);
397 uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns()));
398 wunit.commit();
399 }
400 });
401
402 ShardConnection::forgetNS(_config.incLong.ns());
403 }
404 }
405
406 /**
407 * Create temporary collection, set up indexes
408 */
prepTempCollection()409 void State::prepTempCollection() {
410 if (!_onDisk)
411 return;
412
413 dropTempCollections();
414 if (_useIncremental) {
415 // Create the inc collection and make sure we have index on "0" key.
416 // Intentionally not replicating the inc collection to secondaries.
417 repl::UnreplicatedWritesBlock uwb(_opCtx);
418
419 writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] {
420 OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
421 WriteUnitOfWork wuow(_opCtx);
422 Collection* incColl = incCtx.getCollection();
423 invariant(!incColl);
424
425 CollectionOptions options;
426 options.setNoIdIndex();
427 options.temp = true;
428 if (enableCollectionUUIDs &&
429 serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
430 options.uuid.emplace(UUID::gen());
431 }
432 incColl = incCtx.db()->createCollection(_opCtx, _config.incLong.ns(), options);
433 invariant(incColl);
434
435 auto rawIndexSpec =
436 BSON("key" << BSON("0" << 1) << "ns" << _config.incLong.ns() << "name"
437 << "_temp_0");
438 auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec(
439 _opCtx, rawIndexSpec, _config.incLong, serverGlobalParams.featureCompatibility));
440
441 Status status = incColl->getIndexCatalog()
442 ->createIndexOnEmptyCollection(_opCtx, indexSpec)
443 .getStatus();
444 if (!status.isOK()) {
445 uasserted(17305,
446 str::stream() << "createIndex failed for mr incLong ns: "
447 << _config.incLong.ns()
448 << " err: "
449 << status.code());
450 }
451 wuow.commit();
452 });
453 }
454
455 CollectionOptions finalOptions;
456 vector<BSONObj> indexesToInsert;
457
458 {
459 // copy indexes and collection options into temporary storage
460 OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns());
461 Collection* const finalColl = finalCtx.getCollection();
462 if (finalColl) {
463 finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx);
464
465 IndexCatalog::IndexIterator ii =
466 finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
467 // Iterate over finalColl's indexes.
468 while (ii.more()) {
469 IndexDescriptor* currIndex = ii.next();
470 BSONObjBuilder b;
471 b.append("ns", _config.tempNamespace.ns());
472
473 // Copy over contents of the index descriptor's infoObj.
474 BSONObjIterator j(currIndex->infoObj());
475 while (j.more()) {
476 BSONElement e = j.next();
477 if (str::equals(e.fieldName(), "_id") || str::equals(e.fieldName(), "ns"))
478 continue;
479 b.append(e);
480 }
481 indexesToInsert.push_back(b.obj());
482 }
483 }
484 }
485
486 writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] {
487 // create temp collection and insert the indexes from temporary storage
488 OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns());
489 WriteUnitOfWork wuow(_opCtx);
490 uassert(
491 ErrorCodes::PrimarySteppedDown,
492 str::stream() << "no longer primary while creating temporary collection for mapReduce: "
493 << _config.tempNamespace.ns(),
494 repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx,
495 _config.tempNamespace));
496 Collection* tempColl = tempCtx.getCollection();
497 invariant(!tempColl);
498
499 CollectionOptions options = finalOptions;
500 options.temp = true;
501 if (enableCollectionUUIDs && serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
502 // If a UUID for the final output collection was sent by mongos (i.e., the final output
503 // collection is sharded), use the UUID mongos sent when creating the temp collection.
504 // When the temp collection is renamed to the final output collection, the UUID will be
505 // preserved.
506 options.uuid.emplace(_config.finalOutputCollUUID ? *_config.finalOutputCollUUID
507 : UUID::gen());
508 } else {
509 // While downgrading, we can inherit a UUID from finalOptions and not override it
510 // above. If we didn't clear it, this would result in creating a collection with a
511 // duplicate UUID.
512 options.uuid = boost::none;
513 }
514 tempColl = tempCtx.db()->createCollection(_opCtx, _config.tempNamespace.ns(), options);
515
516 for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end();
517 ++it) {
518 Status status =
519 tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus();
520 if (!status.isOK()) {
521 if (status.code() == ErrorCodes::IndexAlreadyExists) {
522 continue;
523 }
524 uassertStatusOK(status);
525 }
526 // Log the createIndex operation.
527 auto uuid = tempColl->uuid();
528 getGlobalServiceContext()->getOpObserver()->onCreateIndex(
529 _opCtx, _config.tempNamespace, uuid, *it, false);
530 }
531 wuow.commit();
532 });
533 }
534
535 /**
536 * For inline mode, appends results to output object.
537 * Makes sure (key, value) tuple is formatted as {_id: key, value: val}
538 */
appendResults(BSONObjBuilder & final)539 void State::appendResults(BSONObjBuilder& final) {
540 if (_onDisk) {
541 if (!_config.outputOptions.outDB.empty()) {
542 BSONObjBuilder loc;
543 if (!_config.outputOptions.outDB.empty())
544 loc.append("db", _config.outputOptions.outDB);
545 if (!_config.outputOptions.collectionName.empty())
546 loc.append("collection", _config.outputOptions.collectionName);
547 final.append("result", loc.obj());
548 } else {
549 if (!_config.outputOptions.collectionName.empty())
550 final.append("result", _config.outputOptions.collectionName);
551 }
552
553 if (_config.splitInfo > 0) {
554 // add split points, used for shard
555 BSONObj res;
556 BSONObj idKey = BSON("_id" << 1);
557 if (!_db.runCommand("admin",
558 BSON("splitVector" << _config.outputOptions.finalNamespace.ns()
559 << "keyPattern"
560 << idKey
561 << "maxChunkSizeBytes"
562 << _config.splitInfo),
563 res)) {
564 uasserted(15921, str::stream() << "splitVector failed: " << res);
565 }
566 if (res.hasField("splitKeys"))
567 final.append(res.getField("splitKeys"));
568 }
569 return;
570 }
571
572 if (_jsMode) {
573 ScriptingFunction getResult = _scope->createFunction(
574 "var map = _mrMap;"
575 "var result = [];"
576 "for (key in map) {"
577 " result.push({_id: key, value: map[key]});"
578 "}"
579 "return result;");
580 _scope->invoke(getResult, 0, 0, 0, false);
581 BSONObj obj = _scope->getObject("__returnValue");
582 final.append("results", BSONArray(obj));
583 return;
584 }
585
586 uassert(13604, "too much data for in memory map/reduce", _size < BSONObjMaxUserSize);
587
588 BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys
589
590 for (const auto& entry : *_temp) {
591 const BSONObj& key = entry.first;
592 const BSONList& all = entry.second;
593
594 verify(all.size() == 1);
595
596 BSONObjIterator vi(all[0]);
597 vi.next();
598
599 BSONObjBuilder temp(b.subobjStart());
600 temp.appendAs(key.firstElement(), "_id");
601 temp.appendAs(vi.next(), "value");
602 temp.done();
603 }
604
605 BSONArray res = b.arr();
606 final.append("results", res);
607 }
608
609 /**
610 * Does post processing on output collection.
611 * This may involve replacing, merging or reducing.
612 */
postProcessCollection(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm)613 long long State::postProcessCollection(OperationContext* opCtx,
614 CurOp* curOp,
615 ProgressMeterHolder& pm) {
616 if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY)
617 return numInMemKeys();
618
619 bool holdingGlobalLock = false;
620 if (_config.outputOptions.outNonAtomic)
621 return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
622
623 invariant(!opCtx->lockState()->isLocked());
624
625 // This must be global because we may write across different databases.
626 Lock::GlobalWrite lock(opCtx);
627 holdingGlobalLock = true;
628 return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
629 }
630
631 namespace {
632
633 // Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock,
634 // then this function does not acquire any additional locks.
_collectionCount(OperationContext * opCtx,const NamespaceString & nss,bool callerHoldsGlobalLock)635 unsigned long long _collectionCount(OperationContext* opCtx,
636 const NamespaceString& nss,
637 bool callerHoldsGlobalLock) {
638 Collection* coll = nullptr;
639 boost::optional<AutoGetCollectionForReadCommand> ctx;
640
641 // If the global write lock is held, we must avoid using AutoGetCollectionForReadCommand as it
642 // may lead to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
643 if (callerHoldsGlobalLock) {
644 Database* db = dbHolder().get(opCtx, nss.ns());
645 if (db) {
646 coll = db->getCollection(opCtx, nss);
647 }
648 } else {
649 ctx.emplace(opCtx, nss);
650 coll = ctx->getCollection();
651 }
652
653 return coll ? coll->numRecords(opCtx) : 0;
654 }
655
656 } // namespace
657
postProcessCollectionNonAtomic(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm,bool callerHoldsGlobalLock)658 long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
659 CurOp* curOp,
660 ProgressMeterHolder& pm,
661 bool callerHoldsGlobalLock) {
662 auto outputCount =
663 _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
664
665 // Determine whether the temp collection should be renamed to the final output collection and
666 // thus preserve the UUID. This is possible in the following cases:
667 // * Output mode "replace"
668 // * If this mapReduce is creating a new sharded output collection, which can be determined by
669 // whether mongos sent the UUID that the final output collection should have (that is, whether
670 // _config.finalOutputCollUUID is set).
671 if (_config.outputOptions.outType == Config::REPLACE || _config.finalOutputCollUUID) {
672 // This must be global because we may write across different databases.
673 Lock::GlobalWrite lock(opCtx);
674 // replace: just rename from temp to final collection name, dropping previous collection
675 _db.dropCollection(_config.outputOptions.finalNamespace.ns());
676 BSONObj info;
677
678 if (!_db.runCommand("admin",
679 BSON("renameCollection" << _config.tempNamespace.ns() << "to"
680 << _config.outputOptions.finalNamespace.ns()
681 << "stayTemp"
682 << _config.shardedFirstPass),
683 info)) {
684 uasserted(10076, str::stream() << "rename failed: " << info);
685 }
686
687 _db.dropCollection(_config.tempNamespace.ns());
688 } else if (_config.outputOptions.outType == Config::MERGE) {
689 // merge: upsert new docs into old collection
690
691 {
692 stdx::unique_lock<Client> lk(*opCtx->getClient());
693 curOp->setMessage_inlock(
694 "m/r: merge post processing", "M/R Merge Post Processing Progress", outputCount);
695 }
696 unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
697 while (cursor->more()) {
698 Lock::DBLock lock(opCtx, _config.outputOptions.finalNamespace.db(), MODE_X);
699 BSONObj o = cursor->nextSafe();
700 Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o);
701 pm.hit();
702 }
703 _db.dropCollection(_config.tempNamespace.ns());
704 pm.finished();
705 } else if (_config.outputOptions.outType == Config::REDUCE) {
706 // reduce: apply reduce op on new result and existing one
707 BSONList values;
708
709 {
710 stdx::unique_lock<Client> lk(*opCtx->getClient());
711 curOp->setMessage_inlock(
712 "m/r: reduce post processing", "M/R Reduce Post Processing Progress", outputCount);
713 }
714
715 unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
716 while (cursor->more()) {
717 // This must be global because we may write across different databases.
718 Lock::GlobalWrite lock(opCtx);
719 BSONObj temp = cursor->nextSafe();
720 BSONObj old;
721
722 const bool found = [&] {
723 AutoGetCollection autoColl(opCtx, _config.outputOptions.finalNamespace, MODE_IS);
724 return Helpers::findOne(
725 opCtx, autoColl.getCollection(), temp["_id"].wrap(), old, true);
726 }();
727 if (found) {
728 // need to reduce
729 values.clear();
730 values.push_back(temp);
731 values.push_back(old);
732 Helpers::upsert(opCtx,
733 _config.outputOptions.finalNamespace.ns(),
734 _config.reducer->finalReduce(values, _config.finalizer.get()));
735 } else {
736 Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp);
737 }
738 pm.hit();
739 }
740 pm.finished();
741 }
742
743 return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
744 }
745
746 /**
747 * Insert doc in collection. This should be replicated.
748 */
insert(const NamespaceString & nss,const BSONObj & o)749 void State::insert(const NamespaceString& nss, const BSONObj& o) {
750 verify(_onDisk);
751
752 writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] {
753 OldClientWriteContext ctx(_opCtx, nss.ns());
754 WriteUnitOfWork wuow(_opCtx);
755 uassert(
756 ErrorCodes::PrimarySteppedDown,
757 str::stream() << "no longer primary while inserting mapReduce result into collection: "
758 << nss.ns()
759 << ": "
760 << redact(o),
761 repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, nss));
762 Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss);
763
764 BSONObjBuilder b;
765 if (!o.hasField("_id")) {
766 b.appendOID("_id", NULL, true);
767 }
768 b.appendElements(o);
769 BSONObj bo = b.obj();
770
771 StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo);
772 uassertStatusOK(res.getStatus());
773 if (!res.getValue().isEmpty()) {
774 bo = res.getValue();
775 }
776
777 // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
778 OpDebug* const nullOpDebug = nullptr;
779 uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(bo), nullOpDebug, true));
780 wuow.commit();
781 });
782 }
783
784 /**
785 * Insert doc into the inc collection. This should not be replicated.
786 */
_insertToInc(BSONObj & o)787 void State::_insertToInc(BSONObj& o) {
788 verify(_onDisk);
789
790 writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] {
791 OldClientWriteContext ctx(_opCtx, _config.incLong.ns());
792 WriteUnitOfWork wuow(_opCtx);
793 Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong);
794 repl::UnreplicatedWritesBlock uwb(_opCtx);
795
796 // The documents inserted into the incremental collection are of the form
797 // {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the
798 // check that the document has an "_id" field would fail. Instead, we directly verify that
799 // the size of the document to insert is smaller than 16MB.
800 if (o.objsize() > BSONObjMaxUserSize) {
801 uasserted(ErrorCodes::BadValue,
802 str::stream() << "object to insert too large for incremental collection"
803 << ". size in bytes: "
804 << o.objsize()
805 << ", max size: "
806 << BSONObjMaxUserSize);
807 }
808
809 // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
810 OpDebug* const nullOpDebug = nullptr;
811 uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(o), nullOpDebug, true, false));
812 wuow.commit();
813 });
814 }
815
State(OperationContext * opCtx,const Config & c)816 State::State(OperationContext* opCtx, const Config& c)
817 : _config(c),
818 _db(opCtx),
819 _useIncremental(true),
820 _opCtx(opCtx),
821 _size(0),
822 _dupCount(0),
823 _numEmits(0) {
824 _temp.reset(new InMemory());
825 _onDisk = _config.outputOptions.outType != Config::INMEMORY;
826 }
827
sourceExists()828 bool State::sourceExists() {
829 return _db.exists(_config.nss.ns());
830 }
831
~State()832 State::~State() {
833 if (_onDisk) {
834 try {
835 dropTempCollections();
836 } catch (...) {
837 error() << "Unable to drop temporary collection created by mapReduce: "
838 << _config.tempNamespace << ". This collection will be removed automatically "
839 "the next time the server starts up. "
840 << exceptionToStatus();
841 }
842 }
843 if (_scope && !_scope->isKillPending() && _scope->getError().empty()) {
844 // cleanup js objects
845 try {
846 ScriptingFunction cleanup =
847 _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
848 _scope->invoke(cleanup, 0, 0, 0, true);
849 } catch (const DBException&) {
850 // not important because properties will be reset if scope is reused
851 LOG(1) << "MapReduce terminated during state destruction";
852 }
853 }
854 }
855
856 /**
857 * Initialize the mapreduce operation, creating the inc collection
858 */
init()859 void State::init() {
860 // setup js
861 const string userToken =
862 AuthorizationSession::get(Client::getCurrent())->getAuthenticatedUserNamesToken();
863 _scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread());
864 _scope->requireOwnedObjects();
865 _scope->registerOperation(_opCtx);
866 _scope->setLocalDB(_config.dbname);
867 _scope->loadStored(_opCtx, true);
868
869 if (!_config.scopeSetup.isEmpty())
870 _scope->init(&_config.scopeSetup);
871
872 _config.mapper->init(this);
873 _config.reducer->init(this);
874 if (_config.finalizer)
875 _config.finalizer->init(this);
876 _scope->setBoolean("_doFinal", _config.finalizer.get() != 0);
877
878 switchMode(_config.jsMode); // set up js-mode based on Config
879
880 // global JS map/reduce hashmap
881 // we use a standard JS object which means keys are only simple types
882 // we could also add a real hashmap from a library and object comparison methods
883 // for increased performance, we may want to look at v8 Harmony Map support
884 // _scope->setObject("_mrMap", BSONObj(), false);
885 ScriptingFunction init = _scope->createFunction(
886 "_emitCt = 0;"
887 "_keyCt = 0;"
888 "_dupCt = 0;"
889 "_redCt = 0;"
890 "if (typeof(_mrMap) === 'undefined') {"
891 " _mrMap = {};"
892 "}");
893 _scope->invoke(init, 0, 0, 0, true);
894
895 // js function to run reduce on all keys
896 // redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key);
897 // list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };");
898 _reduceAll = _scope->createFunction(
899 "var map = _mrMap;"
900 "var list, ret;"
901 "for (var key in map) {"
902 " list = map[key];"
903 " if (list.length != 1) {"
904 " ret = _reduce(key, list);"
905 " map[key] = [ret];"
906 " ++_redCt;"
907 " }"
908 "}"
909 "_dupCt = 0;");
910 massert(16717, "error initializing JavaScript reduceAll function", _reduceAll != 0);
911
912 _reduceAndEmit = _scope->createFunction(
913 "var map = _mrMap;"
914 "var list, ret;"
915 "for (var key in map) {"
916 " list = map[key];"
917 " if (list.length == 1)"
918 " ret = list[0];"
919 " else {"
920 " ret = _reduce(key, list);"
921 " ++_redCt;"
922 " }"
923 " emit(key, ret);"
924 "}"
925 "delete _mrMap;");
926 massert(16718, "error initializing JavaScript reduce/emit function", _reduceAndEmit != 0);
927
928 _reduceAndFinalize = _scope->createFunction(
929 "var map = _mrMap;"
930 "var list, ret;"
931 "for (var key in map) {"
932 " list = map[key];"
933 " if (list.length == 1) {"
934 " if (!_doFinal) { continue; }"
935 " ret = list[0];"
936 " }"
937 " else {"
938 " ret = _reduce(key, list);"
939 " ++_redCt;"
940 " }"
941 " if (_doFinal)"
942 " ret = _finalize(key, ret);"
943 " map[key] = ret;"
944 "}");
945 massert(16719, "error creating JavaScript reduce/finalize function", _reduceAndFinalize != 0);
946
947 _reduceAndFinalizeAndInsert = _scope->createFunction(
948 "var map = _mrMap;"
949 "var list, ret;"
950 "for (var key in map) {"
951 " list = map[key];"
952 " if (list.length == 1)"
953 " ret = list[0];"
954 " else {"
955 " ret = _reduce(key, list);"
956 " ++_redCt;"
957 " }"
958 " if (_doFinal)"
959 " ret = _finalize(key, ret);"
960 " _nativeToTemp({_id: key, value: ret});"
961 "}");
962 massert(16720, "error initializing JavaScript functions", _reduceAndFinalizeAndInsert != 0);
963 }
964
switchMode(bool jsMode)965 void State::switchMode(bool jsMode) {
966 _jsMode = jsMode;
967 if (jsMode) {
968 // emit function that stays in JS
969 _scope->setFunction("emit",
970 "function(key, value) {"
971 " if (typeof(key) === 'object') {"
972 " _bailFromJS(key, value);"
973 " return;"
974 " }"
975 " ++_emitCt;"
976 " var map = _mrMap;"
977 " var list = map[key];"
978 " if (!list) {"
979 " ++_keyCt;"
980 " list = [];"
981 " map[key] = list;"
982 " }"
983 " else"
984 " ++_dupCt;"
985 " list.push(value);"
986 "}");
987 _scope->injectNative("_bailFromJS", _bailFromJS, this);
988 } else {
989 // emit now populates C++ map
990 _scope->injectNative("emit", fast_emit, this);
991 }
992 }
993
bailFromJS()994 void State::bailFromJS() {
995 LOG(1) << "M/R: Switching from JS mode to mixed mode";
996
997 // reduce and reemit into c++
998 switchMode(false);
999 _scope->invoke(_reduceAndEmit, 0, 0, 0, true);
1000 // need to get the real number emitted so far
1001 _numEmits = _scope->getNumberInt("_emitCt");
1002 _config.reducer->numReduces = _scope->getNumberInt("_redCt");
1003 }
1004
getCollectionOrUassert(OperationContext * opCtx,Database * db,const NamespaceString & nss)1005 Collection* State::getCollectionOrUassert(OperationContext* opCtx,
1006 Database* db,
1007 const NamespaceString& nss) {
1008 Collection* out = db ? db->getCollection(opCtx, nss) : NULL;
1009 uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out);
1010 return out;
1011 }
1012
1013 /**
1014 * Applies last reduce and finalize on a list of tuples (key, val)
1015 * Inserts single result {_id: key, value: val} into temp collection
1016 */
finalReduce(BSONList & values)1017 void State::finalReduce(BSONList& values) {
1018 if (!_onDisk || values.size() == 0)
1019 return;
1020
1021 BSONObj res = _config.reducer->finalReduce(values, _config.finalizer.get());
1022 insert(_config.tempNamespace, res);
1023 }
1024
_nativeToTemp(const BSONObj & args,void * data)1025 BSONObj _nativeToTemp(const BSONObj& args, void* data) {
1026 State* state = (State*)data;
1027 BSONObjIterator it(args);
1028 state->insert(state->_config.tempNamespace, it.next().Obj());
1029 return BSONObj();
1030 }
1031
1032 // BSONObj _nativeToInc( const BSONObj& args, void* data ) {
1033 // State* state = (State*) data;
1034 // BSONObjIterator it(args);
1035 // const BSONObj& obj = it.next().Obj();
1036 // state->_insertToInc(const_cast<BSONObj&>(obj));
1037 // return BSONObj();
1038 // }
1039
1040 /**
1041 * Applies last reduce and finalize.
1042 * After calling this method, the temp collection will be completed.
1043 * If inline, the results will be in the in memory map
1044 */
finalReduce(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm)1045 void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) {
1046 if (_jsMode) {
1047 // apply the reduce within JS
1048 if (_onDisk) {
1049 _scope->injectNative("_nativeToTemp", _nativeToTemp, this);
1050 _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true);
1051 return;
1052 } else {
1053 _scope->invoke(_reduceAndFinalize, 0, 0, 0, true);
1054 return;
1055 }
1056 }
1057
1058 if (!_onDisk) {
1059 // all data has already been reduced, just finalize
1060 if (_config.finalizer) {
1061 long size = 0;
1062 for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
1063 BSONObj key = i->first;
1064 BSONList& all = i->second;
1065
1066 verify(all.size() == 1);
1067
1068 BSONObj res = _config.finalizer->finalize(all[0]);
1069
1070 all.clear();
1071 all.push_back(res);
1072 size += res.objsize();
1073 }
1074 _size = size;
1075 }
1076 return;
1077 }
1078
1079 // use index on "0" to pull sorted data
1080 verify(_temp->size() == 0);
1081 BSONObj sortKey = BSON("0" << 1);
1082
1083 writeConflictRetry(_opCtx, "finalReduce", _config.incLong.ns(), [&] {
1084 OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
1085 WriteUnitOfWork wuow(_opCtx);
1086 Collection* incColl = getCollectionOrUassert(_opCtx, incCtx.db(), _config.incLong);
1087
1088 bool foundIndex = false;
1089 IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
1090 // Iterate over incColl's indexes.
1091 while (ii.more()) {
1092 IndexDescriptor* currIndex = ii.next();
1093 BSONObj x = currIndex->infoObj();
1094 if (sortKey.woCompare(x["key"].embeddedObject()) == 0) {
1095 foundIndex = true;
1096 break;
1097 }
1098 }
1099
1100 verify(foundIndex);
1101 wuow.commit();
1102 });
1103
1104 unique_ptr<AutoGetCollectionForReadCommand> ctx(
1105 new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1106
1107 BSONObj prev;
1108 BSONList all;
1109
1110 {
1111 const auto count = _db.count(_config.incLong.ns(), BSONObj(), QueryOption_SlaveOk);
1112 stdx::lock_guard<Client> lk(*_opCtx->getClient());
1113 verify(pm ==
1114 curOp->setMessage_inlock("m/r: (3/3) final reduce to collection",
1115 "M/R: (3/3) Final Reduce Progress",
1116 count));
1117 }
1118
1119 const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong);
1120
1121 auto qr = stdx::make_unique<QueryRequest>(_config.incLong);
1122 qr->setSort(sortKey);
1123
1124 const boost::intrusive_ptr<ExpressionContext> expCtx;
1125 auto statusWithCQ =
1126 CanonicalQuery::canonicalize(opCtx,
1127 std::move(qr),
1128 expCtx,
1129 extensionsCallback,
1130 MatchExpressionParser::kAllowAllSpecialFeatures &
1131 ~MatchExpressionParser::AllowedFeatures::kIsolated);
1132 verify(statusWithCQ.isOK());
1133 std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
1134
1135 Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong);
1136 invariant(coll);
1137
1138 auto exec = uassertStatusOK(getExecutor(
1139 _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN));
1140
1141 // Make sure the PlanExecutor is destroyed while holding a collection lock.
1142 ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
1143 if (!ctx) {
1144 AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS);
1145 exec.reset();
1146 }
1147 });
1148
1149 // iterate over all sorted objects
1150 BSONObj o;
1151 PlanExecutor::ExecState state;
1152 while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
1153 o = o.getOwned(); // we will be accessing outside of the lock
1154 pm.hit();
1155
1156 if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) {
1157 // object is same as previous, add to array
1158 all.push_back(o);
1159 if (pm->hits() % 100 == 0) {
1160 _opCtx->checkForInterrupt();
1161 }
1162 continue;
1163 }
1164
1165 exec->saveState();
1166
1167 ctx.reset();
1168
1169 // reduce a finalize array
1170 finalReduce(all);
1171
1172 ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1173
1174 all.clear();
1175 prev = o;
1176 all.push_back(o);
1177
1178 _opCtx->checkForInterrupt();
1179 uassertStatusOK(exec->restoreState());
1180 }
1181
1182 uassert(34428,
1183 "Plan executor error during mapReduce command: " + WorkingSetCommon::toStatusString(o),
1184 PlanExecutor::IS_EOF == state);
1185
1186 ctx.reset();
1187 // reduce and finalize last array
1188 finalReduce(all);
1189 ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1190
1191 pm.finished();
1192 }
1193
1194 /**
1195 * Attempts to reduce objects in the memory map.
1196 * A new memory map will be created to hold the results.
1197 * If applicable, objects with unique key may be dumped to inc collection.
1198 * Input and output objects are both {"0": key, "1": val}
1199 */
reduceInMemory()1200 void State::reduceInMemory() {
1201 if (_jsMode) {
1202 // in js mode the reduce is applied when writing to collection
1203 return;
1204 }
1205
1206 unique_ptr<InMemory> n(new InMemory()); // for new data
1207 long nSize = 0;
1208 _dupCount = 0;
1209
1210 for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
1211 BSONList& all = i->second;
1212
1213 if (all.size() == 1) {
1214 // only 1 value for this key
1215 if (_onDisk) {
1216 // this key has low cardinality, so just write to collection
1217 _insertToInc(*(all.begin()));
1218 } else {
1219 // add to new map
1220 nSize += _add(n.get(), all[0]);
1221 }
1222 } else if (all.size() > 1) {
1223 // several values, reduce and add to map
1224 BSONObj res = _config.reducer->reduce(all);
1225 nSize += _add(n.get(), res);
1226 }
1227 }
1228
1229 // swap maps
1230 _temp.reset(n.release());
1231 _size = nSize;
1232 }
1233
1234 /**
1235 * Dumps the entire in memory map to the inc collection.
1236 */
dumpToInc()1237 void State::dumpToInc() {
1238 if (!_onDisk)
1239 return;
1240
1241 for (InMemory::iterator i = _temp->begin(); i != _temp->end(); i++) {
1242 BSONList& all = i->second;
1243 if (all.size() < 1)
1244 continue;
1245
1246 for (BSONList::iterator j = all.begin(); j != all.end(); j++)
1247 _insertToInc(*j);
1248 }
1249 _temp->clear();
1250 _size = 0;
1251 }
1252
1253 /**
1254 * Adds object to in memory map
1255 */
emit(const BSONObj & a)1256 void State::emit(const BSONObj& a) {
1257 _numEmits++;
1258 _size += _add(_temp.get(), a);
1259 }
1260
_add(InMemory * im,const BSONObj & a)1261 int State::_add(InMemory* im, const BSONObj& a) {
1262 BSONList& all = (*im)[a];
1263 all.push_back(a);
1264 if (all.size() > 1) {
1265 ++_dupCount;
1266 }
1267
1268 return a.objsize() + 16;
1269 }
1270
reduceAndSpillInMemoryStateIfNeeded()1271 void State::reduceAndSpillInMemoryStateIfNeeded() {
1272 // Make sure no DB locks are held, because this method manages its own locking and
1273 // write units of work.
1274 invariant(!_opCtx->lockState()->isLocked());
1275
1276 if (_jsMode) {
1277 // try to reduce if it is beneficial
1278 int dupCt = _scope->getNumberInt("_dupCt");
1279 int keyCt = _scope->getNumberInt("_keyCt");
1280
1281 if (keyCt > _config.jsMaxKeys) {
1282 // too many keys for JS, switch to mixed
1283 _bailFromJS(BSONObj(), this);
1284 // then fall through to check map size
1285 } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) {
1286 // reduce now to lower mem usage
1287 Timer t;
1288 _scope->invoke(_reduceAll, 0, 0, 0, true);
1289 LOG(3) << " MR - did reduceAll: keys=" << keyCt << " dups=" << dupCt
1290 << " newKeys=" << _scope->getNumberInt("_keyCt") << " time=" << t.millis()
1291 << "ms";
1292 return;
1293 }
1294 }
1295
1296 if (_jsMode)
1297 return;
1298
1299 if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) {
1300 // attempt to reduce in memory map, if memory is too high or we have many duplicates
1301 long oldSize = _size;
1302 Timer t;
1303 reduceInMemory();
1304 LOG(3) << " MR - did reduceInMemory: size=" << oldSize << " dups=" << _dupCount
1305 << " newSize=" << _size << " time=" << t.millis() << "ms";
1306
1307 // if size is still high, or values are not reducing well, dump
1308 if (_onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2)) {
1309 dumpToInc();
1310 LOG(3) << " MR - dumping to db";
1311 }
1312 }
1313 }
1314
1315 /**
1316 * emit that will be called by js function
1317 */
fast_emit(const BSONObj & args,void * data)1318 BSONObj fast_emit(const BSONObj& args, void* data) {
1319 uassert(10077, "fast_emit takes 2 args", args.nFields() == 2);
1320 uassert(13069,
1321 "an emit can't be more than half max bson size",
1322 args.objsize() < (BSONObjMaxUserSize / 2));
1323
1324 State* state = (State*)data;
1325 if (args.firstElement().type() == Undefined) {
1326 BSONObjBuilder b(args.objsize());
1327 b.appendNull("");
1328 BSONObjIterator i(args);
1329 i.next();
1330 b.append(i.next());
1331 state->emit(b.obj());
1332 } else {
1333 state->emit(args);
1334 }
1335 return BSONObj();
1336 }
1337
1338 /**
1339 * function is called when we realize we cant use js mode for m/r on the 1st key
1340 */
_bailFromJS(const BSONObj & args,void * data)1341 BSONObj _bailFromJS(const BSONObj& args, void* data) {
1342 State* state = (State*)data;
1343 state->bailFromJS();
1344
1345 // emit this particular key if there is one
1346 if (!args.isEmpty()) {
1347 fast_emit(args, data);
1348 }
1349 return BSONObj();
1350 }
1351
1352 /**
1353 * This class represents a map/reduce command executed on a single server
1354 */
1355 class MapReduceCommand : public ErrmsgCommandDeprecated {
1356 public:
MapReduceCommand()1357 MapReduceCommand() : ErrmsgCommandDeprecated("mapReduce", "mapreduce") {}
1358
slaveOk() const1359 virtual bool slaveOk() const {
1360 return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
1361 repl::ReplicationCoordinator::modeReplSet;
1362 }
1363
slaveOverrideOk() const1364 virtual bool slaveOverrideOk() const {
1365 return true;
1366 }
1367
maintenanceOk() const1368 virtual bool maintenanceOk() const override {
1369 return false;
1370 }
1371
reserveBytesForReply() const1372 std::size_t reserveBytesForReply() const override {
1373 return FindCommon::kInitReplyBufferSize;
1374 }
1375
help(stringstream & help) const1376 virtual void help(stringstream& help) const {
1377 help << "Run a map/reduce operation on the server.\n";
1378 help << "Note this is used for aggregation, not querying, in MongoDB.\n";
1379 help << "http://dochub.mongodb.org/core/mapreduce";
1380 }
1381
1382
supportsWriteConcern(const BSONObj & cmd) const1383 virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
1384 return mrSupportsWriteConcern(cmd);
1385 }
1386
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)1387 virtual void addRequiredPrivileges(const std::string& dbname,
1388 const BSONObj& cmdObj,
1389 std::vector<Privilege>* out) {
1390 addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
1391 }
1392
errmsgRun(OperationContext * opCtx,const string & dbname,const BSONObj & cmd,string & errmsg,BSONObjBuilder & result)1393 bool errmsgRun(OperationContext* opCtx,
1394 const string& dbname,
1395 const BSONObj& cmd,
1396 string& errmsg,
1397 BSONObjBuilder& result) {
1398 Timer t;
1399
1400 boost::optional<DisableDocumentValidation> maybeDisableValidation;
1401 if (shouldBypassDocumentValidationForCommand(cmd))
1402 maybeDisableValidation.emplace(opCtx);
1403
1404 auto client = opCtx->getClient();
1405
1406 if (client->isInDirectClient()) {
1407 return appendCommandStatus(
1408 result,
1409 Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()"));
1410 }
1411
1412 auto curOp = CurOp::get(opCtx);
1413
1414 const Config config(dbname, cmd);
1415
1416 LOG(1) << "mr ns: " << config.nss;
1417
1418 uassert(16149, "cannot run map reduce without the js engine", getGlobalScriptEngine());
1419
1420 // Prevent sharding state from changing during the MR.
1421 const auto collMetadata = [&] {
1422 // Get metadata before we check our version, to make sure it doesn't increment in the
1423 // meantime
1424 AutoGetCollectionForReadCommand autoColl(opCtx, config.nss);
1425 return CollectionShardingState::get(opCtx, config.nss)->getMetadata();
1426 }();
1427
1428 bool shouldHaveData = false;
1429
1430 BSONObjBuilder countsBuilder;
1431 BSONObjBuilder timingBuilder;
1432 try {
1433 State state(opCtx, config);
1434 if (!state.sourceExists()) {
1435 return appendCommandStatus(
1436 result,
1437 Status(ErrorCodes::NamespaceNotFound,
1438 str::stream() << "namespace does not exist: " << config.nss.ns()));
1439 }
1440
1441 state.init();
1442 state.prepTempCollection();
1443
1444 int progressTotal = 0;
1445 bool showTotal = true;
1446 if (state.config().filter.isEmpty()) {
1447 const bool holdingGlobalLock = false;
1448 const auto count = _collectionCount(opCtx, config.nss, holdingGlobalLock);
1449 progressTotal =
1450 (config.limit && (unsigned)config.limit < count) ? config.limit : count;
1451 } else {
1452 showTotal = false;
1453 // Set an arbitrary total > 0 so the meter will be activated.
1454 progressTotal = 1;
1455 }
1456
1457 stdx::unique_lock<Client> lk(*opCtx->getClient());
1458 ProgressMeter& progress(curOp->setMessage_inlock(
1459 "m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal));
1460 lk.unlock();
1461 progress.showTotal(showTotal);
1462 ProgressMeterHolder pm(progress);
1463
1464 // See cast on next line to 32 bit unsigned
1465 wassert(config.limit < 0x4000000);
1466
1467 long long mapTime = 0;
1468 long long reduceTime = 0;
1469 long long numInputs = 0;
1470
1471 {
1472 // We've got a cursor preventing migrations off, now re-establish our
1473 // useful cursor.
1474
1475 // Need lock and context to use it
1476 unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
1477
1478 if (state.isOnDisk()) {
1479 // this means that it will be doing a write operation, make sure it is safe to
1480 // do so.
1481 if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx,
1482 config.nss)) {
1483 uasserted(ErrorCodes::NotMaster, "not master");
1484 return false;
1485 }
1486 }
1487
1488 auto qr = stdx::make_unique<QueryRequest>(config.nss);
1489 qr->setFilter(config.filter);
1490 qr->setSort(config.sort);
1491 qr->setCollation(config.collation);
1492
1493 const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss);
1494
1495 const boost::intrusive_ptr<ExpressionContext> expCtx;
1496 auto statusWithCQ = CanonicalQuery::canonicalize(
1497 opCtx,
1498 std::move(qr),
1499 expCtx,
1500 extensionsCallback,
1501 MatchExpressionParser::kAllowAllSpecialFeatures &
1502 ~MatchExpressionParser::AllowedFeatures::kIsolated);
1503 if (!statusWithCQ.isOK()) {
1504 uasserted(17238, "Can't canonicalize query " + config.filter.toString());
1505 return 0;
1506 }
1507 std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
1508
1509 unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
1510 {
1511 Database* db = scopedAutoDb->getDb();
1512 Collection* coll = State::getCollectionOrUassert(opCtx, db, config.nss);
1513 invariant(coll);
1514
1515 exec = uassertStatusOK(
1516 getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, 0));
1517 }
1518
1519 // Make sure the PlanExecutor is destroyed while holding the necessary locks.
1520 ON_BLOCK_EXIT([&exec, &scopedAutoDb, opCtx, &config] {
1521 if (!scopedAutoDb) {
1522 scopedAutoDb = stdx::make_unique<AutoGetDb>(opCtx, config.nss.db(), MODE_S);
1523 exec.reset();
1524 }
1525 });
1526
1527 {
1528 stdx::lock_guard<Client> lk(*opCtx->getClient());
1529 CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
1530 }
1531
1532 Timer mt;
1533
1534 // go through each doc
1535 BSONObj o;
1536 PlanExecutor::ExecState execState;
1537 while (PlanExecutor::ADVANCED == (execState = exec->getNext(&o, NULL))) {
1538 o = o.getOwned(); // we will be accessing outside of the lock
1539 // check to see if this is a new object we don't own yet
1540 // because of a chunk migration
1541 if (collMetadata) {
1542 ShardKeyPattern kp(collMetadata->getKeyPattern());
1543 if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) {
1544 continue;
1545 }
1546 }
1547
1548 // do map
1549 if (config.verbose)
1550 mt.reset();
1551 config.mapper->map(o);
1552 if (config.verbose)
1553 mapTime += mt.micros();
1554
1555 // Check if the state accumulated so far needs to be written to a
1556 // collection. This may yield the DB lock temporarily and then
1557 // acquire it again.
1558 //
1559 numInputs++;
1560 if (numInputs % 100 == 0) {
1561 Timer t;
1562
1563 // TODO: As an optimization, we might want to do the save/restore
1564 // state and yield inside the reduceAndSpillInMemoryState method, so
1565 // it only happens if necessary.
1566 exec->saveState();
1567
1568 scopedAutoDb.reset();
1569
1570 state.reduceAndSpillInMemoryStateIfNeeded();
1571
1572 scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
1573
1574 auto restoreStatus = exec->restoreState();
1575 if (!restoreStatus.isOK()) {
1576 return appendCommandStatus(result, restoreStatus);
1577 }
1578
1579 reduceTime += t.micros();
1580
1581 opCtx->checkForInterrupt();
1582 }
1583
1584 pm.hit();
1585
1586 if (config.limit && numInputs >= config.limit)
1587 break;
1588 }
1589
1590 if (PlanExecutor::DEAD == execState || PlanExecutor::FAILURE == execState) {
1591 return appendCommandStatus(
1592 result,
1593 Status(ErrorCodes::OperationFailed,
1594 str::stream() << "Executor error during mapReduce command: "
1595 << WorkingSetCommon::toStatusString(o)));
1596 }
1597
1598 // Record the indexes used by the PlanExecutor.
1599 PlanSummaryStats stats;
1600 Explain::getSummaryStats(*exec, &stats);
1601
1602 // TODO SERVER-23261: Confirm whether this is the correct place to gather all
1603 // metrics. There is no harm adding here for the time being.
1604 curOp->debug().setPlanSummaryMetrics(stats);
1605
1606 Collection* coll = scopedAutoDb->getDb()->getCollection(opCtx, config.nss);
1607 invariant(coll); // 'exec' hasn't been killed, so collection must be alive.
1608 coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed);
1609
1610 if (curOp->shouldDBProfile()) {
1611 BSONObjBuilder execStatsBob;
1612 Explain::getWinningPlanStats(exec.get(), &execStatsBob);
1613 curOp->debug().execStats = execStatsBob.obj();
1614 }
1615 }
1616 pm.finished();
1617
1618 opCtx->checkForInterrupt();
1619
1620 // update counters
1621 countsBuilder.appendNumber("input", numInputs);
1622 countsBuilder.appendNumber("emit", state.numEmits());
1623 if (state.numEmits())
1624 shouldHaveData = true;
1625
1626 timingBuilder.appendNumber("mapTime", mapTime / 1000);
1627 timingBuilder.append("emitLoop", t.millis());
1628
1629 {
1630 stdx::lock_guard<Client> lk(*opCtx->getClient());
1631 curOp->setMessage_inlock("m/r: (2/3) final reduce in memory",
1632 "M/R: (2/3) Final In-Memory Reduce Progress");
1633 }
1634 Timer rt;
1635 // do reduce in memory
1636 // this will be the last reduce needed for inline mode
1637 state.reduceInMemory();
1638 // if not inline: dump the in memory map to inc collection, all data is on disk
1639 state.dumpToInc();
1640 // final reduce
1641 state.finalReduce(opCtx, curOp, pm);
1642 reduceTime += rt.micros();
1643
1644 // Ensure the profile shows the source namespace. If the output was not inline, the
1645 // active namespace will be the temporary collection we inserted into.
1646 {
1647 stdx::lock_guard<Client> lk(*opCtx->getClient());
1648 curOp->setNS_inlock(config.nss.ns());
1649 }
1650
1651 countsBuilder.appendNumber("reduce", state.numReduces());
1652 timingBuilder.appendNumber("reduceTime", reduceTime / 1000);
1653 timingBuilder.append("mode", state.jsMode() ? "js" : "mixed");
1654
1655 long long finalCount = state.postProcessCollection(opCtx, curOp, pm);
1656 state.appendResults(result);
1657
1658 timingBuilder.appendNumber("total", t.millis());
1659 result.appendNumber("timeMillis", t.millis());
1660 countsBuilder.appendNumber("output", finalCount);
1661 if (config.verbose)
1662 result.append("timing", timingBuilder.obj());
1663 result.append("counts", countsBuilder.obj());
1664
1665 if (finalCount == 0 && shouldHaveData) {
1666 result.append("cmd", cmd);
1667 errmsg = "there were emits but no data!";
1668 return false;
1669 }
1670 } catch (StaleConfigException& e) {
1671 log() << "mr detected stale config, should retry" << redact(e);
1672 throw e;
1673 }
1674 // TODO: The error handling code for queries is v. fragile,
1675 // *requires* rethrow AssertionExceptions - should probably fix.
1676 catch (AssertionException& e) {
1677 log() << "mr failed, removing collection" << redact(e);
1678 throw;
1679 } catch (std::exception& e) {
1680 log() << "mr failed, removing collection" << causedBy(e);
1681 throw;
1682 } catch (...) {
1683 log() << "mr failed for unknown reason, removing collection";
1684 throw;
1685 }
1686
1687 return true;
1688 }
1689
1690 } mapReduceCommand;
1691
1692 /**
1693 * This class represents a map/reduce command executed on the output server of a sharded env
1694 */
1695 class MapReduceFinishCommand : public BasicCommand {
1696 public:
help(stringstream & h) const1697 void help(stringstream& h) const {
1698 h << "internal";
1699 }
MapReduceFinishCommand()1700 MapReduceFinishCommand() : BasicCommand("mapreduce.shardedfinish") {}
slaveOk() const1701 virtual bool slaveOk() const {
1702 return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
1703 repl::ReplicationCoordinator::modeReplSet;
1704 }
slaveOverrideOk() const1705 virtual bool slaveOverrideOk() const {
1706 return true;
1707 }
supportsWriteConcern(const BSONObj & cmd) const1708 virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
1709 return true;
1710 }
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)1711 virtual void addRequiredPrivileges(const std::string& dbname,
1712 const BSONObj& cmdObj,
1713 std::vector<Privilege>* out) {
1714 ActionSet actions;
1715 actions.addAction(ActionType::internal);
1716 out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
1717 }
run(OperationContext * opCtx,const string & dbname,const BSONObj & cmdObj,BSONObjBuilder & result)1718 bool run(OperationContext* opCtx,
1719 const string& dbname,
1720 const BSONObj& cmdObj,
1721 BSONObjBuilder& result) {
1722 if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
1723 return appendCommandStatus(
1724 result,
1725 Status(ErrorCodes::CommandNotSupported,
1726 str::stream() << "Can not execute mapReduce with output database " << dbname
1727 << " which lives on config servers"));
1728 }
1729
1730 boost::optional<DisableDocumentValidation> maybeDisableValidation;
1731 if (shouldBypassDocumentValidationForCommand(cmdObj))
1732 maybeDisableValidation.emplace(opCtx);
1733
1734 // legacy name
1735 const auto shardedOutputCollectionElt = cmdObj["shardedOutputCollection"];
1736 uassert(ErrorCodes::InvalidNamespace,
1737 "'shardedOutputCollection' must be of type String",
1738 shardedOutputCollectionElt.type() == BSONType::String);
1739 const std::string shardedOutputCollection = shardedOutputCollectionElt.str();
1740 verify(shardedOutputCollection.size() > 0);
1741
1742 std::string inputNS;
1743 if (cmdObj["inputDB"].type() == String) {
1744 inputNS =
1745 NamespaceString(cmdObj["inputDB"].valueStringData(), shardedOutputCollection).ns();
1746 } else {
1747 inputNS = NamespaceString(dbname, shardedOutputCollection).ns();
1748 }
1749
1750 CurOp* curOp = CurOp::get(opCtx);
1751
1752 Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck());
1753
1754 if (cmdObj["finalOutputCollIsSharded"].trueValue() &&
1755 serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
1756 uassert(ErrorCodes::InvalidOptions,
1757 "This shard has feature compatibility version 3.6, so it expects mongos to "
1758 "send the UUID to use for the sharded output collection. Was the mapReduce "
1759 "request sent from a 3.4 mongos?",
1760 cmdObj.hasField("shardedOutputCollUUID"));
1761 config.finalOutputCollUUID =
1762 uassertStatusOK(UUID::parse(cmdObj["shardedOutputCollUUID"]));
1763 }
1764
1765 State state(opCtx, config);
1766 state.init();
1767
1768 // no need for incremental collection because records are already sorted
1769 state._useIncremental = false;
1770 config.incLong = config.tempNamespace;
1771
1772 BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
1773 BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
1774
1775 stdx::unique_lock<Client> lk(*opCtx->getClient());
1776 ProgressMeterHolder pm(curOp->setMessage_inlock("m/r: merge sort and reduce",
1777 "M/R Merge Sort and Reduce Progress"));
1778 lk.unlock();
1779 set<string> servers;
1780
1781 {
1782 // Parse per shard results
1783 BSONObjIterator i(shardCounts);
1784 while (i.more()) {
1785 BSONElement e = i.next();
1786 std::string server = e.fieldName();
1787 servers.insert(server);
1788
1789 uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server));
1790 }
1791 }
1792
1793 state.prepTempCollection();
1794
1795 std::vector<std::shared_ptr<Chunk>> chunks;
1796
1797 if (config.outputOptions.outType != Config::OutputType::INMEMORY) {
1798 auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
1799 opCtx, config.outputOptions.finalNamespace);
1800 if (!outRoutingInfoStatus.isOK()) {
1801 return appendCommandStatus(result, outRoutingInfoStatus.getStatus());
1802 }
1803
1804 if (auto cm = outRoutingInfoStatus.getValue().cm()) {
1805 // Fetch result from other shards 1 chunk at a time. It would be better to do just
1806 // one big $or query, but then the sorting would not be efficient.
1807 const string shardName = ShardingState::get(opCtx)->getShardName();
1808
1809 for (const auto& chunk : cm->chunks()) {
1810 if (chunk->getShardId() == shardName) {
1811 chunks.push_back(chunk);
1812 }
1813 }
1814 }
1815 }
1816
1817 long long inputCount = 0;
1818 unsigned int index = 0;
1819 BSONObj query;
1820 BSONArrayBuilder chunkSizes;
1821 BSONList values;
1822
1823 while (true) {
1824 shared_ptr<Chunk> chunk;
1825 if (chunks.size() > 0) {
1826 chunk = chunks[index];
1827 BSONObjBuilder b;
1828 b.appendAs(chunk->getMin().firstElement(), "$gte");
1829 b.appendAs(chunk->getMax().firstElement(), "$lt");
1830 query = BSON("_id" << b.obj());
1831 // chunkSizes.append(min);
1832 }
1833
1834 // reduce from each shard for a chunk
1835 BSONObj sortKey = BSON("_id" << 1);
1836 ParallelSortClusteredCursor cursor(
1837 servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
1838 cursor.init(opCtx);
1839
1840 int chunkSize = 0;
1841
1842 while (cursor.more() || !values.empty()) {
1843 BSONObj t;
1844 if (cursor.more()) {
1845 t = cursor.next().getOwned();
1846 ++inputCount;
1847
1848 if (values.size() == 0) {
1849 values.push_back(t);
1850 continue;
1851 }
1852
1853 if (dps::compareObjectsAccordingToSort(t, *(values.begin()), sortKey) == 0) {
1854 values.push_back(t);
1855 continue;
1856 }
1857 }
1858
1859 BSONObj res = config.reducer->finalReduce(values, config.finalizer.get());
1860 chunkSize += res.objsize();
1861 if (state.isOnDisk())
1862 state.insert(config.tempNamespace, res);
1863 else
1864 state.emit(res);
1865
1866 values.clear();
1867
1868 if (!t.isEmpty())
1869 values.push_back(t);
1870 }
1871
1872 if (chunk) {
1873 chunkSizes.append(chunk->getMin());
1874 chunkSizes.append(chunkSize);
1875 }
1876
1877 if (++index >= chunks.size())
1878 break;
1879 }
1880
1881 // Forget temporary input collection, if output is sharded collection
1882 ShardConnection::forgetNS(inputNS);
1883
1884 result.append("chunkSizes", chunkSizes.arr());
1885
1886 long long outputCount = state.postProcessCollection(opCtx, curOp, pm);
1887 state.appendResults(result);
1888
1889 BSONObjBuilder countsB(32);
1890 countsB.append("input", inputCount);
1891 countsB.append("reduce", state.numReduces());
1892 countsB.append("output", outputCount);
1893 result.append("counts", countsB.obj());
1894
1895 return true;
1896 }
1897
1898 } mapReduceFinishCommand;
1899
1900 } // namespace mr
1901 } // namespace mongo
1902