1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27
28 #include <google/protobuf/arena.h>
29
30 #include <grpc/impl/codegen/log.h>
31 #include <gtest/gtest.h>
32
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/server.h>
37 #include <grpcpp/server_builder.h>
38 #include <grpcpp/server_context.h>
39 #include <grpcpp/support/client_callback.h>
40 #include <grpcpp/support/message_allocator.h>
41
42 #include "src/core/lib/iomgr/iomgr.h"
43 #include "src/proto/grpc/testing/echo.grpc.pb.h"
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47
48 // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
49 // should be skipped based on a decision made at SetUp time. In particular, any
50 // callback tests can only be run if the iomgr can run in the background or if
51 // the transport is in-process.
52 #define MAYBE_SKIP_TEST \
53 do { \
54 if (do_not_test_) { \
55 return; \
56 } \
57 } while (0)
58
59 namespace grpc {
60 namespace testing {
61 namespace {
62
63 class CallbackTestServiceImpl
64 : public EchoTestService::ExperimentalCallbackService {
65 public:
CallbackTestServiceImpl()66 explicit CallbackTestServiceImpl() {}
67
SetAllocatorMutator(std::function<void (experimental::RpcAllocatorState * allocator_state,const EchoRequest * req,EchoResponse * resp)> mutator)68 void SetAllocatorMutator(
69 std::function<void(experimental::RpcAllocatorState* allocator_state,
70 const EchoRequest* req, EchoResponse* resp)>
71 mutator) {
72 allocator_mutator_ = std::move(mutator);
73 }
74
Echo(experimental::CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)75 experimental::ServerUnaryReactor* Echo(
76 experimental::CallbackServerContext* context, const EchoRequest* request,
77 EchoResponse* response) override {
78 response->set_message(request->message());
79 if (allocator_mutator_) {
80 allocator_mutator_(context->GetRpcAllocatorState(), request, response);
81 }
82 auto* reactor = context->DefaultReactor();
83 reactor->Finish(Status::OK);
84 return reactor;
85 }
86
87 private:
88 std::function<void(experimental::RpcAllocatorState* allocator_state,
89 const EchoRequest* req, EchoResponse* resp)>
90 allocator_mutator_;
91 };
92
93 enum class Protocol { INPROC, TCP };
94
95 class TestScenario {
96 public:
TestScenario(Protocol protocol,const std::string & creds_type)97 TestScenario(Protocol protocol, const std::string& creds_type)
98 : protocol(protocol), credentials_type(creds_type) {}
99 void Log() const;
100 Protocol protocol;
101 const std::string credentials_type;
102 };
103
operator <<(std::ostream & out,const TestScenario & scenario)104 static std::ostream& operator<<(std::ostream& out,
105 const TestScenario& scenario) {
106 return out << "TestScenario{protocol="
107 << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
108 << "," << scenario.credentials_type << "}";
109 }
110
Log() const111 void TestScenario::Log() const {
112 std::ostringstream out;
113 out << *this;
114 gpr_log(GPR_INFO, "%s", out.str().c_str());
115 }
116
117 class MessageAllocatorEnd2endTestBase
118 : public ::testing::TestWithParam<TestScenario> {
119 protected:
MessageAllocatorEnd2endTestBase()120 MessageAllocatorEnd2endTestBase() {
121 GetParam().Log();
122 if (GetParam().protocol == Protocol::TCP) {
123 if (!grpc_iomgr_run_in_background()) {
124 do_not_test_ = true;
125 return;
126 }
127 }
128 }
129
130 ~MessageAllocatorEnd2endTestBase() override = default;
131
CreateServer(experimental::MessageAllocator<EchoRequest,EchoResponse> * allocator)132 void CreateServer(
133 experimental::MessageAllocator<EchoRequest, EchoResponse>* allocator) {
134 ServerBuilder builder;
135
136 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
137 GetParam().credentials_type);
138 if (GetParam().protocol == Protocol::TCP) {
139 picked_port_ = grpc_pick_unused_port_or_die();
140 server_address_ << "localhost:" << picked_port_;
141 builder.AddListeningPort(server_address_.str(), server_creds);
142 }
143 callback_service_.SetMessageAllocatorFor_Echo(allocator);
144 builder.RegisterService(&callback_service_);
145
146 server_ = builder.BuildAndStart();
147 }
148
DestroyServer()149 void DestroyServer() {
150 if (server_) {
151 server_->Shutdown();
152 server_.reset();
153 }
154 }
155
ResetStub()156 void ResetStub() {
157 ChannelArguments args;
158 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
159 GetParam().credentials_type, &args);
160 switch (GetParam().protocol) {
161 case Protocol::TCP:
162 channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
163 channel_creds, args);
164 break;
165 case Protocol::INPROC:
166 channel_ = server_->InProcessChannel(args);
167 break;
168 default:
169 assert(false);
170 }
171 stub_ = EchoTestService::NewStub(channel_);
172 }
173
TearDown()174 void TearDown() override {
175 DestroyServer();
176 if (picked_port_ > 0) {
177 grpc_recycle_unused_port(picked_port_);
178 }
179 }
180
SendRpcs(int num_rpcs)181 void SendRpcs(int num_rpcs) {
182 std::string test_string("");
183 for (int i = 0; i < num_rpcs; i++) {
184 EchoRequest request;
185 EchoResponse response;
186 ClientContext cli_ctx;
187
188 test_string += std::string(1024, 'x');
189 request.set_message(test_string);
190 std::string val;
191 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
192
193 std::mutex mu;
194 std::condition_variable cv;
195 bool done = false;
196 stub_->experimental_async()->Echo(
197 &cli_ctx, &request, &response,
198 [&request, &response, &done, &mu, &cv, val](Status s) {
199 GPR_ASSERT(s.ok());
200
201 EXPECT_EQ(request.message(), response.message());
202 std::lock_guard<std::mutex> l(mu);
203 done = true;
204 cv.notify_one();
205 });
206 std::unique_lock<std::mutex> l(mu);
207 while (!done) {
208 cv.wait(l);
209 }
210 }
211 }
212
213 bool do_not_test_{false};
214 int picked_port_{0};
215 std::shared_ptr<Channel> channel_;
216 std::unique_ptr<EchoTestService::Stub> stub_;
217 CallbackTestServiceImpl callback_service_;
218 std::unique_ptr<Server> server_;
219 std::ostringstream server_address_;
220 };
221
222 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
223
TEST_P(NullAllocatorTest,SimpleRpc)224 TEST_P(NullAllocatorTest, SimpleRpc) {
225 MAYBE_SKIP_TEST;
226 CreateServer(nullptr);
227 ResetStub();
228 SendRpcs(1);
229 }
230
231 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
232 public:
233 class SimpleAllocator
234 : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
235 public:
236 class MessageHolderImpl
237 : public experimental::MessageHolder<EchoRequest, EchoResponse> {
238 public:
MessageHolderImpl(std::atomic_int * request_deallocation_count,std::atomic_int * messages_deallocation_count)239 MessageHolderImpl(std::atomic_int* request_deallocation_count,
240 std::atomic_int* messages_deallocation_count)
241 : request_deallocation_count_(request_deallocation_count),
242 messages_deallocation_count_(messages_deallocation_count) {
243 set_request(new EchoRequest);
244 set_response(new EchoResponse);
245 }
Release()246 void Release() override {
247 (*messages_deallocation_count_)++;
248 delete request();
249 delete response();
250 delete this;
251 }
FreeRequest()252 void FreeRequest() override {
253 (*request_deallocation_count_)++;
254 delete request();
255 set_request(nullptr);
256 }
257
ReleaseRequest()258 EchoRequest* ReleaseRequest() {
259 auto* ret = request();
260 set_request(nullptr);
261 return ret;
262 }
263
264 private:
265 std::atomic_int* const request_deallocation_count_;
266 std::atomic_int* const messages_deallocation_count_;
267 };
AllocateMessages()268 experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
269 override {
270 allocation_count++;
271 return new MessageHolderImpl(&request_deallocation_count,
272 &messages_deallocation_count);
273 }
274 int allocation_count = 0;
275 std::atomic_int request_deallocation_count{0};
276 std::atomic_int messages_deallocation_count{0};
277 };
278 };
279
TEST_P(SimpleAllocatorTest,SimpleRpc)280 TEST_P(SimpleAllocatorTest, SimpleRpc) {
281 MAYBE_SKIP_TEST;
282 const int kRpcCount = 10;
283 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
284 CreateServer(allocator.get());
285 ResetStub();
286 SendRpcs(kRpcCount);
287 // messages_deallocaton_count is updated in Release after server side OnDone.
288 // Destroy server to make sure it has been updated.
289 DestroyServer();
290 EXPECT_EQ(kRpcCount, allocator->allocation_count);
291 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
292 EXPECT_EQ(0, allocator->request_deallocation_count);
293 }
294
TEST_P(SimpleAllocatorTest,RpcWithEarlyFreeRequest)295 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
296 MAYBE_SKIP_TEST;
297 const int kRpcCount = 10;
298 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
299 auto mutator = [](experimental::RpcAllocatorState* allocator_state,
300 const EchoRequest* req, EchoResponse* resp) {
301 auto* info =
302 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
303 EXPECT_EQ(req, info->request());
304 EXPECT_EQ(resp, info->response());
305 allocator_state->FreeRequest();
306 EXPECT_EQ(nullptr, info->request());
307 };
308 callback_service_.SetAllocatorMutator(mutator);
309 CreateServer(allocator.get());
310 ResetStub();
311 SendRpcs(kRpcCount);
312 // messages_deallocaton_count is updated in Release after server side OnDone.
313 // Destroy server to make sure it has been updated.
314 DestroyServer();
315 EXPECT_EQ(kRpcCount, allocator->allocation_count);
316 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
317 EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
318 }
319
TEST_P(SimpleAllocatorTest,RpcWithReleaseRequest)320 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
321 MAYBE_SKIP_TEST;
322 const int kRpcCount = 10;
323 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
324 std::vector<EchoRequest*> released_requests;
325 auto mutator = [&released_requests](
326 experimental::RpcAllocatorState* allocator_state,
327 const EchoRequest* req, EchoResponse* resp) {
328 auto* info =
329 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
330 EXPECT_EQ(req, info->request());
331 EXPECT_EQ(resp, info->response());
332 released_requests.push_back(info->ReleaseRequest());
333 EXPECT_EQ(nullptr, info->request());
334 };
335 callback_service_.SetAllocatorMutator(mutator);
336 CreateServer(allocator.get());
337 ResetStub();
338 SendRpcs(kRpcCount);
339 // messages_deallocaton_count is updated in Release after server side OnDone.
340 // Destroy server to make sure it has been updated.
341 DestroyServer();
342 EXPECT_EQ(kRpcCount, allocator->allocation_count);
343 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
344 EXPECT_EQ(0, allocator->request_deallocation_count);
345 EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
346 for (auto* req : released_requests) {
347 delete req;
348 }
349 }
350
351 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
352 public:
353 class ArenaAllocator
354 : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
355 public:
356 class MessageHolderImpl
357 : public experimental::MessageHolder<EchoRequest, EchoResponse> {
358 public:
MessageHolderImpl()359 MessageHolderImpl() {
360 set_request(
361 google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
362 set_response(
363 google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
364 }
Release()365 void Release() override { delete this; }
FreeRequest()366 void FreeRequest() override { GPR_ASSERT(0); }
367
368 private:
369 google::protobuf::Arena arena_;
370 };
AllocateMessages()371 experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
372 override {
373 allocation_count++;
374 return new MessageHolderImpl;
375 }
376 int allocation_count = 0;
377 };
378 };
379
TEST_P(ArenaAllocatorTest,SimpleRpc)380 TEST_P(ArenaAllocatorTest, SimpleRpc) {
381 MAYBE_SKIP_TEST;
382 const int kRpcCount = 10;
383 std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
384 CreateServer(allocator.get());
385 ResetStub();
386 SendRpcs(kRpcCount);
387 EXPECT_EQ(kRpcCount, allocator->allocation_count);
388 }
389
CreateTestScenarios(bool test_insecure)390 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
391 std::vector<TestScenario> scenarios;
392 std::vector<std::string> credentials_types{
393 GetCredentialsProvider()->GetSecureCredentialsTypeList()};
394 auto insec_ok = [] {
395 // Only allow insecure credentials type when it is registered with the
396 // provider. User may create providers that do not have insecure.
397 return GetCredentialsProvider()->GetChannelCredentials(
398 kInsecureCredentialsType, nullptr) != nullptr;
399 };
400 if (test_insecure && insec_ok()) {
401 credentials_types.push_back(kInsecureCredentialsType);
402 }
403 GPR_ASSERT(!credentials_types.empty());
404
405 Protocol parr[]{Protocol::INPROC, Protocol::TCP};
406 for (Protocol p : parr) {
407 for (const auto& cred : credentials_types) {
408 // TODO(vjpai): Test inproc with secure credentials when feasible
409 if (p == Protocol::INPROC &&
410 (cred != kInsecureCredentialsType || !insec_ok())) {
411 continue;
412 }
413 scenarios.emplace_back(p, cred);
414 }
415 }
416 return scenarios;
417 }
418
419 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
420 ::testing::ValuesIn(CreateTestScenarios(true)));
421 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
422 ::testing::ValuesIn(CreateTestScenarios(true)));
423 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
424 ::testing::ValuesIn(CreateTestScenarios(true)));
425
426 } // namespace
427 } // namespace testing
428 } // namespace grpc
429
main(int argc,char ** argv)430 int main(int argc, char** argv) {
431 grpc::testing::TestEnvironment env(argc, argv);
432 // The grpc_init is to cover the MAYBE_SKIP_TEST.
433 grpc_init();
434 ::testing::InitGoogleTest(&argc, argv);
435 int ret = RUN_ALL_TESTS();
436 grpc_shutdown();
437 return ret;
438 }
439