1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 /**
32 * This is the implementation for the Sorter.
33 *
34 * It is intended to be included in other cpp files like this:
35 *
36 * #include <normal/include/files.h>
37 *
38 * #include "mongo/db/sorter/sorter.h"
39 *
40 * namespace mongo {
41 * // Your code
42 * }
43 *
44 * #include "mongo/db/sorter/sorter.cpp"
45 * MONGO_CREATE_SORTER(MyKeyType, MyValueType, MyComparatorType);
46 *
47 * Do this once for each unique set of parameters to MONGO_CREATE_SORTER.
48 */
49
50 #include "mongo/db/sorter/sorter.h"
51
52 #include <boost/filesystem/operations.hpp>
53 #include <snappy.h>
54 #include <vector>
55
56 #include "mongo/base/string_data.h"
57 #include "mongo/config.h"
58 #include "mongo/db/jsobj.h"
59 #include "mongo/db/service_context.h"
60 #include "mongo/db/storage/encryption_hooks.h"
61 #include "mongo/db/storage/storage_options.h"
62 #include "mongo/platform/atomic_word.h"
63 #include "mongo/s/is_mongos.h"
64 #include "mongo/util/assert_util.h"
65 #include "mongo/util/bufreader.h"
66 #include "mongo/util/destructor_guard.h"
67 #include "mongo/util/mongoutils/str.h"
68 #include "mongo/util/unowned_ptr.h"
69
70 namespace mongo {
71
72 namespace sorter {
73
74 using std::shared_ptr;
75 using namespace mongoutils;
76
77 // We need to use the "real" errno everywhere, not GetLastError() on Windows
myErrnoWithDescription()78 inline std::string myErrnoWithDescription() {
79 int errnoCopy = errno;
80 StringBuilder sb;
81 sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
82 return sb.str();
83 }
84
85 template <typename Data, typename Comparator>
dassertCompIsSane(const Comparator & comp,const Data & lhs,const Data & rhs)86 void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
87 #if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER)
88 // MSVC++ already does similar verification in debug mode in addition to using
89 // algorithms that do more comparisons. Doing our own verification in addition makes
90 // debug builds considerably slower without any additional safety.
91
92 // test reversed comparisons
93 const int regular = comp(lhs, rhs);
94 if (regular == 0) {
95 invariant(comp(rhs, lhs) == 0);
96 } else if (regular < 0) {
97 invariant(comp(rhs, lhs) > 0);
98 } else {
99 invariant(comp(rhs, lhs) < 0);
100 }
101
102 // test reflexivity
103 invariant(comp(lhs, lhs) == 0);
104 invariant(comp(rhs, rhs) == 0);
105 #endif
106 }
107
108 /**
109 * Returns results from sorted in-memory storage.
110 */
111 template <typename Key, typename Value>
112 class InMemIterator : public SortIteratorInterface<Key, Value> {
113 public:
114 typedef std::pair<Key, Value> Data;
115
116 /// No data to iterate
InMemIterator()117 InMemIterator() {}
118
119 /// Only a single value
InMemIterator(const Data & singleValue)120 InMemIterator(const Data& singleValue) : _data(1, singleValue) {}
121
122 /// Any number of values
123 template <typename Container>
InMemIterator(const Container & input)124 InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
125
openSource()126 void openSource() {}
closeSource()127 void closeSource() {}
128
more()129 bool more() {
130 return !_data.empty();
131 }
next()132 Data next() {
133 Data out = _data.front();
134 _data.pop_front();
135 return out;
136 }
137
138 private:
139 std::deque<Data> _data;
140 };
141
142 /**
143 * Returns results from a sorted range within a file. Each instance is given a file name and start
144 * and end offsets.
145 *
146 * This class is NOT responsible for file clean up / deletion. There are openSource() and
147 * closeSource() functions to ensure the FileIterator is not holding the file open when the file is
148 * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in
149 * use elsewhere.
150 */
151 template <typename Key, typename Value>
152 class FileIterator : public SortIteratorInterface<Key, Value> {
153 public:
154 typedef std::pair<typename Key::SorterDeserializeSettings,
155 typename Value::SorterDeserializeSettings>
156 Settings;
157 typedef std::pair<Key, Value> Data;
158
FileIterator(const std::string & fileName,std::streampos fileStartOffset,std::streampos fileEndOffset,const Settings & settings)159 FileIterator(const std::string& fileName,
160 std::streampos fileStartOffset,
161 std::streampos fileEndOffset,
162 const Settings& settings)
163 : _settings(settings),
164 _done(false),
165 _fileName(fileName),
166 _fileStartOffset(fileStartOffset),
167 _fileEndOffset(fileEndOffset) {
168 uassert(16815,
169 str::stream() << "unexpected empty file: " << _fileName,
170 boost::filesystem::file_size(_fileName) != 0);
171 }
172
openSource()173 void openSource() {
174 _file.open(_fileName.c_str(), std::ios::in | std::ios::binary);
175 uassert(16814,
176 str::stream() << "error opening file \"" << _fileName << "\": "
177 << myErrnoWithDescription(),
178 _file.good());
179 _file.seekg(_fileStartOffset);
180 uassert(50979,
181 str::stream() << "error seeking starting offset of '" << _fileStartOffset
182 << "' in file \""
183 << _fileName
184 << "\": "
185 << myErrnoWithDescription(),
186 _file.good());
187 }
188
closeSource()189 void closeSource() {
190 _file.close();
191 uassert(50969,
192 str::stream() << "error closing file \"" << _fileName << "\": "
193 << myErrnoWithDescription(),
194 !_file.fail());
195 }
196
more()197 bool more() {
198 if (!_done)
199 fillBufferIfNeeded(); // may change _done
200 return !_done;
201 }
202
next()203 Data next() {
204 verify(!_done);
205 fillBufferIfNeeded();
206
207 // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
208 // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
209 // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
210 // the Data constructor
211 auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
212 auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
213 return Data(std::move(first), std::move(second));
214 }
215
216 private:
217 /**
218 * Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
219 */
fillBufferIfNeeded()220 void fillBufferIfNeeded() {
221 verify(!_done);
222
223 if (!_bufferReader || _bufferReader->atEof())
224 fillBufferFromDisk();
225 }
226
227 /**
228 * Tries to read from disk and places any results in _bufferReader. If there is no more data to
229 * read, then _done is set to true and the function returns immediately.
230 */
fillBufferFromDisk()231 void fillBufferFromDisk() {
232 int32_t rawSize;
233 read(&rawSize, sizeof(rawSize));
234 if (_done)
235 return;
236
237 // negative size means compressed
238 const bool compressed = rawSize < 0;
239 int32_t blockSize = std::abs(rawSize);
240
241 _buffer.reset(new char[blockSize]);
242 read(_buffer.get(), blockSize);
243 uassert(16816, "file too short?", !_done);
244
245 auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
246 if (encryptionHooks->enabled()) {
247 std::unique_ptr<char[]> out(new char[blockSize]);
248 size_t outLen;
249 Status status =
250 encryptionHooks->unprotectTmpData(reinterpret_cast<uint8_t*>(_buffer.get()),
251 blockSize,
252 reinterpret_cast<uint8_t*>(out.get()),
253 blockSize,
254 &outLen);
255 uassert(28841,
256 str::stream() << "Failed to unprotect data: " << status.toString(),
257 status.isOK());
258 blockSize = outLen;
259 _buffer.swap(out);
260 }
261
262 if (!compressed) {
263 _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
264 return;
265 }
266
267 dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
268
269 size_t uncompressedSize;
270 uassert(17061,
271 "couldn't get uncompressed length",
272 snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
273
274 std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
275 uassert(17062,
276 "decompression failed",
277 snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
278
279 // hold on to decompressed data and throw out compressed data at block exit
280 _buffer.swap(decompressionBuffer);
281 _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
282 }
283
284 /**
285 * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
286 *
287 * Masserts on any file errors
288 */
read(void * out,size_t size)289 void read(void* out, size_t size) {
290 invariant(_file.is_open());
291
292 const std::streampos offset = _file.tellg();
293 uassert(51049,
294 str::stream() << "error reading file \"" << _fileName << "\": "
295 << myErrnoWithDescription(),
296 offset >= 0);
297
298 if (offset >= _fileEndOffset) {
299 invariant(offset == _fileEndOffset);
300 _done = true;
301 return;
302 }
303
304 _file.read(reinterpret_cast<char*>(out), size);
305 uassert(16817,
306 str::stream() << "error reading file \"" << _fileName << "\": "
307 << myErrnoWithDescription(),
308 _file.good());
309 verify(_file.gcount() == static_cast<std::streamsize>(size));
310 }
311
312 const Settings _settings;
313 bool _done;
314 std::unique_ptr<char[]> _buffer;
315 std::unique_ptr<BufReader> _bufferReader;
316 std::string _fileName; // File containing the sorted data range.
317 std::streampos _fileStartOffset; // File offset at which the sorted data range starts.
318 std::streampos _fileEndOffset; // File offset at which the sorted data range ends.
319 std::ifstream _file;
320 };
321
322 /**
323 * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
324 * ranges within the same file. This class is given the data source file name upon construction and
325 * is responsible for deleting the data source file upon destruction.
326 */
327 template <typename Key, typename Value, typename Comparator>
328 class MergeIterator : public SortIteratorInterface<Key, Value> {
329 public:
330 typedef SortIteratorInterface<Key, Value> Input;
331 typedef std::pair<Key, Value> Data;
332
MergeIterator(const std::vector<std::shared_ptr<Input>> & iters,const std::string & itersSourceFileName,const SortOptions & opts,const Comparator & comp)333 MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
334 const std::string& itersSourceFileName,
335 const SortOptions& opts,
336 const Comparator& comp)
337 : _opts(opts),
338 _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
339 _first(true),
340 _greater(comp),
341 _itersSourceFileName(itersSourceFileName) {
342 for (size_t i = 0; i < iters.size(); i++) {
343 iters[i]->openSource();
344 if (iters[i]->more()) {
345 _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
346 } else {
347 iters[i]->closeSource();
348 }
349 }
350
351 if (_heap.empty()) {
352 _remaining = 0;
353 return;
354 }
355
356 std::make_heap(_heap.begin(), _heap.end(), _greater);
357 std::pop_heap(_heap.begin(), _heap.end(), _greater);
358 _current = _heap.back();
359 _heap.pop_back();
360 }
361
~MergeIterator()362 ~MergeIterator() {
363 // Clear the remaining Stream objects first, to close the file handles before deleting the
364 // file. Some systems will error closing the file if any file handles are still open.
365 _current.reset();
366 _heap.clear();
367 DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName));
368 }
369
openSource()370 void openSource() {}
closeSource()371 void closeSource() {}
372
more()373 bool more() {
374 if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
375 return true;
376
377 _remaining = 0;
378 return false;
379 }
380
next()381 Data next() {
382 verify(_remaining);
383
384 _remaining--;
385
386 if (_first) {
387 _first = false;
388 return _current->current();
389 }
390
391 if (!_current->advance()) {
392 verify(!_heap.empty());
393 std::pop_heap(_heap.begin(), _heap.end(), _greater);
394 _current = _heap.back();
395 _heap.pop_back();
396 } else if (!_heap.empty() && _greater(_current, _heap.front())) {
397 std::pop_heap(_heap.begin(), _heap.end(), _greater);
398 std::swap(_current, _heap.back());
399 std::push_heap(_heap.begin(), _heap.end(), _greater);
400 }
401
402 return _current->current();
403 }
404
405
406 private:
407 /**
408 * Data iterator over an Input stream.
409 *
410 * This class is responsible for closing the Input source upon destruction, unfortunately,
411 * because that is the path of least resistence to a design change requiring MergeIterator to
412 * handle eventual deletion of said Input source.
413 */
414 class Stream {
415 public:
Stream(size_t fileNum,const Data & first,std::shared_ptr<Input> rest)416 Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
417 : fileNum(fileNum), _current(first), _rest(rest) {}
418
~Stream()419 ~Stream() {
420 _rest->closeSource();
421 }
422
current() const423 const Data& current() const {
424 return _current;
425 }
more()426 bool more() {
427 return _rest->more();
428 }
advance()429 bool advance() {
430 if (!_rest->more())
431 return false;
432
433 _current = _rest->next();
434 return true;
435 }
436
437 const size_t fileNum;
438
439 private:
440 Data _current;
441 std::shared_ptr<Input> _rest;
442 };
443
444 class STLComparator { // uses greater rather than less-than to maintain a MinHeap
445 public:
STLComparator(const Comparator & comp)446 explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(unowned_ptr<const Stream> lhs,unowned_ptr<const Stream> rhs) const447 bool operator()(unowned_ptr<const Stream> lhs, unowned_ptr<const Stream> rhs) const {
448 // first compare data
449 dassertCompIsSane(_comp, lhs->current(), rhs->current());
450 int ret = _comp(lhs->current(), rhs->current());
451 if (ret)
452 return ret > 0;
453
454 // then compare fileNums to ensure stability
455 return lhs->fileNum > rhs->fileNum;
456 }
457
458 private:
459 const Comparator _comp;
460 };
461
462 SortOptions _opts;
463 unsigned long long _remaining;
464 bool _first;
465 std::shared_ptr<Stream> _current;
466 std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
467 STLComparator _greater; // named so calls make sense
468 std::string _itersSourceFileName;
469 };
470
471 template <typename Key, typename Value, typename Comparator>
472 class NoLimitSorter : public Sorter<Key, Value> {
473 public:
474 typedef std::pair<Key, Value> Data;
475 typedef SortIteratorInterface<Key, Value> Iterator;
476 typedef std::pair<typename Key::SorterDeserializeSettings,
477 typename Value::SorterDeserializeSettings>
478 Settings;
479
NoLimitSorter(const SortOptions & opts,const Comparator & comp,const Settings & settings=Settings ())480 NoLimitSorter(const SortOptions& opts,
481 const Comparator& comp,
482 const Settings& settings = Settings())
483 : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
484 verify(_opts.limit == 0);
485 if (_opts.extSortAllowed) {
486 _fileName = _opts.tempDir + "/" + nextFileName();
487 }
488 }
489
~NoLimitSorter()490 ~NoLimitSorter() {
491 if (!_done) {
492 // If done() was never called to return a MergeIterator, then this Sorter still owns
493 // file deletion.
494 DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
495 }
496 }
497
add(const Key & key,const Value & val)498 void add(const Key& key, const Value& val) {
499 invariant(!_done);
500
501 _data.push_back(std::make_pair(key, val));
502
503 _memUsed += key.memUsageForSorter();
504 _memUsed += val.memUsageForSorter();
505
506 if (_memUsed > _opts.maxMemoryUsageBytes)
507 spill();
508 }
509
done()510 Iterator* done() {
511 invariant(!_done);
512
513 if (_iters.empty()) {
514 sort();
515 return new InMemIterator<Key, Value>(_data);
516 }
517
518 spill();
519 Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp);
520 _done = true;
521 return mergeIt;
522 }
523
524 private:
525 class STLComparator {
526 public:
STLComparator(const Comparator & comp)527 explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(const Data & lhs,const Data & rhs) const528 bool operator()(const Data& lhs, const Data& rhs) const {
529 dassertCompIsSane(_comp, lhs, rhs);
530 return _comp(lhs, rhs) < 0;
531 }
532
533 private:
534 const Comparator& _comp;
535 };
536
sort()537 void sort() {
538 STLComparator less(_comp);
539 std::stable_sort(_data.begin(), _data.end(), less);
540
541 // Does 2x more compares than stable_sort
542 // TODO test on windows
543 // std::sort(_data.begin(), _data.end(), comp);
544 }
545
spill()546 void spill() {
547 invariant(!_done);
548
549 if (_data.empty())
550 return;
551
552 if (!_opts.extSortAllowed) {
553 // XXX This error message is only correct for aggregation, but it is also the
554 // only way this code could be hit at the moment. If the Sorter is used
555 // elsewhere where extSortAllowed could possibly be false, this message will
556 // need to be revisited.
557 uasserted(16819,
558 str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
559 << " bytes, but did not opt in to external sorting.");
560 }
561
562 sort();
563
564 SortedFileWriter<Key, Value> writer(
565 _opts, _fileName, _nextSortedFileWriterOffset, _settings);
566 for (; !_data.empty(); _data.pop_front()) {
567 writer.addAlreadySorted(_data.front().first, _data.front().second);
568 }
569 Iterator* iteratorPtr = writer.done();
570 _nextSortedFileWriterOffset = writer.getFileEndOffset();
571
572 _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
573
574 _memUsed = 0;
575 }
576
577 const Comparator _comp;
578 const Settings _settings;
579 SortOptions _opts;
580 std::string _fileName;
581 std::streampos _nextSortedFileWriterOffset = 0;
582 bool _done = false;
583 size_t _memUsed;
584 std::deque<Data> _data; // the "current" data
585 std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
586 };
587
588 template <typename Key, typename Value, typename Comparator>
589 class LimitOneSorter : public Sorter<Key, Value> {
590 // Since this class is only used for limit==1, it omits all logic to
591 // spill to disk and only tracks memory usage if explicitly requested.
592 public:
593 typedef std::pair<Key, Value> Data;
594 typedef SortIteratorInterface<Key, Value> Iterator;
595
LimitOneSorter(const SortOptions & opts,const Comparator & comp)596 LimitOneSorter(const SortOptions& opts, const Comparator& comp)
597 : _comp(comp), _haveData(false) {
598 verify(opts.limit == 1);
599 }
600
add(const Key & key,const Value & val)601 void add(const Key& key, const Value& val) {
602 Data contender(key, val);
603
604 if (_haveData) {
605 dassertCompIsSane(_comp, _best, contender);
606 if (_comp(_best, contender) <= 0)
607 return; // not good enough
608 } else {
609 _haveData = true;
610 }
611
612 _best = contender;
613 }
614
done()615 Iterator* done() {
616 if (_haveData) {
617 return new InMemIterator<Key, Value>(_best);
618 } else {
619 return new InMemIterator<Key, Value>();
620 }
621 }
622
623 private:
624 const Comparator _comp;
625 Data _best;
626 bool _haveData; // false at start, set to true on first call to add()
627 };
628
629 template <typename Key, typename Value, typename Comparator>
630 class TopKSorter : public Sorter<Key, Value> {
631 public:
632 typedef std::pair<Key, Value> Data;
633 typedef SortIteratorInterface<Key, Value> Iterator;
634 typedef std::pair<typename Key::SorterDeserializeSettings,
635 typename Value::SorterDeserializeSettings>
636 Settings;
637
TopKSorter(const SortOptions & opts,const Comparator & comp,const Settings & settings=Settings ())638 TopKSorter(const SortOptions& opts,
639 const Comparator& comp,
640 const Settings& settings = Settings())
641 : _comp(comp),
642 _settings(settings),
643 _opts(opts),
644 _memUsed(0),
645 _haveCutoff(false),
646 _worstCount(0),
647 _medianCount(0) {
648 // This also *works* with limit==1 but LimitOneSorter should be used instead
649 verify(_opts.limit > 1);
650
651 if (_opts.extSortAllowed) {
652 _fileName = _opts.tempDir + "/" + nextFileName();
653 }
654
655 // Preallocate a fixed sized vector of the required size if we
656 // don't expect it to have a major impact on our memory budget.
657 // This is the common case with small limits.
658 if ((sizeof(Data) * opts.limit) < opts.maxMemoryUsageBytes / 10) {
659 _data.reserve(opts.limit);
660 }
661 }
662
~TopKSorter()663 ~TopKSorter() {
664 if (!_done) {
665 // If done() was never called to return a MergeIterator, then this Sorter still owns
666 // file deletion.
667 DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
668 }
669 }
670
add(const Key & key,const Value & val)671 void add(const Key& key, const Value& val) {
672 invariant(!_done);
673
674 STLComparator less(_comp);
675 Data contender(key, val);
676
677 if (_data.size() < _opts.limit) {
678 if (_haveCutoff && !less(contender, _cutoff))
679 return;
680
681 _data.push_back(contender);
682
683 _memUsed += key.memUsageForSorter();
684 _memUsed += val.memUsageForSorter();
685
686 if (_data.size() == _opts.limit)
687 std::make_heap(_data.begin(), _data.end(), less);
688
689 if (_memUsed > _opts.maxMemoryUsageBytes)
690 spill();
691
692 return;
693 }
694
695 verify(_data.size() == _opts.limit);
696
697 if (!less(contender, _data.front()))
698 return; // not good enough
699
700 // Remove the old worst pair and insert the contender, adjusting _memUsed
701
702 _memUsed += key.memUsageForSorter();
703 _memUsed += val.memUsageForSorter();
704
705 _memUsed -= _data.front().first.memUsageForSorter();
706 _memUsed -= _data.front().second.memUsageForSorter();
707
708 std::pop_heap(_data.begin(), _data.end(), less);
709 _data.back() = contender;
710 std::push_heap(_data.begin(), _data.end(), less);
711
712 if (_memUsed > _opts.maxMemoryUsageBytes)
713 spill();
714 }
715
done()716 Iterator* done() {
717 if (_iters.empty()) {
718 sort();
719 return new InMemIterator<Key, Value>(_data);
720 }
721
722 spill();
723 Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp);
724 _done = true;
725 return iterator;
726 }
727
728 private:
729 class STLComparator {
730 public:
STLComparator(const Comparator & comp)731 explicit STLComparator(const Comparator& comp) : _comp(comp) {}
operator ()(const Data & lhs,const Data & rhs) const732 bool operator()(const Data& lhs, const Data& rhs) const {
733 dassertCompIsSane(_comp, lhs, rhs);
734 return _comp(lhs, rhs) < 0;
735 }
736
737 private:
738 const Comparator& _comp;
739 };
740
sort()741 void sort() {
742 STLComparator less(_comp);
743
744 if (_data.size() == _opts.limit) {
745 std::sort_heap(_data.begin(), _data.end(), less);
746 } else {
747 std::stable_sort(_data.begin(), _data.end(), less);
748 }
749 }
750
751 // Can only be called after _data is sorted
updateCutoff()752 void updateCutoff() {
753 // Theory of operation: We want to be able to eagerly ignore values we know will not
754 // be in the TopK result set by setting _cutoff to a value we know we have at least
755 // K values equal to or better than. There are two values that we track to
756 // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
757 // one of these values becomes the new _cutoff, its associated counter is reset to 0
758 // and a new value is chosen for that member the next time we spill.
759 //
760 // _worstSeen is the worst value we've seen so that all kept values are better than
761 // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
762 // reason to consider values worse than _worstSeen so it can become the new _cutoff.
763 // This technique is especially useful when the input is already roughly sorted (eg
764 // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
765 // that will exclude most later values, making the full TopK operation including
766 // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
767 //
768 // _lastMedian was the median of the _data in the first spill() either overall or
769 // following a promotion of _lastMedian to _cutoff. We count the number of kept
770 // values that are better than or equal to _lastMedian in _medianCount and can
771 // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
772 // reasonable median selection (which should happen when the data is completely
773 // unsorted), after the first K spilled values, we will keep roughly 50% of the
774 // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
775 // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
776 // so the expected number of kept values is O(Log(N/K) * K). The final run time if
777 // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
778 // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
779 //
780 // This leaves a currently unoptimized worst case of data that is already roughly
781 // sorted, but in the wrong direction, such that the desired results are all the
782 // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
783 // should be trivially detectable, as a future optimization it might be nice to
784 // detect this case and reverse the direction of input (if possible) which would
785 // turn this into the best case described above.
786 //
787 // Pedantic notes: The time complexities above (which count number of comparisons)
788 // ignore the sorting of batches prior to spilling to disk since they make it more
789 // confusing without changing the results. If you want to add them back in, add an
790 // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
791 // all space complexities measure disk space rather than memory since this class is
792 // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
793
794 STLComparator less(_comp); // less is "better" for TopK.
795
796 // Pick a new _worstSeen or _lastMedian if should.
797 if (_worstCount == 0 || less(_worstSeen, _data.back())) {
798 _worstSeen = _data.back();
799 }
800 if (_medianCount == 0) {
801 size_t medianIndex = _data.size() / 2; // chooses the higher if size() is even.
802 _lastMedian = _data[medianIndex];
803 }
804
805 // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
806 _worstCount += _data.size(); // everything is better or equal
807 typename std::vector<Data>::iterator firstWorseThanLastMedian =
808 std::upper_bound(_data.begin(), _data.end(), _lastMedian, less);
809 _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
810
811
812 // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
813 if (_worstCount >= _opts.limit) {
814 if (!_haveCutoff || less(_worstSeen, _cutoff)) {
815 _cutoff = _worstSeen;
816 _haveCutoff = true;
817 }
818 _worstCount = 0;
819 }
820 if (_medianCount >= _opts.limit) {
821 if (!_haveCutoff || less(_lastMedian, _cutoff)) {
822 _cutoff = _lastMedian;
823 _haveCutoff = true;
824 }
825 _medianCount = 0;
826 }
827 }
828
spill()829 void spill() {
830 invariant(!_done);
831
832 if (_data.empty())
833 return;
834
835 if (!_opts.extSortAllowed) {
836 // XXX This error message is only correct for aggregation, but it is also the
837 // only way this code could be hit at the moment. If the Sorter is used
838 // elsewhere where extSortAllowed could possibly be false, this message will
839 // need to be revisited.
840 uasserted(16820,
841 str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
842 << " bytes, but did not opt in to external sorting.");
843 }
844
845 // We should check readOnly before getting here.
846 invariant(!storageGlobalParams.readOnly);
847
848 sort();
849 updateCutoff();
850
851 SortedFileWriter<Key, Value> writer(
852 _opts, _fileName, _nextSortedFileWriterOffset, _settings);
853 for (size_t i = 0; i < _data.size(); i++) {
854 writer.addAlreadySorted(_data[i].first, _data[i].second);
855 }
856
857 // clear _data and release backing array's memory
858 std::vector<Data>().swap(_data);
859
860 Iterator* iteratorPtr = writer.done();
861 _nextSortedFileWriterOffset = writer.getFileEndOffset();
862 _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
863
864 _memUsed = 0;
865 }
866
867 const Comparator _comp;
868 const Settings _settings;
869 SortOptions _opts;
870 std::string _fileName;
871 std::streampos _nextSortedFileWriterOffset = 0;
872 bool _done = false;
873 size_t _memUsed;
874 std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
875 std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
876
877 // See updateCutoff() for a full description of how these members are used.
878 bool _haveCutoff;
879 Data _cutoff; // We can definitely ignore values worse than this.
880 Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
881 size_t _worstCount; // Number of docs better or equal to _worstSeen kept so far.
882 Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit.
883 size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
884 };
885
886 } // namespace sorter
887
888 //
889 // SortedFileWriter
890 //
891
892 template <typename Key, typename Value>
SortedFileWriter(const SortOptions & opts,const std::string & fileName,const std::streampos fileStartOffset,const Settings & settings)893 SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
894 const std::string& fileName,
895 const std::streampos fileStartOffset,
896 const Settings& settings)
897 : _settings(settings) {
898 namespace str = mongoutils::str;
899
900 // This should be checked by consumers, but if we get here don't allow writes.
901 uassert(
902 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
903
904 uassert(17148,
905 "Attempting to use external sort without setting SortOptions::tempDir",
906 !opts.tempDir.empty());
907
908 boost::filesystem::create_directories(opts.tempDir);
909
910 _fileName = fileName;
911
912 // We open the provided file in append mode so that SortedFileWriter instances can share the
913 // same file, used serially. We want to share files in order to stay below system open file
914 // limits.
915 _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out);
916 uassert(16818,
917 str::stream() << "error opening file \"" << _fileName << "\": "
918 << sorter::myErrnoWithDescription(),
919 _file.good());
920 // The file descriptor is positioned at the end of a file when opened in append mode, but
921 // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass
922 // in the expected offset to this constructor.
923 _fileStartOffset = fileStartOffset;
924
925 // throw on failure
926 _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
927 }
928
929 template <typename Key, typename Value>
addAlreadySorted(const Key & key,const Value & val)930 void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
931 key.serializeForSorter(_buffer);
932 val.serializeForSorter(_buffer);
933
934 if (_buffer.len() > 64 * 1024)
935 spill();
936 }
937
938 template <typename Key, typename Value>
spill()939 void SortedFileWriter<Key, Value>::spill() {
940 namespace str = mongoutils::str;
941
942 int32_t size = _buffer.len();
943 char* outBuffer = _buffer.buf();
944
945 if (size == 0)
946 return;
947
948 std::string compressed;
949 snappy::Compress(outBuffer, size, &compressed);
950 verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
951
952 const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9);
953 if (shouldCompress) {
954 size = compressed.size();
955 outBuffer = const_cast<char*>(compressed.data());
956 }
957
958 std::unique_ptr<char[]> out;
959 auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
960 if (encryptionHooks->enabled()) {
961 size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer();
962 out.reset(new char[protectedSizeMax]);
963 size_t resultLen;
964 Status status = encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer),
965 size,
966 reinterpret_cast<uint8_t*>(out.get()),
967 protectedSizeMax,
968 &resultLen);
969 uassert(28842,
970 str::stream() << "Failed to compress data: " << status.toString(),
971 status.isOK());
972 outBuffer = out.get();
973 size = resultLen;
974 }
975
976 // negative size means compressed
977 size = shouldCompress ? -size : size;
978 try {
979 _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
980 _file.write(outBuffer, std::abs(size));
981 } catch (const std::exception&) {
982 msgasserted(16821,
983 str::stream() << "error writing to file \"" << _fileName << "\": "
984 << sorter::myErrnoWithDescription());
985 }
986
987 _buffer.reset();
988 }
989
990 template <typename Key, typename Value>
done()991 SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
992 spill();
993 std::streampos currentFileOffset = _file.tellp();
994 uassert(50980,
995 str::stream() << "error fetching current file descriptor offset in file \"" << _fileName
996 << "\": "
997 << sorter::myErrnoWithDescription(),
998 currentFileOffset >= 0);
999
1000 // In case nothing was written to disk, use _fileStartOffset because tellp() may not be
1001 // initialized on all systems upon opening the file.
1002 _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset;
1003 _file.close();
1004
1005 return new sorter::FileIterator<Key, Value>(
1006 _fileName, _fileStartOffset, _fileEndOffset, _settings);
1007 }
1008
1009 //
1010 // Factory Functions
1011 //
1012
1013 template <typename Key, typename Value>
1014 template <typename Comparator>
merge(const std::vector<std::shared_ptr<SortIteratorInterface>> & iters,const std::string & fileName,const SortOptions & opts,const Comparator & comp)1015 SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
1016 const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
1017 const std::string& fileName,
1018 const SortOptions& opts,
1019 const Comparator& comp) {
1020 return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp);
1021 }
1022
1023 template <typename Key, typename Value>
1024 template <typename Comparator>
make(const SortOptions & opts,const Comparator & comp,const Settings & settings)1025 Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
1026 const Comparator& comp,
1027 const Settings& settings) {
1028 // This should be checked by consumers, but if it isn't try to fail early.
1029 uassert(16947,
1030 "Attempting to use external sort from mongos. This is not allowed.",
1031 !(isMongos() && opts.extSortAllowed));
1032
1033 uassert(17149,
1034 "Attempting to use external sort without setting SortOptions::tempDir",
1035 !(opts.extSortAllowed && opts.tempDir.empty()));
1036
1037 switch (opts.limit) {
1038 case 0:
1039 return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
1040 case 1:
1041 return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp);
1042 default:
1043 return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings);
1044 }
1045 }
1046 }
1047