1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/portability/GFlags.h>
18 #include <folly/portability/GTest.h>
19 #include <folly/test/TestUtils.h>
20 
21 #include <thrift/lib/cpp2/transport/core/testutil/MockCallback.h>
22 #include <thrift/lib/cpp2/transport/core/testutil/TransportCompatibilityTest.h>
23 #include <thrift/lib/cpp2/transport/rocket/test/util/TestUtil.h>
24 
25 DECLARE_string(transport); // ConnectionManager depends on this flag.
26 
27 namespace apache {
28 namespace thrift {
29 
30 using namespace testutil::testservice;
31 using namespace apache::thrift::transport;
32 
33 class TransportUpgradeCompatibilityTest : public testing::TestWithParam<bool> {
34  public:
TransportUpgradeCompatibilityTest()35   TransportUpgradeCompatibilityTest() {
36     FLAGS_transport = "header";
37 
38     compatibilityTest_ = std::make_unique<TransportCompatibilityTest>();
39     compatibilityTest_->setTransportUpgrade(GetParam());
40     compatibilityTest_->startServer();
41   }
42 
43  protected:
44   std::unique_ptr<TransportCompatibilityTest> compatibilityTest_;
45 };
46 
47 INSTANTIATE_TEST_CASE_P(
48     NoUpgrade, TransportUpgradeCompatibilityTest, testing::Values(false));
49 INSTANTIATE_TEST_CASE_P(
50     Upgrade, TransportUpgradeCompatibilityTest, testing::Values(true));
51 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Simple)52 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Simple) {
53   compatibilityTest_->TestRequestResponse_Simple();
54 }
55 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Sync)56 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Sync) {
57   compatibilityTest_->TestRequestResponse_Sync();
58 }
59 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Destruction)60 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Destruction) {
61   compatibilityTest_->TestRequestResponse_Destruction();
62 }
63 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_MultipleClients)64 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_MultipleClients) {
65   compatibilityTest_->TestRequestResponse_MultipleClients();
66 }
67 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ExpectedException)68 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ExpectedException) {
69   compatibilityTest_->TestRequestResponse_ExpectedException();
70 }
71 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_UnexpectedException)72 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_UnexpectedException) {
73   compatibilityTest_->TestRequestResponse_UnexpectedException();
74 }
75 
76 // Warning: This test may be flaky due to use of timeouts.
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Timeout)77 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Timeout) {
78   compatibilityTest_->TestRequestResponse_Timeout();
79 }
80 
TEST_P(TransportUpgradeCompatibilityTest,DefaultTimeoutValueTest)81 TEST_P(TransportUpgradeCompatibilityTest, DefaultTimeoutValueTest) {
82   compatibilityTest_->connectToServer([](auto client) {
83     // Opts with no timeout value
84     RpcOptions opts;
85 
86     // Ok to sleep for 100msec
87     auto cb = std::make_unique<MockCallback>(false, false);
88     client->sleep(opts, std::move(cb), 100);
89 
90     /* Sleep to give time for all callbacks to be completed */
91     /* sleep override */
92     std::this_thread::sleep_for(std::chrono::milliseconds(200));
93 
94     auto* channel = dynamic_cast<ClientChannel*>(client->getChannel());
95     EXPECT_TRUE(channel);
96     channel->getEventBase()->runInEventBaseThreadAndWait([&]() {
97       channel->setTimeout(1); // 1ms
98     });
99 
100     // Now it should timeout
101     cb = std::make_unique<MockCallback>(false, true);
102     client->sleep(opts, std::move(cb), 100);
103 
104     /* Sleep to give time for all callbacks to be completed */
105     /* sleep override */
106     std::this_thread::sleep_for(std::chrono::milliseconds(200));
107   });
108 }
109 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header)110 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Header) {
111   compatibilityTest_->TestRequestResponse_Header();
112 }
113 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_Load)114 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Header_Load) {
115   compatibilityTest_->TestRequestResponse_Header_Load();
116 }
117 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_ExpectedException)118 TEST_P(
119     TransportUpgradeCompatibilityTest,
120     RequestResponse_Header_ExpectedException) {
121   compatibilityTest_->TestRequestResponse_Header_ExpectedException();
122 }
123 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Header_UnexpectedException)124 TEST_P(
125     TransportUpgradeCompatibilityTest,
126     RequestResponse_Header_UnexpectedException) {
127   compatibilityTest_->TestRequestResponse_Header_UnexpectedException();
128 }
129 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_IsOverloaded)130 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_IsOverloaded) {
131   compatibilityTest_->TestRequestResponse_IsOverloaded();
132 }
133 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_Connection_CloseNow)134 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_Connection_CloseNow) {
135   compatibilityTest_->connectToServer([](auto client) {
136     // It should not reach to server: no EXPECT_CALL for add_(3)
137 
138     // Observe the behavior if the connection is closed already
139     auto channel = static_cast<ClientChannel*>(client->getChannel());
140     channel->getEventBase()->runInEventBaseThreadAndWait(
141         [&]() { channel->closeNow(); });
142 
143     try {
144       client->future_add(3).get();
145       EXPECT_TRUE(false) << "future_add should have thrown";
146     } catch (TTransportException& ex) {
147       EXPECT_EQ(TTransportException::UNKNOWN, ex.getType());
148       EXPECT_PRED_FORMAT2(IsSubstring, "Channel is !good()", ex.what());
149     }
150   });
151 }
152 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ServerQueueTimeout)153 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ServerQueueTimeout) {
154   compatibilityTest_->TestRequestResponse_ServerQueueTimeout();
155 }
156 
TEST_P(TransportUpgradeCompatibilityTest,RequestResponse_ResponseSizeTooBig)157 TEST_P(TransportUpgradeCompatibilityTest, RequestResponse_ResponseSizeTooBig) {
158   compatibilityTest_->TestRequestResponse_ResponseSizeTooBig();
159 }
160 
161 // TODO(T90625074)
TEST_P(TransportUpgradeCompatibilityTest,DISABLED_RequestResponse_Checksumming)162 TEST_P(
163     TransportUpgradeCompatibilityTest, DISABLED_RequestResponse_Checksumming) {
164   // Checksum not implemented for header transport
165   if (!GetParam()) {
166     return;
167   }
168   compatibilityTest_->TestRequestResponse_Checksumming();
169 }
170 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Simple)171 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Simple) {
172   compatibilityTest_->TestOneway_Simple();
173 }
174 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_WithDelay)175 TEST_P(TransportUpgradeCompatibilityTest, Oneway_WithDelay) {
176   compatibilityTest_->TestOneway_WithDelay();
177 }
178 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_UnexpectedException)179 TEST_P(TransportUpgradeCompatibilityTest, Oneway_UnexpectedException) {
180   compatibilityTest_->TestOneway_UnexpectedException();
181 }
182 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Connection_CloseNow)183 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Connection_CloseNow) {
184   compatibilityTest_->TestOneway_Connection_CloseNow();
185 }
186 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_ServerQueueTimeout)187 TEST_P(TransportUpgradeCompatibilityTest, Oneway_ServerQueueTimeout) {
188   compatibilityTest_->TestOneway_ServerQueueTimeout();
189 }
190 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Checksumming)191 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Checksumming) {
192   // Checksum not implemented for header transport
193   if (!GetParam()) {
194     return;
195   }
196   compatibilityTest_->TestOneway_Checksumming();
197 }
198 
TEST_P(TransportUpgradeCompatibilityTest,Oneway_Sampled_Checksumming)199 TEST_P(TransportUpgradeCompatibilityTest, Oneway_Sampled_Checksumming) {
200   // Checksum not implemented for header transport
201   if (!GetParam()) {
202     return;
203   }
204   compatibilityTest_->TestOneway_Checksumming(true);
205 }
206 
TEST_P(TransportUpgradeCompatibilityTest,RequestContextIsPreserved)207 TEST_P(TransportUpgradeCompatibilityTest, RequestContextIsPreserved) {
208   compatibilityTest_->TestRequestContextIsPreserved();
209 }
210 
TEST_P(TransportUpgradeCompatibilityTest,BadPayload)211 TEST_P(TransportUpgradeCompatibilityTest, BadPayload) {
212   compatibilityTest_->TestBadPayload();
213 }
214 
TEST_P(TransportUpgradeCompatibilityTest,EvbSwitch)215 TEST_P(TransportUpgradeCompatibilityTest, EvbSwitch) {
216   compatibilityTest_->TestEvbSwitch();
217 }
218 
TEST_P(TransportUpgradeCompatibilityTest,EvbSwitch_Failure)219 TEST_P(TransportUpgradeCompatibilityTest, EvbSwitch_Failure) {
220   compatibilityTest_->TestEvbSwitch_Failure();
221 }
222 
223 class CloseCallbackTest : public CloseCallback {
224  public:
channelClosed()225   void channelClosed() override {
226     EXPECT_FALSE(closed_);
227     closed_ = true;
228   }
isClosed()229   bool isClosed() { return closed_; }
230 
231  private:
232   bool closed_{false};
233 };
234 
TEST_P(TransportUpgradeCompatibilityTest,CloseCallback)235 TEST_P(TransportUpgradeCompatibilityTest, CloseCallback) {
236   compatibilityTest_->connectToServer(
237       [this](std::unique_ptr<TestServiceAsyncClient> client) {
238         EXPECT_CALL(*compatibilityTest_->handler_.get(), sumTwoNumbers_(1, 2))
239             .Times(1);
240 
241         auto closeCb = std::make_unique<CloseCallbackTest>();
242         auto channel = static_cast<ClientChannel*>(client->getChannel());
243         auto evb = channel->getEventBase();
244         evb->runInEventBaseThreadAndWait(
245             [&]() { channel->setCloseCallback(closeCb.get()); });
246         // send a request so that transport upgrade kicks in (if enabled)
247         EXPECT_EQ(3, client->future_sumTwoNumbers(1, 2).get());
248 
249         EXPECT_FALSE(closeCb->isClosed());
250         evb->runInEventBaseThreadAndWait([&]() { channel->closeNow(); });
251         EXPECT_TRUE(closeCb->isClosed());
252       });
253 }
254 
TEST_P(TransportUpgradeCompatibilityTest,ConnectionStats)255 TEST_P(TransportUpgradeCompatibilityTest, ConnectionStats) {
256   compatibilityTest_->TestConnectionStats();
257 }
258 
TEST_P(TransportUpgradeCompatibilityTest,ObserverSendReceiveRequests)259 TEST_P(TransportUpgradeCompatibilityTest, ObserverSendReceiveRequests) {
260   compatibilityTest_->TestObserverSendReceiveRequests();
261 }
262 
TEST_P(TransportUpgradeCompatibilityTest,ConnectionContext)263 TEST_P(TransportUpgradeCompatibilityTest, ConnectionContext) {
264   compatibilityTest_->TestConnectionContext();
265 }
266 
TEST_P(TransportUpgradeCompatibilityTest,ClientIdentityHook)267 TEST_P(TransportUpgradeCompatibilityTest, ClientIdentityHook) {
268   compatibilityTest_->TestClientIdentityHook();
269 }
270 
271 } // namespace thrift
272 } // namespace apache
273