1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/portability/GFlags.h>
18 #include <folly/portability/GTest.h>
19 #include <folly/test/TestUtils.h>
20
21 #include <thrift/lib/cpp2/transport/core/testutil/MockCallback.h>
22 #include <thrift/lib/cpp2/transport/core/testutil/TransportCompatibilityTest.h>
23 #include <thrift/lib/cpp2/transport/rocket/test/util/TestUtil.h>
24
25 DECLARE_string(transport); // ConnectionManager depends on this flag.
26
27 namespace apache {
28 namespace thrift {
29
30 using namespace testutil::testservice;
31 using namespace apache::thrift::transport;
32
33 class TransportUpgradeCompatibilityTest : public testing::TestWithParam<bool> {
34 public:
TransportUpgradeCompatibilityTest()35 TransportUpgradeCompatibilityTest() {
36 FLAGS_transport = "header";
37
38 compatibilityTest_ = std::make_unique<TransportCompatibilityTest>();
39 compatibilityTest_->setTransportUpgrade(GetParam());
40 compatibilityTest_->startServer();
41 }
42
43 protected:
44 std::unique_ptr<TransportCompatibilityTest> compatibilityTest_;
45 };
46
47 INSTANTIATE_TEST_CASE_P(
48 NoUpgrade, TransportUpgradeCompatibilityTest, testing::Values(false));
49 INSTANTIATE_TEST_CASE_P(
50 Upgrade, TransportUpgradeCompatibilityTest, testing::Values(true));
51
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Simple)52 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Simple) {
53 compatibilityTest_->TestRequestResponse_Simple();
54 }
55
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Sync)56 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Sync) {
57 compatibilityTest_->TestRequestResponse_Sync();
58 }
59
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Destruction)60 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Destruction) {
61 compatibilityTest_->TestRequestResponse_Destruction();
62 }
63
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_MultipleClients)64 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_MultipleClients) {
65 compatibilityTest_->TestRequestResponse_MultipleClients();
66 }
67
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ExpectedException)68 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ExpectedException) {
69 compatibilityTest_->TestRequestResponse_ExpectedException();
70 }
71
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_UnexpectedException)72 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_UnexpectedException) {
73 compatibilityTest_->TestRequestResponse_UnexpectedException();
74 }
75
76 // Warning: This test may be flaky due to use of timeouts.
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Timeout)77 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Timeout) {
78 compatibilityTest_->TestRequestResponse_Timeout();
79 }
80
TEST_P(TransportUpgradeCompatibilityTest,DefaultTimeoutValueTest)81 TEST_P(TransportUpgradeCompatibilityTest, DefaultTimeoutValueTest) {
82 compatibilityTest_->connectToServer([](auto client) {
83 // Opts with no timeout value
84 RpcOptions opts;
85
86 // Ok to sleep for 100msec
87 auto cb = std::make_unique<MockCallback>(false, false);
88 client->sleep(opts, std::move(cb), 100);
89
90 /* Sleep to give time for all callbacks to be completed */
91 /* sleep override */
92 std::this_thread::sleep_for(std::chrono::milliseconds(200));
93
94 auto* channel = dynamic_cast<ClientChannel*>(client->getChannel());
95 EXPECT_TRUE(channel);
96 channel->getEventBase()->runInEventBaseThreadAndWait([&]() {
97 channel->setTimeout(1); // 1ms
98 });
99
100 // Now it should timeout
101 cb = std::make_unique<MockCallback>(false, true);
102 client->sleep(opts, std::move(cb), 100);
103
104 /* Sleep to give time for all callbacks to be completed */
105 /* sleep override */
106 std::this_thread::sleep_for(std::chrono::milliseconds(200));
107 });
108 }
109
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header)110 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Header) {
111 compatibilityTest_->TestRequestResponse_Header();
112 }
113
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_Load)114 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Header_Load) {
115 compatibilityTest_->TestRequestResponse_Header_Load();
116 }
117
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_ExpectedException)118 TEST_P(
119 TransportUpgradeCompatibilityTest,
120 RequestResponse_Header_ExpectedException) {
121 compatibilityTest_->TestRequestResponse_Header_ExpectedException();
122 }
123
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_UnexpectedException)124 TEST_P(
125 TransportUpgradeCompatibilityTest,
126 RequestResponse_Header_UnexpectedException) {
127 compatibilityTest_->TestRequestResponse_Header_UnexpectedException();
128 }
129
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_IsOverloaded)130 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_IsOverloaded) {
131 compatibilityTest_->TestRequestResponse_IsOverloaded();
132 }
133
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Connection_CloseNow)134 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Connection_CloseNow) {
135 compatibilityTest_->connectToServer([](auto client) {
136 // It should not reach to server: no EXPECT_CALL for add_(3)
137
138 // Observe the behavior if the connection is closed already
139 auto channel = static_cast<ClientChannel*>(client->getChannel());
140 channel->getEventBase()->runInEventBaseThreadAndWait(
141 [&]() { channel->closeNow(); });
142
143 try {
144 client->future_add(3).get();
145 EXPECT_TRUE(false) << "future_add should have thrown";
146 } catch (TTransportException& ex) {
147 EXPECT_EQ(TTransportException::UNKNOWN, ex.getType());
148 EXPECT_PRED_FORMAT2(IsSubstring, "Channel is !good()", ex.what());
149 }
150 });
151 }
152
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ServerQueueTimeout)153 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ServerQueueTimeout) {
154 compatibilityTest_->TestRequestResponse_ServerQueueTimeout();
155 }
156
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ResponseSizeTooBig)157 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ResponseSizeTooBig) {
158 compatibilityTest_->TestRequestResponse_ResponseSizeTooBig();
159 }
160
161 // TODO(T90625074)
TEST_P(TransportUpgradeCompatibilityTest,DISABLED_RequestResponse_Checksumming)162 TEST_P(
163 TransportUpgradeCompatibilityTest, DISABLED_RequestResponse_Checksumming) {
164 // Checksum not implemented for header transport
165 if (!GetParam()) {
166 return;
167 }
168 compatibilityTest_->TestRequestResponse_Checksumming();
169 }
170
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Simple)171 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Simple) {
172 compatibilityTest_->TestOneway_Simple();
173 }
174
TEST_P(TransportUpgradeCompatibilityTest,Oneway_WithDelay)175 TEST_P(TransportUpgradeCompatibilityTest, Oneway_WithDelay) {
176 compatibilityTest_->TestOneway_WithDelay();
177 }
178
TEST_P(TransportUpgradeCompatibilityTest,Oneway_UnexpectedException)179 TEST_P(TransportUpgradeCompatibilityTest, Oneway_UnexpectedException) {
180 compatibilityTest_->TestOneway_UnexpectedException();
181 }
182
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Connection_CloseNow)183 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Connection_CloseNow) {
184 compatibilityTest_->TestOneway_Connection_CloseNow();
185 }
186
TEST_P(TransportUpgradeCompatibilityTest,Oneway_ServerQueueTimeout)187 TEST_P(TransportUpgradeCompatibilityTest, Oneway_ServerQueueTimeout) {
188 compatibilityTest_->TestOneway_ServerQueueTimeout();
189 }
190
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Checksumming)191 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Checksumming) {
192 // Checksum not implemented for header transport
193 if (!GetParam()) {
194 return;
195 }
196 compatibilityTest_->TestOneway_Checksumming();
197 }
198
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Sampled_Checksumming)199 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Sampled_Checksumming) {
200 // Checksum not implemented for header transport
201 if (!GetParam()) {
202 return;
203 }
204 compatibilityTest_->TestOneway_Checksumming(true);
205 }
206
TEST_P(TransportUpgradeCompatibilityTest,RequestContextIsPreserved)207 TEST_P(TransportUpgradeCompatibilityTest, RequestContextIsPreserved) {
208 compatibilityTest_->TestRequestContextIsPreserved();
209 }
210
TEST_P(TransportUpgradeCompatibilityTest,BadPayload)211 TEST_P(TransportUpgradeCompatibilityTest, BadPayload) {
212 compatibilityTest_->TestBadPayload();
213 }
214
TEST_P(TransportUpgradeCompatibilityTest,EvbSwitch)215 TEST_P(TransportUpgradeCompatibilityTest, EvbSwitch) {
216 compatibilityTest_->TestEvbSwitch();
217 }
218
TEST_P(TransportUpgradeCompatibilityTest,EvbSwitch_Failure)219 TEST_P(TransportUpgradeCompatibilityTest, EvbSwitch_Failure) {
220 compatibilityTest_->TestEvbSwitch_Failure();
221 }
222
223 class CloseCallbackTest : public CloseCallback {
224 public:
channelClosed()225 void channelClosed() override {
226 EXPECT_FALSE(closed_);
227 closed_ = true;
228 }
isClosed()229 bool isClosed() { return closed_; }
230
231 private:
232 bool closed_{false};
233 };
234
TEST_P(TransportUpgradeCompatibilityTest,CloseCallback)235 TEST_P(TransportUpgradeCompatibilityTest, CloseCallback) {
236 compatibilityTest_->connectToServer(
237 [this](std::unique_ptr<TestServiceAsyncClient> client) {
238 EXPECT_CALL(*compatibilityTest_->handler_.get(), sumTwoNumbers_(1, 2))
239 .Times(1);
240
241 auto closeCb = std::make_unique<CloseCallbackTest>();
242 auto channel = static_cast<ClientChannel*>(client->getChannel());
243 auto evb = channel->getEventBase();
244 evb->runInEventBaseThreadAndWait(
245 [&]() { channel->setCloseCallback(closeCb.get()); });
246 // send a request so that transport upgrade kicks in (if enabled)
247 EXPECT_EQ(3, client->future_sumTwoNumbers(1, 2).get());
248
249 EXPECT_FALSE(closeCb->isClosed());
250 evb->runInEventBaseThreadAndWait([&]() { channel->closeNow(); });
251 EXPECT_TRUE(closeCb->isClosed());
252 });
253 }
254
TEST_P(TransportUpgradeCompatibilityTest,ConnectionStats)255 TEST_P(TransportUpgradeCompatibilityTest, ConnectionStats) {
256 compatibilityTest_->TestConnectionStats();
257 }
258
TEST_P(TransportUpgradeCompatibilityTest,ObserverSendReceiveRequests)259 TEST_P(TransportUpgradeCompatibilityTest, ObserverSendReceiveRequests) {
260 compatibilityTest_->TestObserverSendReceiveRequests();
261 }
262
TEST_P(TransportUpgradeCompatibilityTest,ConnectionContext)263 TEST_P(TransportUpgradeCompatibilityTest, ConnectionContext) {
264 compatibilityTest_->TestConnectionContext();
265 }
266
TEST_P(TransportUpgradeCompatibilityTest,ClientIdentityHook)267 TEST_P(TransportUpgradeCompatibilityTest, ClientIdentityHook) {
268 compatibilityTest_->TestClientIdentityHook();
269 }
270
271 } // namespace thrift
272 } // namespace apache
273