1 //  Copyright (C) 2011 Tim Blechmann
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See
4 //  accompanying file LICENSE_1_0.txt or copy at
5 //  http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <cassert>
8 #include "test_helpers.hpp"
9 
10 #include <boost/array.hpp>
11 #include <boost/thread.hpp>
12 
13 namespace impl {
14 
15 using boost::array;
16 using namespace boost;
17 using namespace std;
18 
19 template <bool Bounded = false>
20 struct queue_stress_tester
21 {
22     static const unsigned int buckets = 1<<13;
23 #ifndef BOOST_LOCKFREE_STRESS_TEST
24     static const long node_count =  5000;
25 #else
26     static const long node_count = 500000;
27 #endif
28     const int reader_threads;
29     const int writer_threads;
30 
31     boost::lockfree::detail::atomic<int> writers_finished;
32 
33     static_hashed_set<long, buckets> data;
34     static_hashed_set<long, buckets> dequeued;
35     array<std::set<long>, buckets> returned;
36 
37     boost::lockfree::detail::atomic<int> push_count, pop_count;
38 
queue_stress_testerimpl::queue_stress_tester39     queue_stress_tester(int reader, int writer):
40         reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0)
41     {}
42 
43     template <typename queue>
add_itemsimpl::queue_stress_tester44     void add_items(queue & stk)
45     {
46         for (long i = 0; i != node_count; ++i) {
47             long id = generate_id<long>();
48 
49             bool inserted = data.insert(id);
50             assert(inserted);
51 
52             if (Bounded)
53                 while(stk.bounded_push(id) == false)
54                     /*thread::yield()*/;
55             else
56                 while(stk.push(id) == false)
57                     /*thread::yield()*/;
58             ++push_count;
59         }
60         writers_finished += 1;
61     }
62 
63     boost::lockfree::detail::atomic<bool> running;
64 
65     template <typename queue>
consume_elementimpl::queue_stress_tester66     bool consume_element(queue & q)
67     {
68         long id;
69         bool ret = q.pop(id);
70 
71         if (!ret)
72             return false;
73 
74         bool erased = data.erase(id);
75         bool inserted = dequeued.insert(id);
76         assert(erased);
77         assert(inserted);
78         ++pop_count;
79         return true;
80     }
81 
82     template <typename queue>
get_itemsimpl::queue_stress_tester83     void get_items(queue & q)
84     {
85         for (;;) {
86             bool received_element = consume_element(q);
87             if (received_element)
88                 continue;
89 
90             if ( writers_finished.load() == writer_threads )
91                 break;
92         }
93 
94         while (consume_element(q));
95     }
96 
97     template <typename queue>
runimpl::queue_stress_tester98     void run(queue & stk)
99     {
100         BOOST_WARN(stk.is_lock_free());
101         writers_finished.store(0);
102 
103         thread_group writer;
104         thread_group reader;
105 
106         BOOST_REQUIRE(stk.empty());
107 
108         for (int i = 0; i != reader_threads; ++i)
109             reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk)));
110 
111         for (int i = 0; i != writer_threads; ++i)
112             writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk)));
113 
114         using namespace std;
115         cout << "threads created" << endl;
116 
117         writer.join_all();
118 
119         cout << "writer threads joined, waiting for readers" << endl;
120 
121         reader.join_all();
122 
123         cout << "reader threads joined" << endl;
124 
125         BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0);
126         BOOST_REQUIRE(stk.empty());
127 
128         BOOST_REQUIRE_EQUAL(push_count, pop_count);
129         BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count);
130     }
131 };
132 
133 }
134 
135 using impl::queue_stress_tester;
136