1 /*
2   Copyright (c) 2018, 2020, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23 */
24 
25 #include <algorithm>
26 #include <thread>
27 
28 #include "gtest/gtest.h"
29 
30 #include "mysql/harness/mpmc_queue.h"
31 #include "mysql/harness/mpsc_queue.h"
32 
33 template <typename T>
34 class TestProducerConsumerQueue : public ::testing::Test {};
35 
36 using ProducerConsumerQueueTypes =
37     ::testing::Types<mysql_harness::WaitingMPMCQueue<int>,
38                      mysql_harness::WaitingMPSCQueue<int>>;
39 TYPED_TEST_SUITE(TestProducerConsumerQueue, ProducerConsumerQueueTypes);
40 
41 /**
42  * @test
43  *       ensure a simple push doesn't block
44  */
TYPED_TEST(TestProducerConsumerQueue,spsc_push)45 TYPED_TEST(TestProducerConsumerQueue, spsc_push) {
46   TypeParam q;
47 
48   q.push(1);
49 }
50 
51 /**
52  * @test
53  *       ensure a pop() returns the value that got pushed
54  */
TYPED_TEST(TestProducerConsumerQueue,spsc_pop)55 TYPED_TEST(TestProducerConsumerQueue, spsc_pop) {
56   TypeParam q;
57 
58   q.push(1);
59 
60   EXPECT_EQ(q.pop(), 1);
61 }
62 
63 /**
64  * @test
65  *       ensure try_pop doesn't block on empty queue
66  */
TYPED_TEST(TestProducerConsumerQueue,spsc_try_pop)67 TYPED_TEST(TestProducerConsumerQueue, spsc_try_pop) {
68   TypeParam q;
69 
70   q.push(1);
71 
72   int item = 0;
73   EXPECT_EQ(q.try_pop(item), true);
74   EXPECT_EQ(item, 1);
75 
76   SCOPED_TRACE("// queue is empty, item shouldn't change");
77   item = 0;
78   EXPECT_EQ(q.try_pop(item), false);
79   EXPECT_EQ(item, 0);
80 }
81 
82 class TestProducerConsumerQueueP
83     : public ::testing::TestWithParam<std::tuple<unsigned int, unsigned int>> {
84 };
85 
86 /**
87  * @test
88  *       ensure concurrent pop/push don't trash the queue
89  */
TEST_P(TestProducerConsumerQueueP,mpmc)90 TEST_P(TestProducerConsumerQueueP, mpmc) {
91   mysql_harness::WaitingMPMCQueue<int> q;
92   const unsigned int total_rounds = 16 * 1024;
93 
94   const unsigned int num_producers = std::get<0>(GetParam());
95   unsigned int rounds_for_consumers = total_rounds;
96   const unsigned int rounds_per_producer = total_rounds / num_producers;
97 
98   const unsigned int num_consumers = std::get<1>(GetParam());
99   unsigned int rounds_for_producers = total_rounds;
100   const unsigned int rounds_per_consumer = total_rounds / num_consumers;
101 
102   std::vector<std::thread> consumers;
103   std::vector<std::thread> producers;
104 
105   for (unsigned int i = 0; i < num_consumers; i++) {
106     unsigned int rounds_this_consumer =
107         std::min(rounds_per_consumer, rounds_for_consumers);
108 
109     consumers.emplace_back(
110         [&q](unsigned int rounds) {
111           while (rounds-- > 0) {
112             EXPECT_EQ(q.pop(), 42);
113           }
114         },
115         rounds_this_consumer);
116 
117     rounds_for_consumers -= rounds_this_consumer;
118   }
119 
120   for (unsigned int i = 0; i < num_producers; i++) {
121     unsigned int rounds_this_producer =
122         std::min(rounds_per_producer, rounds_for_producers);
123 
124     producers.emplace_back(
125         [&q](unsigned int rounds) {
126           while (rounds-- > 0) {
127             q.push(42);
128           }
129         },
130         rounds_this_producer);
131 
132     rounds_for_producers -= rounds_this_producer;
133   }
134 
135   // wait for all threads to shutdown
136   for (auto &producer : producers) {
137     producer.join();
138   }
139 
140   // ... and all consumers
141   for (auto &consumer : consumers) {
142     consumer.join();
143   }
144 
145   // the queue should be empty
146   int last_item = 0;
147 
148   EXPECT_EQ(q.try_pop(last_item), false);
149 }
150 
151 // ::testing::Combine() would be nice, but doesn't work with sun-cc
152 INSTANTIATE_TEST_SUITE_P(
153     ManyToMany, TestProducerConsumerQueueP,
154     ::testing::Values(
155         std::make_tuple(1, 1), std::make_tuple(1, 2), std::make_tuple(1, 4),
156         std::make_tuple(1, 8), std::make_tuple(1, 16), std::make_tuple(2, 1),
157         std::make_tuple(2, 2), std::make_tuple(2, 4), std::make_tuple(2, 8),
158         std::make_tuple(2, 16), std::make_tuple(4, 1), std::make_tuple(4, 2),
159         std::make_tuple(4, 4), std::make_tuple(4, 8), std::make_tuple(4, 16),
160         std::make_tuple(8, 1), std::make_tuple(8, 2), std::make_tuple(8, 4),
161         std::make_tuple(8, 8), std::make_tuple(8, 16), std::make_tuple(16, 1),
162         std::make_tuple(16, 2), std::make_tuple(16, 4), std::make_tuple(16, 8),
163         std::make_tuple(16, 16)),
164     [](testing::TestParamInfo<std::tuple<unsigned int, unsigned int>> p)
__anona34c51fc0302(testing::TestParamInfo<std::tuple<unsigned int, unsigned int>> p) 165         -> std::string {
166       return "p" + std::to_string(std::get<0>(p.param)) + "_" + "c" +
167              std::to_string(std::get<1>(p.param));
168     });
169 
170 class TestProducerConsumerQueueSCP
171     : public ::testing::TestWithParam<std::tuple<unsigned int, unsigned int>> {
172 };
173 
174 /**
175  * @test
176  *       ensure concurrent pop/push don't trash the queue
177  */
TEST_P(TestProducerConsumerQueueSCP,mpsc)178 TEST_P(TestProducerConsumerQueueSCP, mpsc) {
179   mysql_harness::WaitingMPSCQueue<int> q;
180   const unsigned int total_rounds = 16 * 1024;
181 
182   const unsigned int num_producers = std::get<0>(GetParam());
183   unsigned int rounds_for_consumers = total_rounds;
184   const unsigned int rounds_per_producer = total_rounds / num_producers;
185 
186   const unsigned int num_consumers = std::get<1>(GetParam());
187   unsigned int rounds_for_producers = total_rounds;
188   const unsigned int rounds_per_consumer = total_rounds / num_consumers;
189 
190   std::vector<std::thread> consumers;
191   std::vector<std::thread> producers;
192 
193   for (unsigned int i = 0; i < num_consumers; i++) {
194     unsigned int rounds_this_consumer =
195         std::min(rounds_per_consumer, rounds_for_consumers);
196 
197     consumers.emplace_back(
198         [&q](unsigned int rounds) {
199           while (rounds-- > 0) {
200             EXPECT_EQ(q.pop(), 42);
201           }
202         },
203         rounds_this_consumer);
204 
205     rounds_for_consumers -= rounds_this_consumer;
206   }
207 
208   for (unsigned int i = 0; i < num_producers; i++) {
209     unsigned int rounds_this_producer =
210         std::min(rounds_per_producer, rounds_for_producers);
211 
212     producers.emplace_back(
213         [&q](unsigned int rounds) {
214           while (rounds-- > 0) {
215             q.push(42);
216           }
217         },
218         rounds_this_producer);
219 
220     rounds_for_producers -= rounds_this_producer;
221   }
222 
223   // wait for all threads to shutdown
224   for (auto &producer : producers) {
225     producer.join();
226   }
227 
228   // ... and all consumers
229   for (auto &consumer : consumers) {
230     consumer.join();
231   }
232 
233   // the queue should be empty
234   int last_item = 0;
235 
236   EXPECT_EQ(q.try_pop(last_item), false);
237 }
238 
239 // ::testing::Combine() would be nice, but doesn't work with sun-cc
240 INSTANTIATE_TEST_SUITE_P(
241     ManyToSingle, TestProducerConsumerQueueSCP,
242     ::testing::Values(std::make_tuple(1, 1), std::make_tuple(2, 1),
243                       std::make_tuple(4, 1), std::make_tuple(8, 1),
244                       std::make_tuple(16, 1)),
245     [](testing::TestParamInfo<std::tuple<unsigned int, unsigned int>> p)
__anona34c51fc0602(testing::TestParamInfo<std::tuple<unsigned int, unsigned int>> p) 246         -> std::string {
247       return "p" + std::to_string(std::get<0>(p.param)) + "_" + "c" +
248              std::to_string(std::get<1>(p.param));
249     });
250 
main(int argc,char * argv[])251 int main(int argc, char *argv[]) {
252   ::testing::InitGoogleTest(&argc, argv);
253   return RUN_ALL_TESTS();
254 }
255