1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <functional>
18 #include <utility>
19 #include <vector>
20
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseAtomicNotificationQueue.h>
23 #include <folly/portability/GTest.h>
24
25 using namespace folly;
26 using namespace std;
27
28 template <typename Task>
29 struct AtomicNotificationQueueConsumer {
AtomicNotificationQueueConsumerAtomicNotificationQueueConsumer30 explicit AtomicNotificationQueueConsumer(vector<Task>& tasks)
31 : tasks(tasks) {}
32
operator ()AtomicNotificationQueueConsumer33 void operator()(Task&& value) noexcept {
34 tasks.push_back(value);
35 if (fn) {
36 fn(std::move(value));
37 }
38 }
39
40 function<void(Task&&)> fn;
41 vector<Task>& tasks;
42 };
43
TEST(AtomicNotificationQueueTest,TryPutMessage)44 TEST(AtomicNotificationQueueTest, TryPutMessage) {
45 vector<int> data;
46 AtomicNotificationQueueConsumer<int> consumer{data};
47 EventBaseAtomicNotificationQueue<int, decltype(consumer)> queue{
48 std::move(consumer)};
49
50 constexpr uint32_t kMaxSize = 10;
51
52 for (auto i = 1; i <= 9; ++i) {
53 queue.putMessage(std::move(i));
54 }
55
56 EXPECT_TRUE(queue.tryPutMessage(10, kMaxSize));
57 EXPECT_EQ(queue.size(), 10);
58 EXPECT_FALSE(queue.tryPutMessage(11, kMaxSize));
59 EXPECT_EQ(queue.size(), 10);
60 queue.putMessage(11);
61 EXPECT_EQ(queue.size(), 11);
62 EXPECT_FALSE(queue.tryPutMessage(12, kMaxSize));
63
64 queue.drain();
65 EXPECT_TRUE(queue.tryPutMessage(0, kMaxSize));
66 EXPECT_EQ(queue.size(), 1);
67 }
68
TEST(AtomicNotificationQueueTest,DiscardDequeuedTasks)69 TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
70 struct TaskWithExpiry {
71 int datum;
72 bool isExpired;
73 };
74
75 struct Consumer {
76 explicit Consumer(std::vector<int>& data) : data(data) {}
77 AtomicNotificationQueueTaskStatus operator()(
78 TaskWithExpiry&& task) noexcept {
79 if (task.isExpired) {
80 return AtomicNotificationQueueTaskStatus::DISCARD;
81 }
82 data.push_back(task.datum);
83 return AtomicNotificationQueueTaskStatus::CONSUMED;
84 }
85 vector<int>& data;
86 };
87 vector<int> data;
88 Consumer consumer{data};
89
90 EventBaseAtomicNotificationQueue<TaskWithExpiry, Consumer> queue{
91 std::move(consumer)};
92 queue.setMaxReadAtOnce(10);
93
94 vector<TaskWithExpiry> tasks = {
95 {0, false},
96 {1, true},
97 {2, true},
98 {3, false},
99 {4, false},
100 {5, false},
101 {6, false},
102 {7, true},
103 {8, false},
104 {9, false},
105 {10, false},
106 {11, false},
107 {12, true},
108 {13, false},
109 {14, true},
110 {15, false},
111 };
112
113 EventBase eventBase;
114 queue.startConsuming(&eventBase);
115
116 for (auto& t : tasks) {
117 queue.putMessage(t);
118 }
119
120 eventBase.loopOnce();
121
122 vector<int> expectedMessages = {0, 3, 4, 5, 6, 8, 9, 10, 11, 13};
123 EXPECT_EQ(data.size(), expectedMessages.size());
124 for (unsigned i = 0; i < expectedMessages.size(); ++i) {
125 EXPECT_EQ(data.at(i), expectedMessages[i]);
126 }
127
128 data.clear();
129 eventBase.loopOnce();
130
131 EXPECT_EQ(data.size(), 1);
132 EXPECT_EQ(data.at(0), 15);
133 }
134
TEST(AtomicNotificationQueueTest,PutMessage)135 TEST(AtomicNotificationQueueTest, PutMessage) {
136 struct Data {
137 int datum;
138 bool isExpired;
139
140 explicit Data(int datum, bool isExpired)
141 : datum(datum), isExpired(isExpired) {}
142
143 bool operator==(const Data& data) const {
144 return datum == data.datum && isExpired == data.isExpired;
145 }
146 };
147
148 struct Consumer {
149 explicit Consumer(vector<Data>& data) : data(data) {}
150 void operator()(Data&& task) noexcept { data.push_back(task); }
151 vector<Data>& data;
152 };
153
154 vector<Data> expected =
155 {Data(10, false),
156 Data(20, true),
157 Data(-8, true),
158 Data(0, false)},
159 actual;
160 Consumer consumer{actual};
161
162 EventBaseAtomicNotificationQueue<Data, decltype(consumer)> queue{
163 std::move(consumer)};
164 queue.setMaxReadAtOnce(0);
165
166 EventBase eventBase;
167 queue.startConsuming(&eventBase);
168
169 for (auto& t : expected) {
170 queue.putMessage(t.datum, t.isExpired);
171 }
172
173 eventBase.loopOnce();
174
175 EXPECT_EQ(expected.size(), actual.size());
176 for (unsigned i = 0; i < expected.size(); ++i) {
177 EXPECT_EQ(expected[i], actual[i]);
178 }
179 }
180