1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/activity.h"
16
17 #include <gmock/gmock.h>
18 #include <gtest/gtest.h>
19
20 #include "src/core/lib/promise/join.h"
21 #include "src/core/lib/promise/promise.h"
22 #include "src/core/lib/promise/seq.h"
23 #include "src/core/lib/promise/wait_set.h"
24 #include "test/core/promise/test_wakeup_schedulers.h"
25
26 using testing::_;
27 using testing::Mock;
28 using testing::MockFunction;
29 using testing::SaveArg;
30 using testing::StrictMock;
31
32 namespace grpc_core {
33
34 // A simple Barrier type: stalls progress until it is 'cleared'.
35 class Barrier {
36 public:
37 struct Result {};
38
Wait()39 Promise<Result> Wait() {
40 return [this]() -> Poll<Result> {
41 absl::MutexLock lock(&mu_);
42 if (cleared_) {
43 return Result{};
44 } else {
45 return wait_set_.AddPending(Activity::current()->MakeOwningWaker());
46 }
47 };
48 }
49
Clear()50 void Clear() {
51 mu_.Lock();
52 cleared_ = true;
53 auto wakeup = wait_set_.TakeWakeupSet();
54 mu_.Unlock();
55 wakeup.Wakeup();
56 }
57
58 private:
59 absl::Mutex mu_;
60 WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
61 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
62 };
63
64 // A simple Barrier type: stalls progress until it is 'cleared'.
65 // This variant supports only a single waiter.
66 class SingleBarrier {
67 public:
68 struct Result {};
69
Wait()70 Promise<Result> Wait() {
71 return [this]() -> Poll<Result> {
72 absl::MutexLock lock(&mu_);
73 if (cleared_) {
74 return Result{};
75 } else {
76 waker_ = Activity::current()->MakeOwningWaker();
77 return Pending();
78 }
79 };
80 }
81
Clear()82 void Clear() {
83 mu_.Lock();
84 cleared_ = true;
85 auto waker = std::move(waker_);
86 mu_.Unlock();
87 waker.Wakeup();
88 }
89
90 private:
91 absl::Mutex mu_;
92 Waker waker_ ABSL_GUARDED_BY(mu_);
93 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
94 };
95
TEST(ActivityTest,ImmediatelyCompleteWithSuccess)96 TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
97 StrictMock<MockFunction<void(absl::Status)>> on_done;
98 EXPECT_CALL(on_done, Call(absl::OkStatus()));
99 MakeActivity(
100 [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
101 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
102 }
103
TEST(ActivityTest,ImmediatelyCompleteWithFailure)104 TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
105 StrictMock<MockFunction<void(absl::Status)>> on_done;
106 EXPECT_CALL(on_done, Call(absl::CancelledError()));
107 MakeActivity(
108 [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
109 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
110 }
111
TEST(ActivityTest,DropImmediately)112 TEST(ActivityTest, DropImmediately) {
113 StrictMock<MockFunction<void(absl::Status)>> on_done;
114 EXPECT_CALL(on_done, Call(absl::CancelledError()));
115 MakeActivity(
116 [] { return []() -> Poll<absl::Status> { return Pending(); }; },
117 NoWakeupScheduler(),
118 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
119 }
120
TEST(ActivityTest,Cancel)121 TEST(ActivityTest, Cancel) {
122 StrictMock<MockFunction<void(absl::Status)>> on_done;
123 auto activity = MakeActivity(
124 [] { return []() -> Poll<absl::Status> { return Pending(); }; },
125 NoWakeupScheduler(),
126 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
127 EXPECT_CALL(on_done, Call(absl::CancelledError()));
128 activity->Cancel();
129 Mock::VerifyAndClearExpectations(&on_done);
130 activity.reset();
131 }
132
133 template <typename B>
134 class BarrierTest : public testing::Test {
135 public:
136 using Type = B;
137 };
138
139 using BarrierTestTypes = testing::Types<Barrier, SingleBarrier>;
140 TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
141
TYPED_TEST(BarrierTest,Barrier)142 TYPED_TEST(BarrierTest, Barrier) {
143 typename TestFixture::Type b;
144 StrictMock<MockFunction<void(absl::Status)>> on_done;
145 auto activity = MakeActivity(
146 [&b] {
147 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
148 return absl::OkStatus();
149 });
150 },
151 InlineWakeupScheduler(),
152 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
153 // Clearing the barrier should let the activity proceed to return a result.
154 EXPECT_CALL(on_done, Call(absl::OkStatus()));
155 b.Clear();
156 }
157
TYPED_TEST(BarrierTest,BarrierPing)158 TYPED_TEST(BarrierTest, BarrierPing) {
159 typename TestFixture::Type b1;
160 typename TestFixture::Type b2;
161 StrictMock<MockFunction<void(absl::Status)>> on_done1;
162 StrictMock<MockFunction<void(absl::Status)>> on_done2;
163 MockCallbackScheduler scheduler1;
164 MockCallbackScheduler scheduler2;
165 auto activity1 = MakeActivity(
166 [&b1, &b2] {
167 return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
168 // Clear the barrier whilst executing an activity
169 b2.Clear();
170 return absl::OkStatus();
171 });
172 },
173 UseMockCallbackScheduler{&scheduler1},
174 [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
175 auto activity2 = MakeActivity(
176 [&b2] {
177 return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
178 return absl::OkStatus();
179 });
180 },
181 UseMockCallbackScheduler{&scheduler2},
182 [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
183 // Since barrier triggers inside activity1 promise, activity2 wakeup will be
184 // scheduled from a callback.
185 std::function<void()> cb1;
186 std::function<void()> cb2;
187 EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
188 b1.Clear();
189 Mock::VerifyAndClearExpectations(&scheduler1);
190 EXPECT_CALL(on_done1, Call(absl::OkStatus()));
191 EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
192 cb1();
193 Mock::VerifyAndClearExpectations(&on_done1);
194 EXPECT_CALL(on_done2, Call(absl::OkStatus()));
195 cb2();
196 }
197
TYPED_TEST(BarrierTest,WakeSelf)198 TYPED_TEST(BarrierTest, WakeSelf) {
199 typename TestFixture::Type b;
200 StrictMock<MockFunction<void(absl::Status)>> on_done;
201 EXPECT_CALL(on_done, Call(absl::OkStatus()));
202 MakeActivity(
203 [&b] {
204 return Seq(Join(b.Wait(),
205 [&b] {
206 b.Clear();
207 return 1;
208 }),
209 [](std::tuple<typename TestFixture::Type::Result, int>) {
210 return absl::OkStatus();
211 });
212 },
213 NoWakeupScheduler(),
214 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
215 }
216
TYPED_TEST(BarrierTest,WakeAfterDestruction)217 TYPED_TEST(BarrierTest, WakeAfterDestruction) {
218 typename TestFixture::Type b;
219 {
220 StrictMock<MockFunction<void(absl::Status)>> on_done;
221 EXPECT_CALL(on_done, Call(absl::CancelledError()));
222 MakeActivity(
223 [&b] {
224 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
225 return absl::OkStatus();
226 });
227 },
228 InlineWakeupScheduler(),
229 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
230 }
231 b.Clear();
232 }
233
TEST(ActivityTest,ForceWakeup)234 TEST(ActivityTest, ForceWakeup) {
235 StrictMock<MockFunction<void(absl::Status)>> on_done;
236 int run_count = 0;
237 auto activity = MakeActivity(
238 [&run_count]() -> Poll<absl::Status> {
239 ++run_count;
240 switch (run_count) {
241 case 1:
242 return Pending{};
243 case 2:
244 return absl::OkStatus();
245 default:
246 abort();
247 }
248 },
249 InlineWakeupScheduler(),
250 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
251 EXPECT_CALL(on_done, Call(absl::OkStatus()));
252 activity->ForceWakeup();
253 }
254
255 struct TestContext {
256 bool* done;
257 };
258 template <>
259 struct ContextType<TestContext> {};
260
TEST(ActivityTest,WithContext)261 TEST(ActivityTest, WithContext) {
262 bool done = false;
263 StrictMock<MockFunction<void(absl::Status)>> on_done;
264 EXPECT_CALL(on_done, Call(absl::OkStatus()));
265 MakeActivity(
266 [] {
267 *GetContext<TestContext>()->done = true;
268 return Immediate(absl::OkStatus());
269 },
270 NoWakeupScheduler(),
271 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
272 TestContext{&done});
273 EXPECT_TRUE(done);
274 }
275
TEST(ActivityTest,CanCancelDuringExecution)276 TEST(ActivityTest, CanCancelDuringExecution) {
277 ActivityPtr activity;
278 StrictMock<MockFunction<void(absl::Status)>> on_done;
279 int run_count = 0;
280
281 activity = MakeActivity(
282 [&activity, &run_count]() -> Poll<absl::Status> {
283 ++run_count;
284 switch (run_count) {
285 case 1:
286 return Pending{};
287 case 2:
288 activity.reset();
289 return Pending{};
290 default:
291 abort();
292 }
293 },
294 InlineWakeupScheduler(),
295 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
296
297 EXPECT_CALL(on_done, Call(absl::CancelledError()));
298 activity->ForceWakeup();
299 }
300
TEST(ActivityTest,CanCancelDuringSuccessfulExecution)301 TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
302 ActivityPtr activity;
303 StrictMock<MockFunction<void(absl::Status)>> on_done;
304 int run_count = 0;
305
306 activity = MakeActivity(
307 [&activity, &run_count]() -> Poll<absl::Status> {
308 ++run_count;
309 switch (run_count) {
310 case 1:
311 return Pending{};
312 case 2:
313 activity.reset();
314 return absl::OkStatus();
315 default:
316 abort();
317 }
318 },
319 InlineWakeupScheduler(),
320 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
321
322 EXPECT_CALL(on_done, Call(absl::OkStatus()));
323 activity->ForceWakeup();
324 }
325
TEST(WakerTest,CanWakeupEmptyWaker)326 TEST(WakerTest, CanWakeupEmptyWaker) {
327 // Empty wakers should not do anything upon wakeup.
328 Waker().Wakeup();
329 }
330
331 } // namespace grpc_core
332
main(int argc,char ** argv)333 int main(int argc, char** argv) {
334 ::testing::InitGoogleTest(&argc, argv);
335 return RUN_ALL_TESTS();
336 }
337