1 /***************************************************************************
2  *  tools/benchmarks/monotonic_pq.cpp
3  *
4  *  Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  *  Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *  Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de>
8  *  Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
9  *
10  *  Distributed under the Boost Software License, Version 1.0.
11  *  (See accompanying file LICENSE_1_0.txt or copy at
12  *  http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #include <queue>
16 #include <limits>
17 
18 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
19 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
20 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
21 
22 #define TINY_PQ 0
23 #define MANUAL_PQ 0
24 
25 #define SIDE_PQ 1       // compare with second, in-memory PQ (needs a lot of memory)
26 
27 #include <stxxl/priority_queue>
28 #include <stxxl/stats>
29 #include <stxxl/timer>
30 
31 const stxxl::unsigned_type mega = 1024 * 1024;
32 
33 #define RECORD_SIZE 16
34 #define LOAD 0
35 
36 typedef stxxl::uint64 my_key_type;
37 
38 #define MAGIC 123
39 
40 struct my_type
41 {
42     typedef my_key_type key_type;
43 
44     key_type key;
45 #if LOAD
46     key_type load;
47     char data[RECORD_SIZE - 2 * sizeof(key_type)];
48 #else
49     char data[RECORD_SIZE - sizeof(key_type)];
50 #endif
51 
my_typemy_type52     my_type() { }
my_typemy_type53     my_type(key_type k) : key(k) { }
54 #if LOAD
my_typemy_type55     my_type(key_type k, key_type l) : key(k), load(l) { }
56 #endif
57 
operator =my_type58     void operator = (const key_type& k) { key = k; }
59 #if LOAD
operator =my_type60     void operator = (const my_type& mt)
61     {
62         key = mt.key;
63         load = mt.load;
64     }
operator ==my_type65     bool operator == (const my_type& mt) { return (key == mt.key) && (load = mt.load); }
66 #else
operator =my_type67     void operator = (const my_type& mt) { key = mt.key; }
operator ==my_type68     bool operator == (const my_type& mt) { return key == mt.key; }
69 #endif
70 };
71 
operator <<(std::ostream & o,const my_type & obj)72 std::ostream& operator << (std::ostream& o, const my_type& obj)
73 {
74     o << obj.key;
75 #if LOAD
76     o << "/" << obj.load;
77 #endif
78     return o;
79 }
80 
81 //STXXL priority queue is a _maximum_ PQ. "Greater" comparator makes this a "minimum" PQ again.
82 
83 struct my_cmp /*: public std::binary_function<my_type, my_type, bool>*/ // greater
84 {
85     typedef my_type first_argument_type;
86     typedef my_type second_argument_type;
87     typedef bool result_type;
88 
operator ()my_cmp89     bool operator () (const my_type& a, const my_type& b) const
90     {
91         return a.key > b.key;
92     }
93 
min_valuemy_cmp94     my_type min_value() const
95     {
96 #if LOAD
97         return my_type(std::numeric_limits<my_type::key_type>::max(), MAGIC);
98 #else
99         return my_type(std::numeric_limits<my_type::key_type>::max());
100 #endif
101     }
max_valuemy_cmp102     my_type max_value() const
103     {
104 #if LOAD
105         return my_type(std::numeric_limits<my_type::key_type>::min(), MAGIC);
106 #else
107         return my_type(std::numeric_limits<my_type::key_type>::min());
108 #endif
109     }
110 };
111 
main(int argc,char * argv[])112 int main(int argc, char* argv[])
113 {
114     if (argc < 3)
115     {
116         std::cout << "Usage: " << argv[0] << " [n in MiB]"
117             #if defined(STXXL_PARALLEL)
118                   << " [p threads]"
119             #endif
120                   << std::endl;
121         return -1;
122     }
123 
124     STXXL_MSG("----------------------------------------");
125 
126     stxxl::config::get_instance();
127     std::string Flags = std::string("")
128 #if STXXL_CHECK_ORDER_IN_SORTS
129                         + " STXXL_CHECK_ORDER_IN_SORTS"
130 #endif
131 #ifdef NDEBUG
132                         + " NDEBUG"
133 #endif
134 #if TINY_PQ
135                         + " TINY_PQ"
136 #endif
137 #if MANUAL_PQ
138                         + " MANUAL_PQ"
139 #endif
140 #if SIDE_PQ
141                         + " SIDE_PQ"
142 #endif
143 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
144                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL"
145 #endif
146 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
147                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL"
148 #endif
149 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
150                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER"
151 #endif
152     ;
153     STXXL_MSG("Flags:" << Flags);
154 
155     unsigned long megabytes = atoi(argv[1]);
156 #if defined(STXXL_PARALLEL_MODE)
157     int num_threads = atoi(argv[2]);
158     STXXL_MSG("Threads: " << num_threads);
159 
160     omp_set_num_threads(num_threads);
161     __gnu_parallel::_Settings parallel_settings(__gnu_parallel::_Settings::get());
162     parallel_settings.sort_algorithm = __gnu_parallel::QS_BALANCED;
163     parallel_settings.sort_splitting = __gnu_parallel::SAMPLING;
164     parallel_settings.sort_minimal_n = 1000;
165     parallel_settings.sort_mwms_oversampling = 10;
166 
167     parallel_settings.merge_splitting = __gnu_parallel::SAMPLING;
168     parallel_settings.merge_minimal_n = 1000;
169     parallel_settings.merge_oversampling = 10;
170 
171     parallel_settings.multiway_merge_algorithm = __gnu_parallel::LOSER_TREE;
172     parallel_settings.multiway_merge_splitting = __gnu_parallel::EXACT;
173     parallel_settings.multiway_merge_oversampling = 10;
174     parallel_settings.multiway_merge_minimal_n = 1000;
175     parallel_settings.multiway_merge_minimal_k = 2;
176     __gnu_parallel::_Settings::set(parallel_settings);
177 #endif
178 
179     const stxxl::unsigned_type mem_for_queue = 512 * mega;
180     const stxxl::unsigned_type mem_for_pools = 512 * mega;
181 
182 #if TINY_PQ
183     stxxl::STXXL_UNUSED(mem_for_queue);
184     const unsigned BufferSize1 = 32;               // equalize procedure call overheads etc.
185     const unsigned N = (1 << 9) / sizeof(my_type); // minimal sequence length
186     const unsigned IntKMAX = 8;                    // maximal arity for internal mergersq
187     const unsigned IntLevels = 2;                  // number of internal levels
188     const unsigned BlockSize = (4 * mega);
189     const unsigned ExtKMAX = 8;                    // maximal arity for external mergers
190     const unsigned ExtLevels = 2;                  // number of external levels
191     typedef stxxl::priority_queue<
192             stxxl::priority_queue_config<
193                 my_type,
194                 my_cmp,
195                 BufferSize1,
196                 N,
197                 IntKMAX,
198                 IntLevels,
199                 BlockSize,
200                 ExtKMAX,
201                 ExtLevels
202                 >
203             > pq_type;
204 #elif MANUAL_PQ
205     stxxl::STXXL_UNUSED(mem_for_queue);
206     const unsigned BufferSize1 = 32;                    // equalize procedure call overheads etc.
207     const unsigned N = (1 << 20) / sizeof(my_type);     // minimal sequence length
208     const unsigned IntKMAX = 16;                        // maximal arity for internal mergersq
209     const unsigned IntLevels = 2;                       // number of internal levels
210     const unsigned BlockSize = (4 * mega);
211     const unsigned ExtKMAX = 32;                        // maximal arity for external mergers
212     const unsigned ExtLevels = 2;                       // number of external levels
213     typedef stxxl::priority_queue<
214             stxxl::priority_queue_config<
215                 my_type,
216                 my_cmp,
217                 BufferSize1,
218                 N,
219                 IntKMAX,
220                 IntLevels,
221                 BlockSize,
222                 ExtKMAX,
223                 ExtLevels
224                 >
225             > pq_type;
226 #else
227     const stxxl::uint64 volume = stxxl::uint64(200000) * mega;     // in bytes
228     typedef stxxl::PRIORITY_QUEUE_GENERATOR<my_type, my_cmp, mem_for_queue, volume / sizeof(my_type) / 1024 + 1> gen;
229     typedef gen::result pq_type;
230 //         BufferSize1 = Config::BufferSize1,
231 //         N = Config::N,
232 //         IntKMAX = Config::IntKMAX,
233 //         IntLevels = Config::IntLevels,
234 //         ExtLevels = Config::ExtLevels,
235 //         Levels = Config::IntLevels + Config::ExtLevels,
236 //         BlockSize = Config::BlockSize,
237 //         ExtKMAX = Config::ExtKMAX
238 
239 /*  STXXL_MSG ( "Blocks fitting into internal memory m: "<<gen::m );
240   STXXL_MSG ( "X : "<<gen::X );  //maximum number of internal elements //X = B * (settings::k - m) / settings::E,
241   STXXL_MSG ( "Expected internal memory consumption: "<< (gen::EConsumption / 1048576) << " MiB");*/
242 #endif
243     STXXL_MSG("Internal arity: " << pq_type::IntKMAX);
244     STXXL_MSG("N : " << pq_type::N); //X / (AI * AI)
245     STXXL_MSG("External arity: " << pq_type::ExtKMAX);
246     STXXL_MSG("Block size B: " << pq_type::BlockSize);
247     //EConsumption = X * settings::E + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
248 
249     STXXL_MSG("Data type size: " << sizeof(my_type));
250     STXXL_MSG("");
251 
252     stxxl::stats_data sd_start(*stxxl::stats::get_instance());
253     stxxl::timer Timer;
254     Timer.start();
255 
256     pq_type p(mem_for_pools / 2, mem_for_pools / 2);
257     stxxl::int64 nelements = stxxl::int64(megabytes * mega / sizeof(my_type)), i;
258 
259     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
260     STXXL_MSG("Peak number of elements (n): " << nelements);
261     STXXL_MSG("Max number of elements to contain: " << (stxxl::uint64(pq_type::N) * pq_type::IntKMAX * pq_type::IntKMAX * pq_type::ExtKMAX * pq_type::ExtKMAX));
262     srand(5);
263     my_cmp cmp;
264     my_key_type r, sum_input = 0, sum_output = 0;
265     my_type least(0), last_least(0);
266 
267     const my_key_type modulo = 0x10000000;
268 
269 #if SIDE_PQ
270     std::priority_queue<my_type, std::vector<my_type>, my_cmp> side_pq;
271 #endif
272 
273     my_type side_pq_least;
274 
275     STXXL_MSG("op-sequence(monotonic pq): ( push, pop, push ) * n");
276     for (i = 0; i < nelements; ++i)
277     {
278         if ((i % mega) == 0)
279             STXXL_MSG(
280                 std::fixed << std::setprecision(2) << std::setw(5)
281                            << (100.0 * (double)i / (double)nelements) << "% "
282                            << "Inserting element " << i << " top() == " << least.key << " @ "
283                            << std::setprecision(3) << Timer.seconds() << " s"
284                            << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield));
285 
286         //monotone priority queue
287         r = least.key + rand() % modulo;
288         sum_input += r;
289         p.push(my_type(r));
290 #if SIDE_PQ
291         side_pq.push(my_type(r));
292 #endif
293 
294         least = p.top();
295         sum_output += least.key;
296         p.pop();
297 #if SIDE_PQ
298         side_pq_least = side_pq.top();
299         side_pq.pop();
300         if (!(side_pq_least == least))
301             STXXL_MSG("Wrong result at  " << i << "  " << side_pq_least.key << " != " << least.key);
302 #endif
303 
304         if (cmp(last_least, least))
305         {
306             STXXL_MSG("Wrong order at  " << i << "  " << last_least.key << " > " << least.key);
307         }
308         else
309             last_least = least;
310 
311         r = least.key + rand() % modulo;
312         sum_input += r;
313         p.push(my_type(r));
314 #if SIDE_PQ
315         side_pq.push(my_type(r));
316 #endif
317     }
318     Timer.stop();
319     STXXL_MSG("Time spent for filling: " << Timer.seconds() << " s");
320 
321     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
322     stxxl::stats_data sd_middle(*stxxl::stats::get_instance());
323     std::cout << sd_middle - sd_start;
324     Timer.reset();
325     Timer.start();
326 
327     STXXL_MSG("op-sequence(monotonic pq): ( pop, push, pop ) * n");
328     for (i = 0; i < (nelements); ++i)
329     {
330         assert(!p.empty());
331 
332         least = p.top();
333         sum_output += least.key;
334         p.pop();
335 #if SIDE_PQ
336         side_pq_least = side_pq.top();
337         side_pq.pop();
338         if (!(side_pq_least == least))
339         {
340             STXXL_VERBOSE1("" << side_pq_least << " != " << least);
341         }
342 #endif
343         if (cmp(last_least, least))
344         {
345             STXXL_MSG("Wrong result at " << i << "  " << last_least.key << " > " << least.key);
346         }
347         else
348             last_least = least;
349 
350         r = least.key + rand() % modulo;
351         sum_input += r;
352         p.push(my_type(r));
353 #if SIDE_PQ
354         side_pq.push(my_type(r));
355 #endif
356 
357         least = p.top();
358         sum_output += least.key;
359         p.pop();
360 #if SIDE_PQ
361         side_pq_least = side_pq.top();
362         side_pq.pop();
363         if (!(side_pq_least == least))
364         {
365             STXXL_VERBOSE1("" << side_pq_least << " != " << least);
366         }
367 #endif
368         if (cmp(last_least, least))
369         {
370             STXXL_MSG("Wrong result at " << i << "  " << last_least.key << " > " << least.key);
371         }
372         else
373             last_least = least;
374 
375         if ((i % mega) == 0)
376             STXXL_MSG(
377                 std::fixed << std::setprecision(2) << std::setw(5)
378                            << (100.0 * (double)i / (double)nelements) << "% "
379                            << "Popped element " << i << " == " << least.key << " @ "
380                            << std::setprecision(3) << Timer.seconds() << " s"
381                            << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield));
382     }
383     STXXL_MSG("Last element " << i << " popped");
384     Timer.stop();
385 
386     if (sum_input != sum_output)
387         STXXL_MSG("WRONG sum! " << sum_input << " - " << sum_output << " = " << (sum_output - sum_input) << " / " << (sum_input - sum_output));
388 
389     STXXL_MSG("Time spent for removing elements: " << Timer.seconds() << " s");
390     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
391     std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - sd_middle;
392     std::cout << *stxxl::stats::get_instance();
393 
394     assert(sum_input == sum_output);
395 }
396