1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/portability/GTest.h>
18 
19 #include <folly/experimental/coro/Sleep.h>
20 #include <thrift/lib/cpp2/GeneratedCodeHelper.h>
21 #include <thrift/lib/cpp2/async/tests/util/Util.h>
22 
23 namespace apache {
24 namespace thrift {
25 
26 using namespace testutil::testservice;
27 
28 struct SinkServiceTest
29     : public AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient> {};
30 
31 struct SinkServiceTestAllocFn
32     : public AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient> {
SetUpapache::thrift::SinkServiceTestAllocFn33   void SetUp() override {
34     AsyncTestSetup<TestSinkService, TestSinkServiceAsyncClient>::SetUp();
35 
36     // add the custom alloc function
37     folly::Function<BaseThriftServer::AllocIOBufFn> fn = [&](size_t size) {
38       void* p = folly::aligned_malloc(size, 4096);
39       CHECK(!!p);
40 
41       handler_->addBuf(p);
42 
43       auto iobuf = folly::IOBuf::takeOwnership(
44           p, size, size, [](void* p, void*) { folly::aligned_free(p); });
45 
46       return iobuf;
47     };
48 
49     server_->setAllocIOBufFn(std::move(fn));
50   }
51 };
52 
waitNoLeak(TestSinkServiceAsyncClient & client)53 folly::coro::Task<bool> waitNoLeak(TestSinkServiceAsyncClient& client) {
54   auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds{2};
55   do {
56     bool unsubscribed = co_await client.co_isSinkUnSubscribed();
57     if (unsubscribed) {
58       co_return true;
59     }
60   } while (std::chrono::steady_clock::now() < deadline);
61   co_return false;
62 }
63 
TEST_F(SinkServiceTest,SimpleSink)64 TEST_F(SinkServiceTest, SimpleSink) {
65   connectToServer(
66       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
67         auto sink = co_await client.co_range(0, 100);
68         bool finalResponse =
69             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
70               for (int i = 0; i <= 100; i++) {
71                 co_yield std::move(i);
72               }
73             }());
74         EXPECT_TRUE(finalResponse);
75       });
76 }
77 
TEST_F(SinkServiceTest,SinkThrow)78 TEST_F(SinkServiceTest, SinkThrow) {
79   connectToServer(
80       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
81         auto sink = co_await client.co_rangeThrow(0, 100);
82         EXPECT_THROW(
83             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
84               co_yield 0;
85               co_yield 1;
86               throw std::runtime_error("test");
87             }()),
88             std::exception);
89         co_await client.co_purge();
90       });
91 }
92 
TEST_F(SinkServiceTest,SinkThrowStruct)93 TEST_F(SinkServiceTest, SinkThrowStruct) {
94   connectToServer(
95       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
96         auto sink = co_await client.co_sinkThrow();
97         bool exceptionThrown = false;
98         try {
99           co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
100             co_yield 0;
101             co_yield 1;
102             SinkException e;
103             e.reason_ref() = "test";
104             throw e;
105           }());
106         } catch (const SinkThrew& ex) {
107           exceptionThrown = true;
108           EXPECT_EQ(TApplicationException::UNKNOWN, ex.getType());
109           EXPECT_EQ(
110               "testutil::testservice::SinkException: ::testutil::testservice::SinkException",
111               ex.getMessage());
112         }
113         EXPECT_TRUE(exceptionThrown);
114         co_await client.co_purge();
115       });
116 }
117 
TEST_F(SinkServiceTest,SinkFinalThrow)118 TEST_F(SinkServiceTest, SinkFinalThrow) {
119   connectToServer(
120       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
121         auto sink = co_await client.co_rangeFinalResponseThrow(0, 100);
122         bool throwed = false;
123         try {
124           co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
125             for (int i = 0; i <= 100; i++) {
126               co_yield std::move(i);
127             }
128           }());
129         } catch (const std::exception& ex) {
130           throwed = true;
131           EXPECT_EQ("std::runtime_error: test", std::string(ex.what()));
132         }
133         EXPECT_TRUE(throwed);
134       });
135 }
136 
TEST_F(SinkServiceTest,SinkFinalThrowStruct)137 TEST_F(SinkServiceTest, SinkFinalThrowStruct) {
138   connectToServer(
139       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
140         auto sink = co_await client.co_sinkFinalThrow();
141         bool throwed = false;
142         try {
143           co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
144             for (int i = 0; i <= 100; i++) {
145               co_yield std::move(i);
146             }
147           }());
148         } catch (const FinalException& ex) {
149           throwed = true;
150           EXPECT_EQ("test", *ex.reason_ref());
151         }
152         EXPECT_TRUE(throwed);
153       });
154 }
155 
TEST_F(SinkServiceTest,SinkEarlyFinalResponse)156 TEST_F(SinkServiceTest, SinkEarlyFinalResponse) {
157   connectToServer(
158       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
159         auto sink = co_await client.co_rangeEarlyResponse(0, 100, 20);
160 
161         int finalResponse =
162             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
163               for (int i = 0; i <= 100; i++) {
164                 co_yield std::move(i);
165               }
166             }());
167         EXPECT_EQ(20, finalResponse);
168       });
169 }
170 
TEST_F(SinkServiceTest,SinkUnimplemented)171 TEST_F(SinkServiceTest, SinkUnimplemented) {
172   connectToServer(
173       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
174         EXPECT_THROW(co_await client.co_unimplemented(), std::exception);
175       });
176 }
177 
TEST_F(SinkServiceTest,SinkNotCalled)178 TEST_F(SinkServiceTest, SinkNotCalled) {
179   connectToServer(
180       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
181         // even though don't really call sink.sink(...),
182         // after sink get out of scope, the sink should be cancelled properly
183         co_await client.co_unSubscribedSink();
184         EXPECT_TRUE(co_await waitNoLeak(client));
185       });
186 }
187 
TEST_F(SinkServiceTest,SinkInitialThrows)188 TEST_F(SinkServiceTest, SinkInitialThrows) {
189   connectToServer(
190       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
191         try {
192           co_await client.co_initialThrow();
193         } catch (const MyException& ex) {
194           EXPECT_EQ("reason", *ex.reason_ref());
195         }
196         // connection should still be alive after initial throw
197         co_await client.co_purge();
198       });
199 }
200 
TEST_F(SinkServiceTest,SinkInitialThrowsOnFinalResponseCalled)201 TEST_F(SinkServiceTest, SinkInitialThrowsOnFinalResponseCalled) {
202   class Callback : public SinkClientCallback {
203    public:
204     Callback(
205         bool onFirstResponseBool,
206         folly::coro::Baton& responseReceived,
207         folly::coro::Baton& onFinalResponseCalled)
208         : onFirstResponseBool_(onFirstResponseBool),
209           responseReceived_(responseReceived),
210           onFinalResponseCalled_(onFinalResponseCalled) {}
211     bool onFirstResponse(
212         FirstResponsePayload&&,
213         folly::EventBase*,
214         SinkServerCallback* serverCallback) override {
215       SCOPE_EXIT { responseReceived_.post(); };
216       if (!onFirstResponseBool_) {
217         serverCallback->onSinkError(std::runtime_error("stop sink"));
218         return false;
219       }
220       return true;
221     }
222     void onFinalResponse(StreamPayload&&) override {
223       if (onFirstResponseBool_) {
224         onFinalResponseCalled_.post();
225       } else {
226         FAIL() << "onFinalResponse called when onFirstResponse returned false";
227       }
228     }
229     bool onFirstResponseBool_;
230     folly::coro::Baton& responseReceived_;
231     folly::coro::Baton& onFinalResponseCalled_;
232 
233     // unused
234     void onFirstResponseError(folly::exception_wrapper) override {}
235     bool onSinkRequestN(uint64_t) override { return true; }
236     void onFinalResponseError(folly::exception_wrapper) override {}
237     void resetServerCallback(SinkServerCallback&) override {}
238   };
239   connectToServer(
240       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
241         ThriftPresult<false> pargs;
242         auto req = CompactSerializer::serialize<std::string>(pargs);
243         for (auto onFirstResponseBool : {true, false}) {
244           folly::coro::Baton responseReceived, onFinalResponseCalled;
245           Callback callback(
246               onFirstResponseBool, responseReceived, onFinalResponseCalled);
247           client.getChannelShared()->sendRequestSink(
248               RpcOptions(),
249               "initialThrow",
250               SerializedRequest(folly::IOBuf::copyBuffer(req)),
251               std::make_shared<transport::THeader>(),
252               &callback);
253           co_await responseReceived;
254           if (onFirstResponseBool) {
255             co_await onFinalResponseCalled;
256           }
257         }
258       });
259 }
260 
TEST_F(SinkServiceTest,SinkChunkTimeout)261 TEST_F(SinkServiceTest, SinkChunkTimeout) {
262   connectToServer(
263       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
264         auto sink = co_await client.co_rangeChunkTimeout();
265 
266         bool exceptionThrown = false;
267         try {
268           co_await [&]() -> folly::coro::Task<void> {
269             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
270               for (int i = 0; i <= 100; i++) {
271                 if (i == 20) {
272                   co_await folly::coro::sleep(std::chrono::milliseconds{500});
273                 }
274                 co_yield std::move(i);
275               }
276             }());
277           }();
278         } catch (const TApplicationException& ex) {
279           exceptionThrown = true;
280           EXPECT_EQ(TApplicationException::TIMEOUT, ex.getType());
281         }
282         EXPECT_TRUE(exceptionThrown);
283       });
284 }
285 
TEST_F(SinkServiceTest,ClientTimeoutNoLeak)286 TEST_F(SinkServiceTest, ClientTimeoutNoLeak) {
287   connectToServer(
288       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
289         EXPECT_THROW(
290             co_await client.co_unSubscribedSinkSlowReturn(), std::exception);
291         EXPECT_TRUE(co_await waitNoLeak(client));
292       });
293 }
294 
TEST_F(SinkServiceTest,AssignmentNoLeak)295 TEST_F(SinkServiceTest, AssignmentNoLeak) {
296   connectToServer(
297       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
298         {
299           auto sink = co_await client.co_unSubscribedSink();
300           sink = co_await client.co_unSubscribedSink();
301         }
302         EXPECT_TRUE(co_await waitNoLeak(client));
303       });
304 }
305 
TEST_F(SinkServiceTest,AlignedSink)306 TEST_F(SinkServiceTest, AlignedSink) {
307   connectToServer(
308       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
309         {
310           apache::thrift::RpcOptions option;
311           option.setMemAllocType(
312               apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN);
313           std::string s = "abcdefghijk";
314           auto sink = co_await client.co_alignment(option, s);
315           int32_t alignment = co_await sink.sink(
316               [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
317                 co_yield std::move(*folly::IOBuf::copyBuffer(s));
318               }());
319           EXPECT_EQ(alignment, 0);
320         }
321       });
322 }
323 
TEST_F(SinkServiceTest,CustomAllocSink)324 TEST_F(SinkServiceTest, CustomAllocSink) {
325   connectToServer(
326       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
327         {
328           apache::thrift::RpcOptions option;
329           option.setMemAllocType(
330               apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM);
331           std::string s = "abcdefghijk";
332           auto sink = co_await client.co_custom(option, s);
333           bool custom = co_await sink.sink(
334               [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
335                 co_yield std::move(*folly::IOBuf::copyBuffer(s));
336               }());
337           EXPECT_FALSE(custom);
338         }
339       });
340 }
341 
TEST_F(SinkServiceTest,CustomAllocSinkDefault)342 TEST_F(SinkServiceTest, CustomAllocSinkDefault) {
343   connectToServer(
344       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
345         {
346           apache::thrift::RpcOptions option;
347           option.setMemAllocType(
348               apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT);
349           std::string s = "abcdefghijk";
350           auto sink = co_await client.co_custom(option, s);
351           bool custom = co_await sink.sink(
352               [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
353                 co_yield std::move(*folly::IOBuf::copyBuffer(s));
354               }());
355           EXPECT_FALSE(custom);
356         }
357       });
358 }
359 
neverStream()360 folly::coro::Task<void> neverStream() {
361   folly::coro::Baton baton;
362   folly::CancellationCallback cb{
363       co_await folly::coro::co_current_cancellation_token,
364       [&] { baton.post(); }};
365   co_await baton;
366 }
367 
TEST_F(SinkServiceTest,SinkEarlyFinalResponseWithLongWait)368 TEST_F(SinkServiceTest, SinkEarlyFinalResponseWithLongWait) {
369   connectToServer(
370       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
371         // return final response once received two numbers
372         auto sink = co_await client.co_rangeEarlyResponse(0, 5, 2);
373         int finalResponse =
374             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
375               co_yield 0;
376               co_yield 1;
377               // this long wait should get cancelled by final response
378               co_await neverStream();
379             }());
380         EXPECT_EQ(2, finalResponse);
381       });
382 }
383 
TEST_F(SinkServiceTest,SinkEarlyClose)384 TEST_F(SinkServiceTest, SinkEarlyClose) {
385   std::vector<std::thread> ths;
386   for (int i = 0; i < 100; i++) {
387     ths.push_back(std::thread([this]() {
388       connectToServer(
389           [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
390             auto sink = co_await client.co_range(0, 100);
391           });
392     }));
393   }
394   for (auto& th : ths) {
395     th.join();
396   }
397 }
398 
TEST_F(SinkServiceTest,SinkServerCancellation)399 TEST_F(SinkServiceTest, SinkServerCancellation) {
400   connectToServer(
401       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
402         // client sends values 0..100, server initiates cancellation at value 5
403         auto sink = co_await client.co_rangeCancelAt(0, 100, 5);
404         bool finalResponse =
405             co_await sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
406               // enter wait after 5 values, server should cancel
407               for (int i = 0; i <= 5; i++) {
408                 co_yield std::move(i);
409               }
410               co_await neverStream();
411             }());
412         // server sends false as finalResponse after canceling sink
413         EXPECT_FALSE(finalResponse);
414       });
415 }
416 
TEST_F(SinkServiceTest,SinkClientCancellation)417 TEST_F(SinkServiceTest, SinkClientCancellation) {
418   // cancel when async generator get stuck
419   connectToServer(
420       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
421         auto sink = co_await client.co_unSubscribedSink();
422         folly::CancellationSource cancelSource;
423 
424         folly::coro::co_invoke([&cancelSource]() -> folly::coro::Task<void> {
425           co_await folly::coro::sleep(std::chrono::milliseconds{200});
426           cancelSource.requestCancellation();
427         })
428             .scheduleOn(co_await folly::coro::co_current_executor)
429             .start();
430 
431         EXPECT_THROW(
432             co_await folly::coro::co_withCancellation(
433                 cancelSource.getToken(),
434                 sink.sink([]() -> folly::coro::AsyncGenerator<int&&> {
435                   co_await neverStream();
436                   for (int i = 0; i <= 10; i++) {
437                     co_yield std::move(i);
438                   }
439                 }())),
440             folly::OperationCancelled);
441         co_await waitNoLeak(client);
442       });
443 
444   // cancel when final response being slow
445   connectToServer(
446       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
447         auto sink = co_await client.co_rangeSlowFinalResponse(0, 10);
448         folly::CancellationSource cancelSource;
449 
450         EXPECT_THROW(
451             co_await folly::coro::co_withCancellation(
452                 cancelSource.getToken(),
453                 sink.sink([&]() -> folly::coro::AsyncGenerator<int&&> {
454                   for (int i = 0; i <= 10; i++) {
455                     co_yield std::move(i);
456                   }
457                   cancelSource.requestCancellation();
458                 }())),
459             folly::OperationCancelled);
460       });
461 }
462 
TEST_F(SinkServiceTestAllocFn,CustomAllocSink)463 TEST_F(SinkServiceTestAllocFn, CustomAllocSink) {
464   connectToServer(
465       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
466         {
467           apache::thrift::RpcOptions option;
468           option.setMemAllocType(
469               apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM);
470           std::string s = "abcdefghijk";
471           auto sink = co_await client.co_custom(option, s);
472           bool custom = co_await sink.sink(
473               [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
474                 co_yield std::move(*folly::IOBuf::copyBuffer(s));
475               }());
476           EXPECT_TRUE(custom);
477         }
478       });
479 }
480 
TEST_F(SinkServiceTestAllocFn,CustomAllocSinkDefault)481 TEST_F(SinkServiceTestAllocFn, CustomAllocSinkDefault) {
482   connectToServer(
483       [](TestSinkServiceAsyncClient& client) -> folly::coro::Task<void> {
484         {
485           apache::thrift::RpcOptions option;
486           option.setMemAllocType(
487               apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT);
488           std::string s = "abcdefghijk";
489           auto sink = co_await client.co_custom(option, s);
490           bool custom = co_await sink.sink(
491               [s]() -> folly::coro::AsyncGenerator<folly::IOBuf&&> {
492                 co_yield std::move(*folly::IOBuf::copyBuffer(s));
493               }());
494           EXPECT_FALSE(custom);
495         }
496       });
497 }
498 
499 } // namespace thrift
500 } // namespace apache
501