1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <functional>
18 #include <utility>
19 #include <vector>
20 
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseAtomicNotificationQueue.h>
23 #include <folly/portability/GTest.h>
24 
25 using namespace folly;
26 using namespace std;
27 
28 template <typename Task>
29 struct AtomicNotificationQueueConsumer {
AtomicNotificationQueueConsumerAtomicNotificationQueueConsumer30   explicit AtomicNotificationQueueConsumer(vector<Task>& tasks)
31       : tasks(tasks) {}
32 
operator ()AtomicNotificationQueueConsumer33   void operator()(Task&& value) noexcept {
34     tasks.push_back(value);
35     if (fn) {
36       fn(std::move(value));
37     }
38   }
39 
40   function<void(Task&&)> fn;
41   vector<Task>& tasks;
42 };
43 
TEST(AtomicNotificationQueueTest,TryPutMessage)44 TEST(AtomicNotificationQueueTest, TryPutMessage) {
45   vector<int> data;
46   AtomicNotificationQueueConsumer<int> consumer{data};
47   EventBaseAtomicNotificationQueue<int, decltype(consumer)> queue{
48       std::move(consumer)};
49 
50   constexpr uint32_t kMaxSize = 10;
51 
52   for (auto i = 1; i <= 9; ++i) {
53     queue.putMessage(std::move(i));
54   }
55 
56   EXPECT_TRUE(queue.tryPutMessage(10, kMaxSize));
57   EXPECT_EQ(queue.size(), 10);
58   EXPECT_FALSE(queue.tryPutMessage(11, kMaxSize));
59   EXPECT_EQ(queue.size(), 10);
60   queue.putMessage(11);
61   EXPECT_EQ(queue.size(), 11);
62   EXPECT_FALSE(queue.tryPutMessage(12, kMaxSize));
63 
64   queue.drain();
65   EXPECT_TRUE(queue.tryPutMessage(0, kMaxSize));
66   EXPECT_EQ(queue.size(), 1);
67 }
68 
TEST(AtomicNotificationQueueTest,DiscardDequeuedTasks)69 TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
70   struct TaskWithExpiry {
71     int datum;
72     bool isExpired;
73   };
74 
75   struct Consumer {
76     explicit Consumer(std::vector<int>& data) : data(data) {}
77     AtomicNotificationQueueTaskStatus operator()(
78         TaskWithExpiry&& task) noexcept {
79       if (task.isExpired) {
80         return AtomicNotificationQueueTaskStatus::DISCARD;
81       }
82       data.push_back(task.datum);
83       return AtomicNotificationQueueTaskStatus::CONSUMED;
84     }
85     vector<int>& data;
86   };
87   vector<int> data;
88   Consumer consumer{data};
89 
90   EventBaseAtomicNotificationQueue<TaskWithExpiry, Consumer> queue{
91       std::move(consumer)};
92   queue.setMaxReadAtOnce(10);
93 
94   vector<TaskWithExpiry> tasks = {
95       {0, false},
96       {1, true},
97       {2, true},
98       {3, false},
99       {4, false},
100       {5, false},
101       {6, false},
102       {7, true},
103       {8, false},
104       {9, false},
105       {10, false},
106       {11, false},
107       {12, true},
108       {13, false},
109       {14, true},
110       {15, false},
111   };
112 
113   EventBase eventBase;
114   queue.startConsuming(&eventBase);
115 
116   for (auto& t : tasks) {
117     queue.putMessage(t);
118   }
119 
120   eventBase.loopOnce();
121 
122   vector<int> expectedMessages = {0, 3, 4, 5, 6, 8, 9, 10, 11, 13};
123   EXPECT_EQ(data.size(), expectedMessages.size());
124   for (unsigned i = 0; i < expectedMessages.size(); ++i) {
125     EXPECT_EQ(data.at(i), expectedMessages[i]);
126   }
127 
128   data.clear();
129   eventBase.loopOnce();
130 
131   EXPECT_EQ(data.size(), 1);
132   EXPECT_EQ(data.at(0), 15);
133 }
134 
TEST(AtomicNotificationQueueTest,PutMessage)135 TEST(AtomicNotificationQueueTest, PutMessage) {
136   struct Data {
137     int datum;
138     bool isExpired;
139 
140     explicit Data(int datum, bool isExpired)
141         : datum(datum), isExpired(isExpired) {}
142 
143     bool operator==(const Data& data) const {
144       return datum == data.datum && isExpired == data.isExpired;
145     }
146   };
147 
148   struct Consumer {
149     explicit Consumer(vector<Data>& data) : data(data) {}
150     void operator()(Data&& task) noexcept { data.push_back(task); }
151     vector<Data>& data;
152   };
153 
154   vector<Data> expected =
155                    {Data(10, false),
156                     Data(20, true),
157                     Data(-8, true),
158                     Data(0, false)},
159                actual;
160   Consumer consumer{actual};
161 
162   EventBaseAtomicNotificationQueue<Data, decltype(consumer)> queue{
163       std::move(consumer)};
164   queue.setMaxReadAtOnce(0);
165 
166   EventBase eventBase;
167   queue.startConsuming(&eventBase);
168 
169   for (auto& t : expected) {
170     queue.putMessage(t.datum, t.isExpired);
171   }
172 
173   eventBase.loopOnce();
174 
175   EXPECT_EQ(expected.size(), actual.size());
176   for (unsigned i = 0; i < expected.size(); ++i) {
177     EXPECT_EQ(expected[i], actual[i]);
178   }
179 }
180