1 // Copyright (C) 2011 Tim Blechmann 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See 4 // accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 7 #include <cassert> 8 #include "test_helpers.hpp" 9 10 #include <boost/array.hpp> 11 #include <boost/thread.hpp> 12 13 namespace impl { 14 15 using boost::array; 16 using namespace boost; 17 using namespace std; 18 19 template <bool Bounded = false> 20 struct queue_stress_tester 21 { 22 static const unsigned int buckets = 1<<13; 23 #ifndef BOOST_LOCKFREE_STRESS_TEST 24 static const long node_count = 5000; 25 #else 26 static const long node_count = 500000; 27 #endif 28 const int reader_threads; 29 const int writer_threads; 30 31 boost::lockfree::detail::atomic<int> writers_finished; 32 33 static_hashed_set<long, buckets> data; 34 static_hashed_set<long, buckets> dequeued; 35 array<std::set<long>, buckets> returned; 36 37 boost::lockfree::detail::atomic<int> push_count, pop_count; 38 queue_stress_testerimpl::queue_stress_tester39 queue_stress_tester(int reader, int writer): 40 reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0) 41 {} 42 43 template <typename queue> add_itemsimpl::queue_stress_tester44 void add_items(queue & stk) 45 { 46 for (long i = 0; i != node_count; ++i) { 47 long id = generate_id<long>(); 48 49 bool inserted = data.insert(id); 50 assert(inserted); 51 52 if (Bounded) 53 while(stk.bounded_push(id) == false) 54 /*thread::yield()*/; 55 else 56 while(stk.push(id) == false) 57 /*thread::yield()*/; 58 ++push_count; 59 } 60 writers_finished += 1; 61 } 62 63 boost::lockfree::detail::atomic<bool> running; 64 65 template <typename queue> consume_elementimpl::queue_stress_tester66 bool consume_element(queue & q) 67 { 68 long id; 69 bool ret = q.pop(id); 70 71 if (!ret) 72 return false; 73 74 bool erased = data.erase(id); 75 bool inserted = dequeued.insert(id); 76 assert(erased); 77 assert(inserted); 78 ++pop_count; 79 return true; 80 } 81 82 template <typename queue> get_itemsimpl::queue_stress_tester83 void get_items(queue & q) 84 { 85 for (;;) { 86 bool received_element = consume_element(q); 87 if (received_element) 88 continue; 89 90 if ( writers_finished.load() == writer_threads ) 91 break; 92 } 93 94 while (consume_element(q)); 95 } 96 97 template <typename queue> runimpl::queue_stress_tester98 void run(queue & stk) 99 { 100 BOOST_WARN(stk.is_lock_free()); 101 writers_finished.store(0); 102 103 thread_group writer; 104 thread_group reader; 105 106 BOOST_REQUIRE(stk.empty()); 107 108 for (int i = 0; i != reader_threads; ++i) 109 reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk))); 110 111 for (int i = 0; i != writer_threads; ++i) 112 writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk))); 113 114 using namespace std; 115 cout << "threads created" << endl; 116 117 writer.join_all(); 118 119 cout << "writer threads joined, waiting for readers" << endl; 120 121 reader.join_all(); 122 123 cout << "reader threads joined" << endl; 124 125 BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0); 126 BOOST_REQUIRE(stk.empty()); 127 128 BOOST_REQUIRE_EQUAL(push_count, pop_count); 129 BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count); 130 } 131 }; 132 133 } 134 135 using impl::queue_stress_tester; 136