1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/portability/GTest.h>
18
19 #include <folly/experimental/coro/Sleep.h>
20 #include <thrift/lib/cpp2/GeneratedCodeHelper.h>
21 #include <thrift/lib/cpp2/async/tests/util/Util.h>
22
23 namespace apache {
24 namespace thrift {
25
26 using namespace testutil::testservice;
27
28 struct SinkServiceTest
29 : public AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient> {};
30
31 struct SinkServiceTestAllocFn
32 : public AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient> {
SetUpapache::thrift::SinkServiceTestAllocFn33 void SetUp() override {
34 AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient>::SetUp();
35
36 // add the custom alloc function
37 folly::Function<BaseThriftServer::AllocIOBufFn> fn = [&](size_t size) {
38 void* p = folly::aligned_malloc(size, 4096);
39 CHECK(!!p);
40
41 handler_->addBuf(p);
42
43 auto iobuf = folly::IOBuf::takeOwnership(
44 p, size, size, [](void* p, void*) { folly::aligned_free(p); });
45
46 return iobuf;
47 };
48
49 server_->setAllocIOBufFn(std::move(fn));
50 }
51 };
52
waitNoLeak(TestSinkServiceAsyncClient & client)53 folly::coro::Task<bool> waitNoLeak(TestSinkServiceAsyncClient& client) {
54 auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds{2};
55 do {
56 bool unsubscribed = co_await client.co_isSinkUnSubscribed();
57 if (unsubscribed) {
58 co_return true;
59 }
60 } while (std::chrono::steady_clock::now() < deadline);
61 co_return false;
62 }
63
TEST_F(SinkServiceTest,SimpleSink)64 TEST_F(SinkServiceTest, SimpleSink) {
65 connectToServer(
66 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
67 auto sink = co_await client.co_range(0, 100);
68 bool finalResponse =
69 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
70 for (int i = 0; i <= 100; i++) {
71 co_yield std::move(i);
72 }
73 }());
74 EXPECT_TRUE(finalResponse);
75 });
76 }
77
TEST_F(SinkServiceTest,SinkThrow)78 TEST_F(SinkServiceTest, SinkThrow) {
79 connectToServer(
80 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
81 auto sink = co_await client.co_rangeThrow(0, 100);
82 EXPECT_THROW(
83 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
84 co_yield 0;
85 co_yield 1;
86 throw std::runtime_error("test");
87 }()),
88 std::exception);
89 co_await client.co_purge();
90 });
91 }
92
TEST_F(SinkServiceTest,SinkThrowStruct)93 TEST_F(SinkServiceTest, SinkThrowStruct) {
94 connectToServer(
95 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
96 auto sink = co_await client.co_sinkThrow();
97 bool exceptionThrown = false;
98 try {
99 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
100 co_yield 0;
101 co_yield 1;
102 SinkException e;
103 e.reason_ref() = "test";
104 throw e;
105 }());
106 } catch (const SinkThrew& ex) {
107 exceptionThrown = true;
108 EXPECT_EQ(TApplicationException::UNKNOWN, ex.getType());
109 EXPECT_EQ(
110 "testutil::testservice::SinkException: ::testutil::testservice::SinkException",
111 ex.getMessage());
112 }
113 EXPECT_TRUE(exceptionThrown);
114 co_await client.co_purge();
115 });
116 }
117
TEST_F(SinkServiceTest,SinkFinalThrow)118 TEST_F(SinkServiceTest, SinkFinalThrow) {
119 connectToServer(
120 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
121 auto sink = co_await client.co_rangeFinalResponseThrow(0, 100);
122 bool throwed = false;
123 try {
124 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
125 for (int i = 0; i <= 100; i++) {
126 co_yield std::move(i);
127 }
128 }());
129 } catch (const std::exception& ex) {
130 throwed = true;
131 EXPECT_EQ("std::runtime_error: test", std::string(ex.what()));
132 }
133 EXPECT_TRUE(throwed);
134 });
135 }
136
TEST_F(SinkServiceTest,SinkFinalThrowStruct)137 TEST_F(SinkServiceTest, SinkFinalThrowStruct) {
138 connectToServer(
139 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
140 auto sink = co_await client.co_sinkFinalThrow();
141 bool throwed = false;
142 try {
143 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
144 for (int i = 0; i <= 100; i++) {
145 co_yield std::move(i);
146 }
147 }());
148 } catch (const FinalException& ex) {
149 throwed = true;
150 EXPECT_EQ("test", *ex.reason_ref());
151 }
152 EXPECT_TRUE(throwed);
153 });
154 }
155
TEST_F(SinkServiceTest,SinkEarlyFinalResponse)156 TEST_F(SinkServiceTest, SinkEarlyFinalResponse) {
157 connectToServer(
158 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
159 auto sink = co_await client.co_rangeEarlyResponse(0, 100, 20);
160
161 int finalResponse =
162 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
163 for (int i = 0; i <= 100; i++) {
164 co_yield std::move(i);
165 }
166 }());
167 EXPECT_EQ(20, finalResponse);
168 });
169 }
170
TEST_F(SinkServiceTest,SinkUnimplemented)171 TEST_F(SinkServiceTest, SinkUnimplemented) {
172 connectToServer(
173 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
174 EXPECT_THROW(co_await client.co_unimplemented(), std::exception);
175 });
176 }
177
TEST_F(SinkServiceTest,SinkNotCalled)178 TEST_F(SinkServiceTest, SinkNotCalled) {
179 connectToServer(
180 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
181 // even though don't really call sink.sink(...),
182 // after sink get out of scope, the sink should be cancelled properly
183 co_await client.co_unSubscribedSink();
184 EXPECT_TRUE(co_await waitNoLeak(client));
185 });
186 }
187
TEST_F(SinkServiceTest,SinkInitialThrows)188 TEST_F(SinkServiceTest, SinkInitialThrows) {
189 connectToServer(
190 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
191 try {
192 co_await client.co_initialThrow();
193 } catch (const MyException& ex) {
194 EXPECT_EQ("reason", *ex.reason_ref());
195 }
196 // connection should still be alive after initial throw
197 co_await client.co_purge();
198 });
199 }
200
TEST_F(SinkServiceTest,SinkInitialThrowsOnFinalResponseCalled)201 TEST_F(SinkServiceTest, SinkInitialThrowsOnFinalResponseCalled) {
202 class Callback : public SinkClientCallback {
203 public:
204 Callback(
205 bool onFirstResponseBool,
206 folly::coro::Baton& responseReceived,
207 folly::coro::Baton& onFinalResponseCalled)
208 : onFirstResponseBool_(onFirstResponseBool),
209 responseReceived_(responseReceived),
210 onFinalResponseCalled_(onFinalResponseCalled) {}
211 bool onFirstResponse(
212 FirstResponsePayload&&,
213 folly::EventBase*,
214 SinkServerCallback* serverCallback) override {
215 SCOPE_EXIT { responseReceived_.post(); };
216 if (!onFirstResponseBool_) {
217 serverCallback->onSinkError(std::runtime_error("stop sink"));
218 return false;
219 }
220 return true;
221 }
222 void onFinalResponse(StreamPayload&&) override {
223 if (onFirstResponseBool_) {
224 onFinalResponseCalled_.post();
225 } else {
226 FAIL() << "onFinalResponse called when onFirstResponse returned false";
227 }
228 }
229 bool onFirstResponseBool_;
230 folly::coro::Baton& responseReceived_;
231 folly::coro::Baton& onFinalResponseCalled_;
232
233 // unused
234 void onFirstResponseError(folly::exception_wrapper) override {}
235 bool onSinkRequestN(uint64_t) override { return true; }
236 void onFinalResponseError(folly::exception_wrapper) override {}
237 void resetServerCallback(SinkServerCallback&) override {}
238 };
239 connectToServer(
240 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
241 ThriftPresult<false> pargs;
242 auto req = CompactSerializer::serialize<std::string>(pargs);
243 for (auto onFirstResponseBool : {true, false}) {
244 folly::coro::Baton responseReceived, onFinalResponseCalled;
245 Callback callback(
246 onFirstResponseBool, responseReceived, onFinalResponseCalled);
247 client.getChannelShared()->sendRequestSink(
248 RpcOptions(),
249 "initialThrow",
250 SerializedRequest(folly::IOBuf::copyBuffer(req)),
251 std::make_shared<transport::THeader>(),
252 &callback);
253 co_await responseReceived;
254 if (onFirstResponseBool) {
255 co_await onFinalResponseCalled;
256 }
257 }
258 });
259 }
260
TEST_F(SinkServiceTest,SinkChunkTimeout)261 TEST_F(SinkServiceTest, SinkChunkTimeout) {
262 connectToServer(
263 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
264 auto sink = co_await client.co_rangeChunkTimeout();
265
266 bool exceptionThrown = false;
267 try {
268 co_await [&]() -> folly::coro::Task<void> {
269 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
270 for (int i = 0; i <= 100; i++) {
271 if (i == 20) {
272 co_await folly::coro::sleep(std::chrono::milliseconds{500});
273 }
274 co_yield std::move(i);
275 }
276 }());
277 }();
278 } catch (const TApplicationException& ex) {
279 exceptionThrown = true;
280 EXPECT_EQ(TApplicationException::TIMEOUT, ex.getType());
281 }
282 EXPECT_TRUE(exceptionThrown);
283 });
284 }
285
TEST_F(SinkServiceTest,ClientTimeoutNoLeak)286 TEST_F(SinkServiceTest, ClientTimeoutNoLeak) {
287 connectToServer(
288 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
289 EXPECT_THROW(
290 co_await client.co_unSubscribedSinkSlowReturn(), std::exception);
291 EXPECT_TRUE(co_await waitNoLeak(client));
292 });
293 }
294
TEST_F(SinkServiceTest,AssignmentNoLeak)295 TEST_F(SinkServiceTest, AssignmentNoLeak) {
296 connectToServer(
297 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
298 {
299 auto sink = co_await client.co_unSubscribedSink();
300 sink = co_await client.co_unSubscribedSink();
301 }
302 EXPECT_TRUE(co_await waitNoLeak(client));
303 });
304 }
305
TEST_F(SinkServiceTest,AlignedSink)306 TEST_F(SinkServiceTest, AlignedSink) {
307 connectToServer(
308 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
309 {
310 apache::thrift::RpcOptions option;
311 option.setMemAllocType(
312 apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN);
313 std::string s = "abcdefghijk";
314 auto sink = co_await client.co_alignment(option, s);
315 int32_t alignment = co_await sink.sink(
316 [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
317 co_yield std::move(*folly::IOBuf::copyBuffer(s));
318 }());
319 EXPECT_EQ(alignment, 0);
320 }
321 });
322 }
323
TEST_F(SinkServiceTest,CustomAllocSink)324 TEST_F(SinkServiceTest, CustomAllocSink) {
325 connectToServer(
326 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
327 {
328 apache::thrift::RpcOptions option;
329 option.setMemAllocType(
330 apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM);
331 std::string s = "abcdefghijk";
332 auto sink = co_await client.co_custom(option, s);
333 bool custom = co_await sink.sink(
334 [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
335 co_yield std::move(*folly::IOBuf::copyBuffer(s));
336 }());
337 EXPECT_FALSE(custom);
338 }
339 });
340 }
341
TEST_F(SinkServiceTest,CustomAllocSinkDefault)342 TEST_F(SinkServiceTest, CustomAllocSinkDefault) {
343 connectToServer(
344 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
345 {
346 apache::thrift::RpcOptions option;
347 option.setMemAllocType(
348 apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT);
349 std::string s = "abcdefghijk";
350 auto sink = co_await client.co_custom(option, s);
351 bool custom = co_await sink.sink(
352 [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
353 co_yield std::move(*folly::IOBuf::copyBuffer(s));
354 }());
355 EXPECT_FALSE(custom);
356 }
357 });
358 }
359
neverStream()360 folly::coro::Task<void> neverStream() {
361 folly::coro::Baton baton;
362 folly::CancellationCallback cb{
363 co_await folly::coro::co_current_cancellation_token,
364 [&] { baton.post(); }};
365 co_await baton;
366 }
367
TEST_F(SinkServiceTest,SinkEarlyFinalResponseWithLongWait)368 TEST_F(SinkServiceTest, SinkEarlyFinalResponseWithLongWait) {
369 connectToServer(
370 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
371 // return final response once received two numbers
372 auto sink = co_await client.co_rangeEarlyResponse(0, 5, 2);
373 int finalResponse =
374 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
375 co_yield 0;
376 co_yield 1;
377 // this long wait should get cancelled by final response
378 co_await neverStream();
379 }());
380 EXPECT_EQ(2, finalResponse);
381 });
382 }
383
TEST_F(SinkServiceTest,SinkEarlyClose)384 TEST_F(SinkServiceTest, SinkEarlyClose) {
385 std::vector<std::thread> ths;
386 for (int i = 0; i < 100; i++) {
387 ths.push_back(std::thread([this]() {
388 connectToServer(
389 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
390 auto sink = co_await client.co_range(0, 100);
391 });
392 }));
393 }
394 for (auto& th : ths) {
395 th.join();
396 }
397 }
398
TEST_F(SinkServiceTest,SinkServerCancellation)399 TEST_F(SinkServiceTest, SinkServerCancellation) {
400 connectToServer(
401 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
402 // client sends values 0..100, server initiates cancellation at value 5
403 auto sink = co_await client.co_rangeCancelAt(0, 100, 5);
404 bool finalResponse =
405 co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
406 // enter wait after 5 values, server should cancel
407 for (int i = 0; i <= 5; i++) {
408 co_yield std::move(i);
409 }
410 co_await neverStream();
411 }());
412 // server sends false as finalResponse after canceling sink
413 EXPECT_FALSE(finalResponse);
414 });
415 }
416
TEST_F(SinkServiceTest,SinkClientCancellation)417 TEST_F(SinkServiceTest, SinkClientCancellation) {
418 // cancel when async generator get stuck
419 connectToServer(
420 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
421 auto sink = co_await client.co_unSubscribedSink();
422 folly::CancellationSource cancelSource;
423
424 folly::coro::co_invoke([&cancelSource]() -> folly::coro::Task<void> {
425 co_await folly::coro::sleep(std::chrono::milliseconds{200});
426 cancelSource.requestCancellation();
427 })
428 .scheduleOn(co_await folly::coro::co_current_executor)
429 .start();
430
431 EXPECT_THROW(
432 co_await folly::coro::co_withCancellation(
433 cancelSource.getToken(),
434 sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
435 co_await neverStream();
436 for (int i = 0; i <= 10; i++) {
437 co_yield std::move(i);
438 }
439 }())),
440 folly::OperationCancelled);
441 co_await waitNoLeak(client);
442 });
443
444 // cancel when final response being slow
445 connectToServer(
446 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
447 auto sink = co_await client.co_rangeSlowFinalResponse(0, 10);
448 folly::CancellationSource cancelSource;
449
450 EXPECT_THROW(
451 co_await folly::coro::co_withCancellation(
452 cancelSource.getToken(),
453 sink.sink([&]() -> folly::coro::AsyncGenerator<int&&> {
454 for (int i = 0; i <= 10; i++) {
455 co_yield std::move(i);
456 }
457 cancelSource.requestCancellation();
458 }())),
459 folly::OperationCancelled);
460 });
461 }
462
TEST_F(SinkServiceTestAllocFn,CustomAllocSink)463 TEST_F(SinkServiceTestAllocFn, CustomAllocSink) {
464 connectToServer(
465 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
466 {
467 apache::thrift::RpcOptions option;
468 option.setMemAllocType(
469 apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM);
470 std::string s = "abcdefghijk";
471 auto sink = co_await client.co_custom(option, s);
472 bool custom = co_await sink.sink(
473 [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
474 co_yield std::move(*folly::IOBuf::copyBuffer(s));
475 }());
476 EXPECT_TRUE(custom);
477 }
478 });
479 }
480
TEST_F(SinkServiceTestAllocFn,CustomAllocSinkDefault)481 TEST_F(SinkServiceTestAllocFn, CustomAllocSinkDefault) {
482 connectToServer(
483 [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
484 {
485 apache::thrift::RpcOptions option;
486 option.setMemAllocType(
487 apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT);
488 std::string s = "abcdefghijk";
489 auto sink = co_await client.co_custom(option, s);
490 bool custom = co_await sink.sink(
491 [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
492 co_yield std::move(*folly::IOBuf::copyBuffer(s));
493 }());
494 EXPECT_FALSE(custom);
495 }
496 });
497 }
498
499 } // namespace thrift
500 } // namespace apache
501