1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 /**
32  * This is the implementation for the Sorter.
33  *
34  * It is intended to be included in other cpp files like this:
35  *
36  * #include <normal/include/files.h>
37  *
38  * #include "mongo/db/sorter/sorter.h"
39  *
40  * namespace mongo {
41  *     // Your code
42  * }
43  *
44  * #include "mongo/db/sorter/sorter.cpp"
45  * MONGO_CREATE_SORTER(MyKeyType, MyValueType, MyComparatorType);
46  *
47  * Do this once for each unique set of parameters to MONGO_CREATE_SORTER.
48  */
49 
50 #include "mongo/db/sorter/sorter.h"
51 
52 #include <boost/filesystem/operations.hpp>
53 #include <snappy.h>
54 #include <vector>
55 
56 #include "mongo/base/string_data.h"
57 #include "mongo/config.h"
58 #include "mongo/db/jsobj.h"
59 #include "mongo/db/service_context.h"
60 #include "mongo/db/storage/encryption_hooks.h"
61 #include "mongo/db/storage/storage_options.h"
62 #include "mongo/platform/atomic_word.h"
63 #include "mongo/s/is_mongos.h"
64 #include "mongo/util/assert_util.h"
65 #include "mongo/util/bufreader.h"
66 #include "mongo/util/destructor_guard.h"
67 #include "mongo/util/mongoutils/str.h"
68 #include "mongo/util/unowned_ptr.h"
69 
70 namespace mongo {
71 
72 namespace sorter {
73 
74 using std::shared_ptr;
75 using namespace mongoutils;
76 
77 // We need to use the "real" errno everywhere, not GetLastError() on Windows
myErrnoWithDescription()78 inline std::string myErrnoWithDescription() {
79     int errnoCopy = errno;
80     StringBuilder sb;
81     sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
82     return sb.str();
83 }
84 
85 template <typename Data, typename Comparator>
dassertCompIsSane(const Comparator & comp,const Data & lhs,const Data & rhs)86 void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
87 #if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER)
88     // MSVC++ already does similar verification in debug mode in addition to using
89     // algorithms that do more comparisons. Doing our own verification in addition makes
90     // debug builds considerably slower without any additional safety.
91 
92     // test reversed comparisons
93     const int regular = comp(lhs, rhs);
94     if (regular == 0) {
95         invariant(comp(rhs, lhs) == 0);
96     } else if (regular < 0) {
97         invariant(comp(rhs, lhs) > 0);
98     } else {
99         invariant(comp(rhs, lhs) < 0);
100     }
101 
102     // test reflexivity
103     invariant(comp(lhs, lhs) == 0);
104     invariant(comp(rhs, rhs) == 0);
105 #endif
106 }
107 
108 /**
109  * Returns results from sorted in-memory storage.
110  */
111 template <typename Key, typename Value>
112 class InMemIterator : public SortIteratorInterface<Key, Value> {
113 public:
114     typedef std::pair<Key, Value> Data;
115 
116     /// No data to iterate
InMemIterator()117     InMemIterator() {}
118 
119     /// Only a single value
InMemIterator(const Data & singleValue)120     InMemIterator(const Data& singleValue) : _data(1, singleValue) {}
121 
122     /// Any number of values
123     template <typename Container>
InMemIterator(const Container & input)124     InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
125 
openSource()126     void openSource() {}
closeSource()127     void closeSource() {}
128 
more()129     bool more() {
130         return !_data.empty();
131     }
next()132     Data next() {
133         Data out = _data.front();
134         _data.pop_front();
135         return out;
136     }
137 
138 private:
139     std::deque<Data> _data;
140 };
141 
142 /**
143  * Returns results from a sorted range within a file. Each instance is given a file name and start
144  * and end offsets.
145  *
146  * This class is NOT responsible for file clean up / deletion. There are openSource() and
147  * closeSource() functions to ensure the FileIterator is not holding the file open when the file is
148  * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in
149  * use elsewhere.
150  */
151 template <typename Key, typename Value>
152 class FileIterator : public SortIteratorInterface<Key, Value> {
153 public:
154     typedef std::pair<typename Key::SorterDeserializeSettings,
155                       typename Value::SorterDeserializeSettings>
156         Settings;
157     typedef std::pair<Key, Value> Data;
158 
FileIterator(const std::string & fileName,std::streampos fileStartOffset,std::streampos fileEndOffset,const Settings & settings)159     FileIterator(const std::string& fileName,
160                  std::streampos fileStartOffset,
161                  std::streampos fileEndOffset,
162                  const Settings& settings)
163         : _settings(settings),
164           _done(false),
165           _fileName(fileName),
166           _fileStartOffset(fileStartOffset),
167           _fileEndOffset(fileEndOffset) {
168         uassert(16815,
169                 str::stream() << "unexpected empty file: " << _fileName,
170                 boost::filesystem::file_size(_fileName) != 0);
171     }
172 
openSource()173     void openSource() {
174         _file.open(_fileName.c_str(), std::ios::in | std::ios::binary);
175         uassert(16814,
176                 str::stream() << "error opening file \"" << _fileName << "\": "
177                               << myErrnoWithDescription(),
178                 _file.good());
179         _file.seekg(_fileStartOffset);
180         uassert(50979,
181                 str::stream() << "error seeking starting offset of '" << _fileStartOffset
182                               << "' in file \""
183                               << _fileName
184                               << "\": "
185                               << myErrnoWithDescription(),
186                 _file.good());
187     }
188 
closeSource()189     void closeSource() {
190         _file.close();
191         uassert(50969,
192                 str::stream() << "error closing file \"" << _fileName << "\": "
193                               << myErrnoWithDescription(),
194                 !_file.fail());
195     }
196 
more()197     bool more() {
198         if (!_done)
199             fillBufferIfNeeded();  // may change _done
200         return !_done;
201     }
202 
next()203     Data next() {
204         verify(!_done);
205         fillBufferIfNeeded();
206 
207         // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
208         // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
209         // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
210         // the Data constructor
211         auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
212         auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
213         return Data(std::move(first), std::move(second));
214     }
215 
216 private:
217     /**
218      * Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
219      */
fillBufferIfNeeded()220     void fillBufferIfNeeded() {
221         verify(!_done);
222 
223         if (!_bufferReader || _bufferReader->atEof())
224             fillBufferFromDisk();
225     }
226 
227     /**
228      * Tries to read from disk and places any results in _bufferReader. If there is no more data to
229      * read, then _done is set to true and the function returns immediately.
230      */
fillBufferFromDisk()231     void fillBufferFromDisk() {
232         int32_t rawSize;
233         read(&rawSize, sizeof(rawSize));
234         if (_done)
235             return;
236 
237         // negative size means compressed
238         const bool compressed = rawSize < 0;
239         int32_t blockSize = std::abs(rawSize);
240 
241         _buffer.reset(new char[blockSize]);
242         read(_buffer.get(), blockSize);
243         uassert(16816, "file too short?", !_done);
244 
245         auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
246         if (encryptionHooks->enabled()) {
247             std::unique_ptr<char[]> out(new char[blockSize]);
248             size_t outLen;
249             Status status =
250                 encryptionHooks->unprotectTmpData(reinterpret_cast<uint8_t*>(_buffer.get()),
251                                                   blockSize,
252                                                   reinterpret_cast<uint8_t*>(out.get()),
253                                                   blockSize,
254                                                   &outLen);
255             uassert(28841,
256                     str::stream() << "Failed to unprotect data: " << status.toString(),
257                     status.isOK());
258             blockSize = outLen;
259             _buffer.swap(out);
260         }
261 
262         if (!compressed) {
263             _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
264             return;
265         }
266 
267         dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
268 
269         size_t uncompressedSize;
270         uassert(17061,
271                 "couldn't get uncompressed length",
272                 snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
273 
274         std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
275         uassert(17062,
276                 "decompression failed",
277                 snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
278 
279         // hold on to decompressed data and throw out compressed data at block exit
280         _buffer.swap(decompressionBuffer);
281         _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
282     }
283 
284     /**
285      * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
286      *
287      * Masserts on any file errors
288      */
read(void * out,size_t size)289     void read(void* out, size_t size) {
290         invariant(_file.is_open());
291 
292         const std::streampos offset = _file.tellg();
293         uassert(51049,
294                 str::stream() << "error reading file \"" << _fileName << "\": "
295                               << myErrnoWithDescription(),
296                 offset >= 0);
297 
298         if (offset >= _fileEndOffset) {
299             invariant(offset == _fileEndOffset);
300             _done = true;
301             return;
302         }
303 
304         _file.read(reinterpret_cast<char*>(out), size);
305         uassert(16817,
306                 str::stream() << "error reading file \"" << _fileName << "\": "
307                               << myErrnoWithDescription(),
308                 _file.good());
309         verify(_file.gcount() == static_cast<std::streamsize>(size));
310     }
311 
312     const Settings _settings;
313     bool _done;
314     std::unique_ptr<char[]> _buffer;
315     std::unique_ptr<BufReader> _bufferReader;
316     std::string _fileName;            // File containing the sorted data range.
317     std::streampos _fileStartOffset;  // File offset at which the sorted data range starts.
318     std::streampos _fileEndOffset;    // File offset at which the sorted data range ends.
319     std::ifstream _file;
320 };
321 
322 /**
323  * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
324  * ranges within the same file. This class is given the data source file name upon construction and
325  * is responsible for deleting the data source file upon destruction.
326  */
327 template <typename Key, typename Value, typename Comparator>
328 class MergeIterator : public SortIteratorInterface<Key, Value> {
329 public:
330     typedef SortIteratorInterface<Key, Value> Input;
331     typedef std::pair<Key, Value> Data;
332 
MergeIterator(const std::vector<std::shared_ptr<Input>> & iters,const std::string & itersSourceFileName,const SortOptions & opts,const Comparator & comp)333     MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
334                   const std::string& itersSourceFileName,
335                   const SortOptions& opts,
336                   const Comparator& comp)
337         : _opts(opts),
338           _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
339           _first(true),
340           _greater(comp),
341           _itersSourceFileName(itersSourceFileName) {
342         for (size_t i = 0; i < iters.size(); i++) {
343             iters[i]->openSource();
344             if (iters[i]->more()) {
345                 _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
346             } else {
347                 iters[i]->closeSource();
348             }
349         }
350 
351         if (_heap.empty()) {
352             _remaining = 0;
353             return;
354         }
355 
356         std::make_heap(_heap.begin(), _heap.end(), _greater);
357         std::pop_heap(_heap.begin(), _heap.end(), _greater);
358         _current = _heap.back();
359         _heap.pop_back();
360     }
361 
~MergeIterator()362     ~MergeIterator() {
363         // Clear the remaining Stream objects first, to close the file handles before deleting the
364         // file. Some systems will error closing the file if any file handles are still open.
365         _current.reset();
366         _heap.clear();
367         DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName));
368     }
369 
openSource()370     void openSource() {}
closeSource()371     void closeSource() {}
372 
more()373     bool more() {
374         if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
375             return true;
376 
377         _remaining = 0;
378         return false;
379     }
380 
next()381     Data next() {
382         verify(_remaining);
383 
384         _remaining--;
385 
386         if (_first) {
387             _first = false;
388             return _current->current();
389         }
390 
391         if (!_current->advance()) {
392             verify(!_heap.empty());
393             std::pop_heap(_heap.begin(), _heap.end(), _greater);
394             _current = _heap.back();
395             _heap.pop_back();
396         } else if (!_heap.empty() && _greater(_current, _heap.front())) {
397             std::pop_heap(_heap.begin(), _heap.end(), _greater);
398             std::swap(_current, _heap.back());
399             std::push_heap(_heap.begin(), _heap.end(), _greater);
400         }
401 
402         return _current->current();
403     }
404 
405 
406 private:
407     /**
408      * Data iterator over an Input stream.
409      *
410      * This class is responsible for closing the Input source upon destruction, unfortunately,
411      * because that is the path of least resistence to a design change requiring MergeIterator to
412      * handle eventual deletion of said Input source.
413      */
414     class Stream {
415     public:
Stream(size_t fileNum,const Data & first,std::shared_ptr<Input> rest)416         Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
417             : fileNum(fileNum), _current(first), _rest(rest) {}
418 
~Stream()419         ~Stream() {
420             _rest->closeSource();
421         }
422 
current() const423         const Data& current() const {
424             return _current;
425         }
more()426         bool more() {
427             return _rest->more();
428         }
advance()429         bool advance() {
430             if (!_rest->more())
431                 return false;
432 
433             _current = _rest->next();
434             return true;
435         }
436 
437         const size_t fileNum;
438 
439     private:
440         Data _current;
441         std::shared_ptr<Input> _rest;
442     };
443 
444     class STLComparator {  // uses greater rather than less-than to maintain a MinHeap
445     public:
STLComparator(const Comparator & comp)446         explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(unowned_ptr<const Stream> lhs,unowned_ptr<const Stream> rhs) const447         bool operator()(unowned_ptr<const Stream> lhs, unowned_ptr<const Stream> rhs) const {
448             // first compare data
449             dassertCompIsSane(_comp, lhs->current(), rhs->current());
450             int ret = _comp(lhs->current(), rhs->current());
451             if (ret)
452                 return ret > 0;
453 
454             // then compare fileNums to ensure stability
455             return lhs->fileNum > rhs->fileNum;
456         }
457 
458     private:
459         const Comparator _comp;
460     };
461 
462     SortOptions _opts;
463     unsigned long long _remaining;
464     bool _first;
465     std::shared_ptr<Stream> _current;
466     std::vector<std::shared_ptr<Stream>> _heap;  // MinHeap
467     STLComparator _greater;                      // named so calls make sense
468     std::string _itersSourceFileName;
469 };
470 
471 template <typename Key, typename Value, typename Comparator>
472 class NoLimitSorter : public Sorter<Key, Value> {
473 public:
474     typedef std::pair<Key, Value> Data;
475     typedef SortIteratorInterface<Key, Value> Iterator;
476     typedef std::pair<typename Key::SorterDeserializeSettings,
477                       typename Value::SorterDeserializeSettings>
478         Settings;
479 
NoLimitSorter(const SortOptions & opts,const Comparator & comp,const Settings & settings=Settings ())480     NoLimitSorter(const SortOptions& opts,
481                   const Comparator& comp,
482                   const Settings& settings = Settings())
483         : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
484         verify(_opts.limit == 0);
485         if (_opts.extSortAllowed) {
486             _fileName = _opts.tempDir + "/" + nextFileName();
487         }
488     }
489 
~NoLimitSorter()490     ~NoLimitSorter() {
491         if (!_done) {
492             // If done() was never called to return a MergeIterator, then this Sorter still owns
493             // file deletion.
494             DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
495         }
496     }
497 
add(const Key & key,const Value & val)498     void add(const Key& key, const Value& val) {
499         invariant(!_done);
500 
501         _data.push_back(std::make_pair(key, val));
502 
503         _memUsed += key.memUsageForSorter();
504         _memUsed += val.memUsageForSorter();
505 
506         if (_memUsed > _opts.maxMemoryUsageBytes)
507             spill();
508     }
509 
done()510     Iterator* done() {
511         invariant(!_done);
512 
513         if (_iters.empty()) {
514             sort();
515             return new InMemIterator<Key, Value>(_data);
516         }
517 
518         spill();
519         Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp);
520         _done = true;
521         return mergeIt;
522     }
523 
524 private:
525     class STLComparator {
526     public:
STLComparator(const Comparator & comp)527         explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(const Data & lhs,const Data & rhs) const528         bool operator()(const Data& lhs, const Data& rhs) const {
529             dassertCompIsSane(_comp, lhs, rhs);
530             return _comp(lhs, rhs) < 0;
531         }
532 
533     private:
534         const Comparator& _comp;
535     };
536 
sort()537     void sort() {
538         STLComparator less(_comp);
539         std::stable_sort(_data.begin(), _data.end(), less);
540 
541         // Does 2x more compares than stable_sort
542         // TODO test on windows
543         // std::sort(_data.begin(), _data.end(), comp);
544     }
545 
spill()546     void spill() {
547         invariant(!_done);
548 
549         if (_data.empty())
550             return;
551 
552         if (!_opts.extSortAllowed) {
553             // XXX This error message is only correct for aggregation, but it is also the
554             // only way this code could be hit at the moment. If the Sorter is used
555             // elsewhere where extSortAllowed could possibly be false, this message will
556             // need to be revisited.
557             uasserted(16819,
558                       str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
559                                     << " bytes, but did not opt in to external sorting.");
560         }
561 
562         sort();
563 
564         SortedFileWriter<Key, Value> writer(
565             _opts, _fileName, _nextSortedFileWriterOffset, _settings);
566         for (; !_data.empty(); _data.pop_front()) {
567             writer.addAlreadySorted(_data.front().first, _data.front().second);
568         }
569         Iterator* iteratorPtr = writer.done();
570         _nextSortedFileWriterOffset = writer.getFileEndOffset();
571 
572         _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
573 
574         _memUsed = 0;
575     }
576 
577     const Comparator _comp;
578     const Settings _settings;
579     SortOptions _opts;
580     std::string _fileName;
581     std::streampos _nextSortedFileWriterOffset = 0;
582     bool _done = false;
583     size_t _memUsed;
584     std::deque<Data> _data;                         // the "current" data
585     std::vector<std::shared_ptr<Iterator>> _iters;  // data that has already been spilled
586 };
587 
588 template <typename Key, typename Value, typename Comparator>
589 class LimitOneSorter : public Sorter<Key, Value> {
590     // Since this class is only used for limit==1, it omits all logic to
591     // spill to disk and only tracks memory usage if explicitly requested.
592 public:
593     typedef std::pair<Key, Value> Data;
594     typedef SortIteratorInterface<Key, Value> Iterator;
595 
LimitOneSorter(const SortOptions & opts,const Comparator & comp)596     LimitOneSorter(const SortOptions& opts, const Comparator& comp)
597         : _comp(comp), _haveData(false) {
598         verify(opts.limit == 1);
599     }
600 
add(const Key & key,const Value & val)601     void add(const Key& key, const Value& val) {
602         Data contender(key, val);
603 
604         if (_haveData) {
605             dassertCompIsSane(_comp, _best, contender);
606             if (_comp(_best, contender) <= 0)
607                 return;  // not good enough
608         } else {
609             _haveData = true;
610         }
611 
612         _best = contender;
613     }
614 
done()615     Iterator* done() {
616         if (_haveData) {
617             return new InMemIterator<Key, Value>(_best);
618         } else {
619             return new InMemIterator<Key, Value>();
620         }
621     }
622 
623 private:
624     const Comparator _comp;
625     Data _best;
626     bool _haveData;  // false at start, set to true on first call to add()
627 };
628 
629 template <typename Key, typename Value, typename Comparator>
630 class TopKSorter : public Sorter<Key, Value> {
631 public:
632     typedef std::pair<Key, Value> Data;
633     typedef SortIteratorInterface<Key, Value> Iterator;
634     typedef std::pair<typename Key::SorterDeserializeSettings,
635                       typename Value::SorterDeserializeSettings>
636         Settings;
637 
TopKSorter(const SortOptions & opts,const Comparator & comp,const Settings & settings=Settings ())638     TopKSorter(const SortOptions& opts,
639                const Comparator& comp,
640                const Settings& settings = Settings())
641         : _comp(comp),
642           _settings(settings),
643           _opts(opts),
644           _memUsed(0),
645           _haveCutoff(false),
646           _worstCount(0),
647           _medianCount(0) {
648         // This also *works* with limit==1 but LimitOneSorter should be used instead
649         verify(_opts.limit > 1);
650 
651         if (_opts.extSortAllowed) {
652             _fileName = _opts.tempDir + "/" + nextFileName();
653         }
654 
655         // Preallocate a fixed sized vector of the required size if we
656         // don't expect it to have a major impact on our memory budget.
657         // This is the common case with small limits.
658         if ((sizeof(Data) * opts.limit) < opts.maxMemoryUsageBytes / 10) {
659             _data.reserve(opts.limit);
660         }
661     }
662 
~TopKSorter()663     ~TopKSorter() {
664         if (!_done) {
665             // If done() was never called to return a MergeIterator, then this Sorter still owns
666             // file deletion.
667             DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
668         }
669     }
670 
add(const Key & key,const Value & val)671     void add(const Key& key, const Value& val) {
672         invariant(!_done);
673 
674         STLComparator less(_comp);
675         Data contender(key, val);
676 
677         if (_data.size() < _opts.limit) {
678             if (_haveCutoff && !less(contender, _cutoff))
679                 return;
680 
681             _data.push_back(contender);
682 
683             _memUsed += key.memUsageForSorter();
684             _memUsed += val.memUsageForSorter();
685 
686             if (_data.size() == _opts.limit)
687                 std::make_heap(_data.begin(), _data.end(), less);
688 
689             if (_memUsed > _opts.maxMemoryUsageBytes)
690                 spill();
691 
692             return;
693         }
694 
695         verify(_data.size() == _opts.limit);
696 
697         if (!less(contender, _data.front()))
698             return;  // not good enough
699 
700         // Remove the old worst pair and insert the contender, adjusting _memUsed
701 
702         _memUsed += key.memUsageForSorter();
703         _memUsed += val.memUsageForSorter();
704 
705         _memUsed -= _data.front().first.memUsageForSorter();
706         _memUsed -= _data.front().second.memUsageForSorter();
707 
708         std::pop_heap(_data.begin(), _data.end(), less);
709         _data.back() = contender;
710         std::push_heap(_data.begin(), _data.end(), less);
711 
712         if (_memUsed > _opts.maxMemoryUsageBytes)
713             spill();
714     }
715 
done()716     Iterator* done() {
717         if (_iters.empty()) {
718             sort();
719             return new InMemIterator<Key, Value>(_data);
720         }
721 
722         spill();
723         Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp);
724         _done = true;
725         return iterator;
726     }
727 
728 private:
729     class STLComparator {
730     public:
STLComparator(const Comparator & comp)731         explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(const Data & lhs,const Data & rhs) const732         bool operator()(const Data& lhs, const Data& rhs) const {
733             dassertCompIsSane(_comp, lhs, rhs);
734             return _comp(lhs, rhs) < 0;
735         }
736 
737     private:
738         const Comparator& _comp;
739     };
740 
sort()741     void sort() {
742         STLComparator less(_comp);
743 
744         if (_data.size() == _opts.limit) {
745             std::sort_heap(_data.begin(), _data.end(), less);
746         } else {
747             std::stable_sort(_data.begin(), _data.end(), less);
748         }
749     }
750 
751     // Can only be called after _data is sorted
updateCutoff()752     void updateCutoff() {
753         // Theory of operation: We want to be able to eagerly ignore values we know will not
754         // be in the TopK result set by setting _cutoff to a value we know we have at least
755         // K values equal to or better than. There are two values that we track to
756         // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
757         // one of these values becomes the new _cutoff, its associated counter is reset to 0
758         // and a new value is chosen for that member the next time we spill.
759         //
760         // _worstSeen is the worst value we've seen so that all kept values are better than
761         // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
762         // reason to consider values worse than _worstSeen so it can become the new _cutoff.
763         // This technique is especially useful when the input is already roughly sorted (eg
764         // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
765         // that will exclude most later values, making the full TopK operation including
766         // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
767         //
768         // _lastMedian was the median of the _data in the first spill() either overall or
769         // following a promotion of _lastMedian to _cutoff. We count the number of kept
770         // values that are better than or equal to _lastMedian in _medianCount and can
771         // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
772         // reasonable median selection (which should happen when the data is completely
773         // unsorted), after the first K spilled values, we will keep roughly 50% of the
774         // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
775         // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
776         // so the expected number of kept values is O(Log(N/K) * K). The final run time if
777         // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
778         // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
779         //
780         // This leaves a currently unoptimized worst case of data that is already roughly
781         // sorted, but in the wrong direction, such that the desired results are all the
782         // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
783         // should be trivially detectable, as a future optimization it might be nice to
784         // detect this case and reverse the direction of input (if possible) which would
785         // turn this into the best case described above.
786         //
787         // Pedantic notes: The time complexities above (which count number of comparisons)
788         // ignore the sorting of batches prior to spilling to disk since they make it more
789         // confusing without changing the results. If you want to add them back in, add an
790         // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
791         // all space complexities measure disk space rather than memory since this class is
792         // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
793 
794         STLComparator less(_comp);  // less is "better" for TopK.
795 
796         // Pick a new _worstSeen or _lastMedian if should.
797         if (_worstCount == 0 || less(_worstSeen, _data.back())) {
798             _worstSeen = _data.back();
799         }
800         if (_medianCount == 0) {
801             size_t medianIndex = _data.size() / 2;  // chooses the higher if size() is even.
802             _lastMedian = _data[medianIndex];
803         }
804 
805         // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
806         _worstCount += _data.size();  // everything is better or equal
807         typename std::vector<Data>::iterator firstWorseThanLastMedian =
808             std::upper_bound(_data.begin(), _data.end(), _lastMedian, less);
809         _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
810 
811 
812         // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
813         if (_worstCount >= _opts.limit) {
814             if (!_haveCutoff || less(_worstSeen, _cutoff)) {
815                 _cutoff = _worstSeen;
816                 _haveCutoff = true;
817             }
818             _worstCount = 0;
819         }
820         if (_medianCount >= _opts.limit) {
821             if (!_haveCutoff || less(_lastMedian, _cutoff)) {
822                 _cutoff = _lastMedian;
823                 _haveCutoff = true;
824             }
825             _medianCount = 0;
826         }
827     }
828 
spill()829     void spill() {
830         invariant(!_done);
831 
832         if (_data.empty())
833             return;
834 
835         if (!_opts.extSortAllowed) {
836             // XXX This error message is only correct for aggregation, but it is also the
837             // only way this code could be hit at the moment. If the Sorter is used
838             // elsewhere where extSortAllowed could possibly be false, this message will
839             // need to be revisited.
840             uasserted(16820,
841                       str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
842                                     << " bytes, but did not opt in to external sorting.");
843         }
844 
845         // We should check readOnly before getting here.
846         invariant(!storageGlobalParams.readOnly);
847 
848         sort();
849         updateCutoff();
850 
851         SortedFileWriter<Key, Value> writer(
852             _opts, _fileName, _nextSortedFileWriterOffset, _settings);
853         for (size_t i = 0; i < _data.size(); i++) {
854             writer.addAlreadySorted(_data[i].first, _data[i].second);
855         }
856 
857         // clear _data and release backing array's memory
858         std::vector<Data>().swap(_data);
859 
860         Iterator* iteratorPtr = writer.done();
861         _nextSortedFileWriterOffset = writer.getFileEndOffset();
862         _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
863 
864         _memUsed = 0;
865     }
866 
867     const Comparator _comp;
868     const Settings _settings;
869     SortOptions _opts;
870     std::string _fileName;
871     std::streampos _nextSortedFileWriterOffset = 0;
872     bool _done = false;
873     size_t _memUsed;
874     std::vector<Data> _data;  // the "current" data. Organized as max-heap if size == limit.
875     std::vector<std::shared_ptr<Iterator>> _iters;  // data that has already been spilled
876 
877     // See updateCutoff() for a full description of how these members are used.
878     bool _haveCutoff;
879     Data _cutoff;         // We can definitely ignore values worse than this.
880     Data _worstSeen;      // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
881     size_t _worstCount;   // Number of docs better or equal to _worstSeen kept so far.
882     Data _lastMedian;     // Median of a batch. Reset when _medianCount >= _opts.limit.
883     size_t _medianCount;  // Number of docs better or equal to _lastMedian kept so far.
884 };
885 
886 }  // namespace sorter
887 
888 //
889 // SortedFileWriter
890 //
891 
892 template <typename Key, typename Value>
SortedFileWriter(const SortOptions & opts,const std::string & fileName,const std::streampos fileStartOffset,const Settings & settings)893 SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
894                                                const std::string& fileName,
895                                                const std::streampos fileStartOffset,
896                                                const Settings& settings)
897     : _settings(settings) {
898     namespace str = mongoutils::str;
899 
900     // This should be checked by consumers, but if we get here don't allow writes.
901     uassert(
902         16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
903 
904     uassert(17148,
905             "Attempting to use external sort without setting SortOptions::tempDir",
906             !opts.tempDir.empty());
907 
908     boost::filesystem::create_directories(opts.tempDir);
909 
910     _fileName = fileName;
911 
912     // We open the provided file in append mode so that SortedFileWriter instances can share the
913     // same file, used serially. We want to share files in order to stay below system open file
914     // limits.
915     _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out);
916     uassert(16818,
917             str::stream() << "error opening file \"" << _fileName << "\": "
918                           << sorter::myErrnoWithDescription(),
919             _file.good());
920     // The file descriptor is positioned at the end of a file when opened in append mode, but
921     // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass
922     // in the expected offset to this constructor.
923     _fileStartOffset = fileStartOffset;
924 
925     // throw on failure
926     _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
927 }
928 
929 template <typename Key, typename Value>
addAlreadySorted(const Key & key,const Value & val)930 void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
931     key.serializeForSorter(_buffer);
932     val.serializeForSorter(_buffer);
933 
934     if (_buffer.len() > 64 * 1024)
935         spill();
936 }
937 
938 template <typename Key, typename Value>
spill()939 void SortedFileWriter<Key, Value>::spill() {
940     namespace str = mongoutils::str;
941 
942     int32_t size = _buffer.len();
943     char* outBuffer = _buffer.buf();
944 
945     if (size == 0)
946         return;
947 
948     std::string compressed;
949     snappy::Compress(outBuffer, size, &compressed);
950     verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
951 
952     const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9);
953     if (shouldCompress) {
954         size = compressed.size();
955         outBuffer = const_cast<char*>(compressed.data());
956     }
957 
958     std::unique_ptr<char[]> out;
959     auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
960     if (encryptionHooks->enabled()) {
961         size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer();
962         out.reset(new char[protectedSizeMax]);
963         size_t resultLen;
964         Status status = encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer),
965                                                         size,
966                                                         reinterpret_cast<uint8_t*>(out.get()),
967                                                         protectedSizeMax,
968                                                         &resultLen);
969         uassert(28842,
970                 str::stream() << "Failed to compress data: " << status.toString(),
971                 status.isOK());
972         outBuffer = out.get();
973         size = resultLen;
974     }
975 
976     // negative size means compressed
977     size = shouldCompress ? -size : size;
978     try {
979         _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
980         _file.write(outBuffer, std::abs(size));
981     } catch (const std::exception&) {
982         msgasserted(16821,
983                     str::stream() << "error writing to file \"" << _fileName << "\": "
984                                   << sorter::myErrnoWithDescription());
985     }
986 
987     _buffer.reset();
988 }
989 
990 template <typename Key, typename Value>
done()991 SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
992     spill();
993     std::streampos currentFileOffset = _file.tellp();
994     uassert(50980,
995             str::stream() << "error fetching current file descriptor offset in file \"" << _fileName
996                           << "\": "
997                           << sorter::myErrnoWithDescription(),
998             currentFileOffset >= 0);
999 
1000     // In case nothing was written to disk, use _fileStartOffset because tellp() may not be
1001     // initialized on all systems upon opening the file.
1002     _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset;
1003     _file.close();
1004 
1005     return new sorter::FileIterator<Key, Value>(
1006         _fileName, _fileStartOffset, _fileEndOffset, _settings);
1007 }
1008 
1009 //
1010 // Factory Functions
1011 //
1012 
1013 template <typename Key, typename Value>
1014 template <typename Comparator>
merge(const std::vector<std::shared_ptr<SortIteratorInterface>> & iters,const std::string & fileName,const SortOptions & opts,const Comparator & comp)1015 SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
1016     const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
1017     const std::string& fileName,
1018     const SortOptions& opts,
1019     const Comparator& comp) {
1020     return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp);
1021 }
1022 
1023 template <typename Key, typename Value>
1024 template <typename Comparator>
make(const SortOptions & opts,const Comparator & comp,const Settings & settings)1025 Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
1026                                              const Comparator& comp,
1027                                              const Settings& settings) {
1028     // This should be checked by consumers, but if it isn't try to fail early.
1029     uassert(16947,
1030             "Attempting to use external sort from mongos. This is not allowed.",
1031             !(isMongos() && opts.extSortAllowed));
1032 
1033     uassert(17149,
1034             "Attempting to use external sort without setting SortOptions::tempDir",
1035             !(opts.extSortAllowed && opts.tempDir.empty()));
1036 
1037     switch (opts.limit) {
1038         case 0:
1039             return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
1040         case 1:
1041             return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp);
1042         default:
1043             return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings);
1044     }
1045 }
1046 }
1047