1 //  Copyright (C) 2011-2013 Tim Blechmann
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See
4 //  accompanying file LICENSE_1_0.txt or copy at
5 //  http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <boost/lockfree/spsc_queue.hpp>
8 #include <boost/thread.hpp>
9 
10 #define BOOST_TEST_MAIN
11 #ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12 #include <boost/test/included/unit_test.hpp>
13 #else
14 #include <boost/test/unit_test.hpp>
15 #endif
16 
17 #include <iostream>
18 #include <memory>
19 
20 #include "test_helpers.hpp"
21 #include "test_common.hpp"
22 
23 using namespace boost;
24 using namespace boost::lockfree;
25 using namespace std;
26 
27 #ifndef BOOST_LOCKFREE_STRESS_TEST
28 static const boost::uint32_t nodes_per_thread = 100000;
29 #else
30 static const boost::uint32_t nodes_per_thread = 100000000;
31 #endif
32 
33 struct spsc_queue_tester
34 {
35     spsc_queue<int, capacity<128> > sf;
36 
37     boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
38 
39     static_hashed_set<int, 1<<16 > working_set;
40 
spsc_queue_testerspsc_queue_tester41     spsc_queue_tester(void):
42         spsc_queue_cnt(0), received_nodes(0)
43     {}
44 
addspsc_queue_tester45     void add(void)
46     {
47         for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
48             int id = generate_id<int>();
49             working_set.insert(id);
50 
51             while (sf.push(id) == false)
52             {}
53 
54             ++spsc_queue_cnt;
55         }
56         running = false;
57     }
58 
get_elementspsc_queue_tester59     bool get_element(void)
60     {
61         int data;
62         bool success = sf.pop(data);
63 
64         if (success) {
65             ++received_nodes;
66             --spsc_queue_cnt;
67             bool erased = working_set.erase(data);
68             assert(erased);
69             return true;
70         } else
71             return false;
72     }
73 
74     boost::lockfree::detail::atomic<bool> running;
75 
getspsc_queue_tester76     void get(void)
77     {
78         for(;;) {
79             bool success = get_element();
80             if (!running && !success)
81                 break;
82         }
83 
84         while ( get_element() );
85     }
86 
runspsc_queue_tester87     void run(void)
88     {
89         running = true;
90 
91         BOOST_REQUIRE(sf.empty());
92 
93         boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
94         boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
95         cout << "reader and writer threads created" << endl;
96 
97         writer.join();
98         cout << "writer threads joined. waiting for readers to finish" << endl;
99 
100         reader.join();
101 
102         BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
103         BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
104         BOOST_REQUIRE(sf.empty());
105         BOOST_REQUIRE(working_set.count_nodes() == 0);
106     }
107 };
108 
BOOST_AUTO_TEST_CASE(spsc_queue_test_caching)109 BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
110 {
111     boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
112     test1->run();
113 }
114 
115 struct spsc_queue_tester_buffering
116 {
117     spsc_queue<int, capacity<128> > sf;
118 
119     boost::lockfree::detail::atomic<long> spsc_queue_cnt;
120 
121     static_hashed_set<int, 1<<16 > working_set;
122     boost::lockfree::detail::atomic<size_t> received_nodes;
123 
spsc_queue_tester_bufferingspsc_queue_tester_buffering124     spsc_queue_tester_buffering(void):
125         spsc_queue_cnt(0), received_nodes(0)
126     {}
127 
128     static const size_t buf_size = 5;
129 
addspsc_queue_tester_buffering130     void add(void)
131     {
132         boost::array<int, buf_size> input_buffer;
133         for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
134             for (size_t i = 0; i != buf_size; ++i) {
135                 int id = generate_id<int>();
136                 working_set.insert(id);
137                 input_buffer[i] = id;
138             }
139 
140             size_t pushed = 0;
141 
142             do {
143                 pushed += sf.push(input_buffer.c_array() + pushed,
144                                   input_buffer.size()    - pushed);
145             } while (pushed != buf_size);
146 
147             spsc_queue_cnt+=buf_size;
148         }
149         running = false;
150     }
151 
get_elementsspsc_queue_tester_buffering152     bool get_elements(void)
153     {
154         boost::array<int, buf_size> output_buffer;
155 
156         size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
157 
158         if (popd) {
159             received_nodes += popd;
160             spsc_queue_cnt -= popd;
161 
162             for (size_t i = 0; i != popd; ++i) {
163                 bool erased = working_set.erase(output_buffer[i]);
164                 assert(erased);
165             }
166 
167             return true;
168         } else
169             return false;
170     }
171 
172     boost::lockfree::detail::atomic<bool> running;
173 
getspsc_queue_tester_buffering174     void get(void)
175     {
176         for(;;) {
177             bool success = get_elements();
178             if (!running && !success)
179                 break;
180         }
181 
182         while ( get_elements() );
183     }
184 
runspsc_queue_tester_buffering185     void run(void)
186     {
187         running = true;
188 
189         boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
190         boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
191         cout << "reader and writer threads created" << endl;
192 
193         writer.join();
194         cout << "writer threads joined. waiting for readers to finish" << endl;
195 
196         reader.join();
197 
198         BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
199         BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
200         BOOST_REQUIRE(sf.empty());
201         BOOST_REQUIRE(working_set.count_nodes() == 0);
202     }
203 };
204 
205 
BOOST_AUTO_TEST_CASE(spsc_queue_test_buffering)206 BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
207 {
208     boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
209     test1->run();
210 }
211 
212