1 /*******************************************************************************
2 * tlx/sort/strings/parallel_sample_sort.hpp
3 *
4 * Parallel Super Scalar String Sample Sort (pS5)
5 *
6 * See also Timo Bingmann, Andreas Eberle, and Peter Sanders. "Engineering
7 * parallel string sorting." Algorithmica 77.1 (2017): 235-286.
8 *
9 * Part of tlx - http://panthema.net/tlx
10 *
11 * Copyright (C) 2013-2019 Timo Bingmann <tb@panthema.net>
12 *
13 * All rights reserved. Published under the Boost Software License, Version 1.0
14 ******************************************************************************/
15
16 #ifndef TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
17 #define TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
18
19 #include <algorithm>
20 #include <atomic>
21 #include <cmath>
22 #include <cstdlib>
23 #include <cstring>
24 #include <random>
25 #include <vector>
26
27 #include <tlx/sort/strings/insertion_sort.hpp>
28 #include <tlx/sort/strings/sample_sort_tools.hpp>
29 #include <tlx/sort/strings/string_ptr.hpp>
30
31 #include <tlx/logger/core.hpp>
32 #include <tlx/math/clz.hpp>
33 #include <tlx/math/ctz.hpp>
34 #include <tlx/meta/enable_if.hpp>
35 #include <tlx/multi_timer.hpp>
36 #include <tlx/simple_vector.hpp>
37 #include <tlx/thread_pool.hpp>
38 #include <tlx/unused.hpp>
39
40 namespace tlx {
41 namespace sort_strings_detail {
42
43 class PS5SortStep;
44
45 /******************************************************************************/
46 //! Parallel Super Scalar String Sample Sort Parameter Struct
47
48 class PS5ParametersDefault
49 {
50 public:
51 static const bool debug_steps = false;
52 static const bool debug_jobs = false;
53
54 static const bool debug_bucket_size = false;
55 static const bool debug_recursion = false;
56 static const bool debug_lcp = false;
57
58 static const bool debug_result = false;
59
60 //! enable/disable various sorting levels
61 static const bool enable_parallel_sample_sort = true;
62 static const bool enable_sequential_sample_sort = true;
63 static const bool enable_sequential_mkqs = true;
64
65 //! terminate sort after first parallel sample sort step
66 static const bool use_only_first_sortstep = false;
67
68 //! enable work freeing
69 static const bool enable_work_sharing = true;
70
71 //! whether the base sequential_threshold() on the remaining unsorted string
72 //! set or on the whole string set.
73 static const bool enable_rest_size = false;
74
75 //! key type for sample sort: 32-bit or 64-bit
76 typedef size_t key_type;
77
78 //! depth of classification tree used in sample sorts
79 static const unsigned TreeBits = 10;
80
81 //! classification tree variant for sample sorts
82 using Classify = SSClassifyTreeCalcUnrollInterleave<key_type, TreeBits>;
83
84 //! threshold to run sequential small sorts
85 static const size_t smallsort_threshold = 1024 * 1024;
86 //! threshold to switch to insertion sort
87 static const size_t inssort_threshold = 32;
88 };
89
90 /******************************************************************************/
91 //! Parallel Super Scalar String Sample Sort Context
92
93 template <typename Parameters>
94 class PS5Context : public Parameters
95 {
96 public:
97 //! total size of input
98 size_t total_size;
99
100 //! number of remaining strings to sort
101 std::atomic<size_t> rest_size;
102
103 //! counters
104 std::atomic<size_t> para_ss_steps, sequ_ss_steps, base_sort_steps;
105
106 //! timers for individual sorting steps
107 MultiTimer mtimer;
108
109 //! number of threads overall
110 size_t num_threads;
111
112 //! thread pool
113 ThreadPool threads_;
114
115 //! context constructor
PS5Context(size_t _thread_num)116 PS5Context(size_t _thread_num)
117 : para_ss_steps(0), sequ_ss_steps(0), base_sort_steps(0),
118 num_threads(_thread_num),
119 threads_(_thread_num)
120 { }
121
122 //! enqueue a new job in the thread pool
123 template <typename StringPtr>
124 void enqueue(PS5SortStep* sstep, const StringPtr& strptr, size_t depth);
125
126 //! return sequential sorting threshold
sequential_threshold()127 size_t sequential_threshold() {
128 size_t threshold = this->smallsort_threshold;
129 if (this->enable_rest_size) {
130 return std::max(threshold, rest_size / num_threads);
131 }
132 else {
133 return std::max(threshold, total_size / num_threads);
134 }
135 }
136
137 //! decrement number of unordered strings
donesize(size_t n)138 void donesize(size_t n) {
139 if (this->enable_rest_size)
140 rest_size -= n;
141 }
142 };
143
144 /******************************************************************************/
145 //! LCP calculation of Splitter Strings
146
147 template <typename KeyType>
148 static inline unsigned char
lcpKeyType(const KeyType & a,const KeyType & b)149 lcpKeyType(const KeyType& a, const KeyType& b) {
150 // XOR both values and count the number of zero bytes
151 return clz(a ^ b) / 8;
152 }
153
154 template <typename KeyType>
155 static inline unsigned char
lcpKeyDepth(const KeyType & a)156 lcpKeyDepth(const KeyType& a) {
157 // count number of non-zero bytes
158 return sizeof(KeyType) - (ctz(a) / 8);
159 }
160
161 //! return the d-th character in the (swapped) key
162 template <typename KeyType>
163 static inline unsigned char
getCharAtDepth(const KeyType & a,unsigned char d)164 getCharAtDepth(const KeyType& a, unsigned char d) {
165 return static_cast<unsigned char>(a >> (8 * (sizeof(KeyType) - 1 - d)));
166 }
167
168 /******************************************************************************/
169 //! PS5SortStep Top-Level Class to Keep Track of Substeps
170
171 class PS5SortStep
172 {
173 private:
174 //! Number of substeps still running
175 std::atomic<size_t> substep_working_;
176
177 //! Pure virtual function called by substep when all substeps are done.
178 virtual void substep_all_done() = 0;
179
180 protected:
PS5SortStep()181 PS5SortStep() : substep_working_(0) { }
182
~PS5SortStep()183 virtual ~PS5SortStep() {
184 assert(substep_working_ == 0);
185 }
186
187 //! Register new substep
substep_add()188 void substep_add() {
189 ++substep_working_;
190 }
191
192 public:
193 //! Notify superstep that the currently substep is done.
substep_notify_done()194 void substep_notify_done() {
195 assert(substep_working_ > 0);
196 if (--substep_working_ == 0)
197 substep_all_done();
198 }
199 };
200
201 /******************************************************************************/
202 //! LCP Calculation for Finished Sample Sort Steps
203
204 template <size_t bktnum, typename Context, typename Classify,
205 typename StringPtr, typename BktSizeType>
ps5_sample_sort_lcp(const Context & ctx,const Classify & classifier,const StringPtr & strptr,size_t depth,const BktSizeType * bkt)206 void ps5_sample_sort_lcp(const Context& ctx, const Classify& classifier,
207 const StringPtr& strptr, size_t depth,
208 const BktSizeType* bkt) {
209 assert(!strptr.flipped());
210
211 const typename StringPtr::StringSet& strset = strptr.active();
212 typedef typename Context::key_type key_type;
213
214 size_t b = 0; // current bucket number
215 key_type prevkey = 0; // previous key
216
217 // the following while loops only check b < bktnum when b is odd,
218 // because bktnum is always odd. We need a goto to jump into the loop,
219 // as b == 0 start even.
220 goto even_first;
221
222 // find first non-empty bucket
223 while (b < bktnum)
224 {
225 // odd bucket: = bkt
226 if (bkt[b] != bkt[b + 1]) {
227 prevkey = classifier.get_splitter(b / 2);
228 assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
229 break;
230 }
231 ++b;
232 even_first:
233 // even bucket: <, << or > bkt
234 if (bkt[b] != bkt[b + 1]) {
235 prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
236 break;
237 }
238 ++b;
239 }
240 ++b;
241
242 // goto depends on whether the first non-empty bucket was odd or
243 // even. the while loop below encodes this in the program counter.
244 if (b < bktnum && b % 2 == 0) goto even_bucket;
245
246 // find next non-empty bucket
247 while (b < bktnum)
248 {
249 // odd bucket: = bkt
250 if (bkt[b] != bkt[b + 1]) {
251 key_type thiskey = classifier.get_splitter(b / 2);
252 assert(thiskey == get_key_at<key_type>(strset, bkt[b], depth));
253
254 int rlcp = lcpKeyType(prevkey, thiskey);
255 strptr.set_lcp(bkt[b], depth + rlcp);
256 // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
257
258 TLX_LOGC(ctx.debug_lcp)
259 << "LCP at odd-bucket " << b
260 << " [" << bkt[b] << "," << bkt[b + 1] << ")"
261 << " is " << depth + rlcp;
262
263 prevkey = thiskey;
264 assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
265 }
266 ++b;
267 even_bucket:
268 // even bucket: <, << or > bkt
269 if (bkt[b] != bkt[b + 1]) {
270 key_type thiskey = get_key_at<key_type>(strset, bkt[b], depth);
271
272 int rlcp = lcpKeyType(prevkey, thiskey);
273 strptr.set_lcp(bkt[b], depth + rlcp);
274 // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
275
276 TLX_LOGC(ctx.debug_lcp)
277 << "LCP at even-bucket " << b
278 << " [" << bkt[b] << "," << bkt[b + 1] << ")"
279 << " is " << depth + rlcp;
280
281 prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
282 }
283 ++b;
284 }
285 }
286
287 /******************************************************************************/
288 //! SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts
289
290 template <typename Context, typename StringPtr, typename BktSizeType>
291 class PS5SmallsortJob : public PS5SortStep
292 {
293 public:
294 Context& ctx_;
295
296 //! parent sort step
297 PS5SortStep* pstep_;
298
299 StringPtr strptr_;
300 size_t depth_;
301 MultiTimer mtimer_;
302
303 typedef typename Context::key_type key_type;
304 typedef typename StringPtr::StringSet StringSet;
305 typedef BktSizeType bktsize_type;
306
PS5SmallsortJob(Context & ctx,PS5SortStep * pstep,const StringPtr & strptr,size_t depth)307 PS5SmallsortJob(Context& ctx, PS5SortStep* pstep,
308 const StringPtr& strptr, size_t depth)
309 : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
310 TLX_LOGC(ctx_.debug_steps)
311 << "enqueue depth=" << depth_
312 << " size=" << strptr_.size() << " flip=" << strptr_.flipped();
313 }
314
~PS5SmallsortJob()315 ~PS5SmallsortJob() {
316 mtimer_.stop();
317 ctx_.mtimer.add(mtimer_);
318 }
319
320 simple_vector<uint8_t> bktcache_;
321 size_t bktcache_size_ = 0;
322
run()323 void run() {
324 mtimer_.start("sequ_ss");
325
326 size_t n = strptr_.size();
327
328 TLX_LOGC(ctx_.debug_jobs)
329 << "Process PS5SmallsortJob " << this << " of size " << n;
330
331 // create anonymous wrapper job
332 this->substep_add();
333
334 if (ctx_.enable_sequential_sample_sort && n >= ctx_.smallsort_threshold)
335 {
336 bktcache_.resize(n * sizeof(uint16_t));
337 sort_sample_sort(strptr_, depth_);
338 }
339 else
340 {
341 mtimer_.start("mkqs");
342 sort_mkqs_cache(strptr_, depth_);
343 }
344
345 // finish wrapper job, handler delete's this
346 this->substep_notify_done();
347 }
348
349 /*------------------------------------------------------------------------*/
350 //! Stack of Recursive Sample Sort Steps
351
352 class SeqSampleSortStep
353 {
354 public:
355 StringPtr strptr_;
356 size_t idx_;
357 size_t depth_;
358
359 using StringSet = typename StringPtr::StringSet;
360 using bktsize_type = BktSizeType;
361
362 typename Context::Classify classifier;
363
364 static const size_t num_splitters = Context::Classify::num_splitters;
365 static const size_t bktnum = 2 * num_splitters + 1;
366
367 unsigned char splitter_lcp[num_splitters + 1];
368 bktsize_type bkt[bktnum + 1];
369
SeqSampleSortStep(Context & ctx,const StringPtr & strptr,size_t depth,uint16_t * bktcache)370 SeqSampleSortStep(Context& ctx, const StringPtr& strptr, size_t depth,
371 uint16_t* bktcache)
372 : strptr_(strptr), idx_(0), depth_(depth) {
373 size_t n = strptr_.size();
374
375 // step 1: select splitters with oversampling
376
377 const size_t oversample_factor = 2;
378 const size_t sample_size = oversample_factor * num_splitters;
379
380 simple_vector<key_type> samples(sample_size);
381
382 const StringSet& strset = strptr_.active();
383
384 std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
385
386 for (size_t i = 0; i < sample_size; ++i)
387 samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
388
389 std::sort(samples.begin(), samples.end());
390
391 classifier.build(samples.data(), sample_size, splitter_lcp);
392
393 // step 2: classify all strings
394
395 classifier.classify(
396 strset, strset.begin(), strset.end(), bktcache, depth_);
397
398 // step 2.5: count bucket sizes
399
400 bktsize_type bktsize[bktnum];
401 memset(bktsize, 0, bktnum * sizeof(bktsize_type));
402
403 for (size_t si = 0; si < n; ++si)
404 ++bktsize[bktcache[si]];
405
406 // step 3: inclusive prefix sum
407
408 bkt[0] = bktsize[0];
409 for (unsigned int i = 1; i < bktnum; ++i) {
410 bkt[i] = bkt[i - 1] + bktsize[i];
411 }
412 assert(bkt[bktnum - 1] == n);
413 bkt[bktnum] = n;
414
415 // step 4: premute out-of-place
416
417 const StringSet& strB = strptr_.active();
418 // get alternative shadow pointer array
419 const StringSet& sorted = strptr_.shadow();
420 typename StringSet::Iterator sbegin = sorted.begin();
421
422 for (typename StringSet::Iterator str = strB.begin();
423 str != strB.end(); ++str, ++bktcache)
424 *(sbegin + --bkt[*bktcache]) = std::move(*str);
425
426 // bkt is afterwards the exclusive prefix sum of bktsize
427
428 // statistics
429
430 ++ctx.sequ_ss_steps;
431 }
432
calculate_lcp(Context & ctx)433 void calculate_lcp(Context& ctx) {
434 TLX_LOGC(ctx.debug_lcp) << "Calculate LCP after sample sort step";
435 if (strptr_.with_lcp) {
436 ps5_sample_sort_lcp<bktnum>(ctx, classifier, strptr_, depth_, bkt);
437 }
438 }
439 };
440
441 size_t ss_front_ = 0;
442 std::vector<SeqSampleSortStep> ss_stack_;
443
sort_sample_sort(const StringPtr & strptr,size_t depth)444 void sort_sample_sort(const StringPtr& strptr, size_t depth) {
445 typedef SeqSampleSortStep Step;
446
447 assert(ss_front_ == 0);
448 assert(ss_stack_.size() == 0);
449
450 uint16_t* bktcache = reinterpret_cast<uint16_t*>(bktcache_.data());
451
452 // sort first level
453 ss_stack_.emplace_back(ctx_, strptr, depth, bktcache);
454
455 // step 5: "recursion"
456
457 while (ss_stack_.size() > ss_front_)
458 {
459 Step& s = ss_stack_.back();
460 size_t i = s.idx_++; // process the bucket s.idx_
461
462 if (i < Step::bktnum)
463 {
464 size_t bktsize = s.bkt[i + 1] - s.bkt[i];
465
466 StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
467
468 // i is even -> bkt[i] is less-than bucket
469 if (i % 2 == 0)
470 {
471 if (bktsize == 0) {
472 // empty bucket
473 }
474 else if (bktsize < ctx_.smallsort_threshold)
475 {
476 assert(i / 2 <= Step::num_splitters);
477 if (i == Step::bktnum - 1)
478 TLX_LOGC(ctx_.debug_recursion)
479 << "Recurse[" << s.depth_ << "]: > bkt "
480 << i << " size " << bktsize << " no lcp";
481 else
482 TLX_LOGC(ctx_.debug_recursion)
483 << "Recurse[" << s.depth_ << "]: < bkt "
484 << i << " size " << bktsize << " lcp "
485 << int(s.splitter_lcp[i / 2] & 0x7F);
486
487 ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
488 sort_mkqs_cache(
489 sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
490 }
491 else
492 {
493 if (i == Step::bktnum - 1)
494 TLX_LOGC(ctx_.debug_recursion)
495 << "Recurse[" << s.depth_ << "]: > bkt "
496 << i << " size " << bktsize << " no lcp";
497 else
498 TLX_LOGC(ctx_.debug_recursion)
499 << "Recurse[" << s.depth_ << "]: < bkt "
500 << i << " size " << bktsize << " lcp "
501 << int(s.splitter_lcp[i / 2] & 0x7F);
502
503 ss_stack_.emplace_back(
504 ctx_, sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F), bktcache);
505 }
506 }
507 // i is odd -> bkt[i] is equal bucket
508 else
509 {
510 if (bktsize == 0) {
511 // empty bucket
512 }
513 else if (s.splitter_lcp[i / 2] & 0x80) {
514 // equal-bucket has nullptr-terminated key, done.
515 TLX_LOGC(ctx_.debug_recursion)
516 << "Recurse[" << s.depth_ << "]: = bkt "
517 << i << " size " << bktsize << " is done!";
518 StringPtr spb = sp.copy_back();
519
520 if (sp.with_lcp) {
521 spb.fill_lcp(
522 s.depth_ + lcpKeyDepth(s.classifier.get_splitter(i / 2)));
523 }
524 ctx_.donesize(bktsize);
525 }
526 else if (bktsize < ctx_.smallsort_threshold)
527 {
528 TLX_LOGC(ctx_.debug_recursion)
529 << "Recurse[" << s.depth_ << "]: = bkt "
530 << i << " size " << bktsize << " lcp keydepth!";
531
532 ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
533 sort_mkqs_cache(sp, s.depth_ + sizeof(key_type));
534 }
535 else
536 {
537 TLX_LOGC(ctx_.debug_recursion)
538 << "Recurse[" << s.depth_ << "]: = bkt "
539 << i << " size " << bktsize << " lcp keydepth!";
540
541 ss_stack_.emplace_back(
542 ctx_, sp, s.depth_ + sizeof(key_type), bktcache);
543 }
544 }
545 }
546 else
547 {
548 // finished sort
549 assert(ss_stack_.size() > ss_front_);
550
551 // after full sort: calculate LCPs at this level
552 ss_stack_.back().calculate_lcp(ctx_);
553
554 ss_stack_.pop_back();
555 }
556
557 if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
558 sample_sort_free_work();
559 }
560 }
561 }
562
sample_sort_free_work()563 void sample_sort_free_work() {
564 assert(ss_stack_.size() >= ss_front_);
565
566 if (ss_stack_.size() == ss_front_) {
567 // ss_stack_ is empty, check other stack
568 return mkqs_free_work();
569 }
570
571 // convert top level of stack into independent jobs
572 TLX_LOGC(ctx_.debug_jobs)
573 << "Freeing top level of PS5SmallsortJob's sample_sort stack";
574
575 typedef SeqSampleSortStep Step;
576 Step& s = ss_stack_[ss_front_];
577
578 while (s.idx_ < Step::bktnum)
579 {
580 size_t i = s.idx_++; // process the bucket s.idx_
581
582 size_t bktsize = s.bkt[i + 1] - s.bkt[i];
583
584 StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
585
586 // i is even -> bkt[i] is less-than bucket
587 if (i % 2 == 0)
588 {
589 if (bktsize == 0) {
590 // empty bucket
591 }
592 else
593 {
594 if (i == Step::bktnum - 1)
595 TLX_LOGC(ctx_.debug_recursion)
596 << "Recurse[" << s.depth_ << "]: > bkt "
597 << i << " size " << bktsize << " no lcp";
598 else
599 TLX_LOGC(ctx_.debug_recursion)
600 << "Recurse[" << s.depth_ << "]: < bkt "
601 << i << " size " << bktsize << " lcp "
602 << int(s.splitter_lcp[i / 2] & 0x7F);
603
604 this->substep_add();
605 ctx_.enqueue(this, sp,
606 s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
607 }
608 }
609 // i is odd -> bkt[i] is equal bucket
610 else
611 {
612 if (bktsize == 0) {
613 // empty bucket
614 }
615 else if (s.splitter_lcp[i / 2] & 0x80) {
616 // equal-bucket has nullptr-terminated key, done.
617 TLX_LOGC(ctx_.debug_recursion)
618 << "Recurse[" << s.depth_ << "]: = bkt "
619 << i << " size " << bktsize << " is done!";
620 StringPtr spb = sp.copy_back();
621
622 if (sp.with_lcp) {
623 spb.fill_lcp(s.depth_ + lcpKeyDepth(
624 s.classifier.get_splitter(i / 2)));
625 }
626 ctx_.donesize(bktsize);
627 }
628 else
629 {
630 TLX_LOGC(ctx_.debug_recursion)
631 << "Recurse[" << s.depth_ << "]: = bkt "
632 << i << " size " << bktsize << " lcp keydepth!";
633
634 this->substep_add();
635 ctx_.enqueue(this, sp, s.depth_ + sizeof(key_type));
636 }
637 }
638 }
639
640 // shorten the current stack
641 ++ss_front_;
642 }
643
644 /*------------------------------------------------------------------------*/
645 //! Stack of Recursive MKQS Steps
646
cmp(const key_type & a,const key_type & b)647 static inline int cmp(const key_type& a, const key_type& b) {
648 return (a > b) ? 1 : (a < b) ? -1 : 0;
649 }
650
651 template <typename Type>
652 static inline size_t
med3(Type * A,size_t i,size_t j,size_t k)653 med3(Type* A, size_t i, size_t j, size_t k) {
654 if (A[i] == A[j]) return i;
655 if (A[k] == A[i] || A[k] == A[j]) return k;
656 if (A[i] < A[j]) {
657 if (A[j] < A[k]) return j;
658 if (A[i] < A[k]) return k;
659 return i;
660 }
661 else {
662 if (A[j] > A[k]) return j;
663 if (A[i] < A[k]) return i;
664 return k;
665 }
666 }
667
668 //! Insertion sort the strings only based on the cached characters.
669 static inline void
insertion_sort_cache_block(const StringPtr & strptr,key_type * cache)670 insertion_sort_cache_block(const StringPtr& strptr, key_type* cache) {
671 const StringSet& strings = strptr.active();
672 size_t n = strptr.size();
673 size_t pi, pj;
674 for (pi = 1; --n > 0; ++pi) {
675 typename StringSet::String tmps = std::move(strings.at(pi));
676 key_type tmpc = cache[pi];
677 for (pj = pi; pj > 0; --pj) {
678 if (cache[pj - 1] <= tmpc)
679 break;
680 strings.at(pj) = std::move(strings.at(pj - 1));
681 cache[pj] = cache[pj - 1];
682 }
683 strings.at(pj) = std::move(tmps);
684 cache[pj] = tmpc;
685 }
686 }
687
688 //! Insertion sort, but use cached characters if possible.
689 template <bool CacheDirty>
690 static inline void
insertion_sort_cache(const StringPtr & _strptr,key_type * cache,size_t depth)691 insertion_sort_cache(const StringPtr& _strptr, key_type* cache, size_t depth) {
692 StringPtr strptr = _strptr.copy_back();
693
694 if (strptr.size() <= 1) return;
695 if (CacheDirty)
696 return insertion_sort(strptr, depth, /* memory */ 0);
697
698 insertion_sort_cache_block(strptr, cache);
699
700 size_t start = 0, bktsize = 1;
701 for (size_t i = 0; i < strptr.size() - 1; ++i) {
702 // group areas with equal cache values
703 if (cache[i] == cache[i + 1]) {
704 ++bktsize;
705 continue;
706 }
707 // calculate LCP between group areas
708 if (start != 0 && strptr.with_lcp) {
709 int rlcp = lcpKeyType(cache[start - 1], cache[start]);
710 strptr.set_lcp(start, depth + rlcp);
711 // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
712 }
713 // sort group areas deeper if needed
714 if (bktsize > 1) {
715 if (cache[start] & 0xFF) {
716 // need deeper sort
717 insertion_sort(
718 strptr.sub(start, bktsize), depth + sizeof(key_type),
719 /* memory */ 0);
720 }
721 else {
722 // cache contains nullptr-termination
723 strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
724 }
725 }
726 bktsize = 1;
727 start = i + 1;
728 }
729 // tail of loop for last item
730 if (start != 0 && strptr.with_lcp) {
731 int rlcp = lcpKeyType(cache[start - 1], cache[start]);
732 strptr.set_lcp(start, depth + rlcp);
733 // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
734 }
735 if (bktsize > 1) {
736 if (cache[start] & 0xFF) {
737 // need deeper sort
738 insertion_sort(
739 strptr.sub(start, bktsize), depth + sizeof(key_type),
740 /* memory */ 0);
741 }
742 else {
743 // cache contains nullptr-termination
744 strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
745 }
746 }
747 }
748
749 class MKQSStep
750 {
751 public:
752 StringPtr strptr_;
753 key_type* cache_;
754 size_t num_lt_, num_eq_, num_gt_, depth_;
755 size_t idx_;
756 unsigned char eq_recurse_;
757 // typename StringPtr::StringSet::Char dchar_eq_, dchar_gt_;
758 uint8_t lcp_lt_, lcp_eq_, lcp_gt_;
759
MKQSStep(Context & ctx,const StringPtr & strptr,key_type * cache,size_t depth,bool CacheDirty)760 MKQSStep(Context& ctx, const StringPtr& strptr,
761 key_type* cache, size_t depth, bool CacheDirty)
762 : strptr_(strptr), cache_(cache), depth_(depth), idx_(0) {
763 size_t n = strptr_.size();
764
765 const StringSet& strset = strptr_.active();
766
767 if (CacheDirty) {
768 typename StringSet::Iterator it = strset.begin();
769 for (size_t i = 0; i < n; ++i, ++it) {
770 cache_[i] = get_key<key_type>(strset, *it, depth);
771 }
772 }
773 // select median of 9
774 size_t p = med3(
775 cache_,
776 med3(cache_, 0, n / 8, n / 4),
777 med3(cache_, n / 2 - n / 8, n / 2, n / 2 + n / 8),
778 med3(cache_, n - 1 - n / 4, n - 1 - n / 8, n - 3));
779 // swap pivot to first position
780 std::swap(strset.at(0), strset.at(p));
781 std::swap(cache_[0], cache_[p]);
782 // save the pivot value
783 key_type pivot = cache_[0];
784 // for immediate LCP calculation
785 key_type max_lt = 0, min_gt = std::numeric_limits<key_type>::max();
786
787 // indexes into array:
788 // 0 [pivot] 1 [===] leq [<<<] llt [???] rgt [>>>] req [===] n-1
789 size_t leq = 1, llt = 1, rgt = n - 1, req = n - 1;
790 while (true)
791 {
792 while (llt <= rgt)
793 {
794 int r = cmp(cache[llt], pivot);
795 if (r > 0) {
796 min_gt = std::min(min_gt, cache[llt]);
797 break;
798 }
799 else if (r == 0) {
800 std::swap(strset.at(leq), strset.at(llt));
801 std::swap(cache[leq], cache[llt]);
802 leq++;
803 }
804 else {
805 max_lt = std::max(max_lt, cache[llt]);
806 }
807 ++llt;
808 }
809 while (llt <= rgt)
810 {
811 int r = cmp(cache[rgt], pivot);
812 if (r < 0) {
813 max_lt = std::max(max_lt, cache[rgt]);
814 break;
815 }
816 else if (r == 0) {
817 std::swap(strset.at(req), strset.at(rgt));
818 std::swap(cache[req], cache[rgt]);
819 req--;
820 }
821 else {
822 min_gt = std::min(min_gt, cache[rgt]);
823 }
824 --rgt;
825 }
826 if (llt > rgt)
827 break;
828 std::swap(strset.at(llt), strset.at(rgt));
829 std::swap(cache[llt], cache[rgt]);
830 ++llt;
831 --rgt;
832 }
833 // calculate size of areas = < and >, save into struct
834 size_t num_leq = leq, num_req = n - 1 - req;
835 num_eq_ = num_leq + num_req;
836 num_lt_ = llt - leq;
837 num_gt_ = req - rgt;
838 assert(num_eq_ > 0);
839 assert(num_lt_ + num_eq_ + num_gt_ == n);
840
841 // swap equal values from left to center
842 const size_t size1 = std::min(num_leq, num_lt_);
843 std::swap_ranges(strset.begin(), strset.begin() + size1,
844 strset.begin() + llt - size1);
845 std::swap_ranges(cache, cache + size1, cache + llt - size1);
846
847 // swap equal values from right to center
848 const size_t size2 = std::min(num_req, num_gt_);
849 std::swap_ranges(strset.begin() + llt, strset.begin() + llt + size2,
850 strset.begin() + n - size2);
851 std::swap_ranges(cache + llt, cache + llt + size2,
852 cache + n - size2);
853
854 // No recursive sorting if pivot has a zero byte
855 eq_recurse_ = (pivot & 0xFF);
856
857 // save LCP values for writing into LCP array after sorting further
858 if (strptr_.with_lcp && num_lt_ > 0) {
859 assert(max_lt == *std::max_element(
860 cache_ + 0, cache + num_lt_));
861
862 lcp_lt_ = lcpKeyType(max_lt, pivot);
863 // dchar_eq_ = getCharAtDepth(pivot, lcp_lt_);
864 TLX_LOGC(ctx.debug_lcp) << "LCP lt with pivot: " << depth_ + lcp_lt_;
865 }
866
867 // calculate equal area lcp: +1 for the equal zero termination byte
868 lcp_eq_ = lcpKeyDepth(pivot);
869
870 if (strptr_.with_lcp && num_gt_ > 0) {
871 assert(min_gt == *std::min_element(
872 cache_ + num_lt_ + num_eq_, cache_ + n));
873
874 lcp_gt_ = lcpKeyType(pivot, min_gt);
875 // dchar_gt_ = getCharAtDepth(min_gt, lcp_gt_);
876 TLX_LOGC(ctx.debug_lcp) << "LCP pivot with gt: " << depth_ + lcp_gt_;
877 }
878
879 ++ctx.base_sort_steps;
880 }
881
calculate_lcp()882 void calculate_lcp() {
883 if (strptr_.with_lcp && num_lt_ > 0) {
884 strptr_.set_lcp(num_lt_, depth_ + lcp_lt_);
885 // strptr_.set_cache(num_lt_, dchar_eq_);
886 }
887
888 if (strptr_.with_lcp && num_gt_ > 0) {
889 strptr_.set_lcp(num_lt_ + num_eq_, depth_ + lcp_gt_);
890 // strptr_.set_cache(num_lt_ + num_eq_, dchar_gt_);
891 }
892 }
893 };
894
895 size_t ms_front_ = 0;
896 std::vector<MKQSStep> ms_stack_;
897
sort_mkqs_cache(const StringPtr & strptr,size_t depth)898 void sort_mkqs_cache(const StringPtr& strptr, size_t depth) {
899 assert(strcmp(mtimer_.running(), "mkqs") == 0);
900
901 if (!ctx_.enable_sequential_mkqs ||
902 strptr.size() < ctx_.inssort_threshold) {
903 TLX_LOGC(ctx_.debug_jobs)
904 << "insertion_sort() size "
905 << strptr.size() << " depth " << depth;
906
907 ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
908 insertion_sort(strptr.copy_back(), depth, /* memory */ 0);
909 ctx_.donesize(strptr.size());
910 return;
911 }
912
913 TLX_LOGC(ctx_.debug_jobs)
914 << "sort_mkqs_cache() size " << strptr.size() << " depth " << depth;
915
916 if (bktcache_.size() < strptr.size() * sizeof(key_type)) {
917 bktcache_.destroy();
918 bktcache_.resize(strptr.size() * sizeof(key_type));
919 }
920
921 // reuse bktcache as keycache
922 key_type* cache = reinterpret_cast<key_type*>(bktcache_.data());
923
924 assert(ms_front_ == 0);
925 assert(ms_stack_.size() == 0);
926
927 // std::deque is much slower than std::vector, so we use an artificial
928 // pop_front variable.
929 ms_stack_.emplace_back(ctx_, strptr, cache, depth, true);
930
931 while (ms_stack_.size() > ms_front_)
932 {
933 MKQSStep& ms = ms_stack_.back();
934 ++ms.idx_; // increment here, because stack may change
935
936 // process the lt-subsequence
937 if (ms.idx_ == 1) {
938 if (ms.num_lt_ == 0) {
939 // empty subsequence
940 }
941 else if (ms.num_lt_ < ctx_.inssort_threshold) {
942 ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
943 insertion_sort_cache<false>(ms.strptr_.sub(0, ms.num_lt_),
944 ms.cache_, ms.depth_);
945 ctx_.donesize(ms.num_lt_);
946 }
947 else {
948 ms_stack_.emplace_back(
949 ctx_,
950 ms.strptr_.sub(0, ms.num_lt_),
951 ms.cache_, ms.depth_, false);
952 }
953 }
954 // process the eq-subsequence
955 else if (ms.idx_ == 2) {
956 StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
957
958 assert(ms.num_eq_ > 0);
959
960 if (!ms.eq_recurse_) {
961 StringPtr spb = sp.copy_back();
962 spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
963 ctx_.donesize(spb.size());
964 }
965 else if (ms.num_eq_ < ctx_.inssort_threshold) {
966 ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
967 insertion_sort_cache<true>(sp, ms.cache_ + ms.num_lt_,
968 ms.depth_ + sizeof(key_type));
969 ctx_.donesize(ms.num_eq_);
970 }
971 else {
972 ms_stack_.emplace_back(
973 ctx_, sp,
974 ms.cache_ + ms.num_lt_,
975 ms.depth_ + sizeof(key_type), true);
976 }
977 }
978 // process the gt-subsequence
979 else if (ms.idx_ == 3) {
980 StringPtr sp = ms.strptr_.sub(
981 ms.num_lt_ + ms.num_eq_, ms.num_gt_);
982
983 if (ms.num_gt_ == 0) {
984 // empty subsequence
985 }
986 else if (ms.num_gt_ < ctx_.inssort_threshold) {
987 ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
988 insertion_sort_cache<false>(
989 sp, ms.cache_ + ms.num_lt_ + ms.num_eq_, ms.depth_);
990 ctx_.donesize(ms.num_gt_);
991 }
992 else {
993 ms_stack_.emplace_back(
994 ctx_, sp,
995 ms.cache_ + ms.num_lt_ + ms.num_eq_,
996 ms.depth_, false);
997 }
998 }
999 // calculate lcps
1000 else {
1001 // finished sort
1002 assert(ms_stack_.size() > ms_front_);
1003
1004 // calculate LCP after the three parts are sorted
1005 ms_stack_.back().calculate_lcp();
1006
1007 ms_stack_.pop_back();
1008 }
1009
1010 if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
1011 sample_sort_free_work();
1012 }
1013 }
1014 }
1015
mkqs_free_work()1016 void mkqs_free_work() {
1017 assert(ms_stack_.size() >= ms_front_);
1018
1019 for (unsigned int fl = 0; fl < 8; ++fl)
1020 {
1021 if (ms_stack_.size() == ms_front_) {
1022 return;
1023 }
1024
1025 TLX_LOGC(ctx_.debug_jobs)
1026 << "Freeing top level of PS5SmallsortJob's mkqs stack - size "
1027 << ms_stack_.size();
1028
1029 // convert top level of stack into independent jobs
1030
1031 MKQSStep& ms = ms_stack_[ms_front_];
1032
1033 if (ms.idx_ == 0 && ms.num_lt_ != 0)
1034 {
1035 this->substep_add();
1036 ctx_.enqueue(this, ms.strptr_.sub(0, ms.num_lt_), ms.depth_);
1037 }
1038 if (ms.idx_ <= 1) // st.num_eq > 0 always
1039 {
1040 assert(ms.num_eq_ > 0);
1041
1042 StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
1043
1044 if (ms.eq_recurse_) {
1045 this->substep_add();
1046 ctx_.enqueue(this, sp, ms.depth_ + sizeof(key_type));
1047 }
1048 else {
1049 StringPtr spb = sp.copy_back();
1050 spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
1051 ctx_.donesize(ms.num_eq_);
1052 }
1053 }
1054 if (ms.idx_ <= 2 && ms.num_gt_ != 0)
1055 {
1056 this->substep_add();
1057 ctx_.enqueue(
1058 this, ms.strptr_.sub(ms.num_lt_ + ms.num_eq_, ms.num_gt_),
1059 ms.depth_);
1060 }
1061
1062 // shorten the current stack
1063 ++ms_front_;
1064 }
1065 }
1066
1067 /*------------------------------------------------------------------------*/
1068 // Called When PS5SmallsortJob is Finished
1069
substep_all_done()1070 void substep_all_done() final {
1071 TLX_LOGC(ctx_.debug_recursion)
1072 << "SmallSort[" << depth_ << "] "
1073 << "all substeps done -> LCP calculation";
1074
1075 while (ms_front_ > 0) {
1076 TLX_LOGC(ctx_.debug_lcp)
1077 << "SmallSort[" << depth_ << "] ms_front_: " << ms_front_;
1078 ms_stack_[--ms_front_].calculate_lcp();
1079 }
1080
1081 while (ss_front_ > 0) {
1082 TLX_LOGC(ctx_.debug_lcp)
1083 << "SmallSort[" << depth_ << "] ss_front_: " << ss_front_;
1084 ss_stack_[--ss_front_].calculate_lcp(ctx_);
1085 }
1086
1087 if (pstep_) pstep_->substep_notify_done();
1088 delete this;
1089 }
1090 };
1091
1092 /******************************************************************************/
1093 //! PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs
1094
1095 template <typename Context, typename StringPtr>
1096 class PS5BigSortStep : public PS5SortStep
1097 {
1098 public:
1099 typedef typename StringPtr::StringSet StringSet;
1100 typedef typename StringSet::Iterator StrIterator;
1101 typedef typename Context::key_type key_type;
1102
1103 //! context
1104 Context& ctx_;
1105 //! parent sort step for notification
1106 PS5SortStep* pstep_;
1107
1108 //! string pointers, size, and current sorting depth
1109 StringPtr strptr_;
1110 size_t depth_;
1111
1112 //! number of parts into which the strings were split
1113 size_t parts_;
1114 //! size of all parts except the last
1115 size_t psize_;
1116 //! number of threads still working
1117 std::atomic<size_t> pwork_;
1118
1119 //! classifier instance and variables (contains splitter tree
1120 typename Context::Classify classifier_;
1121
1122 static const size_t treebits_ = Context::Classify::treebits;
1123 static const size_t num_splitters_ = Context::Classify::num_splitters;
1124 static const size_t bktnum_ = 2 * num_splitters_ + 1;
1125
1126 //! LCPs of splitters, needed for recursive calls
1127 unsigned char splitter_lcp_[num_splitters_ + 1];
1128
1129 //! individual bucket array of threads, keep bkt[0] for DistributeJob
1130 simple_vector<simple_vector<size_t> > bkt_;
1131 //! bucket ids cache, created by classifier and later counted
1132 simple_vector<simple_vector<uint16_t> > bktcache_;
1133
1134 /*------------------------------------------------------------------------*/
1135 // Constructor
1136
PS5BigSortStep(Context & ctx,PS5SortStep * pstep,const StringPtr & strptr,size_t depth)1137 PS5BigSortStep(Context& ctx, PS5SortStep* pstep,
1138 const StringPtr& strptr, size_t depth)
1139 : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
1140 // calculate number of parts
1141 parts_ = strptr_.size() / ctx.sequential_threshold() * 2;
1142 if (parts_ == 0) parts_ = 1;
1143
1144 bkt_.resize(parts_);
1145 bktcache_.resize(parts_);
1146
1147 psize_ = (strptr.size() + parts_ - 1) / parts_;
1148
1149 TLX_LOGC(ctx_.debug_steps)
1150 << "enqueue depth=" << depth_
1151 << " size=" << strptr_.size()
1152 << " parts=" << parts_
1153 << " psize=" << psize_
1154 << " flip=" << strptr_.flipped();
1155
1156 ctx.threads_.enqueue([this]() { sample(); });
1157 ++ctx.para_ss_steps;
1158 }
1159
~PS5BigSortStep()1160 virtual ~PS5BigSortStep() { }
1161
1162 /*------------------------------------------------------------------------*/
1163 // Sample Step
1164
sample()1165 void sample() {
1166 ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1167 TLX_LOGC(ctx_.debug_jobs) << "Process SampleJob @ " << this;
1168
1169 const size_t oversample_factor = 2;
1170 size_t sample_size = oversample_factor * num_splitters_;
1171
1172 const StringSet& strset = strptr_.active();
1173 size_t n = strset.size();
1174
1175 simple_vector<key_type> samples(sample_size);
1176
1177 std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
1178
1179 for (size_t i = 0; i < sample_size; ++i)
1180 samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
1181
1182 std::sort(samples.begin(), samples.end());
1183
1184 classifier_.build(samples.data(), sample_size, splitter_lcp_);
1185
1186 // create new jobs
1187 pwork_ = parts_;
1188 for (unsigned int p = 0; p < parts_; ++p) {
1189 ctx_.threads_.enqueue([this, p]() { count(p); });
1190 }
1191 }
1192
1193 /*------------------------------------------------------------------------*/
1194 // Counting Step
1195
count(unsigned int p)1196 void count(unsigned int p) {
1197 ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1198 TLX_LOGC(ctx_.debug_jobs) << "Process CountJob " << p << " @ " << this;
1199
1200 const StringSet& strset = strptr_.active();
1201
1202 StrIterator strB = strset.begin() + p * psize_;
1203 StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1204 if (strE < strB) strE = strB;
1205
1206 bktcache_[p].resize(strE - strB);
1207 uint16_t* bktcache = bktcache_[p].data();
1208 classifier_.classify(strset, strB, strE, bktcache, depth_);
1209
1210 bkt_[p].resize(bktnum_ + (p == 0 ? 1 : 0));
1211 size_t* bkt = bkt_[p].data();
1212 memset(bkt, 0, bktnum_ * sizeof(size_t));
1213
1214 for (uint16_t* bc = bktcache; bc != bktcache + (strE - strB); ++bc)
1215 ++bkt[*bc];
1216
1217 if (--pwork_ == 0)
1218 count_finished();
1219 }
1220
count_finished()1221 void count_finished() {
1222 ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1223 TLX_LOGC(ctx_.debug_jobs) << "Finishing CountJob " << this << " with prefixsum";
1224
1225 // abort sorting if we're measuring only the top level
1226 if (ctx_.use_only_first_sortstep)
1227 return;
1228
1229 // inclusive prefix sum over bkt
1230 size_t sum = 0;
1231 for (unsigned int i = 0; i < bktnum_; ++i) {
1232 for (unsigned int p = 0; p < parts_; ++p) {
1233 bkt_[p][i] = (sum += bkt_[p][i]);
1234 }
1235 }
1236 assert(sum == strptr_.size());
1237
1238 // create new jobs
1239 pwork_ = parts_;
1240 for (unsigned int p = 0; p < parts_; ++p) {
1241 ctx_.threads_.enqueue([this, p]() { distribute(p); });
1242 }
1243 }
1244
1245 /*------------------------------------------------------------------------*/
1246 // Distribute Step
1247
distribute(unsigned int p)1248 void distribute(unsigned int p) {
1249 ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1250 TLX_LOGC(ctx_.debug_jobs) << "Process DistributeJob " << p << " @ " << this;
1251
1252 const StringSet& strset = strptr_.active();
1253
1254 StrIterator strB = strset.begin() + p * psize_;
1255 StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1256 if (strE < strB) strE = strB;
1257
1258 // get alternative shadow pointer array
1259 const StringSet& sorted = strptr_.shadow();
1260 typename StringSet::Iterator sbegin = sorted.begin();
1261
1262 uint16_t* bktcache = bktcache_[p].data();
1263 size_t* bkt = bkt_[p].data();
1264
1265 for (StrIterator str = strB; str != strE; ++str, ++bktcache)
1266 *(sbegin + --bkt[*bktcache]) = std::move(*str);
1267
1268 if (p != 0) // p = 0 is needed for recursion into bkts
1269 bkt_[p].destroy();
1270
1271 bktcache_[p].destroy();
1272
1273 if (--pwork_ == 0)
1274 distribute_finished();
1275 }
1276
distribute_finished()1277 void distribute_finished() {
1278 TLX_LOGC(ctx_.debug_jobs)
1279 << "Finishing DistributeJob " << this << " with enqueuing subjobs";
1280
1281 size_t* bkt = bkt_[0].data();
1282 assert(bkt);
1283
1284 // first processor's bkt pointers are boundaries between bkts, just add sentinel:
1285 assert(bkt[0] == 0);
1286 bkt[bktnum_] = strptr_.size();
1287
1288 // keep anonymous subjob handle while creating subjobs
1289 this->substep_add();
1290
1291 size_t i = 0;
1292 while (i < bktnum_ - 1)
1293 {
1294 // i is even -> bkt[i] is less-than bucket
1295 size_t bktsize = bkt[i + 1] - bkt[i];
1296 if (bktsize == 0) {
1297 // empty bucket
1298 }
1299 else if (bktsize == 1) { // just one string pointer, copyback
1300 strptr_.flip(bkt[i], 1).copy_back();
1301 ctx_.donesize(1);
1302 }
1303 else
1304 {
1305 TLX_LOGC(ctx_.debug_recursion)
1306 << "Recurse[" << depth_ << "]: < bkt " << bkt[i]
1307 << " size " << bktsize << " lcp "
1308 << int(splitter_lcp_[i / 2] & 0x7F);
1309 this->substep_add();
1310 ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1311 depth_ + (splitter_lcp_[i / 2] & 0x7F));
1312 }
1313 ++i;
1314 // i is odd -> bkt[i] is equal bucket
1315 bktsize = bkt[i + 1] - bkt[i];
1316 if (bktsize == 0) {
1317 // empty bucket
1318 }
1319 else if (bktsize == 1) { // just one string pointer, copyback
1320 strptr_.flip(bkt[i], 1).copy_back();
1321 ctx_.donesize(1);
1322 }
1323 else
1324 {
1325 if (splitter_lcp_[i / 2] & 0x80) {
1326 // equal-bucket has nullptr-terminated key, done.
1327 TLX_LOGC(ctx_.debug_recursion)
1328 << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1329 << " size " << bktsize << " is done!";
1330 StringPtr sp = strptr_.flip(bkt[i], bktsize).copy_back();
1331 sp.fill_lcp(
1332 depth_ + lcpKeyDepth(classifier_.get_splitter(i / 2)));
1333 ctx_.donesize(bktsize);
1334 }
1335 else {
1336 TLX_LOGC(ctx_.debug_recursion)
1337 << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1338 << " size " << bktsize << " lcp keydepth!";
1339 this->substep_add();
1340 ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1341 depth_ + sizeof(key_type));
1342 }
1343 }
1344 ++i;
1345 }
1346
1347 size_t bktsize = bkt[i + 1] - bkt[i];
1348
1349 if (bktsize == 0) {
1350 // empty bucket
1351 }
1352 else if (bktsize == 1) { // just one string pointer, copyback
1353 strptr_.flip(bkt[i], 1).copy_back();
1354 ctx_.donesize(1);
1355 }
1356 else {
1357 TLX_LOGC(ctx_.debug_recursion)
1358 << "Recurse[" << depth_ << "]: > bkt " << bkt[i]
1359 << " size " << bktsize << " no lcp";
1360 this->substep_add();
1361 ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize), depth_);
1362 }
1363
1364 this->substep_notify_done(); // release anonymous subjob handle
1365
1366 if (!strptr_.with_lcp)
1367 bkt_[0].destroy();
1368 }
1369
1370 /*------------------------------------------------------------------------*/
1371 // After Recursive Sorting
1372
substep_all_done()1373 void substep_all_done() final {
1374 ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1375 if (strptr_.with_lcp) {
1376 TLX_LOGC(ctx_.debug_steps)
1377 << "pSampleSortStep[" << depth_ << "]: all substeps done.";
1378
1379 ps5_sample_sort_lcp<bktnum_>(
1380 ctx_, classifier_, strptr_, depth_, bkt_[0].data());
1381 bkt_[0].destroy();
1382 }
1383
1384 if (pstep_) pstep_->substep_notify_done();
1385 delete this;
1386 }
1387 };
1388
1389 /******************************************************************************/
1390 // PS5Context::enqueue()
1391
1392 template <typename Parameters>
1393 template <typename StringPtr>
enqueue(PS5SortStep * pstep,const StringPtr & strptr,size_t depth)1394 void PS5Context<Parameters>::enqueue(
1395 PS5SortStep* pstep, const StringPtr& strptr, size_t depth) {
1396 if (this->enable_parallel_sample_sort &&
1397 (strptr.size() > sequential_threshold() ||
1398 this->use_only_first_sortstep)) {
1399 new PS5BigSortStep<PS5Context, StringPtr>(*this, pstep, strptr, depth);
1400 }
1401 else {
1402 if (strptr.size() < (1LLU << 32)) {
1403 auto j = new PS5SmallsortJob<PS5Context, StringPtr, uint32_t>(
1404 *this, pstep, strptr, depth);
1405 threads_.enqueue([j]() { j->run(); });
1406 }
1407 else {
1408 auto j = new PS5SmallsortJob<PS5Context, StringPtr, uint64_t>(
1409 *this, pstep, strptr, depth);
1410 threads_.enqueue([j]() { j->run(); });
1411 }
1412 }
1413 }
1414
1415 /******************************************************************************/
1416 // Externally Callable Sorting Methods
1417
1418 //! Main Parallel Sample Sort Function. See below for more convenient wrappers.
1419 template <typename PS5Parameters, typename StringPtr>
parallel_sample_sort_base(const StringPtr & strptr,size_t depth)1420 void parallel_sample_sort_base(const StringPtr& strptr, size_t depth) {
1421
1422 using Context = PS5Context<PS5Parameters>;
1423 Context ctx(std::thread::hardware_concurrency());
1424 ctx.total_size = strptr.size();
1425 ctx.rest_size = strptr.size();
1426 ctx.num_threads = ctx.threads_.size();
1427
1428 MultiTimer timer;
1429 timer.start("sort");
1430
1431 ctx.enqueue(/* pstep */ nullptr, strptr, depth);
1432 ctx.threads_.loop_until_empty();
1433
1434 timer.stop();
1435
1436 assert(!ctx.enable_rest_size || ctx.rest_size == 0);
1437
1438 using BigSortStep = PS5BigSortStep<Context, StringPtr>;
1439
1440 TLX_LOGC(ctx.debug_result)
1441 << "RESULT"
1442 << " sizeof(key_type)=" << sizeof(typename PS5Parameters::key_type)
1443 << " splitter_treebits=" << size_t(BigSortStep::treebits_)
1444 << " num_splitters=" << size_t(BigSortStep::num_splitters_)
1445 << " num_threads=" << ctx.num_threads
1446 << " enable_work_sharing=" << size_t(ctx.enable_work_sharing)
1447 << " use_restsize=" << size_t(ctx.enable_rest_size)
1448 << " tm_para_ss=" << ctx.mtimer.get("para_ss")
1449 << " tm_seq_ss=" << ctx.mtimer.get("sequ_ss")
1450 << " tm_mkqs=" << ctx.mtimer.get("mkqs")
1451 << " tm_inssort=" << ctx.mtimer.get("inssort")
1452 << " tm_total=" << ctx.mtimer.total()
1453 << " tm_idle="
1454 << (ctx.num_threads * timer.total()) - ctx.mtimer.total()
1455 << " steps_para_sample_sort=" << ctx.para_ss_steps
1456 << " steps_seq_sample_sort=" << ctx.sequ_ss_steps
1457 << " steps_base_sort=" << ctx.base_sort_steps;
1458 }
1459
1460 //! Parallel Sample Sort Function for a generic StringSet, this allocates the
1461 //! shadow array for flipping.
1462 template <typename PS5Parameters, typename StringPtr>
1463 typename enable_if<!StringPtr::with_lcp, void>::type
parallel_sample_sort_params(const StringPtr & strptr,size_t depth,size_t memory=0)1464 parallel_sample_sort_params(
1465 const StringPtr& strptr, size_t depth, size_t memory = 0) {
1466 tlx::unused(memory);
1467
1468 typedef typename StringPtr::StringSet StringSet;
1469 const StringSet& strset = strptr.active();
1470
1471 typedef StringShadowPtr<StringSet> StringShadowPtr;
1472 typedef typename StringSet::Container Container;
1473
1474 // allocate shadow pointer array
1475 Container shadow = strset.allocate(strset.size());
1476 StringShadowPtr new_strptr(strset, StringSet(shadow));
1477
1478 parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1479
1480 StringSet::deallocate(shadow);
1481 }
1482
1483 //! Parallel Sample Sort Function for a generic StringSet with LCPs, this
1484 //! allocates the shadow array for flipping.
1485 template <typename PS5Parameters, typename StringPtr>
1486 typename enable_if<StringPtr::with_lcp, void>::type
parallel_sample_sort_params(const StringPtr & strptr,size_t depth,size_t memory=0)1487 parallel_sample_sort_params(
1488 const StringPtr& strptr, size_t depth, size_t memory = 0) {
1489 tlx::unused(memory);
1490
1491 typedef typename StringPtr::StringSet StringSet;
1492 typedef typename StringPtr::LcpType LcpType;
1493 const StringSet& strset = strptr.active();
1494
1495 typedef StringShadowLcpPtr<StringSet, LcpType> StringShadowLcpPtr;
1496 typedef typename StringSet::Container Container;
1497
1498 // allocate shadow pointer array
1499 Container shadow = strset.allocate(strset.size());
1500 StringShadowLcpPtr new_strptr(strset, StringSet(shadow), strptr.lcp());
1501
1502 parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1503
1504 StringSet::deallocate(shadow);
1505 }
1506
1507 //! Parallel Sample Sort Function with default parameter size for a generic
1508 //! StringSet.
1509 template <typename StringPtr>
parallel_sample_sort(const StringPtr & strptr,size_t depth,size_t memory)1510 void parallel_sample_sort(
1511 const StringPtr& strptr, size_t depth, size_t memory) {
1512 return parallel_sample_sort_params<PS5ParametersDefault>(
1513 strptr, depth, memory);
1514 }
1515
1516 } // namespace sort_strings_detail
1517 } // namespace tlx
1518
1519 #endif // !TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
1520
1521 /******************************************************************************/
1522