1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/Portability.h>
18
19 #include <folly/Optional.h>
20 #include <folly/ScopeGuard.h>
21 #include <folly/executors/ManualExecutor.h>
22 #include <folly/experimental/coro/Baton.h>
23 #include <folly/experimental/coro/BlockingWait.h>
24 #include <folly/experimental/coro/Coroutine.h>
25 #include <folly/experimental/coro/Invoke.h>
26 #include <folly/fibers/FiberManager.h>
27 #include <folly/fibers/FiberManagerMap.h>
28 #include <folly/portability/GTest.h>
29
30 #include <memory>
31 #include <type_traits>
32
33 #if FOLLY_HAS_COROUTINES
34
35 static_assert(
36 std::is_same<
37 decltype(folly::coro::blockingWait(
38 std::declval<folly::coro::ready_awaitable<>>())),
39 void>::value,
40 "");
41 static_assert(
42 std::is_same<
43 decltype(folly::coro::blockingWait(
44 std::declval<folly::coro::ready_awaitable<int>>())),
45 int>::value,
46 "");
47 static_assert(
48 std::is_same<
49 decltype(folly::coro::blockingWait(
50 std::declval<folly::coro::ready_awaitable<int&>>())),
51 int&>::value,
52 "");
53 static_assert(
54 std::is_same<
55 decltype(folly::coro::blockingWait(
56 std::declval<folly::coro::ready_awaitable<int&&>>())),
57 int>::value,
58 "blockingWait() should convert rvalue-reference-returning awaitables "
59 "into a returned prvalue to avoid potential lifetime issues since "
60 "its possible the rvalue reference could have been to some temporary "
61 "object stored inside the Awaiter which would have been destructed "
62 "by the time blockingWait returns.");
63
64 class BlockingWaitTest : public testing::Test {};
65
TEST_F(BlockingWaitTest,SynchronousCompletionVoidResult)66 TEST_F(BlockingWaitTest, SynchronousCompletionVoidResult) {
67 folly::coro::blockingWait(folly::coro::ready_awaitable<>{});
68 }
69
TEST_F(BlockingWaitTest,SynchronousCompletionPRValueResult)70 TEST_F(BlockingWaitTest, SynchronousCompletionPRValueResult) {
71 EXPECT_EQ(
72 123, folly::coro::blockingWait(folly::coro::ready_awaitable<int>{123}));
73 EXPECT_EQ(
74 "hello",
75 folly::coro::blockingWait(
76 folly::coro::ready_awaitable<std::string>("hello")));
77 }
78
TEST_F(BlockingWaitTest,SynchronousCompletionLValueResult)79 TEST_F(BlockingWaitTest, SynchronousCompletionLValueResult) {
80 int value = 123;
81 int& result =
82 folly::coro::blockingWait(folly::coro::ready_awaitable<int&>{value});
83 EXPECT_EQ(&value, &result);
84 EXPECT_EQ(123, result);
85 }
86
TEST_F(BlockingWaitTest,SynchronousCompletionRValueResult)87 TEST_F(BlockingWaitTest, SynchronousCompletionRValueResult) {
88 auto p = std::make_unique<int>(123);
89 auto* ptr = p.get();
90
91 // Should return a prvalue which will lifetime-extend when assigned to an
92 // auto&& local variable.
93 auto&& result = folly::coro::blockingWait(
94 folly::coro::ready_awaitable<std::unique_ptr<int>&&>{std::move(p)});
95
96 EXPECT_EQ(ptr, result.get());
97 EXPECT_FALSE(p);
98 }
99
100 struct TrickyAwaitable {
101 struct Awaiter {
102 std::unique_ptr<int> value_;
103
await_readyTrickyAwaitable::Awaiter104 bool await_ready() const { return false; }
105
await_suspendTrickyAwaitable::Awaiter106 bool await_suspend(folly::coro::coroutine_handle<>) {
107 value_ = std::make_unique<int>(42);
108 return false;
109 }
110
await_resumeTrickyAwaitable::Awaiter111 std::unique_ptr<int>&& await_resume() { return std::move(value_); }
112 };
113
operator co_awaitTrickyAwaitable114 Awaiter operator co_await() { return {}; }
115 };
116
TEST_F(BlockingWaitTest,ReturnRvalueReferenceFromAwaiter)117 TEST_F(BlockingWaitTest, ReturnRvalueReferenceFromAwaiter) {
118 // This awaitable stores the result in the temporary Awaiter object that
119 // is placed on the coroutine frame as part of the co_await expression.
120 // It then returns an rvalue-reference to the value inside this temporary
121 // Awaiter object. This test is making sure that we copy/move the result
122 // before destructing the Awaiter object.
123 auto result = folly::coro::blockingWait(TrickyAwaitable{});
124 CHECK(result);
125 CHECK_EQ(42, *result);
126 }
127
TEST_F(BlockingWaitTest,AsynchronousCompletionOnAnotherThread)128 TEST_F(BlockingWaitTest, AsynchronousCompletionOnAnotherThread) {
129 folly::coro::Baton baton;
130 std::thread t{[&] { baton.post(); }};
131 SCOPE_EXIT { t.join(); };
132 folly::coro::blockingWait(baton);
133 }
134
135 template <typename T>
136 class SimplePromise {
137 public:
138 class WaitOperation {
139 public:
WaitOperation(folly::coro::Baton & baton,folly::Optional<T> & value)140 explicit WaitOperation(
141 folly::coro::Baton& baton, folly::Optional<T>& value) noexcept
142 : awaiter_(baton), value_(value) {}
143
await_ready()144 bool await_ready() { return awaiter_.await_ready(); }
145
146 template <typename Promise>
await_suspend(folly::coro::coroutine_handle<Promise> h)147 auto await_suspend(folly::coro::coroutine_handle<Promise> h) {
148 return awaiter_.await_suspend(h);
149 }
150
await_resume()151 T&& await_resume() {
152 awaiter_.await_resume();
153 return std::move(*value_);
154 }
155
156 private:
157 folly::coro::Baton::WaitOperation awaiter_;
158 folly::Optional<T>& value_;
159 };
160
161 SimplePromise() = default;
162
operator co_await()163 WaitOperation operator co_await() { return WaitOperation{baton_, value_}; }
164
165 template <typename... Args>
emplace(Args &&...args)166 void emplace(Args&&... args) {
167 value_.emplace(static_cast<Args&&>(args)...);
168 baton_.post();
169 }
170
171 private:
172 folly::coro::Baton baton_;
173 folly::Optional<T> value_;
174 };
175
TEST_F(BlockingWaitTest,WaitOnSimpleAsyncPromise)176 TEST_F(BlockingWaitTest, WaitOnSimpleAsyncPromise) {
177 SimplePromise<std::string> p;
178 std::thread t{[&] { p.emplace("hello coroutines!"); }};
179 SCOPE_EXIT { t.join(); };
180 auto result = folly::coro::blockingWait(p);
181 EXPECT_EQ("hello coroutines!", result);
182 }
183
184 struct MoveCounting {
185 int count_;
MoveCountingMoveCounting186 MoveCounting() noexcept : count_(0) {}
MoveCountingMoveCounting187 MoveCounting(MoveCounting&& other) noexcept : count_(other.count_ + 1) {}
188 MoveCounting& operator=(MoveCounting&& other) = delete;
189 };
190
TEST_F(BlockingWaitTest,WaitOnMoveOnlyAsyncPromise)191 TEST_F(BlockingWaitTest, WaitOnMoveOnlyAsyncPromise) {
192 SimplePromise<MoveCounting> p;
193 std::thread t{[&] { p.emplace(); }};
194 SCOPE_EXIT { t.join(); };
195 auto result = folly::coro::blockingWait(p);
196
197 // Number of move-constructions:
198 // 0. Value is in-place constructed in Optional<T>
199 // 0. await_resume() returns rvalue reference to Optional<T> value.
200 // 1. return_value() moves value into Try<T>
201 // 2. Value is moved from Try<T> to blockingWait() return value.
202 EXPECT_GE(2, result.count_);
203 }
204
TEST_F(BlockingWaitTest,moveCountingAwaitableReady)205 TEST_F(BlockingWaitTest, moveCountingAwaitableReady) {
206 folly::coro::ready_awaitable<MoveCounting> awaitable{MoveCounting{}};
207 auto result = folly::coro::blockingWait(awaitable);
208
209 // Moves:
210 // 1. Move value into ready_awaitable
211 // 2. Move value to await_resume() return-value
212 // 3. Move value to Try<T>
213 // 4. Move value to blockingWait() return-value
214 EXPECT_GE(4, result.count_);
215 }
216
TEST_F(BlockingWaitTest,WaitInFiber)217 TEST_F(BlockingWaitTest, WaitInFiber) {
218 SimplePromise<int> promise;
219 folly::EventBase evb;
220 auto& fm = folly::fibers::getFiberManager(evb);
221
222 auto future =
223 fm.addTaskFuture([&] { return folly::coro::blockingWait(promise); });
224
225 evb.loopOnce();
226 EXPECT_FALSE(future.isReady());
227
228 promise.emplace(42);
229
230 evb.loopOnce();
231 EXPECT_TRUE(future.isReady());
232 EXPECT_EQ(42, std::move(future).get());
233 }
234
TEST_F(BlockingWaitTest,WaitTaskInFiber)235 TEST_F(BlockingWaitTest, WaitTaskInFiber) {
236 SimplePromise<int> promise;
237 folly::EventBase evb;
238 auto& fm = folly::fibers::getFiberManager(evb);
239
240 auto future = fm.addTaskFuture([&] {
241 return folly::coro::blockingWait(
242 folly::coro::co_invoke([&]() -> folly::coro::Task<int> {
243 EXPECT_FALSE(folly::fibers::onFiber());
244 auto ret = co_await promise;
245 EXPECT_FALSE(folly::fibers::onFiber());
246 co_return ret;
247 }));
248 });
249
250 evb.loopOnce();
251 EXPECT_FALSE(future.isReady());
252
253 promise.emplace(42);
254
255 evb.loopOnce();
256 EXPECT_TRUE(future.isReady());
257 EXPECT_EQ(42, std::move(future).get());
258 }
259
260 struct ExpectedException {};
261
TEST_F(BlockingWaitTest,WaitTaskInFiberException)262 TEST_F(BlockingWaitTest, WaitTaskInFiberException) {
263 folly::EventBase evb;
264 auto& fm = folly::fibers::getFiberManager(evb);
265 EXPECT_TRUE(
266 fm.addTaskFuture([&] {
267 try {
268 folly::coro::blockingWait(
269 folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
270 folly::via(
271 co_await folly::coro::co_current_executor, []() {});
272 throw ExpectedException();
273 }));
274 return false;
275 } catch (const ExpectedException&) {
276 return true;
277 }
278 }).getVia(&evb));
279 }
280
TEST_F(BlockingWaitTest,WaitOnSemiFuture)281 TEST_F(BlockingWaitTest, WaitOnSemiFuture) {
282 int result = folly::coro::blockingWait(folly::makeSemiFuture(123));
283 CHECK_EQ(result, 123);
284 }
285
TEST_F(BlockingWaitTest,RequestContext)286 TEST_F(BlockingWaitTest, RequestContext) {
287 folly::RequestContext::create();
288 std::shared_ptr<folly::RequestContext> ctx1, ctx2;
289 ctx1 = folly::RequestContext::saveContext();
290 folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
291 EXPECT_EQ(ctx1.get(), folly::RequestContext::get());
292 folly::RequestContextScopeGuard guard;
293 ctx2 = folly::RequestContext::saveContext();
294 EXPECT_NE(ctx1, ctx2);
295 co_await folly::coro::co_reschedule_on_current_executor;
296 EXPECT_EQ(ctx2.get(), folly::RequestContext::get());
297 co_return;
298 }());
299 EXPECT_EQ(ctx1.get(), folly::RequestContext::get());
300 }
301
TEST_F(BlockingWaitTest,DrivableExecutor)302 TEST_F(BlockingWaitTest, DrivableExecutor) {
303 folly::ManualExecutor executor;
304 folly::coro::blockingWait(
305 [&]() -> folly::coro::Task<void> {
306 folly::Executor* taskExecutor =
307 co_await folly::coro::co_current_executor;
308 EXPECT_EQ(&executor, taskExecutor);
309 }(),
310 &executor);
311 }
312
TEST_F(BlockingWaitTest,ReleaseExecutorFromAnotherThread)313 TEST_F(BlockingWaitTest, ReleaseExecutorFromAnotherThread) {
314 auto fn = []() {
315 auto c1 = folly::makePromiseContract<folly::Executor::KeepAlive<>>();
316 auto c2 = folly::makePromiseContract<folly::Unit>();
317 std::thread t{[&] {
318 auto e = std::move(c1.second).get();
319 c2.first.setValue(folly::Unit{});
320 std::this_thread::sleep_for(std::chrono::microseconds(1));
321 e = {};
322 }};
323 folly::ManualExecutor executor;
324 folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
325 folly::Executor::KeepAlive<> taskExecutor =
326 co_await folly::coro::co_current_executor;
327 c1.first.setValue(std::move(taskExecutor));
328 co_await std::move(c2.second);
329 }());
330 t.join();
331 };
332 std::vector<std::thread> threads;
333 for (int i = 0; i < 100; ++i) {
334 threads.emplace_back(fn);
335 }
336 for (auto& t : threads) {
337 t.join();
338 }
339 }
340
341 #endif
342