1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/activity.h"
16 
17 #include <gmock/gmock.h>
18 #include <gtest/gtest.h>
19 
20 #include "src/core/lib/promise/join.h"
21 #include "src/core/lib/promise/promise.h"
22 #include "src/core/lib/promise/seq.h"
23 #include "src/core/lib/promise/wait_set.h"
24 #include "test/core/promise/test_wakeup_schedulers.h"
25 
26 using testing::_;
27 using testing::Mock;
28 using testing::MockFunction;
29 using testing::SaveArg;
30 using testing::StrictMock;
31 
32 namespace grpc_core {
33 
34 // A simple Barrier type: stalls progress until it is 'cleared'.
35 class Barrier {
36  public:
37   struct Result {};
38 
Wait()39   Promise<Result> Wait() {
40     return [this]() -> Poll<Result> {
41       absl::MutexLock lock(&mu_);
42       if (cleared_) {
43         return Result{};
44       } else {
45         return wait_set_.AddPending(Activity::current()->MakeOwningWaker());
46       }
47     };
48   }
49 
Clear()50   void Clear() {
51     mu_.Lock();
52     cleared_ = true;
53     auto wakeup = wait_set_.TakeWakeupSet();
54     mu_.Unlock();
55     wakeup.Wakeup();
56   }
57 
58  private:
59   absl::Mutex mu_;
60   WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
61   bool cleared_ ABSL_GUARDED_BY(mu_) = false;
62 };
63 
64 // A simple Barrier type: stalls progress until it is 'cleared'.
65 // This variant supports only a single waiter.
66 class SingleBarrier {
67  public:
68   struct Result {};
69 
Wait()70   Promise<Result> Wait() {
71     return [this]() -> Poll<Result> {
72       absl::MutexLock lock(&mu_);
73       if (cleared_) {
74         return Result{};
75       } else {
76         waker_ = Activity::current()->MakeOwningWaker();
77         return Pending();
78       }
79     };
80   }
81 
Clear()82   void Clear() {
83     mu_.Lock();
84     cleared_ = true;
85     auto waker = std::move(waker_);
86     mu_.Unlock();
87     waker.Wakeup();
88   }
89 
90  private:
91   absl::Mutex mu_;
92   Waker waker_ ABSL_GUARDED_BY(mu_);
93   bool cleared_ ABSL_GUARDED_BY(mu_) = false;
94 };
95 
TEST(ActivityTest,ImmediatelyCompleteWithSuccess)96 TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
97   StrictMock<MockFunction<void(absl::Status)>> on_done;
98   EXPECT_CALL(on_done, Call(absl::OkStatus()));
99   MakeActivity(
100       [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
101       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
102 }
103 
TEST(ActivityTest,ImmediatelyCompleteWithFailure)104 TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
105   StrictMock<MockFunction<void(absl::Status)>> on_done;
106   EXPECT_CALL(on_done, Call(absl::CancelledError()));
107   MakeActivity(
108       [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
109       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
110 }
111 
TEST(ActivityTest,DropImmediately)112 TEST(ActivityTest, DropImmediately) {
113   StrictMock<MockFunction<void(absl::Status)>> on_done;
114   EXPECT_CALL(on_done, Call(absl::CancelledError()));
115   MakeActivity(
116       [] { return []() -> Poll<absl::Status> { return Pending(); }; },
117       NoWakeupScheduler(),
118       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
119 }
120 
TEST(ActivityTest,Cancel)121 TEST(ActivityTest, Cancel) {
122   StrictMock<MockFunction<void(absl::Status)>> on_done;
123   auto activity = MakeActivity(
124       [] { return []() -> Poll<absl::Status> { return Pending(); }; },
125       NoWakeupScheduler(),
126       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
127   EXPECT_CALL(on_done, Call(absl::CancelledError()));
128   activity->Cancel();
129   Mock::VerifyAndClearExpectations(&on_done);
130   activity.reset();
131 }
132 
133 template <typename B>
134 class BarrierTest : public testing::Test {
135  public:
136   using Type = B;
137 };
138 
139 using BarrierTestTypes = testing::Types<Barrier, SingleBarrier>;
140 TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
141 
TYPED_TEST(BarrierTest,Barrier)142 TYPED_TEST(BarrierTest, Barrier) {
143   typename TestFixture::Type b;
144   StrictMock<MockFunction<void(absl::Status)>> on_done;
145   auto activity = MakeActivity(
146       [&b] {
147         return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
148           return absl::OkStatus();
149         });
150       },
151       InlineWakeupScheduler(),
152       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
153   // Clearing the barrier should let the activity proceed to return a result.
154   EXPECT_CALL(on_done, Call(absl::OkStatus()));
155   b.Clear();
156 }
157 
TYPED_TEST(BarrierTest,BarrierPing)158 TYPED_TEST(BarrierTest, BarrierPing) {
159   typename TestFixture::Type b1;
160   typename TestFixture::Type b2;
161   StrictMock<MockFunction<void(absl::Status)>> on_done1;
162   StrictMock<MockFunction<void(absl::Status)>> on_done2;
163   MockCallbackScheduler scheduler1;
164   MockCallbackScheduler scheduler2;
165   auto activity1 = MakeActivity(
166       [&b1, &b2] {
167         return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
168           // Clear the barrier whilst executing an activity
169           b2.Clear();
170           return absl::OkStatus();
171         });
172       },
173       UseMockCallbackScheduler{&scheduler1},
174       [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
175   auto activity2 = MakeActivity(
176       [&b2] {
177         return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
178           return absl::OkStatus();
179         });
180       },
181       UseMockCallbackScheduler{&scheduler2},
182       [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
183   // Since barrier triggers inside activity1 promise, activity2 wakeup will be
184   // scheduled from a callback.
185   std::function<void()> cb1;
186   std::function<void()> cb2;
187   EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
188   b1.Clear();
189   Mock::VerifyAndClearExpectations(&scheduler1);
190   EXPECT_CALL(on_done1, Call(absl::OkStatus()));
191   EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
192   cb1();
193   Mock::VerifyAndClearExpectations(&on_done1);
194   EXPECT_CALL(on_done2, Call(absl::OkStatus()));
195   cb2();
196 }
197 
TYPED_TEST(BarrierTest,WakeSelf)198 TYPED_TEST(BarrierTest, WakeSelf) {
199   typename TestFixture::Type b;
200   StrictMock<MockFunction<void(absl::Status)>> on_done;
201   EXPECT_CALL(on_done, Call(absl::OkStatus()));
202   MakeActivity(
203       [&b] {
204         return Seq(Join(b.Wait(),
205                         [&b] {
206                           b.Clear();
207                           return 1;
208                         }),
209                    [](std::tuple<typename TestFixture::Type::Result, int>) {
210                      return absl::OkStatus();
211                    });
212       },
213       NoWakeupScheduler(),
214       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
215 }
216 
TYPED_TEST(BarrierTest,WakeAfterDestruction)217 TYPED_TEST(BarrierTest, WakeAfterDestruction) {
218   typename TestFixture::Type b;
219   {
220     StrictMock<MockFunction<void(absl::Status)>> on_done;
221     EXPECT_CALL(on_done, Call(absl::CancelledError()));
222     MakeActivity(
223         [&b] {
224           return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
225             return absl::OkStatus();
226           });
227         },
228         InlineWakeupScheduler(),
229         [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
230   }
231   b.Clear();
232 }
233 
TEST(ActivityTest,ForceWakeup)234 TEST(ActivityTest, ForceWakeup) {
235   StrictMock<MockFunction<void(absl::Status)>> on_done;
236   int run_count = 0;
237   auto activity = MakeActivity(
238       [&run_count]() -> Poll<absl::Status> {
239         ++run_count;
240         switch (run_count) {
241           case 1:
242             return Pending{};
243           case 2:
244             return absl::OkStatus();
245           default:
246             abort();
247         }
248       },
249       InlineWakeupScheduler(),
250       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
251   EXPECT_CALL(on_done, Call(absl::OkStatus()));
252   activity->ForceWakeup();
253 }
254 
255 struct TestContext {
256   bool* done;
257 };
258 template <>
259 struct ContextType<TestContext> {};
260 
TEST(ActivityTest,WithContext)261 TEST(ActivityTest, WithContext) {
262   bool done = false;
263   StrictMock<MockFunction<void(absl::Status)>> on_done;
264   EXPECT_CALL(on_done, Call(absl::OkStatus()));
265   MakeActivity(
266       [] {
267         *GetContext<TestContext>()->done = true;
268         return Immediate(absl::OkStatus());
269       },
270       NoWakeupScheduler(),
271       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
272       TestContext{&done});
273   EXPECT_TRUE(done);
274 }
275 
TEST(ActivityTest,CanCancelDuringExecution)276 TEST(ActivityTest, CanCancelDuringExecution) {
277   ActivityPtr activity;
278   StrictMock<MockFunction<void(absl::Status)>> on_done;
279   int run_count = 0;
280 
281   activity = MakeActivity(
282       [&activity, &run_count]() -> Poll<absl::Status> {
283         ++run_count;
284         switch (run_count) {
285           case 1:
286             return Pending{};
287           case 2:
288             activity.reset();
289             return Pending{};
290           default:
291             abort();
292         }
293       },
294       InlineWakeupScheduler(),
295       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
296 
297   EXPECT_CALL(on_done, Call(absl::CancelledError()));
298   activity->ForceWakeup();
299 }
300 
TEST(ActivityTest,CanCancelDuringSuccessfulExecution)301 TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
302   ActivityPtr activity;
303   StrictMock<MockFunction<void(absl::Status)>> on_done;
304   int run_count = 0;
305 
306   activity = MakeActivity(
307       [&activity, &run_count]() -> Poll<absl::Status> {
308         ++run_count;
309         switch (run_count) {
310           case 1:
311             return Pending{};
312           case 2:
313             activity.reset();
314             return absl::OkStatus();
315           default:
316             abort();
317         }
318       },
319       InlineWakeupScheduler(),
320       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
321 
322   EXPECT_CALL(on_done, Call(absl::OkStatus()));
323   activity->ForceWakeup();
324 }
325 
TEST(WakerTest,CanWakeupEmptyWaker)326 TEST(WakerTest, CanWakeupEmptyWaker) {
327   // Empty wakers should not do anything upon wakeup.
328   Waker().Wakeup();
329 }
330 
331 }  // namespace grpc_core
332 
main(int argc,char ** argv)333 int main(int argc, char** argv) {
334   ::testing::InitGoogleTest(&argc, argv);
335   return RUN_ALL_TESTS();
336 }
337