1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/ProducerConsumerQueue.h>
18
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22 #include <thread>
23 #include <vector>
24
25 #include <glog/logging.h>
26
27 #include <folly/portability/GTest.h>
28
29 //////////////////////////////////////////////////////////////////////
30
31 namespace {
32
33 template <class T>
34 struct TestTraits {
limit__anon31dadcc50111::TestTraits35 T limit() const { return 1 << 24; }
generate__anon31dadcc50111::TestTraits36 T generate() const { return rand() % 26; }
37 };
38
39 template <>
40 struct TestTraits<std::string> {
limit__anon31dadcc50111::TestTraits41 unsigned int limit() const { return 1 << 22; }
generate__anon31dadcc50111::TestTraits42 std::string generate() const { return std::string(12, ' '); }
43 };
44
45 template <class QueueType, size_t Size, bool Pop = false>
46 struct PerfTest {
47 typedef typename QueueType::value_type T;
48
PerfTest__anon31dadcc50111::PerfTest49 explicit PerfTest() : queue_(Size), done_(false) {}
50
operator ()__anon31dadcc50111::PerfTest51 void operator()() {
52 using namespace std::chrono;
53 auto const startTime = system_clock::now();
54
55 std::thread producer([this] { this->producer(); });
56 std::thread consumer([this] { this->consumer(); });
57
58 producer.join();
59 done_ = true;
60 consumer.join();
61
62 auto duration =
63 duration_cast<milliseconds>(system_clock::now() - startTime);
64 LOG(INFO) << " done: " << duration.count() << "ms";
65 }
66
producer__anon31dadcc50111::PerfTest67 void producer() {
68 // This is written differently than you might expect so that
69 // it does not run afoul of -Wsign-compare, regardless of the
70 // signedness of this loop's upper bound.
71 for (auto i = traits_.limit(); i > 0; --i) {
72 while (!queue_.write(traits_.generate())) {
73 }
74 }
75 }
76
consumer__anon31dadcc50111::PerfTest77 void consumer() {
78 if /* constexpr */ (Pop) {
79 while (!done_) {
80 if (queue_.frontPtr()) {
81 queue_.popFront();
82 }
83 }
84 } else {
85 while (!done_) {
86 T data;
87 queue_.read(data);
88 }
89 }
90 }
91
92 QueueType queue_;
93 std::atomic<bool> done_;
94 TestTraits<T> traits_;
95 };
96
97 template <class TestType>
doTest(const char * name)98 void doTest(const char* name) {
99 LOG(INFO) << " testing: " << name;
100 std::unique_ptr<TestType> const t(new TestType());
101 (*t)();
102 }
103
104 template <class T, bool Pop = false>
perfTestType(const char * type)105 void perfTestType(const char* type) {
106 const size_t size = 0xfffe;
107
108 LOG(INFO) << "Type: " << type;
109 doTest<PerfTest<folly::ProducerConsumerQueue<T>, size, Pop>>(
110 "ProducerConsumerQueue");
111 }
112
113 template <class QueueType, size_t Size, bool Pop>
114 struct CorrectnessTest {
115 typedef typename QueueType::value_type T;
116
CorrectnessTest__anon31dadcc50111::CorrectnessTest117 explicit CorrectnessTest() : queue_(Size), done_(false) {
118 const size_t testSize = traits_.limit();
119 testData_.reserve(testSize);
120 for (size_t i = 0; i < testSize; ++i) {
121 testData_.push_back(traits_.generate());
122 }
123 }
124
operator ()__anon31dadcc50111::CorrectnessTest125 void operator()() {
126 std::thread producer([this] { this->producer(); });
127 std::thread consumer([this] { this->consumer(); });
128
129 producer.join();
130 done_ = true;
131 consumer.join();
132 }
133
producer__anon31dadcc50111::CorrectnessTest134 void producer() {
135 for (auto& data : testData_) {
136 while (!queue_.write(data)) {
137 }
138 }
139 }
140
consumer__anon31dadcc50111::CorrectnessTest141 void consumer() {
142 if (Pop) {
143 consumerPop();
144 } else {
145 consumerRead();
146 }
147 }
148
consumerPop__anon31dadcc50111::CorrectnessTest149 void consumerPop() {
150 for (auto expect : testData_) {
151 again:
152 T* data;
153 if (!(data = queue_.frontPtr())) {
154 if (done_) {
155 // Try one more read; unless there's a bug in the queue class
156 // there should still be more data sitting in the queue even
157 // though the producer thread exited.
158 if (!(data = queue_.frontPtr())) {
159 EXPECT_TRUE(0 && "Finished too early ...");
160 return;
161 }
162 } else {
163 goto again;
164 }
165 EXPECT_EQ(*data, expect);
166 } else {
167 EXPECT_EQ(*data, expect);
168 queue_.popFront();
169 }
170 }
171 }
172
consumerRead__anon31dadcc50111::CorrectnessTest173 void consumerRead() {
174 for (auto expect : testData_) {
175 again:
176 T data;
177 if (!queue_.read(data)) {
178 if (done_) {
179 // Try one more read; unless there's a bug in the queue class
180 // there should still be more data sitting in the queue even
181 // though the producer thread exited.
182 if (!queue_.read(data)) {
183 EXPECT_TRUE(0 && "Finished too early ...");
184 return;
185 }
186 } else {
187 goto again;
188 }
189 }
190 EXPECT_EQ(data, expect);
191 }
192 }
193
194 std::vector<T> testData_;
195 QueueType queue_;
196 TestTraits<T> traits_;
197 std::atomic<bool> done_;
198 };
199
200 template <class T, bool Pop = false>
correctnessTestType(const std::string & type)201 void correctnessTestType(const std::string& type) {
202 LOG(INFO) << "Type: " << type;
203 doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>, 0xfffe, Pop>>(
204 "ProducerConsumerQueue");
205 }
206
207 struct DtorChecker {
208 static unsigned int numInstances;
DtorChecker__anon31dadcc50111::DtorChecker209 DtorChecker() { ++numInstances; }
DtorChecker__anon31dadcc50111::DtorChecker210 DtorChecker(const DtorChecker& /* o */) { ++numInstances; }
~DtorChecker__anon31dadcc50111::DtorChecker211 ~DtorChecker() { --numInstances; }
212 };
213
214 unsigned int DtorChecker::numInstances = 0;
215
216 } // namespace
217
218 //////////////////////////////////////////////////////////////////////
219
TEST(PCQ,QueueCorrectness)220 TEST(PCQ, QueueCorrectness) {
221 correctnessTestType<std::string, true>("string (front+pop)");
222 correctnessTestType<std::string>("string");
223 correctnessTestType<int>("int");
224 correctnessTestType<unsigned long long>("unsigned long long");
225 }
226
TEST(PCQ,PerfTest)227 TEST(PCQ, PerfTest) {
228 perfTestType<std::string, true>("string (front+pop)");
229 perfTestType<std::string>("string");
230 perfTestType<int>("int");
231 perfTestType<unsigned long long>("unsigned long long");
232 }
233
TEST(PCQ,Destructor)234 TEST(PCQ, Destructor) {
235 // Test that orphaned elements in a ProducerConsumerQueue are
236 // destroyed.
237 {
238 folly::ProducerConsumerQueue<DtorChecker> queue(1024);
239 for (int i = 0; i < 10; ++i) {
240 EXPECT_TRUE(queue.write(DtorChecker()));
241 }
242
243 EXPECT_EQ(DtorChecker::numInstances, 10);
244
245 {
246 DtorChecker ignore;
247 EXPECT_TRUE(queue.read(ignore));
248 EXPECT_TRUE(queue.read(ignore));
249 }
250
251 EXPECT_EQ(DtorChecker::numInstances, 8);
252 }
253
254 EXPECT_EQ(DtorChecker::numInstances, 0);
255
256 // Test the same thing in the case that the queue write pointer has
257 // wrapped, but the read one hasn't.
258 {
259 folly::ProducerConsumerQueue<DtorChecker> queue(4);
260 for (int i = 0; i < 3; ++i) {
261 EXPECT_TRUE(queue.write(DtorChecker()));
262 }
263 EXPECT_EQ(DtorChecker::numInstances, 3);
264 {
265 DtorChecker ignore;
266 EXPECT_TRUE(queue.read(ignore));
267 }
268 EXPECT_EQ(DtorChecker::numInstances, 2);
269 EXPECT_TRUE(queue.write(DtorChecker()));
270 EXPECT_EQ(DtorChecker::numInstances, 3);
271 }
272 EXPECT_EQ(DtorChecker::numInstances, 0);
273 }
274
TEST(PCQ,EmptyFull)275 TEST(PCQ, EmptyFull) {
276 folly::ProducerConsumerQueue<int> queue(3);
277 EXPECT_TRUE(queue.isEmpty());
278 EXPECT_FALSE(queue.isFull());
279
280 EXPECT_TRUE(queue.write(1));
281 EXPECT_FALSE(queue.isEmpty());
282 EXPECT_FALSE(queue.isFull());
283
284 EXPECT_TRUE(queue.write(2));
285 EXPECT_FALSE(queue.isEmpty());
286 EXPECT_TRUE(queue.isFull()); // Tricky: full after 2 writes, not 3.
287
288 EXPECT_FALSE(queue.write(3));
289 EXPECT_EQ(queue.sizeGuess(), 2);
290 }
291
TEST(PCQ,Capacity)292 TEST(PCQ, Capacity) {
293 folly::ProducerConsumerQueue<int> queue(3);
294 EXPECT_EQ(queue.capacity(), 2); // PCQ max size is buffer size - 1.
295 }
296