1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/commands/mr.h"
36 
37 #include "mongo/base/status_with.h"
38 #include "mongo/bson/util/builder.h"
39 #include "mongo/client/connpool.h"
40 #include "mongo/db/auth/authorization_session.h"
41 #include "mongo/db/bson/dotted_path_support.h"
42 #include "mongo/db/catalog/collection.h"
43 #include "mongo/db/catalog/collection_catalog_entry.h"
44 #include "mongo/db/catalog/database_holder.h"
45 #include "mongo/db/catalog/document_validation.h"
46 #include "mongo/db/catalog/index_catalog.h"
47 #include "mongo/db/catalog/index_key_validate.h"
48 #include "mongo/db/client.h"
49 #include "mongo/db/clientcursor.h"
50 #include "mongo/db/commands.h"
51 #include "mongo/db/concurrency/write_conflict_exception.h"
52 #include "mongo/db/db.h"
53 #include "mongo/db/db_raii.h"
54 #include "mongo/db/dbhelpers.h"
55 #include "mongo/db/exec/working_set_common.h"
56 #include "mongo/db/index/index_descriptor.h"
57 #include "mongo/db/matcher/extensions_callback_real.h"
58 #include "mongo/db/op_observer.h"
59 #include "mongo/db/ops/insert.h"
60 #include "mongo/db/query/find_common.h"
61 #include "mongo/db/query/get_executor.h"
62 #include "mongo/db/query/plan_summary_stats.h"
63 #include "mongo/db/query/query_planner.h"
64 #include "mongo/db/repl/replication_coordinator_global.h"
65 #include "mongo/db/s/collection_metadata.h"
66 #include "mongo/db/s/collection_sharding_state.h"
67 #include "mongo/db/s/sharding_state.h"
68 #include "mongo/db/server_options.h"
69 #include "mongo/db/service_context.h"
70 #include "mongo/s/catalog_cache.h"
71 #include "mongo/s/client/parallel.h"
72 #include "mongo/s/client/shard_connection.h"
73 #include "mongo/s/client/shard_registry.h"
74 #include "mongo/s/grid.h"
75 #include "mongo/s/shard_key_pattern.h"
76 #include "mongo/s/stale_exception.h"
77 #include "mongo/scripting/engine.h"
78 #include "mongo/stdx/mutex.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81 #include "mongo/util/scopeguard.h"
82 
83 namespace mongo {
84 
85 using std::set;
86 using std::shared_ptr;
87 using std::string;
88 using std::stringstream;
89 using std::unique_ptr;
90 using std::vector;
91 
92 using IndexVersion = IndexDescriptor::IndexVersion;
93 
94 namespace dps = ::mongo::dotted_path_support;
95 
96 namespace mr {
97 
98 AtomicUInt32 Config::JOB_NUMBER;
99 
JSFunction(const std::string & type,const BSONElement & e)100 JSFunction::JSFunction(const std::string& type, const BSONElement& e) {
101     _type = type;
102     _code = e._asCode();
103 
104     if (e.type() == CodeWScope)
105         _wantedScope = e.codeWScopeObject();
106 }
107 
init(State * state)108 void JSFunction::init(State* state) {
109     _scope = state->scope();
110     verify(_scope);
111     _scope->init(&_wantedScope);
112 
113     _func = _scope->createFunction(_code.c_str());
114     uassert(13598, str::stream() << "couldn't compile code for: " << _type, _func);
115 
116     // install in JS scope so that it can be called in JS mode
117     _scope->setFunction(_type.c_str(), _code.c_str());
118 }
119 
init(State * state)120 void JSMapper::init(State* state) {
121     _func.init(state);
122     _params = state->config().mapParams;
123 }
124 
125 /**
126  * Applies the map function to an object, which should internally call emit()
127  */
map(const BSONObj & o)128 void JSMapper::map(const BSONObj& o) {
129     Scope* s = _func.scope();
130     verify(s);
131     if (s->invoke(_func.func(), &_params, &o, 0, true))
132         uasserted(9014, str::stream() << "map invoke failed: " << s->getError());
133 }
134 
135 /**
136  * Applies the finalize function to a tuple obj (key, val)
137  * Returns tuple obj {_id: key, value: newval}
138  */
finalize(const BSONObj & o)139 BSONObj JSFinalizer::finalize(const BSONObj& o) {
140     Scope* s = _func.scope();
141 
142     Scope::NoDBAccess no = s->disableDBAccess("can't access db inside finalize");
143     s->invokeSafe(_func.func(), &o, 0);
144 
145     // don't want to use o.objsize() to size b
146     // since there are many cases where the point of finalize
147     // is converting many fields to 1
148     BSONObjBuilder b;
149     b.append(o.firstElement());
150     s->append(b, "value", "__returnValue");
151     return b.obj();
152 }
153 
init(State * state)154 void JSReducer::init(State* state) {
155     _func.init(state);
156 }
157 
158 /**
159  * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
160  */
reduce(const BSONList & tuples)161 BSONObj JSReducer::reduce(const BSONList& tuples) {
162     if (tuples.size() <= 1)
163         return tuples[0];
164     BSONObj key;
165     int endSizeEstimate = 16;
166     _reduce(tuples, key, endSizeEstimate);
167 
168     BSONObjBuilder b(endSizeEstimate);
169     b.appendAs(key.firstElement(), "0");
170     _func.scope()->append(b, "1", "__returnValue");
171     return b.obj();
172 }
173 
174 /**
175  * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
176  * Also applies a finalizer method if present.
177  */
finalReduce(const BSONList & tuples,Finalizer * finalizer)178 BSONObj JSReducer::finalReduce(const BSONList& tuples, Finalizer* finalizer) {
179     BSONObj res;
180     BSONObj key;
181 
182     if (tuples.size() == 1) {
183         // 1 obj, just use it
184         key = tuples[0];
185         BSONObjBuilder b(key.objsize());
186         BSONObjIterator it(key);
187         b.appendAs(it.next(), "_id");
188         b.appendAs(it.next(), "value");
189         res = b.obj();
190     } else {
191         // need to reduce
192         int endSizeEstimate = 16;
193         _reduce(tuples, key, endSizeEstimate);
194         BSONObjBuilder b(endSizeEstimate);
195         b.appendAs(key.firstElement(), "_id");
196         _func.scope()->append(b, "value", "__returnValue");
197         res = b.obj();
198     }
199 
200     if (finalizer) {
201         res = finalizer->finalize(res);
202     }
203 
204     return res;
205 }
206 
207 /**
208  * actually applies a reduce, to a list of tuples (key, value).
209  * After the call, tuples will hold a single tuple {"0": key, "1": value}
210  */
_reduce(const BSONList & tuples,BSONObj & key,int & endSizeEstimate)211 void JSReducer::_reduce(const BSONList& tuples, BSONObj& key, int& endSizeEstimate) {
212     uassert(10074, "need values", tuples.size());
213 
214     int sizeEstimate = (tuples.size() * tuples.begin()->getField("value").size()) + 128;
215 
216     // need to build the reduce args: ( key, [values] )
217     BSONObjBuilder reduceArgs(sizeEstimate);
218     std::unique_ptr<BSONArrayBuilder> valueBuilder;
219     unsigned n = 0;
220     for (; n < tuples.size(); n++) {
221         BSONObjIterator j(tuples[n]);
222         BSONElement keyE = j.next();
223         if (n == 0) {
224             reduceArgs.append(keyE);
225             key = keyE.wrap();
226             valueBuilder.reset(new BSONArrayBuilder(reduceArgs.subarrayStart("tuples")));
227         }
228 
229         BSONElement ee = j.next();
230 
231         uassert(13070, "value too large to reduce", ee.size() < (BSONObjMaxUserSize / 2));
232 
233         // If adding this element to the array would cause it to be too large, break. The
234         // remainder of the tuples will be processed recursively at the end of this
235         // function.
236         if (valueBuilder->len() + ee.size() > BSONObjMaxUserSize) {
237             verify(n > 1);  // if not, inf. loop
238             break;
239         }
240 
241         valueBuilder->append(ee);
242     }
243     verify(valueBuilder);
244     valueBuilder->done();
245     BSONObj args = reduceArgs.obj();
246 
247     Scope* s = _func.scope();
248 
249     s->invokeSafe(_func.func(), &args, 0);
250     ++numReduces;
251 
252     if (s->type("__returnValue") == Array) {
253         uasserted(10075, "reduce -> multiple not supported yet");
254         return;
255     }
256 
257     endSizeEstimate = key.objsize() + (args.objsize() / tuples.size());
258 
259     if (n == tuples.size())
260         return;
261 
262     // the input list was too large, add the rest of elmts to new tuples and reduce again
263     // note: would be better to use loop instead of recursion to avoid stack overflow
264     BSONList x;
265     for (; n < tuples.size(); n++) {
266         x.push_back(tuples[n]);
267     }
268     BSONObjBuilder temp(endSizeEstimate);
269     temp.append(key.firstElement());
270     s->append(temp, "1", "__returnValue");
271     x.push_back(temp.obj());
272     _reduce(x, key, endSizeEstimate);
273 }
274 
Config(const string & _dbname,const BSONObj & cmdObj)275 Config::Config(const string& _dbname, const BSONObj& cmdObj) {
276     dbname = _dbname;
277     uassert(ErrorCodes::TypeMismatch,
278             str::stream() << "'mapReduce' must be of type String",
279             cmdObj.firstElement().type() == BSONType::String);
280     nss = NamespaceString(dbname, cmdObj.firstElement().valueStringData());
281     uassert(ErrorCodes::InvalidNamespace,
282             str::stream() << "Invalid namespace: " << nss.ns(),
283             nss.isValid());
284 
285     verbose = cmdObj["verbose"].trueValue();
286     jsMode = cmdObj["jsMode"].trueValue();
287     splitInfo = 0;
288 
289     if (cmdObj.hasField("splitInfo")) {
290         splitInfo = cmdObj["splitInfo"].Int();
291     }
292 
293     jsMaxKeys = 500000;
294     reduceTriggerRatio = 10.0;
295     maxInMemSize = 500 * 1024;
296 
297     uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo());
298 
299     outputOptions = parseOutputOptions(dbname, cmdObj);
300 
301     shardedFirstPass = false;
302     if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) {
303         massert(16054,
304                 "shardedFirstPass should only use replace outType",
305                 outputOptions.outType == REPLACE);
306         shardedFirstPass = true;
307     }
308 
309     if (outputOptions.outType != INMEMORY) {  // setup temp collection name
310         tempNamespace = NamespaceString(
311             outputOptions.outDB.empty() ? dbname : outputOptions.outDB,
312             str::stream() << "tmp.mr." << cmdObj.firstElement().valueStringData() << "_"
313                           << JOB_NUMBER.fetchAndAdd(1));
314         incLong = NamespaceString(str::stream() << tempNamespace.ns() << "_inc");
315     }
316 
317     {
318         // scope and code
319 
320         if (cmdObj["scope"].type() == Object)
321             scopeSetup = cmdObj["scope"].embeddedObjectUserCheck().getOwned();
322 
323         mapper.reset(new JSMapper(cmdObj["map"]));
324         reducer.reset(new JSReducer(cmdObj["reduce"]));
325         if (cmdObj["finalize"].type() && cmdObj["finalize"].trueValue())
326             finalizer.reset(new JSFinalizer(cmdObj["finalize"]));
327 
328         if (cmdObj["mapparams"].type() == Array) {
329             mapParams = cmdObj["mapparams"].embeddedObjectUserCheck().getOwned();
330         }
331     }
332 
333     {
334         // query options
335         BSONElement q = cmdObj["query"];
336         if (q.type() == Object)
337             filter = q.embeddedObjectUserCheck();
338         else
339             uassert(13608, "query has to be blank or an Object", !q.trueValue());
340 
341 
342         BSONElement s = cmdObj["sort"];
343         if (s.type() == Object)
344             sort = s.embeddedObjectUserCheck();
345         else
346             uassert(13609, "sort has to be blank or an Object", !s.trueValue());
347 
348         BSONElement collationElt = cmdObj["collation"];
349         if (collationElt.type() == Object)
350             collation = collationElt.embeddedObjectUserCheck();
351         else
352             uassert(40082,
353                     str::stream()
354                         << "mapReduce 'collation' parameter must be of type Object but found type: "
355                         << typeName(collationElt.type()),
356                     collationElt.eoo());
357 
358         if (cmdObj["limit"].isNumber())
359             limit = cmdObj["limit"].numberLong();
360         else
361             limit = 0;
362     }
363 }
364 
365 /**
366  * Clean up the temporary and incremental collections
367  */
dropTempCollections()368 void State::dropTempCollections() {
369     if (!_config.tempNamespace.isEmpty()) {
370         writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] {
371             AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X);
372             if (auto db = autoDb.getDb()) {
373                 WriteUnitOfWork wunit(_opCtx);
374                 uassert(
375                     ErrorCodes::PrimarySteppedDown,
376                     str::stream()
377                         << "no longer primary while dropping temporary collection for mapReduce: "
378                         << _config.tempNamespace.ns(),
379                     repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(
380                         _opCtx, _config.tempNamespace));
381                 uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns()));
382                 wunit.commit();
383             }
384         });
385         // Always forget about temporary namespaces, so we don't cache lots of them
386         ShardConnection::forgetNS(_config.tempNamespace.ns());
387     }
388     if (_useIncremental && !_config.incLong.isEmpty()) {
389         // We don't want to log the deletion of incLong as it isn't replicated. While
390         // harmless, this would lead to a scary looking warning on the secondaries.
391         repl::UnreplicatedWritesBlock uwb(_opCtx);
392 
393         writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] {
394             Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X);
395             if (Database* db = dbHolder().get(_opCtx, _config.incLong.ns())) {
396                 WriteUnitOfWork wunit(_opCtx);
397                 uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns()));
398                 wunit.commit();
399             }
400         });
401 
402         ShardConnection::forgetNS(_config.incLong.ns());
403     }
404 }
405 
406 /**
407  * Create temporary collection, set up indexes
408  */
prepTempCollection()409 void State::prepTempCollection() {
410     if (!_onDisk)
411         return;
412 
413     dropTempCollections();
414     if (_useIncremental) {
415         // Create the inc collection and make sure we have index on "0" key.
416         // Intentionally not replicating the inc collection to secondaries.
417         repl::UnreplicatedWritesBlock uwb(_opCtx);
418 
419         writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] {
420             OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
421             WriteUnitOfWork wuow(_opCtx);
422             Collection* incColl = incCtx.getCollection();
423             invariant(!incColl);
424 
425             CollectionOptions options;
426             options.setNoIdIndex();
427             options.temp = true;
428             if (enableCollectionUUIDs &&
429                 serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
430                 options.uuid.emplace(UUID::gen());
431             }
432             incColl = incCtx.db()->createCollection(_opCtx, _config.incLong.ns(), options);
433             invariant(incColl);
434 
435             auto rawIndexSpec =
436                 BSON("key" << BSON("0" << 1) << "ns" << _config.incLong.ns() << "name"
437                            << "_temp_0");
438             auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec(
439                 _opCtx, rawIndexSpec, _config.incLong, serverGlobalParams.featureCompatibility));
440 
441             Status status = incColl->getIndexCatalog()
442                                 ->createIndexOnEmptyCollection(_opCtx, indexSpec)
443                                 .getStatus();
444             if (!status.isOK()) {
445                 uasserted(17305,
446                           str::stream() << "createIndex failed for mr incLong ns: "
447                                         << _config.incLong.ns()
448                                         << " err: "
449                                         << status.code());
450             }
451             wuow.commit();
452         });
453     }
454 
455     CollectionOptions finalOptions;
456     vector<BSONObj> indexesToInsert;
457 
458     {
459         // copy indexes and collection options into temporary storage
460         OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns());
461         Collection* const finalColl = finalCtx.getCollection();
462         if (finalColl) {
463             finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx);
464 
465             IndexCatalog::IndexIterator ii =
466                 finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
467             // Iterate over finalColl's indexes.
468             while (ii.more()) {
469                 IndexDescriptor* currIndex = ii.next();
470                 BSONObjBuilder b;
471                 b.append("ns", _config.tempNamespace.ns());
472 
473                 // Copy over contents of the index descriptor's infoObj.
474                 BSONObjIterator j(currIndex->infoObj());
475                 while (j.more()) {
476                     BSONElement e = j.next();
477                     if (str::equals(e.fieldName(), "_id") || str::equals(e.fieldName(), "ns"))
478                         continue;
479                     b.append(e);
480                 }
481                 indexesToInsert.push_back(b.obj());
482             }
483         }
484     }
485 
486     writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] {
487         // create temp collection and insert the indexes from temporary storage
488         OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns());
489         WriteUnitOfWork wuow(_opCtx);
490         uassert(
491             ErrorCodes::PrimarySteppedDown,
492             str::stream() << "no longer primary while creating temporary collection for mapReduce: "
493                           << _config.tempNamespace.ns(),
494             repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx,
495                                                                         _config.tempNamespace));
496         Collection* tempColl = tempCtx.getCollection();
497         invariant(!tempColl);
498 
499         CollectionOptions options = finalOptions;
500         options.temp = true;
501         if (enableCollectionUUIDs && serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
502             // If a UUID for the final output collection was sent by mongos (i.e., the final output
503             // collection is sharded), use the UUID mongos sent when creating the temp collection.
504             // When the temp collection is renamed to the final output collection, the UUID will be
505             // preserved.
506             options.uuid.emplace(_config.finalOutputCollUUID ? *_config.finalOutputCollUUID
507                                                              : UUID::gen());
508         } else {
509             // While downgrading, we can inherit a UUID from finalOptions and not override it
510             // above.  If we didn't clear it, this would result in creating a collection with a
511             // duplicate UUID.
512             options.uuid = boost::none;
513         }
514         tempColl = tempCtx.db()->createCollection(_opCtx, _config.tempNamespace.ns(), options);
515 
516         for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end();
517              ++it) {
518             Status status =
519                 tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus();
520             if (!status.isOK()) {
521                 if (status.code() == ErrorCodes::IndexAlreadyExists) {
522                     continue;
523                 }
524                 uassertStatusOK(status);
525             }
526             // Log the createIndex operation.
527             auto uuid = tempColl->uuid();
528             getGlobalServiceContext()->getOpObserver()->onCreateIndex(
529                 _opCtx, _config.tempNamespace, uuid, *it, false);
530         }
531         wuow.commit();
532     });
533 }
534 
535 /**
536  * For inline mode, appends results to output object.
537  * Makes sure (key, value) tuple is formatted as {_id: key, value: val}
538  */
appendResults(BSONObjBuilder & final)539 void State::appendResults(BSONObjBuilder& final) {
540     if (_onDisk) {
541         if (!_config.outputOptions.outDB.empty()) {
542             BSONObjBuilder loc;
543             if (!_config.outputOptions.outDB.empty())
544                 loc.append("db", _config.outputOptions.outDB);
545             if (!_config.outputOptions.collectionName.empty())
546                 loc.append("collection", _config.outputOptions.collectionName);
547             final.append("result", loc.obj());
548         } else {
549             if (!_config.outputOptions.collectionName.empty())
550                 final.append("result", _config.outputOptions.collectionName);
551         }
552 
553         if (_config.splitInfo > 0) {
554             // add split points, used for shard
555             BSONObj res;
556             BSONObj idKey = BSON("_id" << 1);
557             if (!_db.runCommand("admin",
558                                 BSON("splitVector" << _config.outputOptions.finalNamespace.ns()
559                                                    << "keyPattern"
560                                                    << idKey
561                                                    << "maxChunkSizeBytes"
562                                                    << _config.splitInfo),
563                                 res)) {
564                 uasserted(15921, str::stream() << "splitVector failed: " << res);
565             }
566             if (res.hasField("splitKeys"))
567                 final.append(res.getField("splitKeys"));
568         }
569         return;
570     }
571 
572     if (_jsMode) {
573         ScriptingFunction getResult = _scope->createFunction(
574             "var map = _mrMap;"
575             "var result = [];"
576             "for (key in map) {"
577             "  result.push({_id: key, value: map[key]});"
578             "}"
579             "return result;");
580         _scope->invoke(getResult, 0, 0, 0, false);
581         BSONObj obj = _scope->getObject("__returnValue");
582         final.append("results", BSONArray(obj));
583         return;
584     }
585 
586     uassert(13604, "too much data for in memory map/reduce", _size < BSONObjMaxUserSize);
587 
588     BSONArrayBuilder b((int)(_size * 1.2));  // _size is data size, doesn't count overhead and keys
589 
590     for (const auto& entry : *_temp) {
591         const BSONObj& key = entry.first;
592         const BSONList& all = entry.second;
593 
594         verify(all.size() == 1);
595 
596         BSONObjIterator vi(all[0]);
597         vi.next();
598 
599         BSONObjBuilder temp(b.subobjStart());
600         temp.appendAs(key.firstElement(), "_id");
601         temp.appendAs(vi.next(), "value");
602         temp.done();
603     }
604 
605     BSONArray res = b.arr();
606     final.append("results", res);
607 }
608 
609 /**
610  * Does post processing on output collection.
611  * This may involve replacing, merging or reducing.
612  */
postProcessCollection(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm)613 long long State::postProcessCollection(OperationContext* opCtx,
614                                        CurOp* curOp,
615                                        ProgressMeterHolder& pm) {
616     if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY)
617         return numInMemKeys();
618 
619     bool holdingGlobalLock = false;
620     if (_config.outputOptions.outNonAtomic)
621         return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
622 
623     invariant(!opCtx->lockState()->isLocked());
624 
625     // This must be global because we may write across different databases.
626     Lock::GlobalWrite lock(opCtx);
627     holdingGlobalLock = true;
628     return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
629 }
630 
631 namespace {
632 
633 // Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock,
634 // then this function does not acquire any additional locks.
_collectionCount(OperationContext * opCtx,const NamespaceString & nss,bool callerHoldsGlobalLock)635 unsigned long long _collectionCount(OperationContext* opCtx,
636                                     const NamespaceString& nss,
637                                     bool callerHoldsGlobalLock) {
638     Collection* coll = nullptr;
639     boost::optional<AutoGetCollectionForReadCommand> ctx;
640 
641     // If the global write lock is held, we must avoid using AutoGetCollectionForReadCommand as it
642     // may lead to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
643     if (callerHoldsGlobalLock) {
644         Database* db = dbHolder().get(opCtx, nss.ns());
645         if (db) {
646             coll = db->getCollection(opCtx, nss);
647         }
648     } else {
649         ctx.emplace(opCtx, nss);
650         coll = ctx->getCollection();
651     }
652 
653     return coll ? coll->numRecords(opCtx) : 0;
654 }
655 
656 }  // namespace
657 
postProcessCollectionNonAtomic(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm,bool callerHoldsGlobalLock)658 long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
659                                                 CurOp* curOp,
660                                                 ProgressMeterHolder& pm,
661                                                 bool callerHoldsGlobalLock) {
662     auto outputCount =
663         _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
664 
665     // Determine whether the temp collection should be renamed to the final output collection and
666     // thus preserve the UUID. This is possible in the following cases:
667     //  * Output mode "replace"
668     //  * If this mapReduce is creating a new sharded output collection, which can be determined by
669     //  whether mongos sent the UUID that the final output collection should have (that is, whether
670     //  _config.finalOutputCollUUID is set).
671     if (_config.outputOptions.outType == Config::REPLACE || _config.finalOutputCollUUID) {
672         // This must be global because we may write across different databases.
673         Lock::GlobalWrite lock(opCtx);
674         // replace: just rename from temp to final collection name, dropping previous collection
675         _db.dropCollection(_config.outputOptions.finalNamespace.ns());
676         BSONObj info;
677 
678         if (!_db.runCommand("admin",
679                             BSON("renameCollection" << _config.tempNamespace.ns() << "to"
680                                                     << _config.outputOptions.finalNamespace.ns()
681                                                     << "stayTemp"
682                                                     << _config.shardedFirstPass),
683                             info)) {
684             uasserted(10076, str::stream() << "rename failed: " << info);
685         }
686 
687         _db.dropCollection(_config.tempNamespace.ns());
688     } else if (_config.outputOptions.outType == Config::MERGE) {
689         // merge: upsert new docs into old collection
690 
691         {
692             stdx::unique_lock<Client> lk(*opCtx->getClient());
693             curOp->setMessage_inlock(
694                 "m/r: merge post processing", "M/R Merge Post Processing Progress", outputCount);
695         }
696         unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
697         while (cursor->more()) {
698             Lock::DBLock lock(opCtx, _config.outputOptions.finalNamespace.db(), MODE_X);
699             BSONObj o = cursor->nextSafe();
700             Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o);
701             pm.hit();
702         }
703         _db.dropCollection(_config.tempNamespace.ns());
704         pm.finished();
705     } else if (_config.outputOptions.outType == Config::REDUCE) {
706         // reduce: apply reduce op on new result and existing one
707         BSONList values;
708 
709         {
710             stdx::unique_lock<Client> lk(*opCtx->getClient());
711             curOp->setMessage_inlock(
712                 "m/r: reduce post processing", "M/R Reduce Post Processing Progress", outputCount);
713         }
714 
715         unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
716         while (cursor->more()) {
717             // This must be global because we may write across different databases.
718             Lock::GlobalWrite lock(opCtx);
719             BSONObj temp = cursor->nextSafe();
720             BSONObj old;
721 
722             const bool found = [&] {
723                 AutoGetCollection autoColl(opCtx, _config.outputOptions.finalNamespace, MODE_IS);
724                 return Helpers::findOne(
725                     opCtx, autoColl.getCollection(), temp["_id"].wrap(), old, true);
726             }();
727             if (found) {
728                 // need to reduce
729                 values.clear();
730                 values.push_back(temp);
731                 values.push_back(old);
732                 Helpers::upsert(opCtx,
733                                 _config.outputOptions.finalNamespace.ns(),
734                                 _config.reducer->finalReduce(values, _config.finalizer.get()));
735             } else {
736                 Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp);
737             }
738             pm.hit();
739         }
740         pm.finished();
741     }
742 
743     return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
744 }
745 
746 /**
747  * Insert doc in collection. This should be replicated.
748  */
insert(const NamespaceString & nss,const BSONObj & o)749 void State::insert(const NamespaceString& nss, const BSONObj& o) {
750     verify(_onDisk);
751 
752     writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] {
753         OldClientWriteContext ctx(_opCtx, nss.ns());
754         WriteUnitOfWork wuow(_opCtx);
755         uassert(
756             ErrorCodes::PrimarySteppedDown,
757             str::stream() << "no longer primary while inserting mapReduce result into collection: "
758                           << nss.ns()
759                           << ": "
760                           << redact(o),
761             repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, nss));
762         Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss);
763 
764         BSONObjBuilder b;
765         if (!o.hasField("_id")) {
766             b.appendOID("_id", NULL, true);
767         }
768         b.appendElements(o);
769         BSONObj bo = b.obj();
770 
771         StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo);
772         uassertStatusOK(res.getStatus());
773         if (!res.getValue().isEmpty()) {
774             bo = res.getValue();
775         }
776 
777         // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
778         OpDebug* const nullOpDebug = nullptr;
779         uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(bo), nullOpDebug, true));
780         wuow.commit();
781     });
782 }
783 
784 /**
785  * Insert doc into the inc collection. This should not be replicated.
786  */
_insertToInc(BSONObj & o)787 void State::_insertToInc(BSONObj& o) {
788     verify(_onDisk);
789 
790     writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] {
791         OldClientWriteContext ctx(_opCtx, _config.incLong.ns());
792         WriteUnitOfWork wuow(_opCtx);
793         Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong);
794         repl::UnreplicatedWritesBlock uwb(_opCtx);
795 
796         // The documents inserted into the incremental collection are of the form
797         // {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the
798         // check that the document has an "_id" field would fail. Instead, we directly verify that
799         // the size of the document to insert is smaller than 16MB.
800         if (o.objsize() > BSONObjMaxUserSize) {
801             uasserted(ErrorCodes::BadValue,
802                       str::stream() << "object to insert too large for incremental collection"
803                                     << ". size in bytes: "
804                                     << o.objsize()
805                                     << ", max size: "
806                                     << BSONObjMaxUserSize);
807         }
808 
809         // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
810         OpDebug* const nullOpDebug = nullptr;
811         uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(o), nullOpDebug, true, false));
812         wuow.commit();
813     });
814 }
815 
State(OperationContext * opCtx,const Config & c)816 State::State(OperationContext* opCtx, const Config& c)
817     : _config(c),
818       _db(opCtx),
819       _useIncremental(true),
820       _opCtx(opCtx),
821       _size(0),
822       _dupCount(0),
823       _numEmits(0) {
824     _temp.reset(new InMemory());
825     _onDisk = _config.outputOptions.outType != Config::INMEMORY;
826 }
827 
sourceExists()828 bool State::sourceExists() {
829     return _db.exists(_config.nss.ns());
830 }
831 
~State()832 State::~State() {
833     if (_onDisk) {
834         try {
835             dropTempCollections();
836         } catch (...) {
837             error() << "Unable to drop temporary collection created by mapReduce: "
838                     << _config.tempNamespace << ". This collection will be removed automatically "
839                                                 "the next time the server starts up. "
840                     << exceptionToStatus();
841         }
842     }
843     if (_scope && !_scope->isKillPending() && _scope->getError().empty()) {
844         // cleanup js objects
845         try {
846             ScriptingFunction cleanup =
847                 _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
848             _scope->invoke(cleanup, 0, 0, 0, true);
849         } catch (const DBException&) {
850             // not important because properties will be reset if scope is reused
851             LOG(1) << "MapReduce terminated during state destruction";
852         }
853     }
854 }
855 
856 /**
857  * Initialize the mapreduce operation, creating the inc collection
858  */
init()859 void State::init() {
860     // setup js
861     const string userToken =
862         AuthorizationSession::get(Client::getCurrent())->getAuthenticatedUserNamesToken();
863     _scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread());
864     _scope->requireOwnedObjects();
865     _scope->registerOperation(_opCtx);
866     _scope->setLocalDB(_config.dbname);
867     _scope->loadStored(_opCtx, true);
868 
869     if (!_config.scopeSetup.isEmpty())
870         _scope->init(&_config.scopeSetup);
871 
872     _config.mapper->init(this);
873     _config.reducer->init(this);
874     if (_config.finalizer)
875         _config.finalizer->init(this);
876     _scope->setBoolean("_doFinal", _config.finalizer.get() != 0);
877 
878     switchMode(_config.jsMode);  // set up js-mode based on Config
879 
880     // global JS map/reduce hashmap
881     // we use a standard JS object which means keys are only simple types
882     // we could also add a real hashmap from a library and object comparison methods
883     // for increased performance, we may want to look at v8 Harmony Map support
884     // _scope->setObject("_mrMap", BSONObj(), false);
885     ScriptingFunction init = _scope->createFunction(
886         "_emitCt = 0;"
887         "_keyCt = 0;"
888         "_dupCt = 0;"
889         "_redCt = 0;"
890         "if (typeof(_mrMap) === 'undefined') {"
891         "  _mrMap = {};"
892         "}");
893     _scope->invoke(init, 0, 0, 0, true);
894 
895     // js function to run reduce on all keys
896     // redfunc = _scope->createFunction("for (var key in hashmap) {  print('Key is ' + key);
897     // list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };");
898     _reduceAll = _scope->createFunction(
899         "var map = _mrMap;"
900         "var list, ret;"
901         "for (var key in map) {"
902         "  list = map[key];"
903         "  if (list.length != 1) {"
904         "    ret = _reduce(key, list);"
905         "    map[key] = [ret];"
906         "    ++_redCt;"
907         "  }"
908         "}"
909         "_dupCt = 0;");
910     massert(16717, "error initializing JavaScript reduceAll function", _reduceAll != 0);
911 
912     _reduceAndEmit = _scope->createFunction(
913         "var map = _mrMap;"
914         "var list, ret;"
915         "for (var key in map) {"
916         "  list = map[key];"
917         "  if (list.length == 1)"
918         "    ret = list[0];"
919         "  else {"
920         "    ret = _reduce(key, list);"
921         "    ++_redCt;"
922         "  }"
923         "  emit(key, ret);"
924         "}"
925         "delete _mrMap;");
926     massert(16718, "error initializing JavaScript reduce/emit function", _reduceAndEmit != 0);
927 
928     _reduceAndFinalize = _scope->createFunction(
929         "var map = _mrMap;"
930         "var list, ret;"
931         "for (var key in map) {"
932         "  list = map[key];"
933         "  if (list.length == 1) {"
934         "    if (!_doFinal) { continue; }"
935         "    ret = list[0];"
936         "  }"
937         "  else {"
938         "    ret = _reduce(key, list);"
939         "    ++_redCt;"
940         "  }"
941         "  if (_doFinal)"
942         "    ret = _finalize(key, ret);"
943         "  map[key] = ret;"
944         "}");
945     massert(16719, "error creating JavaScript reduce/finalize function", _reduceAndFinalize != 0);
946 
947     _reduceAndFinalizeAndInsert = _scope->createFunction(
948         "var map = _mrMap;"
949         "var list, ret;"
950         "for (var key in map) {"
951         "  list = map[key];"
952         "  if (list.length == 1)"
953         "    ret = list[0];"
954         "  else {"
955         "    ret = _reduce(key, list);"
956         "    ++_redCt;"
957         "  }"
958         "  if (_doFinal)"
959         "    ret = _finalize(key, ret);"
960         "  _nativeToTemp({_id: key, value: ret});"
961         "}");
962     massert(16720, "error initializing JavaScript functions", _reduceAndFinalizeAndInsert != 0);
963 }
964 
switchMode(bool jsMode)965 void State::switchMode(bool jsMode) {
966     _jsMode = jsMode;
967     if (jsMode) {
968         // emit function that stays in JS
969         _scope->setFunction("emit",
970                             "function(key, value) {"
971                             "  if (typeof(key) === 'object') {"
972                             "    _bailFromJS(key, value);"
973                             "    return;"
974                             "  }"
975                             "  ++_emitCt;"
976                             "  var map = _mrMap;"
977                             "  var list = map[key];"
978                             "  if (!list) {"
979                             "    ++_keyCt;"
980                             "    list = [];"
981                             "    map[key] = list;"
982                             "  }"
983                             "  else"
984                             "    ++_dupCt;"
985                             "  list.push(value);"
986                             "}");
987         _scope->injectNative("_bailFromJS", _bailFromJS, this);
988     } else {
989         // emit now populates C++ map
990         _scope->injectNative("emit", fast_emit, this);
991     }
992 }
993 
bailFromJS()994 void State::bailFromJS() {
995     LOG(1) << "M/R: Switching from JS mode to mixed mode";
996 
997     // reduce and reemit into c++
998     switchMode(false);
999     _scope->invoke(_reduceAndEmit, 0, 0, 0, true);
1000     // need to get the real number emitted so far
1001     _numEmits = _scope->getNumberInt("_emitCt");
1002     _config.reducer->numReduces = _scope->getNumberInt("_redCt");
1003 }
1004 
getCollectionOrUassert(OperationContext * opCtx,Database * db,const NamespaceString & nss)1005 Collection* State::getCollectionOrUassert(OperationContext* opCtx,
1006                                           Database* db,
1007                                           const NamespaceString& nss) {
1008     Collection* out = db ? db->getCollection(opCtx, nss) : NULL;
1009     uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out);
1010     return out;
1011 }
1012 
1013 /**
1014  * Applies last reduce and finalize on a list of tuples (key, val)
1015  * Inserts single result {_id: key, value: val} into temp collection
1016  */
finalReduce(BSONList & values)1017 void State::finalReduce(BSONList& values) {
1018     if (!_onDisk || values.size() == 0)
1019         return;
1020 
1021     BSONObj res = _config.reducer->finalReduce(values, _config.finalizer.get());
1022     insert(_config.tempNamespace, res);
1023 }
1024 
_nativeToTemp(const BSONObj & args,void * data)1025 BSONObj _nativeToTemp(const BSONObj& args, void* data) {
1026     State* state = (State*)data;
1027     BSONObjIterator it(args);
1028     state->insert(state->_config.tempNamespace, it.next().Obj());
1029     return BSONObj();
1030 }
1031 
1032 //        BSONObj _nativeToInc( const BSONObj& args, void* data ) {
1033 //            State* state = (State*) data;
1034 //            BSONObjIterator it(args);
1035 //            const BSONObj& obj = it.next().Obj();
1036 //            state->_insertToInc(const_cast<BSONObj&>(obj));
1037 //            return BSONObj();
1038 //        }
1039 
1040 /**
1041  * Applies last reduce and finalize.
1042  * After calling this method, the temp collection will be completed.
1043  * If inline, the results will be in the in memory map
1044  */
finalReduce(OperationContext * opCtx,CurOp * curOp,ProgressMeterHolder & pm)1045 void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) {
1046     if (_jsMode) {
1047         // apply the reduce within JS
1048         if (_onDisk) {
1049             _scope->injectNative("_nativeToTemp", _nativeToTemp, this);
1050             _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true);
1051             return;
1052         } else {
1053             _scope->invoke(_reduceAndFinalize, 0, 0, 0, true);
1054             return;
1055         }
1056     }
1057 
1058     if (!_onDisk) {
1059         // all data has already been reduced, just finalize
1060         if (_config.finalizer) {
1061             long size = 0;
1062             for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
1063                 BSONObj key = i->first;
1064                 BSONList& all = i->second;
1065 
1066                 verify(all.size() == 1);
1067 
1068                 BSONObj res = _config.finalizer->finalize(all[0]);
1069 
1070                 all.clear();
1071                 all.push_back(res);
1072                 size += res.objsize();
1073             }
1074             _size = size;
1075         }
1076         return;
1077     }
1078 
1079     // use index on "0" to pull sorted data
1080     verify(_temp->size() == 0);
1081     BSONObj sortKey = BSON("0" << 1);
1082 
1083     writeConflictRetry(_opCtx, "finalReduce", _config.incLong.ns(), [&] {
1084         OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
1085         WriteUnitOfWork wuow(_opCtx);
1086         Collection* incColl = getCollectionOrUassert(_opCtx, incCtx.db(), _config.incLong);
1087 
1088         bool foundIndex = false;
1089         IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
1090         // Iterate over incColl's indexes.
1091         while (ii.more()) {
1092             IndexDescriptor* currIndex = ii.next();
1093             BSONObj x = currIndex->infoObj();
1094             if (sortKey.woCompare(x["key"].embeddedObject()) == 0) {
1095                 foundIndex = true;
1096                 break;
1097             }
1098         }
1099 
1100         verify(foundIndex);
1101         wuow.commit();
1102     });
1103 
1104     unique_ptr<AutoGetCollectionForReadCommand> ctx(
1105         new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1106 
1107     BSONObj prev;
1108     BSONList all;
1109 
1110     {
1111         const auto count = _db.count(_config.incLong.ns(), BSONObj(), QueryOption_SlaveOk);
1112         stdx::lock_guard<Client> lk(*_opCtx->getClient());
1113         verify(pm ==
1114                curOp->setMessage_inlock("m/r: (3/3) final reduce to collection",
1115                                         "M/R: (3/3) Final Reduce Progress",
1116                                         count));
1117     }
1118 
1119     const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong);
1120 
1121     auto qr = stdx::make_unique<QueryRequest>(_config.incLong);
1122     qr->setSort(sortKey);
1123 
1124     const boost::intrusive_ptr<ExpressionContext> expCtx;
1125     auto statusWithCQ =
1126         CanonicalQuery::canonicalize(opCtx,
1127                                      std::move(qr),
1128                                      expCtx,
1129                                      extensionsCallback,
1130                                      MatchExpressionParser::kAllowAllSpecialFeatures &
1131                                          ~MatchExpressionParser::AllowedFeatures::kIsolated);
1132     verify(statusWithCQ.isOK());
1133     std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
1134 
1135     Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong);
1136     invariant(coll);
1137 
1138     auto exec = uassertStatusOK(getExecutor(
1139         _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN));
1140 
1141     // Make sure the PlanExecutor is destroyed while holding a collection lock.
1142     ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
1143         if (!ctx) {
1144             AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS);
1145             exec.reset();
1146         }
1147     });
1148 
1149     // iterate over all sorted objects
1150     BSONObj o;
1151     PlanExecutor::ExecState state;
1152     while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
1153         o = o.getOwned();  // we will be accessing outside of the lock
1154         pm.hit();
1155 
1156         if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) {
1157             // object is same as previous, add to array
1158             all.push_back(o);
1159             if (pm->hits() % 100 == 0) {
1160                 _opCtx->checkForInterrupt();
1161             }
1162             continue;
1163         }
1164 
1165         exec->saveState();
1166 
1167         ctx.reset();
1168 
1169         // reduce a finalize array
1170         finalReduce(all);
1171 
1172         ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1173 
1174         all.clear();
1175         prev = o;
1176         all.push_back(o);
1177 
1178         _opCtx->checkForInterrupt();
1179         uassertStatusOK(exec->restoreState());
1180     }
1181 
1182     uassert(34428,
1183             "Plan executor error during mapReduce command: " + WorkingSetCommon::toStatusString(o),
1184             PlanExecutor::IS_EOF == state);
1185 
1186     ctx.reset();
1187     // reduce and finalize last array
1188     finalReduce(all);
1189     ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
1190 
1191     pm.finished();
1192 }
1193 
1194 /**
1195  * Attempts to reduce objects in the memory map.
1196  * A new memory map will be created to hold the results.
1197  * If applicable, objects with unique key may be dumped to inc collection.
1198  * Input and output objects are both {"0": key, "1": val}
1199  */
reduceInMemory()1200 void State::reduceInMemory() {
1201     if (_jsMode) {
1202         // in js mode the reduce is applied when writing to collection
1203         return;
1204     }
1205 
1206     unique_ptr<InMemory> n(new InMemory());  // for new data
1207     long nSize = 0;
1208     _dupCount = 0;
1209 
1210     for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
1211         BSONList& all = i->second;
1212 
1213         if (all.size() == 1) {
1214             // only 1 value for this key
1215             if (_onDisk) {
1216                 // this key has low cardinality, so just write to collection
1217                 _insertToInc(*(all.begin()));
1218             } else {
1219                 // add to new map
1220                 nSize += _add(n.get(), all[0]);
1221             }
1222         } else if (all.size() > 1) {
1223             // several values, reduce and add to map
1224             BSONObj res = _config.reducer->reduce(all);
1225             nSize += _add(n.get(), res);
1226         }
1227     }
1228 
1229     // swap maps
1230     _temp.reset(n.release());
1231     _size = nSize;
1232 }
1233 
1234 /**
1235  * Dumps the entire in memory map to the inc collection.
1236  */
dumpToInc()1237 void State::dumpToInc() {
1238     if (!_onDisk)
1239         return;
1240 
1241     for (InMemory::iterator i = _temp->begin(); i != _temp->end(); i++) {
1242         BSONList& all = i->second;
1243         if (all.size() < 1)
1244             continue;
1245 
1246         for (BSONList::iterator j = all.begin(); j != all.end(); j++)
1247             _insertToInc(*j);
1248     }
1249     _temp->clear();
1250     _size = 0;
1251 }
1252 
1253 /**
1254  * Adds object to in memory map
1255  */
emit(const BSONObj & a)1256 void State::emit(const BSONObj& a) {
1257     _numEmits++;
1258     _size += _add(_temp.get(), a);
1259 }
1260 
_add(InMemory * im,const BSONObj & a)1261 int State::_add(InMemory* im, const BSONObj& a) {
1262     BSONList& all = (*im)[a];
1263     all.push_back(a);
1264     if (all.size() > 1) {
1265         ++_dupCount;
1266     }
1267 
1268     return a.objsize() + 16;
1269 }
1270 
reduceAndSpillInMemoryStateIfNeeded()1271 void State::reduceAndSpillInMemoryStateIfNeeded() {
1272     // Make sure no DB locks are held, because this method manages its own locking and
1273     // write units of work.
1274     invariant(!_opCtx->lockState()->isLocked());
1275 
1276     if (_jsMode) {
1277         // try to reduce if it is beneficial
1278         int dupCt = _scope->getNumberInt("_dupCt");
1279         int keyCt = _scope->getNumberInt("_keyCt");
1280 
1281         if (keyCt > _config.jsMaxKeys) {
1282             // too many keys for JS, switch to mixed
1283             _bailFromJS(BSONObj(), this);
1284             // then fall through to check map size
1285         } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) {
1286             // reduce now to lower mem usage
1287             Timer t;
1288             _scope->invoke(_reduceAll, 0, 0, 0, true);
1289             LOG(3) << "  MR - did reduceAll: keys=" << keyCt << " dups=" << dupCt
1290                    << " newKeys=" << _scope->getNumberInt("_keyCt") << " time=" << t.millis()
1291                    << "ms";
1292             return;
1293         }
1294     }
1295 
1296     if (_jsMode)
1297         return;
1298 
1299     if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) {
1300         // attempt to reduce in memory map, if memory is too high or we have many duplicates
1301         long oldSize = _size;
1302         Timer t;
1303         reduceInMemory();
1304         LOG(3) << "  MR - did reduceInMemory: size=" << oldSize << " dups=" << _dupCount
1305                << " newSize=" << _size << " time=" << t.millis() << "ms";
1306 
1307         // if size is still high, or values are not reducing well, dump
1308         if (_onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2)) {
1309             dumpToInc();
1310             LOG(3) << "  MR - dumping to db";
1311         }
1312     }
1313 }
1314 
1315 /**
1316  * emit that will be called by js function
1317  */
fast_emit(const BSONObj & args,void * data)1318 BSONObj fast_emit(const BSONObj& args, void* data) {
1319     uassert(10077, "fast_emit takes 2 args", args.nFields() == 2);
1320     uassert(13069,
1321             "an emit can't be more than half max bson size",
1322             args.objsize() < (BSONObjMaxUserSize / 2));
1323 
1324     State* state = (State*)data;
1325     if (args.firstElement().type() == Undefined) {
1326         BSONObjBuilder b(args.objsize());
1327         b.appendNull("");
1328         BSONObjIterator i(args);
1329         i.next();
1330         b.append(i.next());
1331         state->emit(b.obj());
1332     } else {
1333         state->emit(args);
1334     }
1335     return BSONObj();
1336 }
1337 
1338 /**
1339  * function is called when we realize we cant use js mode for m/r on the 1st key
1340  */
_bailFromJS(const BSONObj & args,void * data)1341 BSONObj _bailFromJS(const BSONObj& args, void* data) {
1342     State* state = (State*)data;
1343     state->bailFromJS();
1344 
1345     // emit this particular key if there is one
1346     if (!args.isEmpty()) {
1347         fast_emit(args, data);
1348     }
1349     return BSONObj();
1350 }
1351 
1352 /**
1353  * This class represents a map/reduce command executed on a single server
1354  */
1355 class MapReduceCommand : public ErrmsgCommandDeprecated {
1356 public:
MapReduceCommand()1357     MapReduceCommand() : ErrmsgCommandDeprecated("mapReduce", "mapreduce") {}
1358 
slaveOk() const1359     virtual bool slaveOk() const {
1360         return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
1361             repl::ReplicationCoordinator::modeReplSet;
1362     }
1363 
slaveOverrideOk() const1364     virtual bool slaveOverrideOk() const {
1365         return true;
1366     }
1367 
maintenanceOk() const1368     virtual bool maintenanceOk() const override {
1369         return false;
1370     }
1371 
reserveBytesForReply() const1372     std::size_t reserveBytesForReply() const override {
1373         return FindCommon::kInitReplyBufferSize;
1374     }
1375 
help(stringstream & help) const1376     virtual void help(stringstream& help) const {
1377         help << "Run a map/reduce operation on the server.\n";
1378         help << "Note this is used for aggregation, not querying, in MongoDB.\n";
1379         help << "http://dochub.mongodb.org/core/mapreduce";
1380     }
1381 
1382 
supportsWriteConcern(const BSONObj & cmd) const1383     virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
1384         return mrSupportsWriteConcern(cmd);
1385     }
1386 
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)1387     virtual void addRequiredPrivileges(const std::string& dbname,
1388                                        const BSONObj& cmdObj,
1389                                        std::vector<Privilege>* out) {
1390         addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
1391     }
1392 
errmsgRun(OperationContext * opCtx,const string & dbname,const BSONObj & cmd,string & errmsg,BSONObjBuilder & result)1393     bool errmsgRun(OperationContext* opCtx,
1394                    const string& dbname,
1395                    const BSONObj& cmd,
1396                    string& errmsg,
1397                    BSONObjBuilder& result) {
1398         Timer t;
1399 
1400         boost::optional<DisableDocumentValidation> maybeDisableValidation;
1401         if (shouldBypassDocumentValidationForCommand(cmd))
1402             maybeDisableValidation.emplace(opCtx);
1403 
1404         auto client = opCtx->getClient();
1405 
1406         if (client->isInDirectClient()) {
1407             return appendCommandStatus(
1408                 result,
1409                 Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()"));
1410         }
1411 
1412         auto curOp = CurOp::get(opCtx);
1413 
1414         const Config config(dbname, cmd);
1415 
1416         LOG(1) << "mr ns: " << config.nss;
1417 
1418         uassert(16149, "cannot run map reduce without the js engine", getGlobalScriptEngine());
1419 
1420         // Prevent sharding state from changing during the MR.
1421         const auto collMetadata = [&] {
1422             // Get metadata before we check our version, to make sure it doesn't increment in the
1423             // meantime
1424             AutoGetCollectionForReadCommand autoColl(opCtx, config.nss);
1425             return CollectionShardingState::get(opCtx, config.nss)->getMetadata();
1426         }();
1427 
1428         bool shouldHaveData = false;
1429 
1430         BSONObjBuilder countsBuilder;
1431         BSONObjBuilder timingBuilder;
1432         try {
1433             State state(opCtx, config);
1434             if (!state.sourceExists()) {
1435                 return appendCommandStatus(
1436                     result,
1437                     Status(ErrorCodes::NamespaceNotFound,
1438                            str::stream() << "namespace does not exist: " << config.nss.ns()));
1439             }
1440 
1441             state.init();
1442             state.prepTempCollection();
1443 
1444             int progressTotal = 0;
1445             bool showTotal = true;
1446             if (state.config().filter.isEmpty()) {
1447                 const bool holdingGlobalLock = false;
1448                 const auto count = _collectionCount(opCtx, config.nss, holdingGlobalLock);
1449                 progressTotal =
1450                     (config.limit && (unsigned)config.limit < count) ? config.limit : count;
1451             } else {
1452                 showTotal = false;
1453                 // Set an arbitrary total > 0 so the meter will be activated.
1454                 progressTotal = 1;
1455             }
1456 
1457             stdx::unique_lock<Client> lk(*opCtx->getClient());
1458             ProgressMeter& progress(curOp->setMessage_inlock(
1459                 "m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal));
1460             lk.unlock();
1461             progress.showTotal(showTotal);
1462             ProgressMeterHolder pm(progress);
1463 
1464             // See cast on next line to 32 bit unsigned
1465             wassert(config.limit < 0x4000000);
1466 
1467             long long mapTime = 0;
1468             long long reduceTime = 0;
1469             long long numInputs = 0;
1470 
1471             {
1472                 // We've got a cursor preventing migrations off, now re-establish our
1473                 // useful cursor.
1474 
1475                 // Need lock and context to use it
1476                 unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
1477 
1478                 if (state.isOnDisk()) {
1479                     // this means that it will be doing a write operation, make sure it is safe to
1480                     // do so.
1481                     if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx,
1482                                                                                      config.nss)) {
1483                         uasserted(ErrorCodes::NotMaster, "not master");
1484                         return false;
1485                     }
1486                 }
1487 
1488                 auto qr = stdx::make_unique<QueryRequest>(config.nss);
1489                 qr->setFilter(config.filter);
1490                 qr->setSort(config.sort);
1491                 qr->setCollation(config.collation);
1492 
1493                 const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss);
1494 
1495                 const boost::intrusive_ptr<ExpressionContext> expCtx;
1496                 auto statusWithCQ = CanonicalQuery::canonicalize(
1497                     opCtx,
1498                     std::move(qr),
1499                     expCtx,
1500                     extensionsCallback,
1501                     MatchExpressionParser::kAllowAllSpecialFeatures &
1502                         ~MatchExpressionParser::AllowedFeatures::kIsolated);
1503                 if (!statusWithCQ.isOK()) {
1504                     uasserted(17238, "Can't canonicalize query " + config.filter.toString());
1505                     return 0;
1506                 }
1507                 std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
1508 
1509                 unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
1510                 {
1511                     Database* db = scopedAutoDb->getDb();
1512                     Collection* coll = State::getCollectionOrUassert(opCtx, db, config.nss);
1513                     invariant(coll);
1514 
1515                     exec = uassertStatusOK(
1516                         getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, 0));
1517                 }
1518 
1519                 // Make sure the PlanExecutor is destroyed while holding the necessary locks.
1520                 ON_BLOCK_EXIT([&exec, &scopedAutoDb, opCtx, &config] {
1521                     if (!scopedAutoDb) {
1522                         scopedAutoDb = stdx::make_unique<AutoGetDb>(opCtx, config.nss.db(), MODE_S);
1523                         exec.reset();
1524                     }
1525                 });
1526 
1527                 {
1528                     stdx::lock_guard<Client> lk(*opCtx->getClient());
1529                     CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
1530                 }
1531 
1532                 Timer mt;
1533 
1534                 // go through each doc
1535                 BSONObj o;
1536                 PlanExecutor::ExecState execState;
1537                 while (PlanExecutor::ADVANCED == (execState = exec->getNext(&o, NULL))) {
1538                     o = o.getOwned();  // we will be accessing outside of the lock
1539                     // check to see if this is a new object we don't own yet
1540                     // because of a chunk migration
1541                     if (collMetadata) {
1542                         ShardKeyPattern kp(collMetadata->getKeyPattern());
1543                         if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) {
1544                             continue;
1545                         }
1546                     }
1547 
1548                     // do map
1549                     if (config.verbose)
1550                         mt.reset();
1551                     config.mapper->map(o);
1552                     if (config.verbose)
1553                         mapTime += mt.micros();
1554 
1555                     // Check if the state accumulated so far needs to be written to a
1556                     // collection. This may yield the DB lock temporarily and then
1557                     // acquire it again.
1558                     //
1559                     numInputs++;
1560                     if (numInputs % 100 == 0) {
1561                         Timer t;
1562 
1563                         // TODO: As an optimization, we might want to do the save/restore
1564                         // state and yield inside the reduceAndSpillInMemoryState method, so
1565                         // it only happens if necessary.
1566                         exec->saveState();
1567 
1568                         scopedAutoDb.reset();
1569 
1570                         state.reduceAndSpillInMemoryStateIfNeeded();
1571 
1572                         scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
1573 
1574                         auto restoreStatus = exec->restoreState();
1575                         if (!restoreStatus.isOK()) {
1576                             return appendCommandStatus(result, restoreStatus);
1577                         }
1578 
1579                         reduceTime += t.micros();
1580 
1581                         opCtx->checkForInterrupt();
1582                     }
1583 
1584                     pm.hit();
1585 
1586                     if (config.limit && numInputs >= config.limit)
1587                         break;
1588                 }
1589 
1590                 if (PlanExecutor::DEAD == execState || PlanExecutor::FAILURE == execState) {
1591                     return appendCommandStatus(
1592                         result,
1593                         Status(ErrorCodes::OperationFailed,
1594                                str::stream() << "Executor error during mapReduce command: "
1595                                              << WorkingSetCommon::toStatusString(o)));
1596                 }
1597 
1598                 // Record the indexes used by the PlanExecutor.
1599                 PlanSummaryStats stats;
1600                 Explain::getSummaryStats(*exec, &stats);
1601 
1602                 // TODO SERVER-23261: Confirm whether this is the correct place to gather all
1603                 // metrics. There is no harm adding here for the time being.
1604                 curOp->debug().setPlanSummaryMetrics(stats);
1605 
1606                 Collection* coll = scopedAutoDb->getDb()->getCollection(opCtx, config.nss);
1607                 invariant(coll);  // 'exec' hasn't been killed, so collection must be alive.
1608                 coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed);
1609 
1610                 if (curOp->shouldDBProfile()) {
1611                     BSONObjBuilder execStatsBob;
1612                     Explain::getWinningPlanStats(exec.get(), &execStatsBob);
1613                     curOp->debug().execStats = execStatsBob.obj();
1614                 }
1615             }
1616             pm.finished();
1617 
1618             opCtx->checkForInterrupt();
1619 
1620             // update counters
1621             countsBuilder.appendNumber("input", numInputs);
1622             countsBuilder.appendNumber("emit", state.numEmits());
1623             if (state.numEmits())
1624                 shouldHaveData = true;
1625 
1626             timingBuilder.appendNumber("mapTime", mapTime / 1000);
1627             timingBuilder.append("emitLoop", t.millis());
1628 
1629             {
1630                 stdx::lock_guard<Client> lk(*opCtx->getClient());
1631                 curOp->setMessage_inlock("m/r: (2/3) final reduce in memory",
1632                                          "M/R: (2/3) Final In-Memory Reduce Progress");
1633             }
1634             Timer rt;
1635             // do reduce in memory
1636             // this will be the last reduce needed for inline mode
1637             state.reduceInMemory();
1638             // if not inline: dump the in memory map to inc collection, all data is on disk
1639             state.dumpToInc();
1640             // final reduce
1641             state.finalReduce(opCtx, curOp, pm);
1642             reduceTime += rt.micros();
1643 
1644             // Ensure the profile shows the source namespace. If the output was not inline, the
1645             // active namespace will be the temporary collection we inserted into.
1646             {
1647                 stdx::lock_guard<Client> lk(*opCtx->getClient());
1648                 curOp->setNS_inlock(config.nss.ns());
1649             }
1650 
1651             countsBuilder.appendNumber("reduce", state.numReduces());
1652             timingBuilder.appendNumber("reduceTime", reduceTime / 1000);
1653             timingBuilder.append("mode", state.jsMode() ? "js" : "mixed");
1654 
1655             long long finalCount = state.postProcessCollection(opCtx, curOp, pm);
1656             state.appendResults(result);
1657 
1658             timingBuilder.appendNumber("total", t.millis());
1659             result.appendNumber("timeMillis", t.millis());
1660             countsBuilder.appendNumber("output", finalCount);
1661             if (config.verbose)
1662                 result.append("timing", timingBuilder.obj());
1663             result.append("counts", countsBuilder.obj());
1664 
1665             if (finalCount == 0 && shouldHaveData) {
1666                 result.append("cmd", cmd);
1667                 errmsg = "there were emits but no data!";
1668                 return false;
1669             }
1670         } catch (StaleConfigException& e) {
1671             log() << "mr detected stale config, should retry" << redact(e);
1672             throw e;
1673         }
1674         // TODO:  The error handling code for queries is v. fragile,
1675         // *requires* rethrow AssertionExceptions - should probably fix.
1676         catch (AssertionException& e) {
1677             log() << "mr failed, removing collection" << redact(e);
1678             throw;
1679         } catch (std::exception& e) {
1680             log() << "mr failed, removing collection" << causedBy(e);
1681             throw;
1682         } catch (...) {
1683             log() << "mr failed for unknown reason, removing collection";
1684             throw;
1685         }
1686 
1687         return true;
1688     }
1689 
1690 } mapReduceCommand;
1691 
1692 /**
1693  * This class represents a map/reduce command executed on the output server of a sharded env
1694  */
1695 class MapReduceFinishCommand : public BasicCommand {
1696 public:
help(stringstream & h) const1697     void help(stringstream& h) const {
1698         h << "internal";
1699     }
MapReduceFinishCommand()1700     MapReduceFinishCommand() : BasicCommand("mapreduce.shardedfinish") {}
slaveOk() const1701     virtual bool slaveOk() const {
1702         return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
1703             repl::ReplicationCoordinator::modeReplSet;
1704     }
slaveOverrideOk() const1705     virtual bool slaveOverrideOk() const {
1706         return true;
1707     }
supportsWriteConcern(const BSONObj & cmd) const1708     virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
1709         return true;
1710     }
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)1711     virtual void addRequiredPrivileges(const std::string& dbname,
1712                                        const BSONObj& cmdObj,
1713                                        std::vector<Privilege>* out) {
1714         ActionSet actions;
1715         actions.addAction(ActionType::internal);
1716         out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
1717     }
run(OperationContext * opCtx,const string & dbname,const BSONObj & cmdObj,BSONObjBuilder & result)1718     bool run(OperationContext* opCtx,
1719              const string& dbname,
1720              const BSONObj& cmdObj,
1721              BSONObjBuilder& result) {
1722         if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
1723             return appendCommandStatus(
1724                 result,
1725                 Status(ErrorCodes::CommandNotSupported,
1726                        str::stream() << "Can not execute mapReduce with output database " << dbname
1727                                      << " which lives on config servers"));
1728         }
1729 
1730         boost::optional<DisableDocumentValidation> maybeDisableValidation;
1731         if (shouldBypassDocumentValidationForCommand(cmdObj))
1732             maybeDisableValidation.emplace(opCtx);
1733 
1734         // legacy name
1735         const auto shardedOutputCollectionElt = cmdObj["shardedOutputCollection"];
1736         uassert(ErrorCodes::InvalidNamespace,
1737                 "'shardedOutputCollection' must be of type String",
1738                 shardedOutputCollectionElt.type() == BSONType::String);
1739         const std::string shardedOutputCollection = shardedOutputCollectionElt.str();
1740         verify(shardedOutputCollection.size() > 0);
1741 
1742         std::string inputNS;
1743         if (cmdObj["inputDB"].type() == String) {
1744             inputNS =
1745                 NamespaceString(cmdObj["inputDB"].valueStringData(), shardedOutputCollection).ns();
1746         } else {
1747             inputNS = NamespaceString(dbname, shardedOutputCollection).ns();
1748         }
1749 
1750         CurOp* curOp = CurOp::get(opCtx);
1751 
1752         Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck());
1753 
1754         if (cmdObj["finalOutputCollIsSharded"].trueValue() &&
1755             serverGlobalParams.featureCompatibility.isSchemaVersion36()) {
1756             uassert(ErrorCodes::InvalidOptions,
1757                     "This shard has feature compatibility version 3.6, so it expects mongos to "
1758                     "send the UUID to use for the sharded output collection. Was the mapReduce "
1759                     "request sent from a 3.4 mongos?",
1760                     cmdObj.hasField("shardedOutputCollUUID"));
1761             config.finalOutputCollUUID =
1762                 uassertStatusOK(UUID::parse(cmdObj["shardedOutputCollUUID"]));
1763         }
1764 
1765         State state(opCtx, config);
1766         state.init();
1767 
1768         // no need for incremental collection because records are already sorted
1769         state._useIncremental = false;
1770         config.incLong = config.tempNamespace;
1771 
1772         BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
1773         BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
1774 
1775         stdx::unique_lock<Client> lk(*opCtx->getClient());
1776         ProgressMeterHolder pm(curOp->setMessage_inlock("m/r: merge sort and reduce",
1777                                                         "M/R Merge Sort and Reduce Progress"));
1778         lk.unlock();
1779         set<string> servers;
1780 
1781         {
1782             // Parse per shard results
1783             BSONObjIterator i(shardCounts);
1784             while (i.more()) {
1785                 BSONElement e = i.next();
1786                 std::string server = e.fieldName();
1787                 servers.insert(server);
1788 
1789                 uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server));
1790             }
1791         }
1792 
1793         state.prepTempCollection();
1794 
1795         std::vector<std::shared_ptr<Chunk>> chunks;
1796 
1797         if (config.outputOptions.outType != Config::OutputType::INMEMORY) {
1798             auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
1799                 opCtx, config.outputOptions.finalNamespace);
1800             if (!outRoutingInfoStatus.isOK()) {
1801                 return appendCommandStatus(result, outRoutingInfoStatus.getStatus());
1802             }
1803 
1804             if (auto cm = outRoutingInfoStatus.getValue().cm()) {
1805                 // Fetch result from other shards 1 chunk at a time. It would be better to do just
1806                 // one big $or query, but then the sorting would not be efficient.
1807                 const string shardName = ShardingState::get(opCtx)->getShardName();
1808 
1809                 for (const auto& chunk : cm->chunks()) {
1810                     if (chunk->getShardId() == shardName) {
1811                         chunks.push_back(chunk);
1812                     }
1813                 }
1814             }
1815         }
1816 
1817         long long inputCount = 0;
1818         unsigned int index = 0;
1819         BSONObj query;
1820         BSONArrayBuilder chunkSizes;
1821         BSONList values;
1822 
1823         while (true) {
1824             shared_ptr<Chunk> chunk;
1825             if (chunks.size() > 0) {
1826                 chunk = chunks[index];
1827                 BSONObjBuilder b;
1828                 b.appendAs(chunk->getMin().firstElement(), "$gte");
1829                 b.appendAs(chunk->getMax().firstElement(), "$lt");
1830                 query = BSON("_id" << b.obj());
1831                 //                        chunkSizes.append(min);
1832             }
1833 
1834             // reduce from each shard for a chunk
1835             BSONObj sortKey = BSON("_id" << 1);
1836             ParallelSortClusteredCursor cursor(
1837                 servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
1838             cursor.init(opCtx);
1839 
1840             int chunkSize = 0;
1841 
1842             while (cursor.more() || !values.empty()) {
1843                 BSONObj t;
1844                 if (cursor.more()) {
1845                     t = cursor.next().getOwned();
1846                     ++inputCount;
1847 
1848                     if (values.size() == 0) {
1849                         values.push_back(t);
1850                         continue;
1851                     }
1852 
1853                     if (dps::compareObjectsAccordingToSort(t, *(values.begin()), sortKey) == 0) {
1854                         values.push_back(t);
1855                         continue;
1856                     }
1857                 }
1858 
1859                 BSONObj res = config.reducer->finalReduce(values, config.finalizer.get());
1860                 chunkSize += res.objsize();
1861                 if (state.isOnDisk())
1862                     state.insert(config.tempNamespace, res);
1863                 else
1864                     state.emit(res);
1865 
1866                 values.clear();
1867 
1868                 if (!t.isEmpty())
1869                     values.push_back(t);
1870             }
1871 
1872             if (chunk) {
1873                 chunkSizes.append(chunk->getMin());
1874                 chunkSizes.append(chunkSize);
1875             }
1876 
1877             if (++index >= chunks.size())
1878                 break;
1879         }
1880 
1881         // Forget temporary input collection, if output is sharded collection
1882         ShardConnection::forgetNS(inputNS);
1883 
1884         result.append("chunkSizes", chunkSizes.arr());
1885 
1886         long long outputCount = state.postProcessCollection(opCtx, curOp, pm);
1887         state.appendResults(result);
1888 
1889         BSONObjBuilder countsB(32);
1890         countsB.append("input", inputCount);
1891         countsB.append("reduce", state.numReduces());
1892         countsB.append("output", outputCount);
1893         result.append("counts", countsB.obj());
1894 
1895         return true;
1896     }
1897 
1898 } mapReduceFinishCommand;
1899 
1900 }  // namespace mr
1901 }  // namespace mongo
1902