1 // mr.h 2 3 4 /** 5 * Copyright (C) 2018-present MongoDB, Inc. 6 * 7 * This program is free software: you can redistribute it and/or modify 8 * it under the terms of the Server Side Public License, version 1, 9 * as published by MongoDB, Inc. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * Server Side Public License for more details. 15 * 16 * You should have received a copy of the Server Side Public License 17 * along with this program. If not, see 18 * <http://www.mongodb.com/licensing/server-side-public-license>. 19 * 20 * As a special exception, the copyright holders give permission to link the 21 * code of portions of this program with the OpenSSL library under certain 22 * conditions as described in each individual source file and distribute 23 * linked combinations including the program with the OpenSSL library. You 24 * must comply with the Server Side Public License in all respects for 25 * all of the code used other than as permitted herein. If you modify file(s) 26 * with this exception, you may extend this exception to your version of the 27 * file(s), but you are not obligated to do so. If you do not wish to do so, 28 * delete this exception statement from your version. If you delete this 29 * exception statement from all source files in the program, then also delete 30 * it in the license file. 31 */ 32 33 #pragma once 34 35 #include <string> 36 #include <vector> 37 38 #include "mongo/db/auth/privilege.h" 39 #include "mongo/db/curop.h" 40 #include "mongo/db/dbdirectclient.h" 41 #include "mongo/db/jsobj.h" 42 #include "mongo/db/namespace_string.h" 43 #include "mongo/platform/atomic_word.h" 44 #include "mongo/scripting/engine.h" 45 46 namespace mongo { 47 48 class Collection; 49 class Database; 50 class OperationContext; 51 52 namespace mr { 53 54 typedef std::vector<BSONObj> BSONList; 55 56 class State; 57 58 // ------------ function interfaces ----------- 59 60 class Mapper { 61 MONGO_DISALLOW_COPYING(Mapper); 62 63 public: ~Mapper()64 virtual ~Mapper() {} 65 virtual void init(State* state) = 0; 66 67 virtual void map(const BSONObj& o) = 0; 68 69 protected: 70 Mapper() = default; 71 }; 72 73 class Finalizer { 74 MONGO_DISALLOW_COPYING(Finalizer); 75 76 public: ~Finalizer()77 virtual ~Finalizer() {} 78 virtual void init(State* state) = 0; 79 80 /** 81 * this takes a tuple and returns a tuple 82 */ 83 virtual BSONObj finalize(const BSONObj& tuple) = 0; 84 85 protected: 86 Finalizer() = default; 87 }; 88 89 class Reducer { 90 MONGO_DISALLOW_COPYING(Reducer); 91 92 public: Reducer()93 Reducer() : numReduces(0) {} ~Reducer()94 virtual ~Reducer() {} 95 virtual void init(State* state) = 0; 96 97 virtual BSONObj reduce(const BSONList& tuples) = 0; 98 /** this means its a final reduce, even if there is no finalizer */ 99 virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer) = 0; 100 101 long long numReduces; 102 }; 103 104 // ------------ js function implementations ----------- 105 106 /** 107 * used as a holder for Scope and ScriptingFunction 108 * visitor like pattern as Scope is gotten from first access 109 */ 110 class JSFunction { 111 MONGO_DISALLOW_COPYING(JSFunction); 112 113 public: 114 /** 115 * @param type (map|reduce|finalize) 116 */ 117 JSFunction(const std::string& type, const BSONElement& e); ~JSFunction()118 virtual ~JSFunction() {} 119 120 virtual void init(State* state); 121 scope()122 Scope* scope() const { 123 return _scope; 124 } func()125 ScriptingFunction func() const { 126 return _func; 127 } 128 129 private: 130 std::string _type; 131 std::string _code; // actual javascript code 132 BSONObj _wantedScope; // this is for CodeWScope 133 134 Scope* _scope; // this is not owned by us, and might be shared 135 ScriptingFunction _func; 136 }; 137 138 class JSMapper : public Mapper { 139 public: JSMapper(const BSONElement & code)140 JSMapper(const BSONElement& code) : _func("_map", code) {} 141 virtual void map(const BSONObj& o); 142 virtual void init(State* state); 143 144 private: 145 JSFunction _func; 146 BSONObj _params; 147 }; 148 149 class JSReducer : public Reducer { 150 public: JSReducer(const BSONElement & code)151 JSReducer(const BSONElement& code) : _func("_reduce", code) {} 152 virtual void init(State* state); 153 154 virtual BSONObj reduce(const BSONList& tuples); 155 virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer); 156 157 private: 158 /** 159 * result in "__returnValue" 160 * @param key OUT 161 * @param endSizeEstimate OUT 162 */ 163 void _reduce(const BSONList& values, BSONObj& key, int& endSizeEstimate); 164 165 JSFunction _func; 166 }; 167 168 class JSFinalizer : public Finalizer { 169 public: JSFinalizer(const BSONElement & code)170 JSFinalizer(const BSONElement& code) : _func("_finalize", code) {} 171 virtual BSONObj finalize(const BSONObj& o); init(State * state)172 virtual void init(State* state) { 173 _func.init(state); 174 } 175 176 private: 177 JSFunction _func; 178 }; 179 180 // ----------------- 181 182 183 class TupleKeyCmp { 184 public: TupleKeyCmp()185 TupleKeyCmp() {} operator()186 bool operator()(const BSONObj& l, const BSONObj& r) const { 187 return l.firstElement().woCompare(r.firstElement()) < 0; 188 } 189 }; 190 191 typedef std::map<BSONObj, BSONList, TupleKeyCmp> InMemory; // from key to list of tuples 192 193 /** 194 * holds map/reduce config information 195 */ 196 class Config { 197 public: 198 Config(const std::string& _dbname, const BSONObj& cmdObj); 199 200 std::string dbname; 201 NamespaceString nss; 202 203 // options 204 bool verbose; 205 bool jsMode; 206 int splitInfo; 207 208 // query options 209 210 BSONObj filter; 211 BSONObj sort; 212 BSONObj collation; 213 long long limit; 214 215 // functions 216 217 std::unique_ptr<Mapper> mapper; 218 std::unique_ptr<Reducer> reducer; 219 std::unique_ptr<Finalizer> finalizer; 220 221 BSONObj mapParams; 222 BSONObj scopeSetup; 223 224 // output tables 225 NamespaceString incLong; 226 NamespaceString tempNamespace; 227 228 enum OutputType { 229 REPLACE, // atomically replace the collection 230 MERGE, // merge keys, override dups 231 REDUCE, // merge keys, reduce dups 232 INMEMORY // only store in memory, limited in size 233 }; 234 struct OutputOptions { 235 std::string outDB; 236 std::string collectionName; 237 NamespaceString finalNamespace; 238 // if true, no lock during output operation 239 bool outNonAtomic; 240 OutputType outType; 241 } outputOptions; 242 243 static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj); 244 245 // max number of keys allowed in JS map before switching mode 246 long jsMaxKeys; 247 // ratio of duplicates vs unique keys before reduce is triggered in js mode 248 float reduceTriggerRatio; 249 // maximum size of map before it gets dumped to disk 250 long maxInMemSize; 251 252 // true when called from mongos to do phase-1 of M/R 253 bool shardedFirstPass; 254 255 // if the output collection is sharded, we must be told what UUID to use for it 256 boost::optional<UUID> finalOutputCollUUID; 257 258 static AtomicUInt32 JOB_NUMBER; 259 }; // end MRsetup 260 261 /** 262 * stores information about intermediate map reduce state 263 * controls flow of data from map->reduce->finalize->output 264 */ 265 class State { 266 public: 267 /** 268 * opCtx must outlive this State. 269 */ 270 State(OperationContext* opCtx, const Config& c); 271 ~State(); 272 273 void init(); 274 275 // ---- prep ----- 276 bool sourceExists(); 277 278 // ---- map stage ---- 279 280 /** 281 * stages on in in-memory storage 282 */ 283 void emit(const BSONObj& a); 284 285 /** 286 * Checks the size of the transient in-memory results accumulated so far and potentially 287 * runs reduce in order to compact them. If the data is still too large, it will be 288 * spilled to the output collection. 289 * 290 * NOTE: Make sure that no DB locks are held, when calling this function, because it may 291 * try to acquire write DB lock for the write to the output collection. 292 */ 293 void reduceAndSpillInMemoryStateIfNeeded(); 294 295 /** 296 * run reduce on _temp 297 */ 298 void reduceInMemory(); 299 300 /** 301 * transfers in memory storage to temp collection 302 */ 303 void dumpToInc(); 304 void insertToInc(BSONObj& o); 305 void _insertToInc(BSONObj& o); 306 307 // ------ reduce stage ----------- 308 309 void prepTempCollection(); 310 311 void finalReduce(BSONList& values); 312 313 void finalReduce(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm); 314 315 // ------- cleanup/data positioning ---------- 316 317 /** 318 * Clean up the temporary and incremental collections 319 */ 320 void dropTempCollections(); 321 322 /** 323 @return number objects in collection 324 */ 325 long long postProcessCollection(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm); 326 long long postProcessCollectionNonAtomic(OperationContext* opCtx, 327 CurOp* op, 328 ProgressMeterHolder& pm, 329 bool callerHoldsGlobalLock); 330 331 /** 332 * if INMEMORY will append 333 * may also append stats or anything else it likes 334 */ 335 void appendResults(BSONObjBuilder& b); 336 337 // -------- util ------------ 338 339 /** 340 * inserts with correct replication semantics 341 */ 342 void insert(const NamespaceString& nss, const BSONObj& o); 343 344 // ------ simple accessors ----- 345 346 /** State maintains ownership, do no use past State lifetime */ scope()347 Scope* scope() { 348 return _scope.get(); 349 } 350 config()351 const Config& config() { 352 return _config; 353 } 354 isOnDisk()355 bool isOnDisk() { 356 return _onDisk; 357 } 358 numEmits()359 long long numEmits() const { 360 if (_jsMode) 361 return _scope->getNumberLongLong("_emitCt"); 362 return _numEmits; 363 } numReduces()364 long long numReduces() const { 365 if (_jsMode) 366 return _scope->getNumberLongLong("_redCt"); 367 return _config.reducer->numReduces; 368 } numInMemKeys()369 long long numInMemKeys() const { 370 if (_jsMode) 371 return _scope->getNumberLongLong("_keyCt"); 372 return _temp->size(); 373 } 374 jsMode()375 bool jsMode() { 376 return _jsMode; 377 } 378 void switchMode(bool jsMode); 379 void bailFromJS(); 380 381 static Collection* getCollectionOrUassert(OperationContext* opCtx, 382 Database* db, 383 const NamespaceString& nss); 384 385 const Config& _config; 386 DBDirectClient _db; 387 bool _useIncremental; // use an incremental collection 388 389 protected: 390 /** 391 * Appends a new document to the in-memory list of tuples, which are under that 392 * document's key. 393 * 394 * @return estimated in-memory size occupied by the newly added document. 395 */ 396 int _add(InMemory* im, const BSONObj& a); 397 398 OperationContext* _opCtx; 399 std::unique_ptr<Scope> _scope; 400 bool _onDisk; // if the end result of this map reduce is disk or not 401 402 std::unique_ptr<InMemory> _temp; 403 long _size; // bytes in _temp 404 long _dupCount; // number of duplicate key entries 405 406 long long _numEmits; 407 408 bool _jsMode; 409 ScriptingFunction _reduceAll; 410 ScriptingFunction _reduceAndEmit; 411 ScriptingFunction _reduceAndFinalize; 412 ScriptingFunction _reduceAndFinalizeAndInsert; 413 }; 414 415 BSONObj fast_emit(const BSONObj& args, void* data); 416 BSONObj _bailFromJS(const BSONObj& args, void* data); 417 418 void addPrivilegesRequiredForMapReduce(Command* commandTemplate, 419 const std::string& dbname, 420 const BSONObj& cmdObj, 421 std::vector<Privilege>* out); 422 423 /** 424 * Returns true if the provided mapReduce command has an 'out' parameter. 425 */ 426 bool mrSupportsWriteConcern(const BSONObj& cmd); 427 428 } // end mr namespace 429 } 430