1 /***************************************************************************
2 * include/stxxl/bits/algo/stable_ksort.h
3 *
4 * Part of the STXXL. See http://stxxl.sourceforge.net
5 *
6 * Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7 * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8 *
9 * Distributed under the Boost Software License, Version 1.0.
10 * (See accompanying file LICENSE_1_0.txt or copy at
11 * http://www.boost.org/LICENSE_1_0.txt)
12 **************************************************************************/
13
14 #ifndef STXXL_ALGO_STABLE_KSORT_HEADER
15 #define STXXL_ALGO_STABLE_KSORT_HEADER
16
17 // it is a first try: distribution sort without sampling
18 // I rework the stable_ksort when I would have a time
19
20 #include <stxxl/bits/mng/block_manager.h>
21 #include <stxxl/bits/mng/buf_istream.h>
22 #include <stxxl/bits/mng/buf_ostream.h>
23 #include <stxxl/bits/common/simple_vector.h>
24 #include <stxxl/bits/algo/intksort.h>
25 #include <stxxl/bits/algo/sort_base.h>
26 #include <stxxl/bits/common/utils.h>
27
28 #ifndef STXXL_VERBOSE_STABLE_KSORT
29 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
30 #endif
31
32 STXXL_BEGIN_NAMESPACE
33
34 //! \addtogroup stlalgo
35 //! \{
36
37 /*! \internal
38 */
39 namespace stable_ksort_local {
40
41 template <class Type, class TypeKey>
classify_block(Type * begin,Type * end,TypeKey * & out,int_type * bucket,typename Type::key_type offset,unsigned shift)42 void classify_block(Type* begin, Type* end, TypeKey*& out,
43 int_type* bucket, typename Type::key_type offset,
44 unsigned shift)
45 {
46 for (Type* p = begin; p < end; p++, out++) // count & create references
47 {
48 out->ptr = p;
49 typename Type::key_type key = p->key();
50 int_type ibucket = (int_type)((key - offset) >> shift);
51 out->key = key;
52 bucket[ibucket]++;
53 }
54 }
55
56 template <typename Type>
57 struct type_key
58 {
59 typedef typename Type::key_type key_type;
60 key_type key;
61 Type* ptr;
62
type_keytype_key63 type_key() { }
type_keytype_key64 type_key(key_type k, Type* p) : key(k), ptr(p)
65 { }
66 };
67
68 template <typename Type>
69 bool operator < (const type_key<Type>& a, const type_key<Type>& b)
70 {
71 return a.key < b.key;
72 }
73
74 template <typename Type>
75 bool operator > (const type_key<Type>& a, const type_key<Type>& b)
76 {
77 return a.key > b.key;
78 }
79
80 template <typename BIDType, typename AllocStrategy>
81 class bid_sequence
82 {
83 public:
84 typedef BIDType bid_type;
85 typedef bid_type& reference;
86 typedef AllocStrategy alloc_strategy;
87 typedef typename simple_vector<bid_type>::size_type size_type;
88 typedef typename simple_vector<bid_type>::iterator iterator;
89
90 protected:
91 simple_vector<bid_type>* bids;
92 alloc_strategy alloc_strategy_;
93
94 public:
bid_sequence()95 bid_sequence() : bids(NULL) { }
bid_sequence(size_type size_)96 bid_sequence(size_type size_)
97 {
98 bids = new simple_vector<bid_type>(size_);
99 block_manager* mng = block_manager::get_instance();
100 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
101 }
init(size_type size_)102 void init(size_type size_)
103 {
104 bids = new simple_vector<bid_type>(size_);
105 block_manager* mng = block_manager::get_instance();
106 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
107 }
108 reference operator [] (size_type i)
109 {
110 size_type size_ = size(); // cache size in a register
111 if (i < size_)
112 return *(bids->begin() + i);
113
114 block_manager* mng = block_manager::get_instance();
115 simple_vector<bid_type>* larger_bids = new simple_vector<bid_type>((i + 1) * 2);
116 std::copy(bids->begin(), bids->end(), larger_bids->begin());
117 mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
118 delete bids;
119 bids = larger_bids;
120 return *(larger_bids->begin() + i);
121 }
size()122 size_type size() { return bids->size(); }
begin()123 iterator begin() { return bids->begin(); }
~bid_sequence()124 ~bid_sequence()
125 {
126 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
127 delete bids;
128 }
129 };
130
131 template <typename ExtIterator>
distribute(bid_sequence<typename ExtIterator::vector_type::block_type::bid_type,typename ExtIterator::vector_type::alloc_strategy_type> * bucket_bids,int64 * bucket_sizes,const int_type nbuckets,const int_type lognbuckets,ExtIterator first,ExtIterator last,const int_type nread_buffers,const int_type nwrite_buffers)132 void distribute(
133 bid_sequence<typename ExtIterator::vector_type::block_type::bid_type,
134 typename ExtIterator::vector_type::alloc_strategy_type>* bucket_bids,
135 int64* bucket_sizes,
136 const int_type nbuckets,
137 const int_type lognbuckets,
138 ExtIterator first,
139 ExtIterator last,
140 const int_type nread_buffers,
141 const int_type nwrite_buffers)
142 {
143 typedef typename ExtIterator::vector_type::value_type value_type;
144 typedef typename value_type::key_type key_type;
145 typedef typename ExtIterator::block_type block_type;
146 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
147
148 typedef buf_istream<block_type, bids_container_iterator> buf_istream_type;
149
150 int_type i = 0;
151
152 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
153 nread_buffers);
154
155 buffered_writer<block_type> out(
156 nbuckets + nwrite_buffers,
157 nwrite_buffers);
158
159 unsigned_type* bucket_block_offsets = new unsigned_type[nbuckets];
160 unsigned_type* bucket_iblock = new unsigned_type[nbuckets];
161 block_type** bucket_blocks = new block_type*[nbuckets];
162
163 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
164 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
165 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
166
167 for (i = 0; i < nbuckets; i++)
168 bucket_blocks[i] = out.get_free_block();
169
170 ExtIterator cur = first - first.block_offset();
171
172 // skip part of the block before first untouched
173 for ( ; cur != first; cur++)
174 ++in;
175
176 const int_type shift = sizeof(key_type) * 8 - lognbuckets;
177 // search in the the range [_begin,_end)
178 STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
179 for ( ; cur != last; cur++)
180 {
181 key_type cur_key = in.current().key();
182 int_type ibucket = (int_type)(cur_key >> shift);
183
184 int_type block_offset = bucket_block_offsets[ibucket];
185 in >> (bucket_blocks[ibucket]->elem[block_offset++]);
186 if (block_offset == block_type::size)
187 {
188 block_offset = 0;
189 int_type iblock = bucket_iblock[ibucket]++;
190 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
191 }
192 bucket_block_offsets[ibucket] = block_offset;
193 }
194 for (i = 0; i < nbuckets; i++)
195 {
196 if (bucket_block_offsets[i])
197 {
198 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
199 }
200 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
201 bucket_block_offsets[i];
202 STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
203 ", estimated size: " << ((last - first) / int64(nbuckets)));
204 }
205
206 delete[] bucket_blocks;
207 delete[] bucket_block_offsets;
208 delete[] bucket_iblock;
209 }
210
211 } // namespace stable_ksort_local
212
213 //! Sort records with integer keys
214 //! \param first object of model of \c ext_random_access_iterator concept
215 //! \param last object of model of \c ext_random_access_iterator concept
216 //! \param M amount of memory for internal use (in bytes)
217 //! \remark Elements must provide a method key() which returns the integer key.
218 //! \remark Not yet fully implemented, it assumes that the keys are uniformly
219 //! distributed between [0,std::numeric_limits<key_type>::max().
220 template <typename ExtIterator>
stable_ksort(ExtIterator first,ExtIterator last,unsigned_type M)221 void stable_ksort(ExtIterator first, ExtIterator last, unsigned_type M)
222 {
223 STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,std::numeric_limits<key_type>::max()]");
224 typedef typename ExtIterator::vector_type::value_type value_type;
225 typedef typename value_type::key_type key_type;
226 typedef typename ExtIterator::block_type block_type;
227 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
228 typedef typename block_type::bid_type bid_type;
229 typedef typename ExtIterator::vector_type::alloc_strategy_type alloc_strategy;
230 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
231 typedef stable_ksort_local::type_key<value_type> type_key_;
232
233 first.flush(); // flush container
234
235 double begin = timestamp();
236
237 unsigned_type i = 0;
238 config* cfg = config::get_instance();
239 const unsigned_type m = M / block_type::raw_size;
240 assert(2 * block_type::raw_size <= M);
241 const unsigned_type write_buffers_multiple = 2;
242 const unsigned_type read_buffers_multiple = 2;
243 const unsigned_type ndisks = cfg->disks_number();
244 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
245 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
246 const unsigned int lognbuckets = ilog2_floor(nmaxbuckets);
247 const unsigned_type nbuckets = unsigned_type(1) << lognbuckets;
248 const unsigned_type est_bucket_size = (unsigned_type)div_ceil((last - first) / nbuckets, block_type::size); //in blocks
249
250 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
251 STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
252 ", required for r/w buffers: " << min_num_read_write_buffers <<
253 ", required for buckets: 2, nbuckets: " << nbuckets);
254 throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
255 }
256 STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
257 STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
258 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
259 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
260
261 STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
262 STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
263
264 bucket_bids_type* bucket_bids = new bucket_bids_type[nbuckets];
265 for (i = 0; i < nbuckets; ++i)
266 bucket_bids[i].init(est_bucket_size);
267
268 int64* bucket_sizes = new int64[nbuckets];
269
270 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
271
272 stable_ksort_local::distribute(
273 bucket_bids,
274 bucket_sizes,
275 nbuckets,
276 lognbuckets,
277 first,
278 last,
279 nread_buffers,
280 nwrite_buffers);
281
282 double dist_end = timestamp(), end;
283 double io_wait_after_d = stats::get_instance()->get_io_wait_time();
284
285 {
286 // sort buckets
287 unsigned_type write_buffers_multiple_bs = 2;
288 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
289 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records
290 int64 max_bucket_size_act = 0; // actual max bucket size
291 // establish output stream
292
293 for (i = 0; i < nbuckets; i++)
294 {
295 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
296 if (bucket_sizes[i] > max_bucket_size_rec)
297 {
298 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
299 " records, maximum: " << max_bucket_size_rec);
300 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
301 abort();
302 }
303 }
304 // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
305 // ... and decrease max_bucket_size_bl
306 const int_type max_bucket_size_act_bl = (int_type)div_ceil(max_bucket_size_act, block_type::size);
307 STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
308 max_bucket_size_bl << " to " << max_bucket_size_act_bl);
309 max_bucket_size_rec = max_bucket_size_act;
310 max_bucket_size_bl = max_bucket_size_act_bl;
311 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
312 STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
313
314 typedef buf_ostream<block_type, bids_container_iterator> buf_ostream_type;
315 buf_ostream_type out(first.bid(), nwrite_buffers_bs);
316
317 disk_queues::get_instance()->set_priority_op(request_queue::READ);
318
319 if (first.block_offset())
320 {
321 // has to skip part of the first block
322 block_type* block = new block_type;
323 request_ptr req;
324 req = block->read(*first.bid());
325 req->wait();
326
327 for (i = 0; i < first.block_offset(); i++)
328 {
329 out << block->elem[i];
330 }
331 delete block;
332 }
333 block_type* blocks1 = new block_type[max_bucket_size_bl];
334 block_type* blocks2 = new block_type[max_bucket_size_bl];
335 request_ptr* reqs1 = new request_ptr[max_bucket_size_bl];
336 request_ptr* reqs2 = new request_ptr[max_bucket_size_bl];
337 type_key_* refs1 = new type_key_[(size_t)max_bucket_size_rec];
338 type_key_* refs2 = new type_key_[(size_t)max_bucket_size_rec];
339
340 // submit reading first 2 buckets (Peter's scheme)
341 unsigned_type nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[0], block_type::size);
342 for (i = 0; i < nbucket_blocks; i++)
343 reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
344
345 nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[1], block_type::size);
346 for (i = 0; i < nbucket_blocks; i++)
347 reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
348
349 key_type offset = 0;
350 const unsigned log_k1 = STXXL_MAX<unsigned>(ilog2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1);
351 unsigned_type k1 = unsigned_type(1) << log_k1;
352 int_type* bucket1 = new int_type[k1];
353
354 const unsigned int shift = (unsigned int)(sizeof(key_type) * 8 - lognbuckets);
355 const unsigned int shift1 = shift - log_k1;
356
357 STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
358 " block size:" << block_type::size << " log_k1:" << log_k1);
359
360 for (unsigned_type k = 0; k < nbuckets; k++)
361 {
362 nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[k], block_type::size);
363 const unsigned log_k1_k = STXXL_MAX<unsigned>(ilog2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1);
364 assert(log_k1_k <= log_k1);
365 k1 = (unsigned_type)(1) << log_k1_k;
366 std::fill(bucket1, bucket1 + k1, 0);
367
368 STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
369 " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
370 // classify first nbucket_blocks-1 blocks, they are full
371 type_key_* ref_ptr = refs1;
372 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
373 for (i = 0; i < nbucket_blocks - 1; i++)
374 {
375 reqs1[i]->wait();
376 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(),
377 ref_ptr, bucket1, offset1, shift1 /*,k1*/);
378 }
379 // last block might be non-full
380 const unsigned_type last_block_size =
381 (unsigned_type)(bucket_sizes[k] - (nbucket_blocks - 1) * block_type::size);
382 reqs1[i]->wait();
383
384 //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
385
386 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size,
387 ref_ptr, bucket1, offset1, shift1);
388
389 exclusive_prefix_sum(bucket1, k1);
390 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
391
392 type_key_* c = refs2;
393 type_key_* d = refs1;
394 for (i = 0; i < k1; i++)
395 {
396 type_key_* cEnd = refs2 + bucket1[i];
397 type_key_* dEnd = refs1 + bucket1[i];
398
399 const unsigned log_k2 = ilog2_floor(bucket1[i]) - 1; // adaptive bucket size
400 const unsigned_type k2 = unsigned_type(1) << log_k2;
401 int_type* bucket2 = new int_type[k2];
402 const unsigned shift2 = shift1 - log_k2;
403
404 // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
405 l1sort(c, cEnd, d, bucket2, k2,
406 offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
407 shift2);
408
409 // write out all
410 for (type_key_* p = d; p < dEnd; p++)
411 out << (*(p->ptr));
412
413 delete[] bucket2;
414 c = cEnd;
415 d = dEnd;
416 }
417 // submit next read
418 const unsigned_type bucket2submit = k + 2;
419 if (bucket2submit < nbuckets)
420 {
421 nbucket_blocks = (unsigned_type)div_ceil(bucket_sizes[bucket2submit], block_type::size);
422 for (i = 0; i < nbucket_blocks; i++)
423 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
424 }
425
426 std::swap(blocks1, blocks2);
427 std::swap(reqs1, reqs2);
428 }
429
430 delete[] bucket1;
431 delete[] refs1;
432 delete[] refs2;
433 delete[] blocks1;
434 delete[] blocks2;
435 delete[] reqs1;
436 delete[] reqs2;
437 delete[] bucket_bids;
438 delete[] bucket_sizes;
439
440 if (last.block_offset())
441 {
442 // has to skip part of the first block
443 block_type* block = new block_type;
444 request_ptr req = block->read(*last.bid());
445 req->wait();
446
447 for (i = last.block_offset(); i < block_type::size; i++)
448 {
449 out << block->elem[i];
450 }
451 delete block;
452 }
453
454 end = timestamp();
455 }
456
457 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
458 dist_end - begin << " s");
459 STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
460 STXXL_VERBOSE(*stats::get_instance());
461 }
462
463 //! \}
464
465 STXXL_END_NAMESPACE
466
467 #endif // !STXXL_ALGO_STABLE_KSORT_HEADER
468