1 /*******************************************************************************
2  * tlx/sort/strings/parallel_sample_sort.hpp
3  *
4  * Parallel Super Scalar String Sample Sort (pS5)
5  *
6  * See also Timo Bingmann, Andreas Eberle, and Peter Sanders. "Engineering
7  * parallel string sorting." Algorithmica 77.1 (2017): 235-286.
8  *
9  * Part of tlx - http://panthema.net/tlx
10  *
11  * Copyright (C) 2013-2019 Timo Bingmann <tb@panthema.net>
12  *
13  * All rights reserved. Published under the Boost Software License, Version 1.0
14  ******************************************************************************/
15 
16 #ifndef TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
17 #define TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <cmath>
22 #include <cstdlib>
23 #include <cstring>
24 #include <random>
25 #include <vector>
26 
27 #include <tlx/sort/strings/insertion_sort.hpp>
28 #include <tlx/sort/strings/sample_sort_tools.hpp>
29 #include <tlx/sort/strings/string_ptr.hpp>
30 
31 #include <tlx/logger/core.hpp>
32 #include <tlx/math/clz.hpp>
33 #include <tlx/math/ctz.hpp>
34 #include <tlx/meta/enable_if.hpp>
35 #include <tlx/multi_timer.hpp>
36 #include <tlx/simple_vector.hpp>
37 #include <tlx/thread_pool.hpp>
38 #include <tlx/unused.hpp>
39 
40 namespace tlx {
41 namespace sort_strings_detail {
42 
43 class PS5SortStep;
44 
45 /******************************************************************************/
46 //! Parallel Super Scalar String Sample Sort Parameter Struct
47 
48 class PS5ParametersDefault
49 {
50 public:
51     static const bool debug_steps = false;
52     static const bool debug_jobs = false;
53 
54     static const bool debug_bucket_size = false;
55     static const bool debug_recursion = false;
56     static const bool debug_lcp = false;
57 
58     static const bool debug_result = false;
59 
60     //! enable/disable various sorting levels
61     static const bool enable_parallel_sample_sort = true;
62     static const bool enable_sequential_sample_sort = true;
63     static const bool enable_sequential_mkqs = true;
64 
65     //! terminate sort after first parallel sample sort step
66     static const bool use_only_first_sortstep = false;
67 
68     //! enable work freeing
69     static const bool enable_work_sharing = true;
70 
71     //! whether the base sequential_threshold() on the remaining unsorted string
72     //! set or on the whole string set.
73     static const bool enable_rest_size = false;
74 
75     //! key type for sample sort: 32-bit or 64-bit
76     typedef size_t key_type;
77 
78     //! depth of classification tree used in sample sorts
79     static const unsigned TreeBits = 10;
80 
81     //! classification tree variant for sample sorts
82     using Classify = SSClassifyTreeCalcUnrollInterleave<key_type, TreeBits>;
83 
84     //! threshold to run sequential small sorts
85     static const size_t smallsort_threshold = 1024 * 1024;
86     //! threshold to switch to insertion sort
87     static const size_t inssort_threshold = 32;
88 };
89 
90 /******************************************************************************/
91 //! Parallel Super Scalar String Sample Sort Context
92 
93 template <typename Parameters>
94 class PS5Context : public Parameters
95 {
96 public:
97     //! total size of input
98     size_t total_size;
99 
100     //! number of remaining strings to sort
101     std::atomic<size_t> rest_size;
102 
103     //! counters
104     std::atomic<size_t> para_ss_steps, sequ_ss_steps, base_sort_steps;
105 
106     //! timers for individual sorting steps
107     MultiTimer mtimer;
108 
109     //! number of threads overall
110     size_t num_threads;
111 
112     //! thread pool
113     ThreadPool threads_;
114 
115     //! context constructor
PS5Context(size_t _thread_num)116     PS5Context(size_t _thread_num)
117         : para_ss_steps(0), sequ_ss_steps(0), base_sort_steps(0),
118           num_threads(_thread_num),
119           threads_(_thread_num)
120     { }
121 
122     //! enqueue a new job in the thread pool
123     template <typename StringPtr>
124     void enqueue(PS5SortStep* sstep, const StringPtr& strptr, size_t depth);
125 
126     //! return sequential sorting threshold
sequential_threshold()127     size_t sequential_threshold() {
128         size_t threshold = this->smallsort_threshold;
129         if (this->enable_rest_size) {
130             return std::max(threshold, rest_size / num_threads);
131         }
132         else {
133             return std::max(threshold, total_size / num_threads);
134         }
135     }
136 
137     //! decrement number of unordered strings
donesize(size_t n)138     void donesize(size_t n) {
139         if (this->enable_rest_size)
140             rest_size -= n;
141     }
142 };
143 
144 /******************************************************************************/
145 //! LCP calculation of Splitter Strings
146 
147 template <typename KeyType>
148 static inline unsigned char
lcpKeyType(const KeyType & a,const KeyType & b)149 lcpKeyType(const KeyType& a, const KeyType& b) {
150     // XOR both values and count the number of zero bytes
151     return clz(a ^ b) / 8;
152 }
153 
154 template <typename KeyType>
155 static inline unsigned char
lcpKeyDepth(const KeyType & a)156 lcpKeyDepth(const KeyType& a) {
157     // count number of non-zero bytes
158     return sizeof(KeyType) - (ctz(a) / 8);
159 }
160 
161 //! return the d-th character in the (swapped) key
162 template <typename KeyType>
163 static inline unsigned char
getCharAtDepth(const KeyType & a,unsigned char d)164 getCharAtDepth(const KeyType& a, unsigned char d) {
165     return static_cast<unsigned char>(a >> (8 * (sizeof(KeyType) - 1 - d)));
166 }
167 
168 /******************************************************************************/
169 //! PS5SortStep Top-Level Class to Keep Track of Substeps
170 
171 class PS5SortStep
172 {
173 private:
174     //! Number of substeps still running
175     std::atomic<size_t> substep_working_;
176 
177     //! Pure virtual function called by substep when all substeps are done.
178     virtual void substep_all_done() = 0;
179 
180 protected:
PS5SortStep()181     PS5SortStep() : substep_working_(0) { }
182 
~PS5SortStep()183     virtual ~PS5SortStep() {
184         assert(substep_working_ == 0);
185     }
186 
187     //! Register new substep
substep_add()188     void substep_add() {
189         ++substep_working_;
190     }
191 
192 public:
193     //! Notify superstep that the currently substep is done.
substep_notify_done()194     void substep_notify_done() {
195         assert(substep_working_ > 0);
196         if (--substep_working_ == 0)
197             substep_all_done();
198     }
199 };
200 
201 /******************************************************************************/
202 //! LCP Calculation for Finished Sample Sort Steps
203 
204 template <size_t bktnum, typename Context, typename Classify,
205           typename StringPtr, typename BktSizeType>
ps5_sample_sort_lcp(const Context & ctx,const Classify & classifier,const StringPtr & strptr,size_t depth,const BktSizeType * bkt)206 void ps5_sample_sort_lcp(const Context& ctx, const Classify& classifier,
207                          const StringPtr& strptr, size_t depth,
208                          const BktSizeType* bkt) {
209     assert(!strptr.flipped());
210 
211     const typename StringPtr::StringSet& strset = strptr.active();
212     typedef typename Context::key_type key_type;
213 
214     size_t b = 0;         // current bucket number
215     key_type prevkey = 0; // previous key
216 
217     // the following while loops only check b < bktnum when b is odd,
218     // because bktnum is always odd. We need a goto to jump into the loop,
219     // as b == 0 start even.
220     goto even_first;
221 
222     // find first non-empty bucket
223     while (b < bktnum)
224     {
225         // odd bucket: = bkt
226         if (bkt[b] != bkt[b + 1]) {
227             prevkey = classifier.get_splitter(b / 2);
228             assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
229             break;
230         }
231         ++b;
232 even_first:
233         // even bucket: <, << or > bkt
234         if (bkt[b] != bkt[b + 1]) {
235             prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
236             break;
237         }
238         ++b;
239     }
240     ++b;
241 
242     // goto depends on whether the first non-empty bucket was odd or
243     // even. the while loop below encodes this in the program counter.
244     if (b < bktnum && b % 2 == 0) goto even_bucket;
245 
246     // find next non-empty bucket
247     while (b < bktnum)
248     {
249         // odd bucket: = bkt
250         if (bkt[b] != bkt[b + 1]) {
251             key_type thiskey = classifier.get_splitter(b / 2);
252             assert(thiskey == get_key_at<key_type>(strset, bkt[b], depth));
253 
254             int rlcp = lcpKeyType(prevkey, thiskey);
255             strptr.set_lcp(bkt[b], depth + rlcp);
256             // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
257 
258             TLX_LOGC(ctx.debug_lcp)
259                 << "LCP at odd-bucket " << b
260                 << " [" << bkt[b] << "," << bkt[b + 1] << ")"
261                 << " is " << depth + rlcp;
262 
263             prevkey = thiskey;
264             assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
265         }
266         ++b;
267 even_bucket:
268         // even bucket: <, << or > bkt
269         if (bkt[b] != bkt[b + 1]) {
270             key_type thiskey = get_key_at<key_type>(strset, bkt[b], depth);
271 
272             int rlcp = lcpKeyType(prevkey, thiskey);
273             strptr.set_lcp(bkt[b], depth + rlcp);
274             // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
275 
276             TLX_LOGC(ctx.debug_lcp)
277                 << "LCP at even-bucket " << b
278                 << " [" << bkt[b] << "," << bkt[b + 1] << ")"
279                 << " is " << depth + rlcp;
280 
281             prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
282         }
283         ++b;
284     }
285 }
286 
287 /******************************************************************************/
288 //! SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts
289 
290 template <typename Context, typename StringPtr, typename BktSizeType>
291 class PS5SmallsortJob : public PS5SortStep
292 {
293 public:
294     Context& ctx_;
295 
296     //! parent sort step
297     PS5SortStep* pstep_;
298 
299     StringPtr strptr_;
300     size_t depth_;
301     MultiTimer mtimer_;
302 
303     typedef typename Context::key_type key_type;
304     typedef typename StringPtr::StringSet StringSet;
305     typedef BktSizeType bktsize_type;
306 
PS5SmallsortJob(Context & ctx,PS5SortStep * pstep,const StringPtr & strptr,size_t depth)307     PS5SmallsortJob(Context& ctx, PS5SortStep* pstep,
308                     const StringPtr& strptr, size_t depth)
309         : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
310         TLX_LOGC(ctx_.debug_steps)
311             << "enqueue depth=" << depth_
312             << " size=" << strptr_.size() << " flip=" << strptr_.flipped();
313     }
314 
~PS5SmallsortJob()315     ~PS5SmallsortJob() {
316         mtimer_.stop();
317         ctx_.mtimer.add(mtimer_);
318     }
319 
320     simple_vector<uint8_t> bktcache_;
321     size_t bktcache_size_ = 0;
322 
run()323     void run() {
324         mtimer_.start("sequ_ss");
325 
326         size_t n = strptr_.size();
327 
328         TLX_LOGC(ctx_.debug_jobs)
329             << "Process PS5SmallsortJob " << this << " of size " << n;
330 
331         // create anonymous wrapper job
332         this->substep_add();
333 
334         if (ctx_.enable_sequential_sample_sort && n >= ctx_.smallsort_threshold)
335         {
336             bktcache_.resize(n * sizeof(uint16_t));
337             sort_sample_sort(strptr_, depth_);
338         }
339         else
340         {
341             mtimer_.start("mkqs");
342             sort_mkqs_cache(strptr_, depth_);
343         }
344 
345         // finish wrapper job, handler delete's this
346         this->substep_notify_done();
347     }
348 
349     /*------------------------------------------------------------------------*/
350     //! Stack of Recursive Sample Sort Steps
351 
352     class SeqSampleSortStep
353     {
354     public:
355         StringPtr strptr_;
356         size_t idx_;
357         size_t depth_;
358 
359         using StringSet = typename StringPtr::StringSet;
360         using bktsize_type = BktSizeType;
361 
362         typename Context::Classify classifier;
363 
364         static const size_t num_splitters = Context::Classify::num_splitters;
365         static const size_t bktnum = 2 * num_splitters + 1;
366 
367         unsigned char splitter_lcp[num_splitters + 1];
368         bktsize_type bkt[bktnum + 1];
369 
SeqSampleSortStep(Context & ctx,const StringPtr & strptr,size_t depth,uint16_t * bktcache)370         SeqSampleSortStep(Context& ctx, const StringPtr& strptr, size_t depth,
371                           uint16_t* bktcache)
372             : strptr_(strptr), idx_(0), depth_(depth) {
373             size_t n = strptr_.size();
374 
375             // step 1: select splitters with oversampling
376 
377             const size_t oversample_factor = 2;
378             const size_t sample_size = oversample_factor * num_splitters;
379 
380             simple_vector<key_type> samples(sample_size);
381 
382             const StringSet& strset = strptr_.active();
383 
384             std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
385 
386             for (size_t i = 0; i < sample_size; ++i)
387                 samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
388 
389             std::sort(samples.begin(), samples.end());
390 
391             classifier.build(samples.data(), sample_size, splitter_lcp);
392 
393             // step 2: classify all strings
394 
395             classifier.classify(
396                 strset, strset.begin(), strset.end(), bktcache, depth_);
397 
398             // step 2.5: count bucket sizes
399 
400             bktsize_type bktsize[bktnum];
401             memset(bktsize, 0, bktnum * sizeof(bktsize_type));
402 
403             for (size_t si = 0; si < n; ++si)
404                 ++bktsize[bktcache[si]];
405 
406             // step 3: inclusive prefix sum
407 
408             bkt[0] = bktsize[0];
409             for (unsigned int i = 1; i < bktnum; ++i) {
410                 bkt[i] = bkt[i - 1] + bktsize[i];
411             }
412             assert(bkt[bktnum - 1] == n);
413             bkt[bktnum] = n;
414 
415             // step 4: premute out-of-place
416 
417             const StringSet& strB = strptr_.active();
418             // get alternative shadow pointer array
419             const StringSet& sorted = strptr_.shadow();
420             typename StringSet::Iterator sbegin = sorted.begin();
421 
422             for (typename StringSet::Iterator str = strB.begin();
423                  str != strB.end(); ++str, ++bktcache)
424                 *(sbegin + --bkt[*bktcache]) = std::move(*str);
425 
426             // bkt is afterwards the exclusive prefix sum of bktsize
427 
428             // statistics
429 
430             ++ctx.sequ_ss_steps;
431         }
432 
calculate_lcp(Context & ctx)433         void calculate_lcp(Context& ctx) {
434             TLX_LOGC(ctx.debug_lcp) << "Calculate LCP after sample sort step";
435             if (strptr_.with_lcp) {
436                 ps5_sample_sort_lcp<bktnum>(ctx, classifier, strptr_, depth_, bkt);
437             }
438         }
439     };
440 
441     size_t ss_front_ = 0;
442     std::vector<SeqSampleSortStep> ss_stack_;
443 
sort_sample_sort(const StringPtr & strptr,size_t depth)444     void sort_sample_sort(const StringPtr& strptr, size_t depth) {
445         typedef SeqSampleSortStep Step;
446 
447         assert(ss_front_ == 0);
448         assert(ss_stack_.size() == 0);
449 
450         uint16_t* bktcache = reinterpret_cast<uint16_t*>(bktcache_.data());
451 
452         // sort first level
453         ss_stack_.emplace_back(ctx_, strptr, depth, bktcache);
454 
455         // step 5: "recursion"
456 
457         while (ss_stack_.size() > ss_front_)
458         {
459             Step& s = ss_stack_.back();
460             size_t i = s.idx_++; // process the bucket s.idx_
461 
462             if (i < Step::bktnum)
463             {
464                 size_t bktsize = s.bkt[i + 1] - s.bkt[i];
465 
466                 StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
467 
468                 // i is even -> bkt[i] is less-than bucket
469                 if (i % 2 == 0)
470                 {
471                     if (bktsize == 0) {
472                         // empty bucket
473                     }
474                     else if (bktsize < ctx_.smallsort_threshold)
475                     {
476                         assert(i / 2 <= Step::num_splitters);
477                         if (i == Step::bktnum - 1)
478                             TLX_LOGC(ctx_.debug_recursion)
479                                 << "Recurse[" << s.depth_ << "]: > bkt "
480                                 << i << " size " << bktsize << " no lcp";
481                         else
482                             TLX_LOGC(ctx_.debug_recursion)
483                                 << "Recurse[" << s.depth_ << "]: < bkt "
484                                 << i << " size " << bktsize << " lcp "
485                                 << int(s.splitter_lcp[i / 2] & 0x7F);
486 
487                         ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
488                         sort_mkqs_cache(
489                             sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
490                     }
491                     else
492                     {
493                         if (i == Step::bktnum - 1)
494                             TLX_LOGC(ctx_.debug_recursion)
495                                 << "Recurse[" << s.depth_ << "]: > bkt "
496                                 << i << " size " << bktsize << " no lcp";
497                         else
498                             TLX_LOGC(ctx_.debug_recursion)
499                                 << "Recurse[" << s.depth_ << "]: < bkt "
500                                 << i << " size " << bktsize << " lcp "
501                                 << int(s.splitter_lcp[i / 2] & 0x7F);
502 
503                         ss_stack_.emplace_back(
504                             ctx_, sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F), bktcache);
505                     }
506                 }
507                 // i is odd -> bkt[i] is equal bucket
508                 else
509                 {
510                     if (bktsize == 0) {
511                         // empty bucket
512                     }
513                     else if (s.splitter_lcp[i / 2] & 0x80) {
514                         // equal-bucket has nullptr-terminated key, done.
515                         TLX_LOGC(ctx_.debug_recursion)
516                             << "Recurse[" << s.depth_ << "]: = bkt "
517                             << i << " size " << bktsize << " is done!";
518                         StringPtr spb = sp.copy_back();
519 
520                         if (sp.with_lcp) {
521                             spb.fill_lcp(
522                                 s.depth_ + lcpKeyDepth(s.classifier.get_splitter(i / 2)));
523                         }
524                         ctx_.donesize(bktsize);
525                     }
526                     else if (bktsize < ctx_.smallsort_threshold)
527                     {
528                         TLX_LOGC(ctx_.debug_recursion)
529                             << "Recurse[" << s.depth_ << "]: = bkt "
530                             << i << " size " << bktsize << " lcp keydepth!";
531 
532                         ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
533                         sort_mkqs_cache(sp, s.depth_ + sizeof(key_type));
534                     }
535                     else
536                     {
537                         TLX_LOGC(ctx_.debug_recursion)
538                             << "Recurse[" << s.depth_ << "]: = bkt "
539                             << i << " size " << bktsize << " lcp keydepth!";
540 
541                         ss_stack_.emplace_back(
542                             ctx_, sp, s.depth_ + sizeof(key_type), bktcache);
543                     }
544                 }
545             }
546             else
547             {
548                 // finished sort
549                 assert(ss_stack_.size() > ss_front_);
550 
551                 // after full sort: calculate LCPs at this level
552                 ss_stack_.back().calculate_lcp(ctx_);
553 
554                 ss_stack_.pop_back();
555             }
556 
557             if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
558                 sample_sort_free_work();
559             }
560         }
561     }
562 
sample_sort_free_work()563     void sample_sort_free_work() {
564         assert(ss_stack_.size() >= ss_front_);
565 
566         if (ss_stack_.size() == ss_front_) {
567             // ss_stack_ is empty, check other stack
568             return mkqs_free_work();
569         }
570 
571         // convert top level of stack into independent jobs
572         TLX_LOGC(ctx_.debug_jobs)
573             << "Freeing top level of PS5SmallsortJob's sample_sort stack";
574 
575         typedef SeqSampleSortStep Step;
576         Step& s = ss_stack_[ss_front_];
577 
578         while (s.idx_ < Step::bktnum)
579         {
580             size_t i = s.idx_++; // process the bucket s.idx_
581 
582             size_t bktsize = s.bkt[i + 1] - s.bkt[i];
583 
584             StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
585 
586             // i is even -> bkt[i] is less-than bucket
587             if (i % 2 == 0)
588             {
589                 if (bktsize == 0) {
590                     // empty bucket
591                 }
592                 else
593                 {
594                     if (i == Step::bktnum - 1)
595                         TLX_LOGC(ctx_.debug_recursion)
596                             << "Recurse[" << s.depth_ << "]: > bkt "
597                             << i << " size " << bktsize << " no lcp";
598                     else
599                         TLX_LOGC(ctx_.debug_recursion)
600                             << "Recurse[" << s.depth_ << "]: < bkt "
601                             << i << " size " << bktsize << " lcp "
602                             << int(s.splitter_lcp[i / 2] & 0x7F);
603 
604                     this->substep_add();
605                     ctx_.enqueue(this, sp,
606                                  s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
607                 }
608             }
609             // i is odd -> bkt[i] is equal bucket
610             else
611             {
612                 if (bktsize == 0) {
613                     // empty bucket
614                 }
615                 else if (s.splitter_lcp[i / 2] & 0x80) {
616                     // equal-bucket has nullptr-terminated key, done.
617                     TLX_LOGC(ctx_.debug_recursion)
618                         << "Recurse[" << s.depth_ << "]: = bkt "
619                         << i << " size " << bktsize << " is done!";
620                     StringPtr spb = sp.copy_back();
621 
622                     if (sp.with_lcp) {
623                         spb.fill_lcp(s.depth_ + lcpKeyDepth(
624                                          s.classifier.get_splitter(i / 2)));
625                     }
626                     ctx_.donesize(bktsize);
627                 }
628                 else
629                 {
630                     TLX_LOGC(ctx_.debug_recursion)
631                         << "Recurse[" << s.depth_ << "]: = bkt "
632                         << i << " size " << bktsize << " lcp keydepth!";
633 
634                     this->substep_add();
635                     ctx_.enqueue(this, sp, s.depth_ + sizeof(key_type));
636                 }
637             }
638         }
639 
640         // shorten the current stack
641         ++ss_front_;
642     }
643 
644     /*------------------------------------------------------------------------*/
645     //! Stack of Recursive MKQS Steps
646 
cmp(const key_type & a,const key_type & b)647     static inline int cmp(const key_type& a, const key_type& b) {
648         return (a > b) ? 1 : (a < b) ? -1 : 0;
649     }
650 
651     template <typename Type>
652     static inline size_t
med3(Type * A,size_t i,size_t j,size_t k)653     med3(Type* A, size_t i, size_t j, size_t k) {
654         if (A[i] == A[j]) return i;
655         if (A[k] == A[i] || A[k] == A[j]) return k;
656         if (A[i] < A[j]) {
657             if (A[j] < A[k]) return j;
658             if (A[i] < A[k]) return k;
659             return i;
660         }
661         else {
662             if (A[j] > A[k]) return j;
663             if (A[i] < A[k]) return i;
664             return k;
665         }
666     }
667 
668     //! Insertion sort the strings only based on the cached characters.
669     static inline void
insertion_sort_cache_block(const StringPtr & strptr,key_type * cache)670     insertion_sort_cache_block(const StringPtr& strptr, key_type* cache) {
671         const StringSet& strings = strptr.active();
672         size_t n = strptr.size();
673         size_t pi, pj;
674         for (pi = 1; --n > 0; ++pi) {
675             typename StringSet::String tmps = std::move(strings.at(pi));
676             key_type tmpc = cache[pi];
677             for (pj = pi; pj > 0; --pj) {
678                 if (cache[pj - 1] <= tmpc)
679                     break;
680                 strings.at(pj) = std::move(strings.at(pj - 1));
681                 cache[pj] = cache[pj - 1];
682             }
683             strings.at(pj) = std::move(tmps);
684             cache[pj] = tmpc;
685         }
686     }
687 
688     //! Insertion sort, but use cached characters if possible.
689     template <bool CacheDirty>
690     static inline void
insertion_sort_cache(const StringPtr & _strptr,key_type * cache,size_t depth)691     insertion_sort_cache(const StringPtr& _strptr, key_type* cache, size_t depth) {
692         StringPtr strptr = _strptr.copy_back();
693 
694         if (strptr.size() <= 1) return;
695         if (CacheDirty)
696             return insertion_sort(strptr, depth, /* memory */ 0);
697 
698         insertion_sort_cache_block(strptr, cache);
699 
700         size_t start = 0, bktsize = 1;
701         for (size_t i = 0; i < strptr.size() - 1; ++i) {
702             // group areas with equal cache values
703             if (cache[i] == cache[i + 1]) {
704                 ++bktsize;
705                 continue;
706             }
707             // calculate LCP between group areas
708             if (start != 0 && strptr.with_lcp) {
709                 int rlcp = lcpKeyType(cache[start - 1], cache[start]);
710                 strptr.set_lcp(start, depth + rlcp);
711                 // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
712             }
713             // sort group areas deeper if needed
714             if (bktsize > 1) {
715                 if (cache[start] & 0xFF) {
716                     // need deeper sort
717                     insertion_sort(
718                         strptr.sub(start, bktsize), depth + sizeof(key_type),
719                         /* memory */ 0);
720                 }
721                 else {
722                     // cache contains nullptr-termination
723                     strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
724                 }
725             }
726             bktsize = 1;
727             start = i + 1;
728         }
729         // tail of loop for last item
730         if (start != 0 && strptr.with_lcp) {
731             int rlcp = lcpKeyType(cache[start - 1], cache[start]);
732             strptr.set_lcp(start, depth + rlcp);
733             // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
734         }
735         if (bktsize > 1) {
736             if (cache[start] & 0xFF) {
737                 // need deeper sort
738                 insertion_sort(
739                     strptr.sub(start, bktsize), depth + sizeof(key_type),
740                     /* memory */ 0);
741             }
742             else {
743                 // cache contains nullptr-termination
744                 strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
745             }
746         }
747     }
748 
749     class MKQSStep
750     {
751     public:
752         StringPtr strptr_;
753         key_type* cache_;
754         size_t num_lt_, num_eq_, num_gt_, depth_;
755         size_t idx_;
756         unsigned char eq_recurse_;
757         // typename StringPtr::StringSet::Char dchar_eq_, dchar_gt_;
758         uint8_t lcp_lt_, lcp_eq_, lcp_gt_;
759 
MKQSStep(Context & ctx,const StringPtr & strptr,key_type * cache,size_t depth,bool CacheDirty)760         MKQSStep(Context& ctx, const StringPtr& strptr,
761                  key_type* cache, size_t depth, bool CacheDirty)
762             : strptr_(strptr), cache_(cache), depth_(depth), idx_(0) {
763             size_t n = strptr_.size();
764 
765             const StringSet& strset = strptr_.active();
766 
767             if (CacheDirty) {
768                 typename StringSet::Iterator it = strset.begin();
769                 for (size_t i = 0; i < n; ++i, ++it) {
770                     cache_[i] = get_key<key_type>(strset, *it, depth);
771                 }
772             }
773             // select median of 9
774             size_t p = med3(
775                 cache_,
776                 med3(cache_, 0, n / 8, n / 4),
777                 med3(cache_, n / 2 - n / 8, n / 2, n / 2 + n / 8),
778                 med3(cache_, n - 1 - n / 4, n - 1 - n / 8, n - 3));
779             // swap pivot to first position
780             std::swap(strset.at(0), strset.at(p));
781             std::swap(cache_[0], cache_[p]);
782             // save the pivot value
783             key_type pivot = cache_[0];
784             // for immediate LCP calculation
785             key_type max_lt = 0, min_gt = std::numeric_limits<key_type>::max();
786 
787             // indexes into array:
788             // 0 [pivot] 1 [===] leq [<<<] llt [???] rgt [>>>] req [===] n-1
789             size_t leq = 1, llt = 1, rgt = n - 1, req = n - 1;
790             while (true)
791             {
792                 while (llt <= rgt)
793                 {
794                     int r = cmp(cache[llt], pivot);
795                     if (r > 0) {
796                         min_gt = std::min(min_gt, cache[llt]);
797                         break;
798                     }
799                     else if (r == 0) {
800                         std::swap(strset.at(leq), strset.at(llt));
801                         std::swap(cache[leq], cache[llt]);
802                         leq++;
803                     }
804                     else {
805                         max_lt = std::max(max_lt, cache[llt]);
806                     }
807                     ++llt;
808                 }
809                 while (llt <= rgt)
810                 {
811                     int r = cmp(cache[rgt], pivot);
812                     if (r < 0) {
813                         max_lt = std::max(max_lt, cache[rgt]);
814                         break;
815                     }
816                     else if (r == 0) {
817                         std::swap(strset.at(req), strset.at(rgt));
818                         std::swap(cache[req], cache[rgt]);
819                         req--;
820                     }
821                     else {
822                         min_gt = std::min(min_gt, cache[rgt]);
823                     }
824                     --rgt;
825                 }
826                 if (llt > rgt)
827                     break;
828                 std::swap(strset.at(llt), strset.at(rgt));
829                 std::swap(cache[llt], cache[rgt]);
830                 ++llt;
831                 --rgt;
832             }
833             // calculate size of areas = < and >, save into struct
834             size_t num_leq = leq, num_req = n - 1 - req;
835             num_eq_ = num_leq + num_req;
836             num_lt_ = llt - leq;
837             num_gt_ = req - rgt;
838             assert(num_eq_ > 0);
839             assert(num_lt_ + num_eq_ + num_gt_ == n);
840 
841             // swap equal values from left to center
842             const size_t size1 = std::min(num_leq, num_lt_);
843             std::swap_ranges(strset.begin(), strset.begin() + size1,
844                              strset.begin() + llt - size1);
845             std::swap_ranges(cache, cache + size1, cache + llt - size1);
846 
847             // swap equal values from right to center
848             const size_t size2 = std::min(num_req, num_gt_);
849             std::swap_ranges(strset.begin() + llt, strset.begin() + llt + size2,
850                              strset.begin() + n - size2);
851             std::swap_ranges(cache + llt, cache + llt + size2,
852                              cache + n - size2);
853 
854             // No recursive sorting if pivot has a zero byte
855             eq_recurse_ = (pivot & 0xFF);
856 
857             // save LCP values for writing into LCP array after sorting further
858             if (strptr_.with_lcp && num_lt_ > 0) {
859                 assert(max_lt == *std::max_element(
860                            cache_ + 0, cache + num_lt_));
861 
862                 lcp_lt_ = lcpKeyType(max_lt, pivot);
863                 // dchar_eq_ = getCharAtDepth(pivot, lcp_lt_);
864                 TLX_LOGC(ctx.debug_lcp) << "LCP lt with pivot: " << depth_ + lcp_lt_;
865             }
866 
867             // calculate equal area lcp: +1 for the equal zero termination byte
868             lcp_eq_ = lcpKeyDepth(pivot);
869 
870             if (strptr_.with_lcp && num_gt_ > 0) {
871                 assert(min_gt == *std::min_element(
872                            cache_ + num_lt_ + num_eq_, cache_ + n));
873 
874                 lcp_gt_ = lcpKeyType(pivot, min_gt);
875                 // dchar_gt_ = getCharAtDepth(min_gt, lcp_gt_);
876                 TLX_LOGC(ctx.debug_lcp) << "LCP pivot with gt: " << depth_ + lcp_gt_;
877             }
878 
879             ++ctx.base_sort_steps;
880         }
881 
calculate_lcp()882         void calculate_lcp() {
883             if (strptr_.with_lcp && num_lt_ > 0) {
884                 strptr_.set_lcp(num_lt_, depth_ + lcp_lt_);
885                 // strptr_.set_cache(num_lt_, dchar_eq_);
886             }
887 
888             if (strptr_.with_lcp && num_gt_ > 0) {
889                 strptr_.set_lcp(num_lt_ + num_eq_, depth_ + lcp_gt_);
890                 // strptr_.set_cache(num_lt_ + num_eq_, dchar_gt_);
891             }
892         }
893     };
894 
895     size_t ms_front_ = 0;
896     std::vector<MKQSStep> ms_stack_;
897 
sort_mkqs_cache(const StringPtr & strptr,size_t depth)898     void sort_mkqs_cache(const StringPtr& strptr, size_t depth) {
899         assert(strcmp(mtimer_.running(), "mkqs") == 0);
900 
901         if (!ctx_.enable_sequential_mkqs ||
902             strptr.size() < ctx_.inssort_threshold) {
903             TLX_LOGC(ctx_.debug_jobs)
904                 << "insertion_sort() size "
905                 << strptr.size() << " depth " << depth;
906 
907             ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
908             insertion_sort(strptr.copy_back(), depth, /* memory */ 0);
909             ctx_.donesize(strptr.size());
910             return;
911         }
912 
913         TLX_LOGC(ctx_.debug_jobs)
914             << "sort_mkqs_cache() size " << strptr.size() << " depth " << depth;
915 
916         if (bktcache_.size() < strptr.size() * sizeof(key_type)) {
917             bktcache_.destroy();
918             bktcache_.resize(strptr.size() * sizeof(key_type));
919         }
920 
921         // reuse bktcache as keycache
922         key_type* cache = reinterpret_cast<key_type*>(bktcache_.data());
923 
924         assert(ms_front_ == 0);
925         assert(ms_stack_.size() == 0);
926 
927         // std::deque is much slower than std::vector, so we use an artificial
928         // pop_front variable.
929         ms_stack_.emplace_back(ctx_, strptr, cache, depth, true);
930 
931         while (ms_stack_.size() > ms_front_)
932         {
933             MKQSStep& ms = ms_stack_.back();
934             ++ms.idx_; // increment here, because stack may change
935 
936             // process the lt-subsequence
937             if (ms.idx_ == 1) {
938                 if (ms.num_lt_ == 0) {
939                     // empty subsequence
940                 }
941                 else if (ms.num_lt_ < ctx_.inssort_threshold) {
942                     ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
943                     insertion_sort_cache<false>(ms.strptr_.sub(0, ms.num_lt_),
944                                                 ms.cache_, ms.depth_);
945                     ctx_.donesize(ms.num_lt_);
946                 }
947                 else {
948                     ms_stack_.emplace_back(
949                         ctx_,
950                         ms.strptr_.sub(0, ms.num_lt_),
951                         ms.cache_, ms.depth_, false);
952                 }
953             }
954             // process the eq-subsequence
955             else if (ms.idx_ == 2) {
956                 StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
957 
958                 assert(ms.num_eq_ > 0);
959 
960                 if (!ms.eq_recurse_) {
961                     StringPtr spb = sp.copy_back();
962                     spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
963                     ctx_.donesize(spb.size());
964                 }
965                 else if (ms.num_eq_ < ctx_.inssort_threshold) {
966                     ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
967                     insertion_sort_cache<true>(sp, ms.cache_ + ms.num_lt_,
968                                                ms.depth_ + sizeof(key_type));
969                     ctx_.donesize(ms.num_eq_);
970                 }
971                 else {
972                     ms_stack_.emplace_back(
973                         ctx_, sp,
974                         ms.cache_ + ms.num_lt_,
975                         ms.depth_ + sizeof(key_type), true);
976                 }
977             }
978             // process the gt-subsequence
979             else if (ms.idx_ == 3) {
980                 StringPtr sp = ms.strptr_.sub(
981                     ms.num_lt_ + ms.num_eq_, ms.num_gt_);
982 
983                 if (ms.num_gt_ == 0) {
984                     // empty subsequence
985                 }
986                 else if (ms.num_gt_ < ctx_.inssort_threshold) {
987                     ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
988                     insertion_sort_cache<false>(
989                         sp, ms.cache_ + ms.num_lt_ + ms.num_eq_, ms.depth_);
990                     ctx_.donesize(ms.num_gt_);
991                 }
992                 else {
993                     ms_stack_.emplace_back(
994                         ctx_, sp,
995                         ms.cache_ + ms.num_lt_ + ms.num_eq_,
996                         ms.depth_, false);
997                 }
998             }
999             // calculate lcps
1000             else {
1001                 // finished sort
1002                 assert(ms_stack_.size() > ms_front_);
1003 
1004                 // calculate LCP after the three parts are sorted
1005                 ms_stack_.back().calculate_lcp();
1006 
1007                 ms_stack_.pop_back();
1008             }
1009 
1010             if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
1011                 sample_sort_free_work();
1012             }
1013         }
1014     }
1015 
mkqs_free_work()1016     void mkqs_free_work() {
1017         assert(ms_stack_.size() >= ms_front_);
1018 
1019         for (unsigned int fl = 0; fl < 8; ++fl)
1020         {
1021             if (ms_stack_.size() == ms_front_) {
1022                 return;
1023             }
1024 
1025             TLX_LOGC(ctx_.debug_jobs)
1026                 << "Freeing top level of PS5SmallsortJob's mkqs stack - size "
1027                 << ms_stack_.size();
1028 
1029             // convert top level of stack into independent jobs
1030 
1031             MKQSStep& ms = ms_stack_[ms_front_];
1032 
1033             if (ms.idx_ == 0 && ms.num_lt_ != 0)
1034             {
1035                 this->substep_add();
1036                 ctx_.enqueue(this, ms.strptr_.sub(0, ms.num_lt_), ms.depth_);
1037             }
1038             if (ms.idx_ <= 1) // st.num_eq > 0 always
1039             {
1040                 assert(ms.num_eq_ > 0);
1041 
1042                 StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
1043 
1044                 if (ms.eq_recurse_) {
1045                     this->substep_add();
1046                     ctx_.enqueue(this, sp, ms.depth_ + sizeof(key_type));
1047                 }
1048                 else {
1049                     StringPtr spb = sp.copy_back();
1050                     spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
1051                     ctx_.donesize(ms.num_eq_);
1052                 }
1053             }
1054             if (ms.idx_ <= 2 && ms.num_gt_ != 0)
1055             {
1056                 this->substep_add();
1057                 ctx_.enqueue(
1058                     this, ms.strptr_.sub(ms.num_lt_ + ms.num_eq_, ms.num_gt_),
1059                     ms.depth_);
1060             }
1061 
1062             // shorten the current stack
1063             ++ms_front_;
1064         }
1065     }
1066 
1067     /*------------------------------------------------------------------------*/
1068     // Called When PS5SmallsortJob is Finished
1069 
substep_all_done()1070     void substep_all_done() final {
1071         TLX_LOGC(ctx_.debug_recursion)
1072             << "SmallSort[" << depth_ << "] "
1073             << "all substeps done -> LCP calculation";
1074 
1075         while (ms_front_ > 0) {
1076             TLX_LOGC(ctx_.debug_lcp)
1077                 << "SmallSort[" << depth_ << "] ms_front_: " << ms_front_;
1078             ms_stack_[--ms_front_].calculate_lcp();
1079         }
1080 
1081         while (ss_front_ > 0) {
1082             TLX_LOGC(ctx_.debug_lcp)
1083                 << "SmallSort[" << depth_ << "] ss_front_: " << ss_front_;
1084             ss_stack_[--ss_front_].calculate_lcp(ctx_);
1085         }
1086 
1087         if (pstep_) pstep_->substep_notify_done();
1088         delete this;
1089     }
1090 };
1091 
1092 /******************************************************************************/
1093 //! PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs
1094 
1095 template <typename Context, typename StringPtr>
1096 class PS5BigSortStep : public PS5SortStep
1097 {
1098 public:
1099     typedef typename StringPtr::StringSet StringSet;
1100     typedef typename StringSet::Iterator StrIterator;
1101     typedef typename Context::key_type key_type;
1102 
1103     //! context
1104     Context& ctx_;
1105     //! parent sort step for notification
1106     PS5SortStep* pstep_;
1107 
1108     //! string pointers, size, and current sorting depth
1109     StringPtr strptr_;
1110     size_t depth_;
1111 
1112     //! number of parts into which the strings were split
1113     size_t parts_;
1114     //! size of all parts except the last
1115     size_t psize_;
1116     //! number of threads still working
1117     std::atomic<size_t> pwork_;
1118 
1119     //! classifier instance and variables (contains splitter tree
1120     typename Context::Classify classifier_;
1121 
1122     static const size_t treebits_ = Context::Classify::treebits;
1123     static const size_t num_splitters_ = Context::Classify::num_splitters;
1124     static const size_t bktnum_ = 2 * num_splitters_ + 1;
1125 
1126     //! LCPs of splitters, needed for recursive calls
1127     unsigned char splitter_lcp_[num_splitters_ + 1];
1128 
1129     //! individual bucket array of threads, keep bkt[0] for DistributeJob
1130     simple_vector<simple_vector<size_t> > bkt_;
1131     //! bucket ids cache, created by classifier and later counted
1132     simple_vector<simple_vector<uint16_t> > bktcache_;
1133 
1134     /*------------------------------------------------------------------------*/
1135     // Constructor
1136 
PS5BigSortStep(Context & ctx,PS5SortStep * pstep,const StringPtr & strptr,size_t depth)1137     PS5BigSortStep(Context& ctx, PS5SortStep* pstep,
1138                    const StringPtr& strptr, size_t depth)
1139         : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
1140         // calculate number of parts
1141         parts_ = strptr_.size() / ctx.sequential_threshold() * 2;
1142         if (parts_ == 0) parts_ = 1;
1143 
1144         bkt_.resize(parts_);
1145         bktcache_.resize(parts_);
1146 
1147         psize_ = (strptr.size() + parts_ - 1) / parts_;
1148 
1149         TLX_LOGC(ctx_.debug_steps)
1150             << "enqueue depth=" << depth_
1151             << " size=" << strptr_.size()
1152             << " parts=" << parts_
1153             << " psize=" << psize_
1154             << " flip=" << strptr_.flipped();
1155 
1156         ctx.threads_.enqueue([this]() { sample(); });
1157         ++ctx.para_ss_steps;
1158     }
1159 
~PS5BigSortStep()1160     virtual ~PS5BigSortStep() { }
1161 
1162     /*------------------------------------------------------------------------*/
1163     // Sample Step
1164 
sample()1165     void sample() {
1166         ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1167         TLX_LOGC(ctx_.debug_jobs) << "Process SampleJob @ " << this;
1168 
1169         const size_t oversample_factor = 2;
1170         size_t sample_size = oversample_factor * num_splitters_;
1171 
1172         const StringSet& strset = strptr_.active();
1173         size_t n = strset.size();
1174 
1175         simple_vector<key_type> samples(sample_size);
1176 
1177         std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
1178 
1179         for (size_t i = 0; i < sample_size; ++i)
1180             samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
1181 
1182         std::sort(samples.begin(), samples.end());
1183 
1184         classifier_.build(samples.data(), sample_size, splitter_lcp_);
1185 
1186         // create new jobs
1187         pwork_ = parts_;
1188         for (unsigned int p = 0; p < parts_; ++p) {
1189             ctx_.threads_.enqueue([this, p]() { count(p); });
1190         }
1191     }
1192 
1193     /*------------------------------------------------------------------------*/
1194     // Counting Step
1195 
count(unsigned int p)1196     void count(unsigned int p) {
1197         ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1198         TLX_LOGC(ctx_.debug_jobs) << "Process CountJob " << p << " @ " << this;
1199 
1200         const StringSet& strset = strptr_.active();
1201 
1202         StrIterator strB = strset.begin() + p * psize_;
1203         StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1204         if (strE < strB) strE = strB;
1205 
1206         bktcache_[p].resize(strE - strB);
1207         uint16_t* bktcache = bktcache_[p].data();
1208         classifier_.classify(strset, strB, strE, bktcache, depth_);
1209 
1210         bkt_[p].resize(bktnum_ + (p == 0 ? 1 : 0));
1211         size_t* bkt = bkt_[p].data();
1212         memset(bkt, 0, bktnum_ * sizeof(size_t));
1213 
1214         for (uint16_t* bc = bktcache; bc != bktcache + (strE - strB); ++bc)
1215             ++bkt[*bc];
1216 
1217         if (--pwork_ == 0)
1218             count_finished();
1219     }
1220 
count_finished()1221     void count_finished() {
1222         ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1223         TLX_LOGC(ctx_.debug_jobs) << "Finishing CountJob " << this << " with prefixsum";
1224 
1225         // abort sorting if we're measuring only the top level
1226         if (ctx_.use_only_first_sortstep)
1227             return;
1228 
1229         // inclusive prefix sum over bkt
1230         size_t sum = 0;
1231         for (unsigned int i = 0; i < bktnum_; ++i) {
1232             for (unsigned int p = 0; p < parts_; ++p) {
1233                 bkt_[p][i] = (sum += bkt_[p][i]);
1234             }
1235         }
1236         assert(sum == strptr_.size());
1237 
1238         // create new jobs
1239         pwork_ = parts_;
1240         for (unsigned int p = 0; p < parts_; ++p) {
1241             ctx_.threads_.enqueue([this, p]() { distribute(p); });
1242         }
1243     }
1244 
1245     /*------------------------------------------------------------------------*/
1246     // Distribute Step
1247 
distribute(unsigned int p)1248     void distribute(unsigned int p) {
1249         ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1250         TLX_LOGC(ctx_.debug_jobs) << "Process DistributeJob " << p << " @ " << this;
1251 
1252         const StringSet& strset = strptr_.active();
1253 
1254         StrIterator strB = strset.begin() + p * psize_;
1255         StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1256         if (strE < strB) strE = strB;
1257 
1258         // get alternative shadow pointer array
1259         const StringSet& sorted = strptr_.shadow();
1260         typename StringSet::Iterator sbegin = sorted.begin();
1261 
1262         uint16_t* bktcache = bktcache_[p].data();
1263         size_t* bkt = bkt_[p].data();
1264 
1265         for (StrIterator str = strB; str != strE; ++str, ++bktcache)
1266             *(sbegin + --bkt[*bktcache]) = std::move(*str);
1267 
1268         if (p != 0) // p = 0 is needed for recursion into bkts
1269             bkt_[p].destroy();
1270 
1271         bktcache_[p].destroy();
1272 
1273         if (--pwork_ == 0)
1274             distribute_finished();
1275     }
1276 
distribute_finished()1277     void distribute_finished() {
1278         TLX_LOGC(ctx_.debug_jobs)
1279             << "Finishing DistributeJob " << this << " with enqueuing subjobs";
1280 
1281         size_t* bkt = bkt_[0].data();
1282         assert(bkt);
1283 
1284         // first processor's bkt pointers are boundaries between bkts, just add sentinel:
1285         assert(bkt[0] == 0);
1286         bkt[bktnum_] = strptr_.size();
1287 
1288         // keep anonymous subjob handle while creating subjobs
1289         this->substep_add();
1290 
1291         size_t i = 0;
1292         while (i < bktnum_ - 1)
1293         {
1294             // i is even -> bkt[i] is less-than bucket
1295             size_t bktsize = bkt[i + 1] - bkt[i];
1296             if (bktsize == 0) {
1297                 // empty bucket
1298             }
1299             else if (bktsize == 1) { // just one string pointer, copyback
1300                 strptr_.flip(bkt[i], 1).copy_back();
1301                 ctx_.donesize(1);
1302             }
1303             else
1304             {
1305                 TLX_LOGC(ctx_.debug_recursion)
1306                     << "Recurse[" << depth_ << "]: < bkt " << bkt[i]
1307                     << " size " << bktsize << " lcp "
1308                     << int(splitter_lcp_[i / 2] & 0x7F);
1309                 this->substep_add();
1310                 ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1311                              depth_ + (splitter_lcp_[i / 2] & 0x7F));
1312             }
1313             ++i;
1314             // i is odd -> bkt[i] is equal bucket
1315             bktsize = bkt[i + 1] - bkt[i];
1316             if (bktsize == 0) {
1317                 // empty bucket
1318             }
1319             else if (bktsize == 1) { // just one string pointer, copyback
1320                 strptr_.flip(bkt[i], 1).copy_back();
1321                 ctx_.donesize(1);
1322             }
1323             else
1324             {
1325                 if (splitter_lcp_[i / 2] & 0x80) {
1326                     // equal-bucket has nullptr-terminated key, done.
1327                     TLX_LOGC(ctx_.debug_recursion)
1328                         << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1329                         << " size " << bktsize << " is done!";
1330                     StringPtr sp = strptr_.flip(bkt[i], bktsize).copy_back();
1331                     sp.fill_lcp(
1332                         depth_ + lcpKeyDepth(classifier_.get_splitter(i / 2)));
1333                     ctx_.donesize(bktsize);
1334                 }
1335                 else {
1336                     TLX_LOGC(ctx_.debug_recursion)
1337                         << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1338                         << " size " << bktsize << " lcp keydepth!";
1339                     this->substep_add();
1340                     ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1341                                  depth_ + sizeof(key_type));
1342                 }
1343             }
1344             ++i;
1345         }
1346 
1347         size_t bktsize = bkt[i + 1] - bkt[i];
1348 
1349         if (bktsize == 0) {
1350             // empty bucket
1351         }
1352         else if (bktsize == 1) { // just one string pointer, copyback
1353             strptr_.flip(bkt[i], 1).copy_back();
1354             ctx_.donesize(1);
1355         }
1356         else {
1357             TLX_LOGC(ctx_.debug_recursion)
1358                 << "Recurse[" << depth_ << "]: > bkt " << bkt[i]
1359                 << " size " << bktsize << " no lcp";
1360             this->substep_add();
1361             ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize), depth_);
1362         }
1363 
1364         this->substep_notify_done(); // release anonymous subjob handle
1365 
1366         if (!strptr_.with_lcp)
1367             bkt_[0].destroy();
1368     }
1369 
1370     /*------------------------------------------------------------------------*/
1371     // After Recursive Sorting
1372 
substep_all_done()1373     void substep_all_done() final {
1374         ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1375         if (strptr_.with_lcp) {
1376             TLX_LOGC(ctx_.debug_steps)
1377                 << "pSampleSortStep[" << depth_ << "]: all substeps done.";
1378 
1379             ps5_sample_sort_lcp<bktnum_>(
1380                 ctx_, classifier_, strptr_, depth_, bkt_[0].data());
1381             bkt_[0].destroy();
1382         }
1383 
1384         if (pstep_) pstep_->substep_notify_done();
1385         delete this;
1386     }
1387 };
1388 
1389 /******************************************************************************/
1390 // PS5Context::enqueue()
1391 
1392 template <typename Parameters>
1393 template <typename StringPtr>
enqueue(PS5SortStep * pstep,const StringPtr & strptr,size_t depth)1394 void PS5Context<Parameters>::enqueue(
1395     PS5SortStep* pstep, const StringPtr& strptr, size_t depth) {
1396     if (this->enable_parallel_sample_sort &&
1397         (strptr.size() > sequential_threshold() ||
1398          this->use_only_first_sortstep)) {
1399         new PS5BigSortStep<PS5Context, StringPtr>(*this, pstep, strptr, depth);
1400     }
1401     else {
1402         if (strptr.size() < (1LLU << 32)) {
1403             auto j = new PS5SmallsortJob<PS5Context, StringPtr, uint32_t>(
1404                 *this, pstep, strptr, depth);
1405             threads_.enqueue([j]() { j->run(); });
1406         }
1407         else {
1408             auto j = new PS5SmallsortJob<PS5Context, StringPtr, uint64_t>(
1409                 *this, pstep, strptr, depth);
1410             threads_.enqueue([j]() { j->run(); });
1411         }
1412     }
1413 }
1414 
1415 /******************************************************************************/
1416 // Externally Callable Sorting Methods
1417 
1418 //! Main Parallel Sample Sort Function. See below for more convenient wrappers.
1419 template <typename PS5Parameters, typename StringPtr>
parallel_sample_sort_base(const StringPtr & strptr,size_t depth)1420 void parallel_sample_sort_base(const StringPtr& strptr, size_t depth) {
1421 
1422     using Context = PS5Context<PS5Parameters>;
1423     Context ctx(std::thread::hardware_concurrency());
1424     ctx.total_size = strptr.size();
1425     ctx.rest_size = strptr.size();
1426     ctx.num_threads = ctx.threads_.size();
1427 
1428     MultiTimer timer;
1429     timer.start("sort");
1430 
1431     ctx.enqueue(/* pstep */ nullptr, strptr, depth);
1432     ctx.threads_.loop_until_empty();
1433 
1434     timer.stop();
1435 
1436     assert(!ctx.enable_rest_size || ctx.rest_size == 0);
1437 
1438     using BigSortStep = PS5BigSortStep<Context, StringPtr>;
1439 
1440     TLX_LOGC(ctx.debug_result)
1441         << "RESULT"
1442         << " sizeof(key_type)=" << sizeof(typename PS5Parameters::key_type)
1443         << " splitter_treebits=" << size_t(BigSortStep::treebits_)
1444         << " num_splitters=" << size_t(BigSortStep::num_splitters_)
1445         << " num_threads=" << ctx.num_threads
1446         << " enable_work_sharing=" << size_t(ctx.enable_work_sharing)
1447         << " use_restsize=" << size_t(ctx.enable_rest_size)
1448         << " tm_para_ss=" << ctx.mtimer.get("para_ss")
1449         << " tm_seq_ss=" << ctx.mtimer.get("sequ_ss")
1450         << " tm_mkqs=" << ctx.mtimer.get("mkqs")
1451         << " tm_inssort=" << ctx.mtimer.get("inssort")
1452         << " tm_total=" << ctx.mtimer.total()
1453         << " tm_idle="
1454         << (ctx.num_threads * timer.total()) - ctx.mtimer.total()
1455         << " steps_para_sample_sort=" << ctx.para_ss_steps
1456         << " steps_seq_sample_sort=" << ctx.sequ_ss_steps
1457         << " steps_base_sort=" << ctx.base_sort_steps;
1458 }
1459 
1460 //! Parallel Sample Sort Function for a generic StringSet, this allocates the
1461 //! shadow array for flipping.
1462 template <typename PS5Parameters, typename StringPtr>
1463 typename enable_if<!StringPtr::with_lcp, void>::type
parallel_sample_sort_params(const StringPtr & strptr,size_t depth,size_t memory=0)1464 parallel_sample_sort_params(
1465     const StringPtr& strptr, size_t depth, size_t memory = 0) {
1466     tlx::unused(memory);
1467 
1468     typedef typename StringPtr::StringSet StringSet;
1469     const StringSet& strset = strptr.active();
1470 
1471     typedef StringShadowPtr<StringSet> StringShadowPtr;
1472     typedef typename StringSet::Container Container;
1473 
1474     // allocate shadow pointer array
1475     Container shadow = strset.allocate(strset.size());
1476     StringShadowPtr new_strptr(strset, StringSet(shadow));
1477 
1478     parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1479 
1480     StringSet::deallocate(shadow);
1481 }
1482 
1483 //! Parallel Sample Sort Function for a generic StringSet with LCPs, this
1484 //! allocates the shadow array for flipping.
1485 template <typename PS5Parameters, typename StringPtr>
1486 typename enable_if<StringPtr::with_lcp, void>::type
parallel_sample_sort_params(const StringPtr & strptr,size_t depth,size_t memory=0)1487 parallel_sample_sort_params(
1488     const StringPtr& strptr, size_t depth, size_t memory = 0) {
1489     tlx::unused(memory);
1490 
1491     typedef typename StringPtr::StringSet StringSet;
1492     typedef typename StringPtr::LcpType LcpType;
1493     const StringSet& strset = strptr.active();
1494 
1495     typedef StringShadowLcpPtr<StringSet, LcpType> StringShadowLcpPtr;
1496     typedef typename StringSet::Container Container;
1497 
1498     // allocate shadow pointer array
1499     Container shadow = strset.allocate(strset.size());
1500     StringShadowLcpPtr new_strptr(strset, StringSet(shadow), strptr.lcp());
1501 
1502     parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1503 
1504     StringSet::deallocate(shadow);
1505 }
1506 
1507 //! Parallel Sample Sort Function with default parameter size for a generic
1508 //! StringSet.
1509 template <typename StringPtr>
parallel_sample_sort(const StringPtr & strptr,size_t depth,size_t memory)1510 void parallel_sample_sort(
1511     const StringPtr& strptr, size_t depth, size_t memory) {
1512     return parallel_sample_sort_params<PS5ParametersDefault>(
1513         strptr, depth, memory);
1514 }
1515 
1516 } // namespace sort_strings_detail
1517 } // namespace tlx
1518 
1519 #endif // !TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
1520 
1521 /******************************************************************************/
1522