1 // mr.h
2 
3 
4 /**
5  *    Copyright (C) 2018-present MongoDB, Inc.
6  *
7  *    This program is free software: you can redistribute it and/or modify
8  *    it under the terms of the Server Side Public License, version 1,
9  *    as published by MongoDB, Inc.
10  *
11  *    This program is distributed in the hope that it will be useful,
12  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *    Server Side Public License for more details.
15  *
16  *    You should have received a copy of the Server Side Public License
17  *    along with this program. If not, see
18  *    <http://www.mongodb.com/licensing/server-side-public-license>.
19  *
20  *    As a special exception, the copyright holders give permission to link the
21  *    code of portions of this program with the OpenSSL library under certain
22  *    conditions as described in each individual source file and distribute
23  *    linked combinations including the program with the OpenSSL library. You
24  *    must comply with the Server Side Public License in all respects for
25  *    all of the code used other than as permitted herein. If you modify file(s)
26  *    with this exception, you may extend this exception to your version of the
27  *    file(s), but you are not obligated to do so. If you do not wish to do so,
28  *    delete this exception statement from your version. If you delete this
29  *    exception statement from all source files in the program, then also delete
30  *    it in the license file.
31  */
32 
33 #pragma once
34 
35 #include <string>
36 #include <vector>
37 
38 #include "mongo/db/auth/privilege.h"
39 #include "mongo/db/curop.h"
40 #include "mongo/db/dbdirectclient.h"
41 #include "mongo/db/jsobj.h"
42 #include "mongo/db/namespace_string.h"
43 #include "mongo/platform/atomic_word.h"
44 #include "mongo/scripting/engine.h"
45 
46 namespace mongo {
47 
48 class Collection;
49 class Database;
50 class OperationContext;
51 
52 namespace mr {
53 
54 typedef std::vector<BSONObj> BSONList;
55 
56 class State;
57 
58 // ------------  function interfaces -----------
59 
60 class Mapper {
61     MONGO_DISALLOW_COPYING(Mapper);
62 
63 public:
~Mapper()64     virtual ~Mapper() {}
65     virtual void init(State* state) = 0;
66 
67     virtual void map(const BSONObj& o) = 0;
68 
69 protected:
70     Mapper() = default;
71 };
72 
73 class Finalizer {
74     MONGO_DISALLOW_COPYING(Finalizer);
75 
76 public:
~Finalizer()77     virtual ~Finalizer() {}
78     virtual void init(State* state) = 0;
79 
80     /**
81      * this takes a tuple and returns a tuple
82      */
83     virtual BSONObj finalize(const BSONObj& tuple) = 0;
84 
85 protected:
86     Finalizer() = default;
87 };
88 
89 class Reducer {
90     MONGO_DISALLOW_COPYING(Reducer);
91 
92 public:
Reducer()93     Reducer() : numReduces(0) {}
~Reducer()94     virtual ~Reducer() {}
95     virtual void init(State* state) = 0;
96 
97     virtual BSONObj reduce(const BSONList& tuples) = 0;
98     /** this means its a final reduce, even if there is no finalizer */
99     virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer) = 0;
100 
101     long long numReduces;
102 };
103 
104 // ------------  js function implementations -----------
105 
106 /**
107  * used as a holder for Scope and ScriptingFunction
108  * visitor like pattern as Scope is gotten from first access
109  */
110 class JSFunction {
111     MONGO_DISALLOW_COPYING(JSFunction);
112 
113 public:
114     /**
115      * @param type (map|reduce|finalize)
116      */
117     JSFunction(const std::string& type, const BSONElement& e);
~JSFunction()118     virtual ~JSFunction() {}
119 
120     virtual void init(State* state);
121 
scope()122     Scope* scope() const {
123         return _scope;
124     }
func()125     ScriptingFunction func() const {
126         return _func;
127     }
128 
129 private:
130     std::string _type;
131     std::string _code;     // actual javascript code
132     BSONObj _wantedScope;  // this is for CodeWScope
133 
134     Scope* _scope;  // this is not owned by us, and might be shared
135     ScriptingFunction _func;
136 };
137 
138 class JSMapper : public Mapper {
139 public:
JSMapper(const BSONElement & code)140     JSMapper(const BSONElement& code) : _func("_map", code) {}
141     virtual void map(const BSONObj& o);
142     virtual void init(State* state);
143 
144 private:
145     JSFunction _func;
146     BSONObj _params;
147 };
148 
149 class JSReducer : public Reducer {
150 public:
JSReducer(const BSONElement & code)151     JSReducer(const BSONElement& code) : _func("_reduce", code) {}
152     virtual void init(State* state);
153 
154     virtual BSONObj reduce(const BSONList& tuples);
155     virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer);
156 
157 private:
158     /**
159      * result in "__returnValue"
160      * @param key OUT
161      * @param endSizeEstimate OUT
162     */
163     void _reduce(const BSONList& values, BSONObj& key, int& endSizeEstimate);
164 
165     JSFunction _func;
166 };
167 
168 class JSFinalizer : public Finalizer {
169 public:
JSFinalizer(const BSONElement & code)170     JSFinalizer(const BSONElement& code) : _func("_finalize", code) {}
171     virtual BSONObj finalize(const BSONObj& o);
init(State * state)172     virtual void init(State* state) {
173         _func.init(state);
174     }
175 
176 private:
177     JSFunction _func;
178 };
179 
180 // -----------------
181 
182 
183 class TupleKeyCmp {
184 public:
TupleKeyCmp()185     TupleKeyCmp() {}
operator()186     bool operator()(const BSONObj& l, const BSONObj& r) const {
187         return l.firstElement().woCompare(r.firstElement()) < 0;
188     }
189 };
190 
191 typedef std::map<BSONObj, BSONList, TupleKeyCmp> InMemory;  // from key to list of tuples
192 
193 /**
194  * holds map/reduce config information
195  */
196 class Config {
197 public:
198     Config(const std::string& _dbname, const BSONObj& cmdObj);
199 
200     std::string dbname;
201     NamespaceString nss;
202 
203     // options
204     bool verbose;
205     bool jsMode;
206     int splitInfo;
207 
208     // query options
209 
210     BSONObj filter;
211     BSONObj sort;
212     BSONObj collation;
213     long long limit;
214 
215     // functions
216 
217     std::unique_ptr<Mapper> mapper;
218     std::unique_ptr<Reducer> reducer;
219     std::unique_ptr<Finalizer> finalizer;
220 
221     BSONObj mapParams;
222     BSONObj scopeSetup;
223 
224     // output tables
225     NamespaceString incLong;
226     NamespaceString tempNamespace;
227 
228     enum OutputType {
229         REPLACE,  // atomically replace the collection
230         MERGE,    // merge keys, override dups
231         REDUCE,   // merge keys, reduce dups
232         INMEMORY  // only store in memory, limited in size
233     };
234     struct OutputOptions {
235         std::string outDB;
236         std::string collectionName;
237         NamespaceString finalNamespace;
238         // if true, no lock during output operation
239         bool outNonAtomic;
240         OutputType outType;
241     } outputOptions;
242 
243     static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj);
244 
245     // max number of keys allowed in JS map before switching mode
246     long jsMaxKeys;
247     // ratio of duplicates vs unique keys before reduce is triggered in js mode
248     float reduceTriggerRatio;
249     // maximum size of map before it gets dumped to disk
250     long maxInMemSize;
251 
252     // true when called from mongos to do phase-1 of M/R
253     bool shardedFirstPass;
254 
255     // if the output collection is sharded, we must be told what UUID to use for it
256     boost::optional<UUID> finalOutputCollUUID;
257 
258     static AtomicUInt32 JOB_NUMBER;
259 };  // end MRsetup
260 
261 /**
262  * stores information about intermediate map reduce state
263  * controls flow of data from map->reduce->finalize->output
264  */
265 class State {
266 public:
267     /**
268      * opCtx must outlive this State.
269      */
270     State(OperationContext* opCtx, const Config& c);
271     ~State();
272 
273     void init();
274 
275     // ---- prep  -----
276     bool sourceExists();
277 
278     // ---- map stage ----
279 
280     /**
281      * stages on in in-memory storage
282      */
283     void emit(const BSONObj& a);
284 
285     /**
286     * Checks the size of the transient in-memory results accumulated so far and potentially
287     * runs reduce in order to compact them. If the data is still too large, it will be
288     * spilled to the output collection.
289     *
290     * NOTE: Make sure that no DB locks are held, when calling this function, because it may
291     * try to acquire write DB lock for the write to the output collection.
292     */
293     void reduceAndSpillInMemoryStateIfNeeded();
294 
295     /**
296      * run reduce on _temp
297      */
298     void reduceInMemory();
299 
300     /**
301      * transfers in memory storage to temp collection
302      */
303     void dumpToInc();
304     void insertToInc(BSONObj& o);
305     void _insertToInc(BSONObj& o);
306 
307     // ------ reduce stage -----------
308 
309     void prepTempCollection();
310 
311     void finalReduce(BSONList& values);
312 
313     void finalReduce(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm);
314 
315     // ------- cleanup/data positioning ----------
316 
317     /**
318      * Clean up the temporary and incremental collections
319      */
320     void dropTempCollections();
321 
322     /**
323        @return number objects in collection
324      */
325     long long postProcessCollection(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm);
326     long long postProcessCollectionNonAtomic(OperationContext* opCtx,
327                                              CurOp* op,
328                                              ProgressMeterHolder& pm,
329                                              bool callerHoldsGlobalLock);
330 
331     /**
332      * if INMEMORY will append
333      * may also append stats or anything else it likes
334      */
335     void appendResults(BSONObjBuilder& b);
336 
337     // -------- util ------------
338 
339     /**
340      * inserts with correct replication semantics
341      */
342     void insert(const NamespaceString& nss, const BSONObj& o);
343 
344     // ------ simple accessors -----
345 
346     /** State maintains ownership, do no use past State lifetime */
scope()347     Scope* scope() {
348         return _scope.get();
349     }
350 
config()351     const Config& config() {
352         return _config;
353     }
354 
isOnDisk()355     bool isOnDisk() {
356         return _onDisk;
357     }
358 
numEmits()359     long long numEmits() const {
360         if (_jsMode)
361             return _scope->getNumberLongLong("_emitCt");
362         return _numEmits;
363     }
numReduces()364     long long numReduces() const {
365         if (_jsMode)
366             return _scope->getNumberLongLong("_redCt");
367         return _config.reducer->numReduces;
368     }
numInMemKeys()369     long long numInMemKeys() const {
370         if (_jsMode)
371             return _scope->getNumberLongLong("_keyCt");
372         return _temp->size();
373     }
374 
jsMode()375     bool jsMode() {
376         return _jsMode;
377     }
378     void switchMode(bool jsMode);
379     void bailFromJS();
380 
381     static Collection* getCollectionOrUassert(OperationContext* opCtx,
382                                               Database* db,
383                                               const NamespaceString& nss);
384 
385     const Config& _config;
386     DBDirectClient _db;
387     bool _useIncremental;  // use an incremental collection
388 
389 protected:
390     /**
391      * Appends a new document to the in-memory list of tuples, which are under that
392      * document's key.
393      *
394      * @return estimated in-memory size occupied by the newly added document.
395      */
396     int _add(InMemory* im, const BSONObj& a);
397 
398     OperationContext* _opCtx;
399     std::unique_ptr<Scope> _scope;
400     bool _onDisk;  // if the end result of this map reduce is disk or not
401 
402     std::unique_ptr<InMemory> _temp;
403     long _size;      // bytes in _temp
404     long _dupCount;  // number of duplicate key entries
405 
406     long long _numEmits;
407 
408     bool _jsMode;
409     ScriptingFunction _reduceAll;
410     ScriptingFunction _reduceAndEmit;
411     ScriptingFunction _reduceAndFinalize;
412     ScriptingFunction _reduceAndFinalizeAndInsert;
413 };
414 
415 BSONObj fast_emit(const BSONObj& args, void* data);
416 BSONObj _bailFromJS(const BSONObj& args, void* data);
417 
418 void addPrivilegesRequiredForMapReduce(Command* commandTemplate,
419                                        const std::string& dbname,
420                                        const BSONObj& cmdObj,
421                                        std::vector<Privilege>* out);
422 
423 /**
424  * Returns true if the provided mapReduce command has an 'out' parameter.
425  */
426 bool mrSupportsWriteConcern(const BSONObj& cmd);
427 
428 }  // end mr namespace
429 }
430