1 /***************************************************************************
2  *  tests/algo/test_parallel_sort.cpp
3  *
4  *  Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  *  Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de>
7  *  Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8  *
9  *  Distributed under the Boost Software License, Version 1.0.
10  *  (See accompanying file LICENSE_1_0.txt or copy at
11  *  http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 //! \example algo/test_parallel_sort.cpp
15 //! This is an example of how to use the parallelized sorting algorithm.
16 //! Setting all the parameters in optional, just compiling with parallel mode
17 //! suffices.
18 
19 #if !defined(STXXL_NOT_CONSIDER_SORT_MEMORY_OVERHEAD)
20 #define STXXL_NOT_CONSIDER_SORT_MEMORY_OVERHEAD 0
21 #endif
22 
23 #include <algorithm>
24 #include <functional>
25 #include <limits>
26 
27 #include <stxxl/vector>
28 #include <stxxl/stream>
29 #include <stxxl/scan>
30 #include <stxxl/sort>
31 
32 using stxxl::unsigned_type;
33 
34 const unsigned long long megabyte = 1024 * 1024;
35 
36 const int block_size = STXXL_DEFAULT_BLOCK_SIZE(my_type);
37 
38 #define RECORD_SIZE 20
39 #define MAGIC 123
40 
41 unsigned_type run_size;
42 unsigned_type buffer_size;
43 
44 struct my_type
45 {
46     typedef unsigned long long key_type;
47 
48     key_type m_key;
49     key_type m_load;
50     char m_data[RECORD_SIZE - 2 * sizeof(key_type)];
keymy_type51     key_type key() const { return m_key; }
52 
my_typemy_type53     my_type() { }
my_typemy_type54     my_type(key_type k) : m_key(k) { }
my_typemy_type55     my_type(key_type k, key_type l) : m_key(k), m_load(l) { }
56 
operator =my_type57     void operator = (const key_type& k) { m_key = k; }
operator =my_type58     void operator = (const my_type& mt)
59     {
60         m_key = mt.m_key;
61         m_load = mt.m_load;
62     }
63 };
64 
65 bool operator < (const my_type& a, const my_type& b);
66 
operator <(const my_type & a,const my_type & b)67 inline bool operator < (const my_type& a, const my_type& b)
68 {
69     return a.key() < b.key();
70 }
71 
operator ==(const my_type & a,const my_type & b)72 inline bool operator == (const my_type& a, const my_type& b)
73 {
74     return a.key() == b.key();
75 }
76 
operator <<(std::ostream & o,const my_type & obj)77 inline std::ostream& operator << (std::ostream& o, const my_type& obj)
78 {
79     o << obj.m_key << "/" << obj.m_load;
80     return o;
81 }
82 
83 struct cmp_less_key : public std::less<my_type>
84 {
min_valuecmp_less_key85     my_type min_value() const { return my_type(std::numeric_limits<my_type::key_type>::min(), MAGIC); }
max_valuecmp_less_key86     my_type max_value() const { return my_type(std::numeric_limits<my_type::key_type>::max(), MAGIC); }
87 };
88 
89 typedef stxxl::vector<my_type, 4, stxxl::lru_pager<8>, block_size, STXXL_DEFAULT_ALLOC_STRATEGY> vector_type;
90 
checksum(vector_type & input)91 unsigned_type checksum(vector_type& input)
92 {
93     unsigned_type sum = 0;
94     for (vector_type::const_iterator i = input.begin(); i != input.end(); ++i)
95         sum += (unsigned_type)((*i).m_key);
96     return sum;
97 }
98 
linear_sort_normal(vector_type & input)99 void linear_sort_normal(vector_type& input)
100 {
101     unsigned_type sum1 = checksum(input);
102 
103     stxxl::stats_data stats_begin(*stxxl::stats::get_instance());
104     double start = stxxl::timestamp();
105 
106     stxxl::sort(input.begin(), input.end(), cmp_less_key(), run_size);
107 
108     double stop = stxxl::timestamp();
109     std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin;
110 
111     unsigned_type sum2 = checksum(input);
112 
113     std::cout << sum1 << " ?= " << sum2 << std::endl;
114 
115     STXXL_CHECK(stxxl::is_sorted<vector_type::const_iterator>(input.begin(), input.end()));
116 
117     std::cout << "Linear sorting normal took " << (stop - start) << " seconds." << std::endl;
118 }
119 
linear_sort_streamed(vector_type & input,vector_type & output)120 void linear_sort_streamed(vector_type& input, vector_type& output)
121 {
122     unsigned_type sum1 = checksum(input);
123 
124     stxxl::stats_data stats_begin(*stxxl::stats::get_instance());
125     double start = stxxl::timestamp();
126 
127     typedef stxxl::stream::streamify_traits<vector_type::iterator>::stream_type input_stream_type;
128 
129     input_stream_type input_stream = stxxl::stream::streamify(input.begin(), input.end());
130 
131     typedef cmp_less_key comparator_type;
132     comparator_type cl;
133 
134     typedef stxxl::stream::sort<input_stream_type, comparator_type, block_size> sort_stream_type;
135 
136     sort_stream_type sort_stream(input_stream, cl, run_size);
137 
138     vector_type::iterator o = stxxl::stream::materialize(sort_stream, output.begin(), output.end());
139     STXXL_CHECK(o == output.end());
140 
141     double stop = stxxl::timestamp();
142     std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin;
143 
144     unsigned_type sum2 = checksum(output);
145 
146     std::cout << sum1 << " ?= " << sum2 << std::endl;
147     if (sum1 != sum2)
148         STXXL_MSG("WRONG DATA");
149 
150     STXXL_CHECK(stxxl::is_sorted<vector_type::const_iterator>(output.begin(), output.end(), comparator_type()));
151 
152     std::cout << "Linear sorting streamed took " << (stop - start) << " seconds." << std::endl;
153 }
154 
main(int argc,const char ** argv)155 int main(int argc, const char** argv)
156 {
157     if (argc < 6) {
158         std::cout << "Usage: " << argv[0] << " [n in MiB] [p threads] [M in MiB] [sorting algorithm: m | q | qb | s] [merging algorithm: p | s | n]" << std::endl;
159         return -1;
160     }
161 
162     stxxl::config::get_instance();
163 
164 #if STXXL_PARALLEL_MULTIWAY_MERGE
165     STXXL_MSG("STXXL_PARALLEL_MULTIWAY_MERGE");
166 #endif
167     unsigned long megabytes_to_process = atoi(argv[1]);
168     int p = atoi(argv[2]);
169     unsigned_type memory_to_use = (unsigned_type)(atoi(argv[3]) * megabyte);
170     run_size = memory_to_use;
171     buffer_size = memory_to_use / 16;
172 #ifdef STXXL_PARALLEL_MODE
173     omp_set_num_threads(p);
174     __gnu_parallel::_Settings parallel_settings(__gnu_parallel::_Settings::get());
175 
176     parallel_settings.merge_splitting = __gnu_parallel::EXACT;
177     parallel_settings.merge_minimal_n = 10000;
178     parallel_settings.merge_oversampling = 10;
179 
180     parallel_settings.multiway_merge_algorithm = __gnu_parallel::LOSER_TREE;
181     parallel_settings.multiway_merge_splitting = __gnu_parallel::EXACT;
182     parallel_settings.multiway_merge_oversampling = 10;
183     parallel_settings.multiway_merge_minimal_n = 10000;
184     parallel_settings.multiway_merge_minimal_k = 2;
185     if (!strcmp(argv[4], "q"))                  //quicksort
186         parallel_settings.sort_algorithm = __gnu_parallel::QS;
187     else if (!strcmp(argv[4], "qb"))            //balanced quicksort
188         parallel_settings.sort_algorithm = __gnu_parallel::QS_BALANCED;
189     else if (!strcmp(argv[4], "m"))             //merge sort
190         parallel_settings.sort_algorithm = __gnu_parallel::MWMS;
191     else /*if(!strcmp(argv[4], "s"))*/          //sequential (default)
192     {
193         parallel_settings.sort_algorithm = __gnu_parallel::QS;
194         parallel_settings.sort_minimal_n = memory_to_use;
195     }
196 
197     if (!strcmp(argv[5], "p"))          //parallel
198     {
199         stxxl::SETTINGS::native_merge = false;
200         //parallel_settings.multiway_merge_minimal_n = 1024;	//leave as default
201     }
202     else if (!strcmp(argv[5], "s"))                                             //sequential
203     {
204         stxxl::SETTINGS::native_merge = false;
205         parallel_settings.multiway_merge_minimal_n = memory_to_use;             //too much to be called
206     }
207     else /*if(!strcmp(argv[5], "n"))*/                                          //native (default)
208         stxxl::SETTINGS::native_merge = true;
209 
210     parallel_settings.multiway_merge_minimal_k = 2;
211 
212     __gnu_parallel::_Settings::set(parallel_settings);
213     STXXL_CHECK(&__gnu_parallel::_Settings::get() != &parallel_settings);
214 
215     if (0)
216         printf("%d %p: mwms %d, q %d, qb %d",
217                __gnu_parallel::_Settings::get().sort_algorithm,
218                (void*)&__gnu_parallel::_Settings::get().sort_algorithm,
219                __gnu_parallel::MWMS,
220                __gnu_parallel::QS,
221                __gnu_parallel::QS_BALANCED);
222 #endif
223 
224     std::cout << "Sorting " << megabytes_to_process << " MiB of data ("
225               << (megabytes_to_process * megabyte / sizeof(my_type)) << " elements) using "
226               << (memory_to_use / megabyte) << " MiB of internal memory and "
227               << p << " thread(s), block size "
228               << block_size << ", element size " << sizeof(my_type) << std::endl;
229 
230     const stxxl::int64 n_records =
231         stxxl::int64(megabytes_to_process) * stxxl::int64(megabyte) / sizeof(my_type);
232     vector_type input(n_records);
233 
234     stxxl::stats_data stats_begin(*stxxl::stats::get_instance());
235     double generate_start = stxxl::timestamp();
236 
237     stxxl::generate(input.begin(), input.end(), stxxl::random_number64(), memory_to_use / STXXL_DEFAULT_BLOCK_SIZE(my_type));
238 
239     double generate_stop = stxxl::timestamp();
240     std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin;
241 
242     std::cout << "Generating took " << (generate_stop - generate_start) << " seconds." << std::endl;
243 
244     STXXL_CHECK(!stxxl::is_sorted<vector_type::const_iterator>(input.begin(), input.end()));
245 
246     {
247         vector_type output(n_records);
248 
249         linear_sort_streamed(input, output);
250         linear_sort_normal(input);
251     }
252 
253     return 0;
254 }
255 // vim: et:ts=4:sw=4
256