1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/ProducerConsumerQueue.h>
18 
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22 #include <thread>
23 #include <vector>
24 
25 #include <glog/logging.h>
26 
27 #include <folly/portability/GTest.h>
28 
29 //////////////////////////////////////////////////////////////////////
30 
31 namespace {
32 
33 template <class T>
34 struct TestTraits {
limit__anon31dadcc50111::TestTraits35   T limit() const { return 1 << 24; }
generate__anon31dadcc50111::TestTraits36   T generate() const { return rand() % 26; }
37 };
38 
39 template <>
40 struct TestTraits<std::string> {
limit__anon31dadcc50111::TestTraits41   unsigned int limit() const { return 1 << 22; }
generate__anon31dadcc50111::TestTraits42   std::string generate() const { return std::string(12, ' '); }
43 };
44 
45 template <class QueueType, size_t Size, bool Pop = false>
46 struct PerfTest {
47   typedef typename QueueType::value_type T;
48 
PerfTest__anon31dadcc50111::PerfTest49   explicit PerfTest() : queue_(Size), done_(false) {}
50 
operator ()__anon31dadcc50111::PerfTest51   void operator()() {
52     using namespace std::chrono;
53     auto const startTime = system_clock::now();
54 
55     std::thread producer([this] { this->producer(); });
56     std::thread consumer([this] { this->consumer(); });
57 
58     producer.join();
59     done_ = true;
60     consumer.join();
61 
62     auto duration =
63         duration_cast<milliseconds>(system_clock::now() - startTime);
64     LOG(INFO) << "     done: " << duration.count() << "ms";
65   }
66 
producer__anon31dadcc50111::PerfTest67   void producer() {
68     // This is written differently than you might expect so that
69     // it does not run afoul of -Wsign-compare, regardless of the
70     // signedness of this loop's upper bound.
71     for (auto i = traits_.limit(); i > 0; --i) {
72       while (!queue_.write(traits_.generate())) {
73       }
74     }
75   }
76 
consumer__anon31dadcc50111::PerfTest77   void consumer() {
78     if /* constexpr */ (Pop) {
79       while (!done_) {
80         if (queue_.frontPtr()) {
81           queue_.popFront();
82         }
83       }
84     } else {
85       while (!done_) {
86         T data;
87         queue_.read(data);
88       }
89     }
90   }
91 
92   QueueType queue_;
93   std::atomic<bool> done_;
94   TestTraits<T> traits_;
95 };
96 
97 template <class TestType>
doTest(const char * name)98 void doTest(const char* name) {
99   LOG(INFO) << "  testing: " << name;
100   std::unique_ptr<TestType> const t(new TestType());
101   (*t)();
102 }
103 
104 template <class T, bool Pop = false>
perfTestType(const char * type)105 void perfTestType(const char* type) {
106   const size_t size = 0xfffe;
107 
108   LOG(INFO) << "Type: " << type;
109   doTest<PerfTest<folly::ProducerConsumerQueue<T>, size, Pop>>(
110       "ProducerConsumerQueue");
111 }
112 
113 template <class QueueType, size_t Size, bool Pop>
114 struct CorrectnessTest {
115   typedef typename QueueType::value_type T;
116 
CorrectnessTest__anon31dadcc50111::CorrectnessTest117   explicit CorrectnessTest() : queue_(Size), done_(false) {
118     const size_t testSize = traits_.limit();
119     testData_.reserve(testSize);
120     for (size_t i = 0; i < testSize; ++i) {
121       testData_.push_back(traits_.generate());
122     }
123   }
124 
operator ()__anon31dadcc50111::CorrectnessTest125   void operator()() {
126     std::thread producer([this] { this->producer(); });
127     std::thread consumer([this] { this->consumer(); });
128 
129     producer.join();
130     done_ = true;
131     consumer.join();
132   }
133 
producer__anon31dadcc50111::CorrectnessTest134   void producer() {
135     for (auto& data : testData_) {
136       while (!queue_.write(data)) {
137       }
138     }
139   }
140 
consumer__anon31dadcc50111::CorrectnessTest141   void consumer() {
142     if (Pop) {
143       consumerPop();
144     } else {
145       consumerRead();
146     }
147   }
148 
consumerPop__anon31dadcc50111::CorrectnessTest149   void consumerPop() {
150     for (auto expect : testData_) {
151     again:
152       T* data;
153       if (!(data = queue_.frontPtr())) {
154         if (done_) {
155           // Try one more read; unless there's a bug in the queue class
156           // there should still be more data sitting in the queue even
157           // though the producer thread exited.
158           if (!(data = queue_.frontPtr())) {
159             EXPECT_TRUE(0 && "Finished too early ...");
160             return;
161           }
162         } else {
163           goto again;
164         }
165         EXPECT_EQ(*data, expect);
166       } else {
167         EXPECT_EQ(*data, expect);
168         queue_.popFront();
169       }
170     }
171   }
172 
consumerRead__anon31dadcc50111::CorrectnessTest173   void consumerRead() {
174     for (auto expect : testData_) {
175     again:
176       T data;
177       if (!queue_.read(data)) {
178         if (done_) {
179           // Try one more read; unless there's a bug in the queue class
180           // there should still be more data sitting in the queue even
181           // though the producer thread exited.
182           if (!queue_.read(data)) {
183             EXPECT_TRUE(0 && "Finished too early ...");
184             return;
185           }
186         } else {
187           goto again;
188         }
189       }
190       EXPECT_EQ(data, expect);
191     }
192   }
193 
194   std::vector<T> testData_;
195   QueueType queue_;
196   TestTraits<T> traits_;
197   std::atomic<bool> done_;
198 };
199 
200 template <class T, bool Pop = false>
correctnessTestType(const std::string & type)201 void correctnessTestType(const std::string& type) {
202   LOG(INFO) << "Type: " << type;
203   doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>, 0xfffe, Pop>>(
204       "ProducerConsumerQueue");
205 }
206 
207 struct DtorChecker {
208   static unsigned int numInstances;
DtorChecker__anon31dadcc50111::DtorChecker209   DtorChecker() { ++numInstances; }
DtorChecker__anon31dadcc50111::DtorChecker210   DtorChecker(const DtorChecker& /* o */) { ++numInstances; }
~DtorChecker__anon31dadcc50111::DtorChecker211   ~DtorChecker() { --numInstances; }
212 };
213 
214 unsigned int DtorChecker::numInstances = 0;
215 
216 } // namespace
217 
218 //////////////////////////////////////////////////////////////////////
219 
TEST(PCQ,QueueCorrectness)220 TEST(PCQ, QueueCorrectness) {
221   correctnessTestType<std::string, true>("string (front+pop)");
222   correctnessTestType<std::string>("string");
223   correctnessTestType<int>("int");
224   correctnessTestType<unsigned long long>("unsigned long long");
225 }
226 
TEST(PCQ,PerfTest)227 TEST(PCQ, PerfTest) {
228   perfTestType<std::string, true>("string (front+pop)");
229   perfTestType<std::string>("string");
230   perfTestType<int>("int");
231   perfTestType<unsigned long long>("unsigned long long");
232 }
233 
TEST(PCQ,Destructor)234 TEST(PCQ, Destructor) {
235   // Test that orphaned elements in a ProducerConsumerQueue are
236   // destroyed.
237   {
238     folly::ProducerConsumerQueue<DtorChecker> queue(1024);
239     for (int i = 0; i < 10; ++i) {
240       EXPECT_TRUE(queue.write(DtorChecker()));
241     }
242 
243     EXPECT_EQ(DtorChecker::numInstances, 10);
244 
245     {
246       DtorChecker ignore;
247       EXPECT_TRUE(queue.read(ignore));
248       EXPECT_TRUE(queue.read(ignore));
249     }
250 
251     EXPECT_EQ(DtorChecker::numInstances, 8);
252   }
253 
254   EXPECT_EQ(DtorChecker::numInstances, 0);
255 
256   // Test the same thing in the case that the queue write pointer has
257   // wrapped, but the read one hasn't.
258   {
259     folly::ProducerConsumerQueue<DtorChecker> queue(4);
260     for (int i = 0; i < 3; ++i) {
261       EXPECT_TRUE(queue.write(DtorChecker()));
262     }
263     EXPECT_EQ(DtorChecker::numInstances, 3);
264     {
265       DtorChecker ignore;
266       EXPECT_TRUE(queue.read(ignore));
267     }
268     EXPECT_EQ(DtorChecker::numInstances, 2);
269     EXPECT_TRUE(queue.write(DtorChecker()));
270     EXPECT_EQ(DtorChecker::numInstances, 3);
271   }
272   EXPECT_EQ(DtorChecker::numInstances, 0);
273 }
274 
TEST(PCQ,EmptyFull)275 TEST(PCQ, EmptyFull) {
276   folly::ProducerConsumerQueue<int> queue(3);
277   EXPECT_TRUE(queue.isEmpty());
278   EXPECT_FALSE(queue.isFull());
279 
280   EXPECT_TRUE(queue.write(1));
281   EXPECT_FALSE(queue.isEmpty());
282   EXPECT_FALSE(queue.isFull());
283 
284   EXPECT_TRUE(queue.write(2));
285   EXPECT_FALSE(queue.isEmpty());
286   EXPECT_TRUE(queue.isFull()); // Tricky: full after 2 writes, not 3.
287 
288   EXPECT_FALSE(queue.write(3));
289   EXPECT_EQ(queue.sizeGuess(), 2);
290 }
291 
TEST(PCQ,Capacity)292 TEST(PCQ, Capacity) {
293   folly::ProducerConsumerQueue<int> queue(3);
294   EXPECT_EQ(queue.capacity(), 2); // PCQ max size is buffer size - 1.
295 }
296