1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <mutex>
20 #include <thread>
21
22 #include "absl/memory/memory.h"
23 #include "absl/strings/match.h"
24 #include "absl/strings/str_format.h"
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30 #include <grpcpp/channel.h>
31 #include <grpcpp/client_context.h>
32 #include <grpcpp/create_channel.h>
33 #include <grpcpp/resource_quota.h>
34 #include <grpcpp/security/auth_metadata_processor.h>
35 #include <grpcpp/security/credentials.h>
36 #include <grpcpp/security/server_credentials.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
39 #include <grpcpp/server_context.h>
40 #include <grpcpp/support/string_ref.h>
41 #include <grpcpp/test/channel_test_peer.h>
42
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/lib/gpr/env.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/security/credentials/credentials.h"
47 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
48 #include "src/proto/grpc/testing/echo.grpc.pb.h"
49 #include "test/core/util/port.h"
50 #include "test/core/util/test_config.h"
51 #include "test/cpp/end2end/interceptors_util.h"
52 #include "test/cpp/end2end/test_service_impl.h"
53 #include "test/cpp/util/string_ref_helper.h"
54 #include "test/cpp/util/test_credentials_provider.h"
55
56 #ifdef GRPC_POSIX_SOCKET_EV
57 #include "src/core/lib/iomgr/ev_posix.h"
58 #endif // GRPC_POSIX_SOCKET_EV
59
60 #include <gtest/gtest.h>
61
62 using grpc::testing::EchoRequest;
63 using grpc::testing::EchoResponse;
64 using grpc::testing::kTlsCredentialsType;
65 using std::chrono::system_clock;
66
67 namespace grpc {
68 namespace testing {
69 namespace {
70
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72 const std::string kIpv6("ipv6:[::1]:");
73 const std::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
74 const std::string kIpv4("ipv4:127.0.0.1:");
75 return addr.substr(0, kIpv4.size()) == kIpv4 ||
76 addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77 addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79
80 const int kClientChannelBackupPollIntervalMs = 200;
81
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87 "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
88 "AuthoritySelector:fake_selector}}";
89
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93 "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
94 "AuthoritySelector:wrong_selector}}";
95
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99 "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
100 "AuthoritySelector:fake_selector1}}";
101
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105 "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
106 "AuthoritySelector:fake_selector2}}";
107
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109 "SecureCallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110 "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112 "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113 "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115 "SecureCallCredentials{TestMetadataCredentials{key:meta_key,value:Does not "
116 "matter}}";
117 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
118 "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
119 "value:Does not matter, will fail anyway (see 3rd param)}}";
120 const char
121 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
122 [] = "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-"
123 "metadata,value:Dr Jekyll}}";
124 const char
125 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
126 [] = "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-"
127 "metadata,value:Mr Hyde}}";
128 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
129 "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
130 "value:Does not matter, will fail anyway (see 3rd param)}}";
131 const char kExpectedCompositeCallCredsDebugString[] =
132 "SecureCallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
133 "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
134 "call-creds-key2,value:call-creds-val2}}}";
135
136 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
137 public:
138 static const char kGoodMetadataKey[];
139 static const char kBadMetadataKey[];
140
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)141 TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
142 const grpc::string_ref& metadata_value,
143 bool is_blocking, bool is_successful,
144 int delay_ms)
145 : metadata_key_(metadata_key.data(), metadata_key.length()),
146 metadata_value_(metadata_value.data(), metadata_value.length()),
147 is_blocking_(is_blocking),
148 is_successful_(is_successful),
149 delay_ms_(delay_ms) {}
150
IsBlocking() const151 bool IsBlocking() const override { return is_blocking_; }
152
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)153 Status GetMetadata(
154 grpc::string_ref service_url, grpc::string_ref method_name,
155 const grpc::AuthContext& channel_auth_context,
156 std::multimap<std::string, std::string>* metadata) override {
157 if (delay_ms_ != 0) {
158 gpr_sleep_until(
159 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
160 gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
161 }
162 EXPECT_GT(service_url.length(), 0UL);
163 EXPECT_GT(method_name.length(), 0UL);
164 EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
165 EXPECT_TRUE(metadata != nullptr);
166 if (is_successful_) {
167 metadata->insert(std::make_pair(metadata_key_, metadata_value_));
168 return Status::OK;
169 } else {
170 return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
171 }
172 }
173
DebugString()174 std::string DebugString() override {
175 return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
176 metadata_key_.c_str(), metadata_value_.c_str());
177 }
178
179 private:
180 std::string metadata_key_;
181 std::string metadata_value_;
182 bool is_blocking_;
183 bool is_successful_;
184 int delay_ms_;
185 };
186
187 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
188 "TestPluginMetadata";
189 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
190 "test-plugin-metadata";
191
192 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
193 public:
194 static const char kGoodGuy[];
195
TestAuthMetadataProcessor(bool is_blocking)196 explicit TestAuthMetadataProcessor(bool is_blocking)
197 : is_blocking_(is_blocking) {}
198
GetCompatibleClientCreds()199 std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
200 return grpc::MetadataCredentialsFromPlugin(
201 std::unique_ptr<MetadataCredentialsPlugin>(
202 new TestMetadataCredentialsPlugin(
203 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
204 is_blocking_, true, 0)));
205 }
206
GetIncompatibleClientCreds()207 std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
208 return grpc::MetadataCredentialsFromPlugin(
209 std::unique_ptr<MetadataCredentialsPlugin>(
210 new TestMetadataCredentialsPlugin(
211 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
212 is_blocking_, true, 0)));
213 }
214
215 // Interface implementation
IsBlocking() const216 bool IsBlocking() const override { return is_blocking_; }
217
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)218 Status Process(const InputMetadata& auth_metadata, AuthContext* context,
219 OutputMetadata* consumed_auth_metadata,
220 OutputMetadata* response_metadata) override {
221 EXPECT_TRUE(consumed_auth_metadata != nullptr);
222 EXPECT_TRUE(context != nullptr);
223 EXPECT_TRUE(response_metadata != nullptr);
224 auto auth_md =
225 auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
226 EXPECT_NE(auth_md, auth_metadata.end());
227 string_ref auth_md_value = auth_md->second;
228 if (auth_md_value == kGoodGuy) {
229 context->AddProperty(kIdentityPropName, kGoodGuy);
230 context->SetPeerIdentityPropertyName(kIdentityPropName);
231 consumed_auth_metadata->insert(std::make_pair(
232 string(auth_md->first.data(), auth_md->first.length()),
233 string(auth_md->second.data(), auth_md->second.length())));
234 return Status::OK;
235 } else {
236 return Status(StatusCode::UNAUTHENTICATED,
237 string("Invalid principal: ") +
238 string(auth_md_value.data(), auth_md_value.length()));
239 }
240 }
241
242 private:
243 static const char kIdentityPropName[];
244 bool is_blocking_;
245 };
246
247 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
248 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
249
250 class Proxy : public ::grpc::testing::EchoTestService::Service {
251 public:
Proxy(const std::shared_ptr<Channel> & channel)252 explicit Proxy(const std::shared_ptr<Channel>& channel)
253 : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
254
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)255 Status Echo(ServerContext* server_context, const EchoRequest* request,
256 EchoResponse* response) override {
257 std::unique_ptr<ClientContext> client_context =
258 ClientContext::FromServerContext(*server_context);
259 return stub_->Echo(client_context.get(), *request, response);
260 }
261
262 private:
263 std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_;
264 };
265
266 class TestServiceImplDupPkg
267 : public ::grpc::testing::duplicate::EchoTestService::Service {
268 public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)269 Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
270 EchoResponse* response) override {
271 response->set_message("no package");
272 return Status::OK;
273 }
274 };
275
276 class TestScenario {
277 public:
TestScenario(bool interceptors,bool proxy,bool inproc_stub,const std::string & creds_type,bool use_callback_server)278 TestScenario(bool interceptors, bool proxy, bool inproc_stub,
279 const std::string& creds_type, bool use_callback_server)
280 : use_interceptors(interceptors),
281 use_proxy(proxy),
282 inproc(inproc_stub),
283 credentials_type(creds_type),
284 callback_server(use_callback_server) {}
285 void Log() const;
286 bool use_interceptors;
287 bool use_proxy;
288 bool inproc;
289 const std::string credentials_type;
290 bool callback_server;
291 };
292
operator <<(std::ostream & out,const TestScenario & scenario)293 static std::ostream& operator<<(std::ostream& out,
294 const TestScenario& scenario) {
295 return out << "TestScenario{use_interceptors="
296 << (scenario.use_interceptors ? "true" : "false")
297 << ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
298 << ", inproc=" << (scenario.inproc ? "true" : "false")
299 << ", server_type="
300 << (scenario.callback_server ? "callback" : "sync")
301 << ", credentials='" << scenario.credentials_type << "'}";
302 }
303
Log() const304 void TestScenario::Log() const {
305 std::ostringstream out;
306 out << *this;
307 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
308 }
309
310 class End2endTest : public ::testing::TestWithParam<TestScenario> {
311 protected:
SetUpTestCase()312 static void SetUpTestCase() { grpc_init(); }
TearDownTestCase()313 static void TearDownTestCase() { grpc_shutdown(); }
End2endTest()314 End2endTest()
315 : is_server_started_(false),
316 kMaxMessageSize_(8192),
317 special_service_("special"),
318 first_picked_port_(0) {
319 GetParam().Log();
320 }
321
TearDown()322 void TearDown() override {
323 if (is_server_started_) {
324 server_->Shutdown();
325 if (proxy_server_) proxy_server_->Shutdown();
326 }
327 if (first_picked_port_ > 0) {
328 grpc_recycle_unused_port(first_picked_port_);
329 }
330 }
331
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)332 void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
333 int port = grpc_pick_unused_port_or_die();
334 first_picked_port_ = port;
335 server_address_ << "localhost:" << port;
336 // Setup server
337 BuildAndStartServer(processor);
338 }
339
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)340 void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
341 if (is_server_started_) {
342 server_->Shutdown();
343 BuildAndStartServer(processor);
344 }
345 }
346
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)347 void BuildAndStartServer(
348 const std::shared_ptr<AuthMetadataProcessor>& processor) {
349 ServerBuilder builder;
350 ConfigureServerBuilder(&builder);
351 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
352 GetParam().credentials_type);
353 if (GetParam().credentials_type != kInsecureCredentialsType) {
354 server_creds->SetAuthMetadataProcessor(processor);
355 }
356 if (GetParam().use_interceptors) {
357 std::vector<
358 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
359 creators;
360 // Add 20 phony server interceptors
361 creators.reserve(20);
362 for (auto i = 0; i < 20; i++) {
363 creators.push_back(absl::make_unique<PhonyInterceptorFactory>());
364 }
365 builder.experimental().SetInterceptorCreators(std::move(creators));
366 }
367 builder.AddListeningPort(server_address_.str(), server_creds);
368 if (!GetParam().callback_server) {
369 builder.RegisterService(&service_);
370 } else {
371 builder.RegisterService(&callback_service_);
372 }
373 builder.RegisterService("foo.test.youtube.com", &special_service_);
374 builder.RegisterService(&dup_pkg_service_);
375
376 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
377 builder.SetSyncServerOption(
378 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
379
380 server_ = builder.BuildAndStart();
381 is_server_started_ = true;
382 }
383
ConfigureServerBuilder(ServerBuilder * builder)384 virtual void ConfigureServerBuilder(ServerBuilder* builder) {
385 builder->SetMaxMessageSize(
386 kMaxMessageSize_); // For testing max message size.
387 }
388
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})389 void ResetChannel(
390 std::vector<
391 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
392 interceptor_creators = {}) {
393 if (!is_server_started_) {
394 StartServer(std::shared_ptr<AuthMetadataProcessor>());
395 }
396 EXPECT_TRUE(is_server_started_);
397 ChannelArguments args;
398 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
399 GetParam().credentials_type, &args);
400 if (!user_agent_prefix_.empty()) {
401 args.SetUserAgentPrefix(user_agent_prefix_);
402 }
403 args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
404
405 if (!GetParam().inproc) {
406 if (!GetParam().use_interceptors) {
407 channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
408 channel_creds, args);
409 } else {
410 channel_ = CreateCustomChannelWithInterceptors(
411 server_address_.str(), channel_creds, args,
412 interceptor_creators.empty() ? CreatePhonyClientInterceptors()
413 : std::move(interceptor_creators));
414 }
415 } else {
416 if (!GetParam().use_interceptors) {
417 channel_ = server_->InProcessChannel(args);
418 } else {
419 channel_ = server_->experimental().InProcessChannelWithInterceptors(
420 args, interceptor_creators.empty()
421 ? CreatePhonyClientInterceptors()
422 : std::move(interceptor_creators));
423 }
424 }
425 }
426
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})427 void ResetStub(
428 std::vector<
429 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
430 interceptor_creators = {}) {
431 ResetChannel(std::move(interceptor_creators));
432 if (GetParam().use_proxy) {
433 proxy_service_ = absl::make_unique<Proxy>(channel_);
434 int port = grpc_pick_unused_port_or_die();
435 std::ostringstream proxyaddr;
436 proxyaddr << "localhost:" << port;
437 ServerBuilder builder;
438 builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
439 builder.RegisterService(proxy_service_.get());
440
441 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
442 builder.SetSyncServerOption(
443 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
444
445 proxy_server_ = builder.BuildAndStart();
446
447 channel_ =
448 grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
449 }
450
451 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
452 PhonyInterceptor::Reset();
453 }
454
455 bool is_server_started_;
456 std::shared_ptr<Channel> channel_;
457 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
458 std::unique_ptr<Server> server_;
459 std::unique_ptr<Server> proxy_server_;
460 std::unique_ptr<Proxy> proxy_service_;
461 std::ostringstream server_address_;
462 const int kMaxMessageSize_;
463 TestServiceImpl service_;
464 CallbackTestServiceImpl callback_service_;
465 TestServiceImpl special_service_;
466 TestServiceImplDupPkg dup_pkg_service_;
467 std::string user_agent_prefix_;
468 int first_picked_port_;
469 };
470
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)471 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
472 bool with_binary_metadata) {
473 EchoRequest request;
474 EchoResponse response;
475 request.set_message("Hello hello hello hello");
476
477 for (int i = 0; i < num_rpcs; ++i) {
478 ClientContext context;
479 if (with_binary_metadata) {
480 char bytes[8] = {'\0', '\1', '\2', '\3',
481 '\4', '\5', '\6', static_cast<char>(i)};
482 context.AddMetadata("custom-bin", std::string(bytes, 8));
483 }
484 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
485 Status s = stub->Echo(&context, request, &response);
486 EXPECT_EQ(response.message(), request.message());
487 EXPECT_TRUE(s.ok());
488 }
489 }
490
491 // This class is for testing scenarios where RPCs are cancelled on the server
492 // by calling ServerContext::TryCancel()
493 class End2endServerTryCancelTest : public End2endTest {
494 protected:
495 // Helper for testing client-streaming RPCs which are cancelled on the server.
496 // Depending on the value of server_try_cancel parameter, this will test one
497 // of the following three scenarios:
498 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
499 // any messages from the client
500 //
501 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
502 // messages from the client
503 //
504 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
505 // the messages from the client
506 //
507 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)508 void TestRequestStreamServerCancel(
509 ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
510 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
511 ResetStub();
512 EchoRequest request;
513 EchoResponse response;
514 ClientContext context;
515
516 // Send server_try_cancel value in the client metadata
517 context.AddMetadata(kServerTryCancelRequest,
518 std::to_string(server_try_cancel));
519
520 auto stream = stub_->RequestStream(&context, &response);
521
522 int num_msgs_sent = 0;
523 while (num_msgs_sent < num_msgs_to_send) {
524 request.set_message("hello");
525 if (!stream->Write(request)) {
526 break;
527 }
528 num_msgs_sent++;
529 }
530 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
531
532 stream->WritesDone();
533 Status s = stream->Finish();
534
535 // At this point, we know for sure that RPC was cancelled by the server
536 // since we passed server_try_cancel value in the metadata. Depending on the
537 // value of server_try_cancel, the RPC might have been cancelled by the
538 // server at different stages. The following validates our expectations of
539 // number of messages sent in various cancellation scenarios:
540
541 switch (server_try_cancel) {
542 case CANCEL_BEFORE_PROCESSING:
543 case CANCEL_DURING_PROCESSING:
544 // If the RPC is cancelled by server before / during messages from the
545 // client, it means that the client most likely did not get a chance to
546 // send all the messages it wanted to send. i.e num_msgs_sent <=
547 // num_msgs_to_send
548 EXPECT_LE(num_msgs_sent, num_msgs_to_send);
549 break;
550
551 case CANCEL_AFTER_PROCESSING:
552 // If the RPC was cancelled after all messages were read by the server,
553 // the client did get a chance to send all its messages
554 EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
555 break;
556
557 default:
558 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
559 server_try_cancel);
560 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
561 server_try_cancel <= CANCEL_AFTER_PROCESSING);
562 break;
563 }
564
565 EXPECT_FALSE(s.ok());
566 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
567 // Make sure that the server interceptors were notified
568 if (GetParam().use_interceptors) {
569 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
570 }
571 }
572
573 // Helper for testing server-streaming RPCs which are cancelled on the server.
574 // Depending on the value of server_try_cancel parameter, this will test one
575 // of the following three scenarios:
576 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
577 // any messages to the client
578 //
579 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
580 // messages to the client
581 //
582 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
583 // the messages to the client
584 //
585 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)586 void TestResponseStreamServerCancel(
587 ServerTryCancelRequestPhase server_try_cancel) {
588 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
589 ResetStub();
590 EchoRequest request;
591 EchoResponse response;
592 ClientContext context;
593
594 // Send server_try_cancel in the client metadata
595 context.AddMetadata(kServerTryCancelRequest,
596 std::to_string(server_try_cancel));
597
598 request.set_message("hello");
599 auto stream = stub_->ResponseStream(&context, request);
600
601 int num_msgs_read = 0;
602 while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
603 if (!stream->Read(&response)) {
604 break;
605 }
606 EXPECT_EQ(response.message(),
607 request.message() + std::to_string(num_msgs_read));
608 num_msgs_read++;
609 }
610 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
611
612 Status s = stream->Finish();
613
614 // Depending on the value of server_try_cancel, the RPC might have been
615 // cancelled by the server at different stages. The following validates our
616 // expectations of number of messages read in various cancellation
617 // scenarios:
618 switch (server_try_cancel) {
619 case CANCEL_BEFORE_PROCESSING:
620 // Server cancelled before sending any messages. Which means the client
621 // wouldn't have read any
622 EXPECT_EQ(num_msgs_read, 0);
623 break;
624
625 case CANCEL_DURING_PROCESSING:
626 // Server cancelled while writing messages. Client must have read less
627 // than or equal to the expected number of messages
628 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
629 break;
630
631 case CANCEL_AFTER_PROCESSING:
632 // Even though the Server cancelled after writing all messages, the RPC
633 // may be cancelled before the Client got a chance to read all the
634 // messages.
635 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
636 break;
637
638 default: {
639 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
640 server_try_cancel);
641 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
642 server_try_cancel <= CANCEL_AFTER_PROCESSING);
643 break;
644 }
645 }
646
647 EXPECT_FALSE(s.ok());
648 // Make sure that the server interceptors were notified
649 if (GetParam().use_interceptors) {
650 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
651 }
652 }
653
654 // Helper for testing bidirectional-streaming RPCs which are cancelled on the
655 // server. Depending on the value of server_try_cancel parameter, this will
656 // test one of the following three scenarios:
657 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
658 // writing any messages from/to the client
659 //
660 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
661 // writing messages from/to the client
662 //
663 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
664 // all the messages from/to the client
665 //
666 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)667 void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
668 int num_messages) {
669 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
670 ResetStub();
671 EchoRequest request;
672 EchoResponse response;
673 ClientContext context;
674
675 // Send server_try_cancel in the client metadata
676 context.AddMetadata(kServerTryCancelRequest,
677 std::to_string(server_try_cancel));
678
679 auto stream = stub_->BidiStream(&context);
680
681 int num_msgs_read = 0;
682 int num_msgs_sent = 0;
683 while (num_msgs_sent < num_messages) {
684 request.set_message("hello " + std::to_string(num_msgs_sent));
685 if (!stream->Write(request)) {
686 break;
687 }
688 num_msgs_sent++;
689
690 if (!stream->Read(&response)) {
691 break;
692 }
693 num_msgs_read++;
694
695 EXPECT_EQ(response.message(), request.message());
696 }
697 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
698 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
699
700 stream->WritesDone();
701 Status s = stream->Finish();
702
703 // Depending on the value of server_try_cancel, the RPC might have been
704 // cancelled by the server at different stages. The following validates our
705 // expectations of number of messages read in various cancellation
706 // scenarios:
707 switch (server_try_cancel) {
708 case CANCEL_BEFORE_PROCESSING:
709 EXPECT_EQ(num_msgs_read, 0);
710 break;
711
712 case CANCEL_DURING_PROCESSING:
713 EXPECT_LE(num_msgs_sent, num_messages);
714 EXPECT_LE(num_msgs_read, num_msgs_sent);
715 break;
716
717 case CANCEL_AFTER_PROCESSING:
718 EXPECT_EQ(num_msgs_sent, num_messages);
719
720 // The Server cancelled after reading the last message and after writing
721 // the message to the client. However, the RPC cancellation might have
722 // taken effect before the client actually read the response.
723 EXPECT_LE(num_msgs_read, num_msgs_sent);
724 break;
725
726 default:
727 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
728 server_try_cancel);
729 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
730 server_try_cancel <= CANCEL_AFTER_PROCESSING);
731 break;
732 }
733
734 EXPECT_FALSE(s.ok());
735 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
736 // Make sure that the server interceptors were notified
737 if (GetParam().use_interceptors) {
738 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
739 }
740 }
741 };
742
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)743 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
744 ResetStub();
745 EchoRequest request;
746 EchoResponse response;
747 ClientContext context;
748
749 context.AddMetadata(kServerTryCancelRequest,
750 std::to_string(CANCEL_BEFORE_PROCESSING));
751 Status s = stub_->Echo(&context, request, &response);
752 EXPECT_FALSE(s.ok());
753 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
754 }
755
756 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)757 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
758 TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
759 }
760
761 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)762 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
763 TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
764 }
765
766 // Server to cancel after reading all the requests but before returning to the
767 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)768 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
769 TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
770 }
771
772 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)773 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
774 TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
775 }
776
777 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)778 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
779 TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
780 }
781
782 // Server to cancel after writing all the respones to the stream but before
783 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)784 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
785 TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
786 }
787
788 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)789 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
790 TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
791 }
792
793 // Server to cancel while reading/writing requests/responses on the stream in
794 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)795 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
796 TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
797 }
798
799 // Server to cancel after reading/writing all requests/responses on the stream
800 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)801 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
802 TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
803 }
804
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)805 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
806 // User-Agent is an HTTP header for HTTP transports only
807 if (GetParam().inproc) {
808 return;
809 }
810 user_agent_prefix_ = "custom_prefix";
811 ResetStub();
812 EchoRequest request;
813 EchoResponse response;
814 request.set_message("Hello hello hello hello");
815 request.mutable_param()->set_echo_metadata(true);
816
817 ClientContext context;
818 Status s = stub_->Echo(&context, request, &response);
819 EXPECT_EQ(response.message(), request.message());
820 EXPECT_TRUE(s.ok());
821 const auto& trailing_metadata = context.GetServerTrailingMetadata();
822 auto iter = trailing_metadata.find("user-agent");
823 EXPECT_TRUE(iter != trailing_metadata.end());
824 std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
825 EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
826 }
827
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)828 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
829 ResetStub();
830 std::vector<std::thread> threads;
831 threads.reserve(10);
832 for (int i = 0; i < 10; ++i) {
833 threads.emplace_back(SendRpc, stub_.get(), 10, true);
834 }
835 for (int i = 0; i < 10; ++i) {
836 threads[i].join();
837 }
838 }
839
TEST_P(End2endTest,MultipleRpcs)840 TEST_P(End2endTest, MultipleRpcs) {
841 ResetStub();
842 std::vector<std::thread> threads;
843 threads.reserve(10);
844 for (int i = 0; i < 10; ++i) {
845 threads.emplace_back(SendRpc, stub_.get(), 10, false);
846 }
847 for (int i = 0; i < 10; ++i) {
848 threads[i].join();
849 }
850 }
851
TEST_P(End2endTest,ManyStubs)852 TEST_P(End2endTest, ManyStubs) {
853 ResetStub();
854 ChannelTestPeer peer(channel_.get());
855 int registered_calls_pre = peer.registered_calls();
856 int registration_attempts_pre = peer.registration_attempts();
857 for (int i = 0; i < 1000; ++i) {
858 grpc::testing::EchoTestService::NewStub(channel_);
859 }
860 EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
861 EXPECT_GT(peer.registration_attempts(), registration_attempts_pre);
862 }
863
TEST_P(End2endTest,EmptyBinaryMetadata)864 TEST_P(End2endTest, EmptyBinaryMetadata) {
865 ResetStub();
866 EchoRequest request;
867 EchoResponse response;
868 request.set_message("Hello hello hello hello");
869 ClientContext context;
870 context.AddMetadata("custom-bin", "");
871 Status s = stub_->Echo(&context, request, &response);
872 EXPECT_EQ(response.message(), request.message());
873 EXPECT_TRUE(s.ok());
874 }
875
TEST_P(End2endTest,ReconnectChannel)876 TEST_P(End2endTest, ReconnectChannel) {
877 if (GetParam().inproc) {
878 return;
879 }
880 int poller_slowdown_factor = 1;
881 // It needs 2 pollset_works to reconnect the channel with polling engine
882 // "poll"
883 #ifdef GRPC_POSIX_SOCKET_EV
884 grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
885 if (0 == strcmp(poller.get(), "poll")) {
886 poller_slowdown_factor = 2;
887 }
888 #endif // GRPC_POSIX_SOCKET_EV
889 ResetStub();
890 SendRpc(stub_.get(), 1, false);
891 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
892 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
893 // reconnect the channel. Make it a factor of 5x
894 gpr_sleep_until(
895 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
896 gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
897 poller_slowdown_factor *
898 grpc_test_slowdown_factor(),
899 GPR_TIMESPAN)));
900 SendRpc(stub_.get(), 1, false);
901 }
902
TEST_P(End2endTest,RequestStreamOneRequest)903 TEST_P(End2endTest, RequestStreamOneRequest) {
904 ResetStub();
905 EchoRequest request;
906 EchoResponse response;
907 ClientContext context;
908
909 auto stream = stub_->RequestStream(&context, &response);
910 request.set_message("hello");
911 EXPECT_TRUE(stream->Write(request));
912 stream->WritesDone();
913 Status s = stream->Finish();
914 EXPECT_EQ(response.message(), request.message());
915 EXPECT_TRUE(s.ok());
916 EXPECT_TRUE(context.debug_error_string().empty());
917 }
918
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)919 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
920 ResetStub();
921 EchoRequest request;
922 EchoResponse response;
923 ClientContext context;
924
925 context.set_initial_metadata_corked(true);
926 auto stream = stub_->RequestStream(&context, &response);
927 request.set_message("hello");
928 stream->WriteLast(request, WriteOptions());
929 Status s = stream->Finish();
930 EXPECT_EQ(response.message(), request.message());
931 EXPECT_TRUE(s.ok());
932 }
933
TEST_P(End2endTest,RequestStreamTwoRequests)934 TEST_P(End2endTest, RequestStreamTwoRequests) {
935 ResetStub();
936 EchoRequest request;
937 EchoResponse response;
938 ClientContext context;
939
940 auto stream = stub_->RequestStream(&context, &response);
941 request.set_message("hello");
942 EXPECT_TRUE(stream->Write(request));
943 EXPECT_TRUE(stream->Write(request));
944 stream->WritesDone();
945 Status s = stream->Finish();
946 EXPECT_EQ(response.message(), "hellohello");
947 EXPECT_TRUE(s.ok());
948 }
949
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)950 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
951 ResetStub();
952 EchoRequest request;
953 EchoResponse response;
954 ClientContext context;
955
956 auto stream = stub_->RequestStream(&context, &response);
957 request.set_message("hello");
958 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
959 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
960 stream->WritesDone();
961 Status s = stream->Finish();
962 EXPECT_EQ(response.message(), "hellohello");
963 EXPECT_TRUE(s.ok());
964 }
965
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)966 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
967 ResetStub();
968 EchoRequest request;
969 EchoResponse response;
970 ClientContext context;
971
972 context.set_initial_metadata_corked(true);
973 auto stream = stub_->RequestStream(&context, &response);
974 request.set_message("hello");
975 EXPECT_TRUE(stream->Write(request));
976 stream->WriteLast(request, WriteOptions());
977 Status s = stream->Finish();
978 EXPECT_EQ(response.message(), "hellohello");
979 EXPECT_TRUE(s.ok());
980 }
981
TEST_P(End2endTest,ResponseStream)982 TEST_P(End2endTest, ResponseStream) {
983 ResetStub();
984 EchoRequest request;
985 EchoResponse response;
986 ClientContext context;
987 request.set_message("hello");
988
989 auto stream = stub_->ResponseStream(&context, request);
990 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
991 EXPECT_TRUE(stream->Read(&response));
992 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
993 }
994 EXPECT_FALSE(stream->Read(&response));
995
996 Status s = stream->Finish();
997 EXPECT_TRUE(s.ok());
998 }
999
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1000 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1001 ResetStub();
1002 EchoRequest request;
1003 EchoResponse response;
1004 ClientContext context;
1005 request.set_message("hello");
1006 context.AddMetadata(kServerUseCoalescingApi, "1");
1007
1008 auto stream = stub_->ResponseStream(&context, request);
1009 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1010 EXPECT_TRUE(stream->Read(&response));
1011 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1012 }
1013 EXPECT_FALSE(stream->Read(&response));
1014
1015 Status s = stream->Finish();
1016 EXPECT_TRUE(s.ok());
1017 }
1018
1019 // This was added to prevent regression from issue:
1020 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1021 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1022 ResetStub();
1023 EchoRequest request;
1024 EchoResponse response;
1025 ClientContext context;
1026 request.set_message("hello");
1027 context.AddMetadata(kServerUseCoalescingApi, "1");
1028 // We will only send one message, forcing everything (init metadata, message,
1029 // trailing) to be coalesced together.
1030 context.AddMetadata(kServerResponseStreamsToSend, "1");
1031
1032 auto stream = stub_->ResponseStream(&context, request);
1033 EXPECT_TRUE(stream->Read(&response));
1034 EXPECT_EQ(response.message(), request.message() + "0");
1035
1036 EXPECT_FALSE(stream->Read(&response));
1037
1038 Status s = stream->Finish();
1039 EXPECT_TRUE(s.ok());
1040 }
1041
TEST_P(End2endTest,BidiStream)1042 TEST_P(End2endTest, BidiStream) {
1043 ResetStub();
1044 EchoRequest request;
1045 EchoResponse response;
1046 ClientContext context;
1047 std::string msg("hello");
1048
1049 auto stream = stub_->BidiStream(&context);
1050
1051 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1052 request.set_message(msg + std::to_string(i));
1053 EXPECT_TRUE(stream->Write(request));
1054 EXPECT_TRUE(stream->Read(&response));
1055 EXPECT_EQ(response.message(), request.message());
1056 }
1057
1058 stream->WritesDone();
1059 EXPECT_FALSE(stream->Read(&response));
1060 EXPECT_FALSE(stream->Read(&response));
1061
1062 Status s = stream->Finish();
1063 EXPECT_TRUE(s.ok());
1064 }
1065
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1066 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1067 ResetStub();
1068 EchoRequest request;
1069 EchoResponse response;
1070 ClientContext context;
1071 context.AddMetadata(kServerFinishAfterNReads, "3");
1072 context.set_initial_metadata_corked(true);
1073 std::string msg("hello");
1074
1075 auto stream = stub_->BidiStream(&context);
1076
1077 request.set_message(msg + "0");
1078 EXPECT_TRUE(stream->Write(request));
1079 EXPECT_TRUE(stream->Read(&response));
1080 EXPECT_EQ(response.message(), request.message());
1081
1082 request.set_message(msg + "1");
1083 EXPECT_TRUE(stream->Write(request));
1084 EXPECT_TRUE(stream->Read(&response));
1085 EXPECT_EQ(response.message(), request.message());
1086
1087 request.set_message(msg + "2");
1088 stream->WriteLast(request, WriteOptions());
1089 EXPECT_TRUE(stream->Read(&response));
1090 EXPECT_EQ(response.message(), request.message());
1091
1092 EXPECT_FALSE(stream->Read(&response));
1093 EXPECT_FALSE(stream->Read(&response));
1094
1095 Status s = stream->Finish();
1096 EXPECT_TRUE(s.ok());
1097 }
1098
1099 // This was added to prevent regression from issue:
1100 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1101 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1102 ResetStub();
1103 EchoRequest request;
1104 EchoResponse response;
1105 ClientContext context;
1106 context.AddMetadata(kServerFinishAfterNReads, "1");
1107 context.set_initial_metadata_corked(true);
1108 std::string msg("hello");
1109
1110 auto stream = stub_->BidiStream(&context);
1111
1112 request.set_message(msg + "0");
1113 stream->WriteLast(request, WriteOptions());
1114 EXPECT_TRUE(stream->Read(&response));
1115 EXPECT_EQ(response.message(), request.message());
1116
1117 EXPECT_FALSE(stream->Read(&response));
1118 EXPECT_FALSE(stream->Read(&response));
1119
1120 Status s = stream->Finish();
1121 EXPECT_TRUE(s.ok());
1122 }
1123
1124 // Talk to the two services with the same name but different package names.
1125 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1126 TEST_P(End2endTest, DiffPackageServices) {
1127 ResetStub();
1128 EchoRequest request;
1129 EchoResponse response;
1130 request.set_message("Hello");
1131
1132 ClientContext context;
1133 Status s = stub_->Echo(&context, request, &response);
1134 EXPECT_EQ(response.message(), request.message());
1135 EXPECT_TRUE(s.ok());
1136
1137 std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1138 grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1139 ClientContext context2;
1140 s = dup_pkg_stub->Echo(&context2, request, &response);
1141 EXPECT_EQ("no package", response.message());
1142 EXPECT_TRUE(s.ok());
1143 }
1144
1145 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1146 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1147 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1148 gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1149 while (!service->signal_client()) {
1150 }
1151 context->TryCancel();
1152 }
1153
TEST_P(End2endTest,CancelRpcBeforeStart)1154 TEST_P(End2endTest, CancelRpcBeforeStart) {
1155 ResetStub();
1156 EchoRequest request;
1157 EchoResponse response;
1158 ClientContext context;
1159 request.set_message("hello");
1160 context.TryCancel();
1161 Status s = stub_->Echo(&context, request, &response);
1162 EXPECT_EQ("", response.message());
1163 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1164 if (GetParam().use_interceptors) {
1165 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1166 }
1167 }
1168
TEST_P(End2endTest,CancelRpcAfterStart)1169 TEST_P(End2endTest, CancelRpcAfterStart) {
1170 ResetStub();
1171 EchoRequest request;
1172 EchoResponse response;
1173 ClientContext context;
1174 request.set_message("hello");
1175 request.mutable_param()->set_server_notify_client_when_started(true);
1176 request.mutable_param()->set_skip_cancelled_check(true);
1177 Status s;
1178 std::thread echo_thread([this, &s, &context, &request, &response] {
1179 s = stub_->Echo(&context, request, &response);
1180 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1181 });
1182 if (!GetParam().callback_server) {
1183 service_.ClientWaitUntilRpcStarted();
1184 } else {
1185 callback_service_.ClientWaitUntilRpcStarted();
1186 }
1187
1188 context.TryCancel();
1189
1190 if (!GetParam().callback_server) {
1191 service_.SignalServerToContinue();
1192 } else {
1193 callback_service_.SignalServerToContinue();
1194 }
1195
1196 echo_thread.join();
1197 EXPECT_EQ("", response.message());
1198 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1199 if (GetParam().use_interceptors) {
1200 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1201 }
1202 }
1203
1204 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1205 TEST_P(End2endTest, ClientCancelsRequestStream) {
1206 ResetStub();
1207 EchoRequest request;
1208 EchoResponse response;
1209 ClientContext context;
1210 request.set_message("hello");
1211
1212 auto stream = stub_->RequestStream(&context, &response);
1213 EXPECT_TRUE(stream->Write(request));
1214 EXPECT_TRUE(stream->Write(request));
1215
1216 context.TryCancel();
1217
1218 Status s = stream->Finish();
1219 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1220
1221 EXPECT_EQ(response.message(), "");
1222 if (GetParam().use_interceptors) {
1223 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1224 }
1225 }
1226
1227 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1228 TEST_P(End2endTest, ClientCancelsResponseStream) {
1229 ResetStub();
1230 EchoRequest request;
1231 EchoResponse response;
1232 ClientContext context;
1233 request.set_message("hello");
1234
1235 auto stream = stub_->ResponseStream(&context, request);
1236
1237 EXPECT_TRUE(stream->Read(&response));
1238 EXPECT_EQ(response.message(), request.message() + "0");
1239 EXPECT_TRUE(stream->Read(&response));
1240 EXPECT_EQ(response.message(), request.message() + "1");
1241
1242 context.TryCancel();
1243
1244 // The cancellation races with responses, so there might be zero or
1245 // one responses pending, read till failure
1246
1247 if (stream->Read(&response)) {
1248 EXPECT_EQ(response.message(), request.message() + "2");
1249 // Since we have cancelled, we expect the next attempt to read to fail
1250 EXPECT_FALSE(stream->Read(&response));
1251 }
1252
1253 Status s = stream->Finish();
1254 // The final status could be either of CANCELLED or OK depending on
1255 // who won the race.
1256 EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1257 if (GetParam().use_interceptors) {
1258 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1259 }
1260 }
1261
1262 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1263 TEST_P(End2endTest, ClientCancelsBidi) {
1264 ResetStub();
1265 EchoRequest request;
1266 EchoResponse response;
1267 ClientContext context;
1268 std::string msg("hello");
1269
1270 // Send server_try_cancel value in the client metadata
1271 context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1272
1273 auto stream = stub_->BidiStream(&context);
1274
1275 request.set_message(msg + "0");
1276 EXPECT_TRUE(stream->Write(request));
1277 EXPECT_TRUE(stream->Read(&response));
1278 EXPECT_EQ(response.message(), request.message());
1279
1280 request.set_message(msg + "1");
1281 EXPECT_TRUE(stream->Write(request));
1282
1283 context.TryCancel();
1284
1285 // The cancellation races with responses, so there might be zero or
1286 // one responses pending, read till failure
1287
1288 if (stream->Read(&response)) {
1289 EXPECT_EQ(response.message(), request.message());
1290 // Since we have cancelled, we expect the next attempt to read to fail
1291 EXPECT_FALSE(stream->Read(&response));
1292 }
1293
1294 Status s = stream->Finish();
1295 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1296 if (GetParam().use_interceptors) {
1297 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1298 }
1299 }
1300
TEST_P(End2endTest,RpcMaxMessageSize)1301 TEST_P(End2endTest, RpcMaxMessageSize) {
1302 ResetStub();
1303 EchoRequest request;
1304 EchoResponse response;
1305 request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1306 request.mutable_param()->set_server_die(true);
1307
1308 ClientContext context;
1309 Status s = stub_->Echo(&context, request, &response);
1310 EXPECT_FALSE(s.ok());
1311 }
1312
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1313 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1314 gpr_event* ev) {
1315 EchoResponse resp;
1316 gpr_event_set(ev, reinterpret_cast<void*>(1));
1317 while (stream->Read(&resp)) {
1318 gpr_log(GPR_INFO, "Read message");
1319 }
1320 }
1321
1322 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1323 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1324 ResetStub();
1325 ClientContext context;
1326 gpr_event ev;
1327 gpr_event_init(&ev);
1328 auto stream = stub_->BidiStream(&context);
1329 std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1330 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1331 stream->WritesDone();
1332 reader_thread.join();
1333 Status s = stream->Finish();
1334 EXPECT_TRUE(s.ok());
1335 }
1336
TEST_P(End2endTest,ChannelState)1337 TEST_P(End2endTest, ChannelState) {
1338 if (GetParam().inproc) {
1339 return;
1340 }
1341
1342 ResetStub();
1343 // Start IDLE
1344 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1345
1346 // Did not ask to connect, no state change.
1347 CompletionQueue cq;
1348 std::chrono::system_clock::time_point deadline =
1349 std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1350 channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1351 void* tag;
1352 bool ok = true;
1353 cq.Next(&tag, &ok);
1354 EXPECT_FALSE(ok);
1355
1356 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1357 EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1358 gpr_inf_future(GPR_CLOCK_REALTIME)));
1359 auto state = channel_->GetState(false);
1360 EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1361 }
1362
1363 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1364 TEST_P(End2endTest, ChannelStateTimeout) {
1365 if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1366 GetParam().inproc) {
1367 return;
1368 }
1369 int port = grpc_pick_unused_port_or_die();
1370 std::ostringstream server_address;
1371 server_address << "localhost:" << port;
1372 // Channel to non-existing server
1373 auto channel =
1374 grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1375 // Start IDLE
1376 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1377
1378 auto state = GRPC_CHANNEL_IDLE;
1379 for (int i = 0; i < 10; i++) {
1380 channel->WaitForStateChange(
1381 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1382 state = channel->GetState(false);
1383 }
1384 }
1385
TEST_P(End2endTest,ChannelStateOnLameChannel)1386 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1387 if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1388 GetParam().inproc) {
1389 return;
1390 }
1391 // Channel using invalid target URI. This creates a lame channel.
1392 auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1393 // Channel should immediately report TRANSIENT_FAILURE.
1394 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1395 // And state will never change.
1396 auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1397 for (int i = 0; i < 10; ++i) {
1398 channel->WaitForStateChange(
1399 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1400 state = channel->GetState(false);
1401 }
1402 }
1403
1404 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1405 TEST_P(End2endTest, NonExistingService) {
1406 ResetChannel();
1407 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1408 stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1409
1410 EchoRequest request;
1411 EchoResponse response;
1412 request.set_message("Hello");
1413
1414 ClientContext context;
1415 Status s = stub->Unimplemented(&context, request, &response);
1416 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1417 EXPECT_EQ("", s.error_message());
1418 }
1419
1420 // Ask the server to send back a serialized proto in trailer.
1421 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1422 TEST_P(End2endTest, BinaryTrailerTest) {
1423 ResetStub();
1424 EchoRequest request;
1425 EchoResponse response;
1426 ClientContext context;
1427
1428 request.mutable_param()->set_echo_metadata(true);
1429 DebugInfo* info = request.mutable_param()->mutable_debug_info();
1430 info->add_stack_entries("stack_entry_1");
1431 info->add_stack_entries("stack_entry_2");
1432 info->add_stack_entries("stack_entry_3");
1433 info->set_detail("detailed debug info");
1434 std::string expected_string = info->SerializeAsString();
1435 request.set_message("Hello");
1436
1437 Status s = stub_->Echo(&context, request, &response);
1438 EXPECT_FALSE(s.ok());
1439 auto trailers = context.GetServerTrailingMetadata();
1440 EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1441 auto iter = trailers.find(kDebugInfoTrailerKey);
1442 EXPECT_EQ(expected_string, iter->second);
1443 // Parse the returned trailer into a DebugInfo proto.
1444 DebugInfo returned_info;
1445 EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1446 }
1447
TEST_P(End2endTest,ExpectErrorTest)1448 TEST_P(End2endTest, ExpectErrorTest) {
1449 ResetStub();
1450
1451 std::vector<ErrorStatus> expected_status;
1452 expected_status.emplace_back();
1453 expected_status.back().set_code(13); // INTERNAL
1454 // No Error message or details
1455
1456 expected_status.emplace_back();
1457 expected_status.back().set_code(13); // INTERNAL
1458 expected_status.back().set_error_message("text error message");
1459 expected_status.back().set_binary_error_details("text error details");
1460
1461 expected_status.emplace_back();
1462 expected_status.back().set_code(13); // INTERNAL
1463 expected_status.back().set_error_message("text error message");
1464 expected_status.back().set_binary_error_details(
1465 "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1466
1467 for (auto iter = expected_status.begin(); iter != expected_status.end();
1468 ++iter) {
1469 EchoRequest request;
1470 EchoResponse response;
1471 ClientContext context;
1472 request.set_message("Hello");
1473 auto* error = request.mutable_param()->mutable_expected_error();
1474 error->set_code(iter->code());
1475 error->set_error_message(iter->error_message());
1476 error->set_binary_error_details(iter->binary_error_details());
1477
1478 Status s = stub_->Echo(&context, request, &response);
1479 EXPECT_FALSE(s.ok());
1480 EXPECT_EQ(iter->code(), s.error_code());
1481 EXPECT_EQ(iter->error_message(), s.error_message());
1482 EXPECT_EQ(iter->binary_error_details(), s.error_details());
1483 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1484 #ifndef NDEBUG
1485 // GRPC_ERROR_INT_FILE_LINE is for debug only
1486 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1487 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1488 #endif
1489 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1490 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1491 }
1492 }
1493
1494 //////////////////////////////////////////////////////////////////////////
1495 // Test with and without a proxy.
1496 class ProxyEnd2endTest : public End2endTest {
1497 protected:
1498 };
1499
TEST_P(ProxyEnd2endTest,SimpleRpc)1500 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1501 ResetStub();
1502 SendRpc(stub_.get(), 1, false);
1503 }
1504
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1505 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1506 ResetStub();
1507 EchoRequest request;
1508 EchoResponse response;
1509
1510 ClientContext context;
1511 Status s = stub_->Echo(&context, request, &response);
1512 EXPECT_TRUE(s.ok());
1513 }
1514
TEST_P(ProxyEnd2endTest,MultipleRpcs)1515 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1516 ResetStub();
1517 std::vector<std::thread> threads;
1518 threads.reserve(10);
1519 for (int i = 0; i < 10; ++i) {
1520 threads.emplace_back(SendRpc, stub_.get(), 10, false);
1521 }
1522 for (int i = 0; i < 10; ++i) {
1523 threads[i].join();
1524 }
1525 }
1526
1527 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1528 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1529 ResetStub();
1530 EchoRequest request;
1531 EchoResponse response;
1532 request.set_message("Hello");
1533 request.mutable_param()->set_skip_cancelled_check(true);
1534 // Let server sleep for 40 ms first to guarantee expiry.
1535 // 40 ms might seem a bit extreme but the timer manager would have been just
1536 // initialized (when ResetStub() was called) and there are some warmup costs
1537 // i.e the timer thread many not have even started. There might also be other
1538 // delays in the timer manager thread (in acquiring locks, timer data
1539 // structure manipulations, starting backup timer threads) that add to the
1540 // delays. 40ms is still not enough in some cases but this significantly
1541 // reduces the test flakes
1542 request.mutable_param()->set_server_sleep_us(40 * 1000);
1543
1544 ClientContext context;
1545 std::chrono::system_clock::time_point deadline =
1546 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1547 context.set_deadline(deadline);
1548 Status s = stub_->Echo(&context, request, &response);
1549 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1550 }
1551
1552 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1553 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1554 ResetStub();
1555 EchoRequest request;
1556 EchoResponse response;
1557 request.set_message("Hello");
1558
1559 ClientContext context;
1560 std::chrono::system_clock::time_point deadline =
1561 std::chrono::system_clock::now() + std::chrono::hours(1);
1562 context.set_deadline(deadline);
1563 Status s = stub_->Echo(&context, request, &response);
1564 EXPECT_EQ(response.message(), request.message());
1565 EXPECT_TRUE(s.ok());
1566 }
1567
1568 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1569 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1570 ResetStub();
1571 EchoRequest request;
1572 EchoResponse response;
1573 request.set_message("Hello");
1574 request.mutable_param()->set_echo_deadline(true);
1575
1576 ClientContext context;
1577 std::chrono::system_clock::time_point deadline =
1578 std::chrono::system_clock::now() + std::chrono::seconds(100);
1579 context.set_deadline(deadline);
1580 Status s = stub_->Echo(&context, request, &response);
1581 EXPECT_EQ(response.message(), request.message());
1582 EXPECT_TRUE(s.ok());
1583 gpr_timespec sent_deadline;
1584 Timepoint2Timespec(deadline, &sent_deadline);
1585 // We want to allow some reasonable error given:
1586 // - request_deadline() only has 1sec resolution so the best we can do is +-1
1587 // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1588 // can end up being off by 2 in one direction.
1589 EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1590 EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1591 }
1592
1593 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1594 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1595 ResetStub();
1596 EchoRequest request;
1597 EchoResponse response;
1598 request.set_message("Hello");
1599 request.mutable_param()->set_echo_deadline(true);
1600
1601 ClientContext context;
1602 Status s = stub_->Echo(&context, request, &response);
1603 EXPECT_EQ(response.message(), request.message());
1604 EXPECT_TRUE(s.ok());
1605 EXPECT_EQ(response.param().request_deadline(),
1606 gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1607 }
1608
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1609 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1610 ResetStub();
1611 EchoRequest request;
1612 EchoResponse response;
1613 request.set_message("Hello");
1614
1615 ClientContext context;
1616 Status s = stub_->Unimplemented(&context, request, &response);
1617 EXPECT_FALSE(s.ok());
1618 EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1619 EXPECT_EQ(s.error_message(), "");
1620 EXPECT_EQ(response.message(), "");
1621 }
1622
1623 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1624 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1625 ResetStub();
1626 EchoRequest request;
1627 EchoResponse response;
1628 request.set_message("Hello");
1629 const int kCancelDelayUs = 10 * 1000;
1630 request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1631
1632 ClientContext context;
1633 std::thread cancel_thread;
1634 if (!GetParam().callback_server) {
1635 cancel_thread = std::thread(
1636 [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1637 kCancelDelayUs);
1638 // Note: the unusual pattern above (and below) is caused by a conflict
1639 // between two sets of compiler expectations. clang allows const to be
1640 // captured without mention, so there is no need to capture kCancelDelayUs
1641 // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1642 // in our tests requires an explicit capture even for const. We square this
1643 // circle by passing the const value in as an argument to the lambda.
1644 } else {
1645 cancel_thread = std::thread(
1646 [&context, this](int delay) {
1647 CancelRpc(&context, delay, &callback_service_);
1648 },
1649 kCancelDelayUs);
1650 }
1651 Status s = stub_->Echo(&context, request, &response);
1652 cancel_thread.join();
1653 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1654 EXPECT_EQ(s.error_message(), "CANCELLED");
1655 }
1656
1657 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1658 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1659 ResetStub();
1660 EchoRequest request;
1661 EchoResponse response;
1662 request.set_message("Hello");
1663 request.mutable_param()->set_server_cancel_after_us(1000);
1664
1665 ClientContext context;
1666 Status s = stub_->Echo(&context, request, &response);
1667 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1668 EXPECT_TRUE(s.error_message().empty());
1669 }
1670
1671 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1672 TEST_P(ProxyEnd2endTest, HugeResponse) {
1673 ResetStub();
1674 EchoRequest request;
1675 EchoResponse response;
1676 request.set_message("huge response");
1677 const size_t kResponseSize = 1024 * (1024 + 10);
1678 request.mutable_param()->set_response_message_length(kResponseSize);
1679
1680 ClientContext context;
1681 std::chrono::system_clock::time_point deadline =
1682 std::chrono::system_clock::now() + std::chrono::seconds(20);
1683 context.set_deadline(deadline);
1684 Status s = stub_->Echo(&context, request, &response);
1685 EXPECT_EQ(kResponseSize, response.message().size());
1686 EXPECT_TRUE(s.ok());
1687 }
1688
TEST_P(ProxyEnd2endTest,Peer)1689 TEST_P(ProxyEnd2endTest, Peer) {
1690 // Peer is not meaningful for inproc
1691 if (GetParam().inproc) {
1692 return;
1693 }
1694 ResetStub();
1695 EchoRequest request;
1696 EchoResponse response;
1697 request.set_message("hello");
1698 request.mutable_param()->set_echo_peer(true);
1699
1700 ClientContext context;
1701 Status s = stub_->Echo(&context, request, &response);
1702 EXPECT_EQ(response.message(), request.message());
1703 EXPECT_TRUE(s.ok());
1704 EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1705 EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1706 }
1707
1708 //////////////////////////////////////////////////////////////////////////
1709 class SecureEnd2endTest : public End2endTest {
1710 protected:
SecureEnd2endTest()1711 SecureEnd2endTest() {
1712 GPR_ASSERT(!GetParam().use_proxy);
1713 GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
1714 }
1715 };
1716
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1717 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1718 ResetStub();
1719
1720 EchoRequest request;
1721 EchoResponse response;
1722 request.set_message("Hello");
1723
1724 ClientContext context;
1725 context.set_authority("foo.test.youtube.com");
1726 Status s = stub_->Echo(&context, request, &response);
1727 EXPECT_EQ(response.message(), request.message());
1728 EXPECT_TRUE(response.has_param());
1729 EXPECT_EQ("special", response.param().host());
1730 EXPECT_TRUE(s.ok());
1731 }
1732
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1733 bool MetadataContains(
1734 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1735 const std::string& key, const std::string& value) {
1736 int count = 0;
1737
1738 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1739 metadata.begin();
1740 iter != metadata.end(); ++iter) {
1741 if (ToString(iter->first) == key && ToString(iter->second) == value) {
1742 count++;
1743 }
1744 }
1745 return count == 1;
1746 }
1747
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1748 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1749 auto* processor = new TestAuthMetadataProcessor(true);
1750 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1751 ResetStub();
1752 EchoRequest request;
1753 EchoResponse response;
1754 ClientContext context;
1755 context.set_credentials(processor->GetCompatibleClientCreds());
1756 request.set_message("Hello");
1757 request.mutable_param()->set_echo_metadata(true);
1758 request.mutable_param()->set_expected_client_identity(
1759 TestAuthMetadataProcessor::kGoodGuy);
1760 request.mutable_param()->set_expected_transport_security_type(
1761 GetParam().credentials_type);
1762
1763 Status s = stub_->Echo(&context, request, &response);
1764 EXPECT_EQ(request.message(), response.message());
1765 EXPECT_TRUE(s.ok());
1766
1767 // Metadata should have been consumed by the processor.
1768 EXPECT_FALSE(MetadataContains(
1769 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1770 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1771 }
1772
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1773 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1774 auto* processor = new TestAuthMetadataProcessor(true);
1775 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1776 ResetStub();
1777 EchoRequest request;
1778 EchoResponse response;
1779 ClientContext context;
1780 context.set_credentials(processor->GetIncompatibleClientCreds());
1781 request.set_message("Hello");
1782
1783 Status s = stub_->Echo(&context, request, &response);
1784 EXPECT_FALSE(s.ok());
1785 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1786 }
1787
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1788 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1789 ResetStub();
1790 EchoRequest request;
1791 EchoResponse response;
1792 ClientContext context;
1793 std::shared_ptr<CallCredentials> creds =
1794 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1795 context.set_credentials(creds);
1796 request.set_message("Hello");
1797 request.mutable_param()->set_echo_metadata(true);
1798
1799 Status s = stub_->Echo(&context, request, &response);
1800 EXPECT_EQ(request.message(), response.message());
1801 EXPECT_TRUE(s.ok());
1802 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1803 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1804 kFakeToken));
1805 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1806 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1807 kFakeSelector));
1808 EXPECT_EQ(context.credentials()->DebugString(),
1809 kExpectedFakeCredsDebugString);
1810 }
1811
1812 class CredentialsInterceptor : public experimental::Interceptor {
1813 public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1814 explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1815 : info_(info) {}
1816
Intercept(experimental::InterceptorBatchMethods * methods)1817 void Intercept(experimental::InterceptorBatchMethods* methods) override {
1818 if (methods->QueryInterceptionHookPoint(
1819 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1820 std::shared_ptr<CallCredentials> creds =
1821 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1822 info_->client_context()->set_credentials(creds);
1823 }
1824 methods->Proceed();
1825 }
1826
1827 private:
1828 experimental::ClientRpcInfo* info_ = nullptr;
1829 };
1830
1831 class CredentialsInterceptorFactory
1832 : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1833 CredentialsInterceptor* CreateClientInterceptor(
1834 experimental::ClientRpcInfo* info) override {
1835 return new CredentialsInterceptor(info);
1836 }
1837 };
1838
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1839 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1840 if (!GetParam().use_interceptors) {
1841 return;
1842 }
1843 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1844 interceptor_creators;
1845 interceptor_creators.push_back(
1846 absl::make_unique<CredentialsInterceptorFactory>());
1847 ResetStub(std::move(interceptor_creators));
1848 EchoRequest request;
1849 EchoResponse response;
1850 ClientContext context;
1851
1852 request.set_message("Hello");
1853 request.mutable_param()->set_echo_metadata(true);
1854
1855 Status s = stub_->Echo(&context, request, &response);
1856 EXPECT_EQ(request.message(), response.message());
1857 EXPECT_TRUE(s.ok());
1858 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1859 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1860 kFakeToken));
1861 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1862 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1863 kFakeSelector));
1864 EXPECT_EQ(context.credentials()->DebugString(),
1865 kExpectedFakeCredsDebugString);
1866 }
1867
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1868 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1869 if (!GetParam().use_interceptors) {
1870 return;
1871 }
1872 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1873 interceptor_creators;
1874 interceptor_creators.push_back(
1875 absl::make_unique<CredentialsInterceptorFactory>());
1876 ResetStub(std::move(interceptor_creators));
1877 EchoRequest request;
1878 EchoResponse response;
1879 ClientContext context;
1880 std::shared_ptr<CallCredentials> creds1 =
1881 GoogleIAMCredentials(kWrongToken, kWrongSelector);
1882 context.set_credentials(creds1);
1883 EXPECT_EQ(context.credentials(), creds1);
1884 EXPECT_EQ(context.credentials()->DebugString(),
1885 kExpectedWrongCredsDebugString);
1886 request.set_message("Hello");
1887 request.mutable_param()->set_echo_metadata(true);
1888
1889 Status s = stub_->Echo(&context, request, &response);
1890 EXPECT_EQ(request.message(), response.message());
1891 EXPECT_TRUE(s.ok());
1892 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1893 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1894 kFakeToken));
1895 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1897 kFakeSelector));
1898 EXPECT_EQ(context.credentials()->DebugString(),
1899 kExpectedFakeCredsDebugString);
1900 }
1901
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1902 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1903 ResetStub();
1904 EchoRequest request;
1905 EchoResponse response;
1906 ClientContext context;
1907 std::shared_ptr<CallCredentials> creds1 =
1908 GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1909 context.set_credentials(creds1);
1910 EXPECT_EQ(context.credentials(), creds1);
1911 EXPECT_EQ(context.credentials()->DebugString(),
1912 kExpectedFakeCreds1DebugString);
1913 std::shared_ptr<CallCredentials> creds2 =
1914 GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1915 context.set_credentials(creds2);
1916 EXPECT_EQ(context.credentials(), creds2);
1917 request.set_message("Hello");
1918 request.mutable_param()->set_echo_metadata(true);
1919
1920 Status s = stub_->Echo(&context, request, &response);
1921 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1922 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1923 kFakeToken2));
1924 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1925 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1926 kFakeSelector2));
1927 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1928 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1929 kFakeToken1));
1930 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1931 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1932 kFakeSelector1));
1933 EXPECT_EQ(context.credentials()->DebugString(),
1934 kExpectedFakeCreds2DebugString);
1935 EXPECT_EQ(request.message(), response.message());
1936 EXPECT_TRUE(s.ok());
1937 }
1938
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1939 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1940 ResetStub();
1941 EchoRequest request;
1942 EchoResponse response;
1943 ClientContext context;
1944 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1945 std::unique_ptr<MetadataCredentialsPlugin>(
1946 new TestMetadataCredentialsPlugin(
1947 TestMetadataCredentialsPlugin::kBadMetadataKey,
1948 "Does not matter, will fail the key is invalid.", false, true,
1949 0))));
1950 request.set_message("Hello");
1951
1952 Status s = stub_->Echo(&context, request, &response);
1953 EXPECT_FALSE(s.ok());
1954 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1955 EXPECT_EQ(context.credentials()->DebugString(),
1956 kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1957 }
1958
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1959 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1960 ResetStub();
1961 EchoRequest request;
1962 EchoResponse response;
1963 ClientContext context;
1964 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1965 std::unique_ptr<MetadataCredentialsPlugin>(
1966 new TestMetadataCredentialsPlugin(
1967 TestMetadataCredentialsPlugin::kGoodMetadataKey,
1968 "With illegal \n value.", false, true, 0))));
1969 request.set_message("Hello");
1970
1971 Status s = stub_->Echo(&context, request, &response);
1972 EXPECT_FALSE(s.ok());
1973 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1974 EXPECT_EQ(context.credentials()->DebugString(),
1975 kExpectedAuthMetadataPluginValueFailureCredsDebugString);
1976 }
1977
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)1978 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
1979 ResetStub();
1980 EchoRequest request;
1981 request.mutable_param()->set_skip_cancelled_check(true);
1982 EchoResponse response;
1983 ClientContext context;
1984 const int delay = 100;
1985 std::chrono::system_clock::time_point deadline =
1986 std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
1987 context.set_deadline(deadline);
1988 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1989 std::unique_ptr<MetadataCredentialsPlugin>(
1990 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
1991 true, delay))));
1992 request.set_message("Hello");
1993
1994 Status s = stub_->Echo(&context, request, &response);
1995 if (!s.ok()) {
1996 EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
1997 s.error_code() == StatusCode::UNAVAILABLE);
1998 }
1999 EXPECT_EQ(context.credentials()->DebugString(),
2000 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2001 }
2002
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2003 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2004 ResetStub();
2005 EchoRequest request;
2006 request.mutable_param()->set_skip_cancelled_check(true);
2007 EchoResponse response;
2008 ClientContext context;
2009 const int delay = 100;
2010 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2011 std::unique_ptr<MetadataCredentialsPlugin>(
2012 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2013 true, delay))));
2014 request.set_message("Hello");
2015
2016 std::thread cancel_thread([&] {
2017 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2018 gpr_time_from_millis(delay, GPR_TIMESPAN)));
2019 context.TryCancel();
2020 });
2021 Status s = stub_->Echo(&context, request, &response);
2022 if (!s.ok()) {
2023 EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2024 s.error_code() == StatusCode::UNAVAILABLE);
2025 }
2026 cancel_thread.join();
2027 EXPECT_EQ(context.credentials()->DebugString(),
2028 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2029 }
2030
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2031 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2032 ResetStub();
2033 EchoRequest request;
2034 EchoResponse response;
2035 ClientContext context;
2036 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2037 std::unique_ptr<MetadataCredentialsPlugin>(
2038 new TestMetadataCredentialsPlugin(
2039 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2040 "Does not matter, will fail anyway (see 3rd param)", false, false,
2041 0))));
2042 request.set_message("Hello");
2043
2044 Status s = stub_->Echo(&context, request, &response);
2045 EXPECT_FALSE(s.ok());
2046 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2047 EXPECT_EQ(s.error_message(),
2048 std::string("Getting metadata from plugin failed with error: ") +
2049 kTestCredsPluginErrorMsg);
2050 EXPECT_EQ(context.credentials()->DebugString(),
2051 kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2052 }
2053
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2054 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2055 auto* processor = new TestAuthMetadataProcessor(false);
2056 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2057 ResetStub();
2058 EchoRequest request;
2059 EchoResponse response;
2060 ClientContext context;
2061 context.set_credentials(processor->GetCompatibleClientCreds());
2062 request.set_message("Hello");
2063 request.mutable_param()->set_echo_metadata(true);
2064 request.mutable_param()->set_expected_client_identity(
2065 TestAuthMetadataProcessor::kGoodGuy);
2066 request.mutable_param()->set_expected_transport_security_type(
2067 GetParam().credentials_type);
2068
2069 Status s = stub_->Echo(&context, request, &response);
2070 EXPECT_EQ(request.message(), response.message());
2071 EXPECT_TRUE(s.ok());
2072
2073 // Metadata should have been consumed by the processor.
2074 EXPECT_FALSE(MetadataContains(
2075 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2076 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2077 EXPECT_EQ(
2078 context.credentials()->DebugString(),
2079 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2080 }
2081
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2082 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2083 auto* processor = new TestAuthMetadataProcessor(false);
2084 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2085 ResetStub();
2086 EchoRequest request;
2087 EchoResponse response;
2088 ClientContext context;
2089 context.set_credentials(processor->GetIncompatibleClientCreds());
2090 request.set_message("Hello");
2091
2092 Status s = stub_->Echo(&context, request, &response);
2093 EXPECT_FALSE(s.ok());
2094 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2095 EXPECT_EQ(
2096 context.credentials()->DebugString(),
2097 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2098 }
2099
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2100 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2101 ResetStub();
2102 EchoRequest request;
2103 EchoResponse response;
2104 ClientContext context;
2105 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2106 std::unique_ptr<MetadataCredentialsPlugin>(
2107 new TestMetadataCredentialsPlugin(
2108 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2109 "Does not matter, will fail anyway (see 3rd param)", true, false,
2110 0))));
2111 request.set_message("Hello");
2112
2113 Status s = stub_->Echo(&context, request, &response);
2114 EXPECT_FALSE(s.ok());
2115 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2116 EXPECT_EQ(s.error_message(),
2117 std::string("Getting metadata from plugin failed with error: ") +
2118 kTestCredsPluginErrorMsg);
2119 EXPECT_EQ(context.credentials()->DebugString(),
2120 kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2121 }
2122
TEST_P(SecureEnd2endTest,CompositeCallCreds)2123 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2124 ResetStub();
2125 EchoRequest request;
2126 EchoResponse response;
2127 ClientContext context;
2128 const char kMetadataKey1[] = "call-creds-key1";
2129 const char kMetadataKey2[] = "call-creds-key2";
2130 const char kMetadataVal1[] = "call-creds-val1";
2131 const char kMetadataVal2[] = "call-creds-val2";
2132
2133 context.set_credentials(grpc::CompositeCallCredentials(
2134 grpc::MetadataCredentialsFromPlugin(
2135 std::unique_ptr<MetadataCredentialsPlugin>(
2136 new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2137 true, true, 0))),
2138 grpc::MetadataCredentialsFromPlugin(
2139 std::unique_ptr<MetadataCredentialsPlugin>(
2140 new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2141 true, true, 0)))));
2142 request.set_message("Hello");
2143 request.mutable_param()->set_echo_metadata(true);
2144
2145 Status s = stub_->Echo(&context, request, &response);
2146 EXPECT_TRUE(s.ok());
2147 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2148 kMetadataKey1, kMetadataVal1));
2149 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2150 kMetadataKey2, kMetadataVal2));
2151 EXPECT_EQ(context.credentials()->DebugString(),
2152 kExpectedCompositeCallCredsDebugString);
2153 }
2154
TEST_P(SecureEnd2endTest,ClientAuthContext)2155 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2156 ResetStub();
2157 EchoRequest request;
2158 EchoResponse response;
2159 request.set_message("Hello");
2160 request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
2161 kTlsCredentialsType);
2162 request.mutable_param()->set_expected_transport_security_type(
2163 GetParam().credentials_type);
2164 ClientContext context;
2165 Status s = stub_->Echo(&context, request, &response);
2166 EXPECT_EQ(response.message(), request.message());
2167 EXPECT_TRUE(s.ok());
2168
2169 std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2170 std::vector<grpc::string_ref> tst =
2171 auth_ctx->FindPropertyValues("transport_security_type");
2172 ASSERT_EQ(1u, tst.size());
2173 EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
2174 if (GetParam().credentials_type == kTlsCredentialsType) {
2175 EXPECT_EQ("x509_subject_alternative_name",
2176 auth_ctx->GetPeerIdentityPropertyName());
2177 EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2178 EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2179 EXPECT_EQ("waterzooi.test.google.be",
2180 ToString(auth_ctx->GetPeerIdentity()[1]));
2181 EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2182 EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2183 }
2184 }
2185
2186 class ResourceQuotaEnd2endTest : public End2endTest {
2187 public:
ResourceQuotaEnd2endTest()2188 ResourceQuotaEnd2endTest()
2189 : server_resource_quota_("server_resource_quota") {}
2190
ConfigureServerBuilder(ServerBuilder * builder)2191 void ConfigureServerBuilder(ServerBuilder* builder) override {
2192 builder->SetResourceQuota(server_resource_quota_);
2193 }
2194
2195 private:
2196 ResourceQuota server_resource_quota_;
2197 };
2198
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2199 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2200 ResetStub();
2201
2202 EchoRequest request;
2203 EchoResponse response;
2204 request.set_message("Hello");
2205
2206 ClientContext context;
2207 Status s = stub_->Echo(&context, request, &response);
2208 EXPECT_EQ(response.message(), request.message());
2209 EXPECT_TRUE(s.ok());
2210 }
2211
2212 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2213 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2214 bool test_insecure,
2215 bool test_secure,
2216 bool test_inproc,
2217 bool test_callback_server) {
2218 std::vector<TestScenario> scenarios;
2219 std::vector<std::string> credentials_types;
2220
2221 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms,
2222 kClientChannelBackupPollIntervalMs);
2223 #if TARGET_OS_IPHONE
2224 // Workaround Apple CFStream bug
2225 gpr_setenv("grpc_cfstream", "0");
2226 #endif
2227
2228 if (test_secure) {
2229 credentials_types =
2230 GetCredentialsProvider()->GetSecureCredentialsTypeList();
2231 }
2232 auto insec_ok = [] {
2233 // Only allow insecure credentials type when it is registered with the
2234 // provider. User may create providers that do not have insecure.
2235 return GetCredentialsProvider()->GetChannelCredentials(
2236 kInsecureCredentialsType, nullptr) != nullptr;
2237 };
2238 if (test_insecure && insec_ok()) {
2239 credentials_types.push_back(kInsecureCredentialsType);
2240 }
2241
2242 // Test callback with inproc or if the event-engine allows it
2243 GPR_ASSERT(!credentials_types.empty());
2244 for (const auto& cred : credentials_types) {
2245 scenarios.emplace_back(false, false, false, cred, false);
2246 scenarios.emplace_back(true, false, false, cred, false);
2247 if (test_callback_server) {
2248 // Note that these scenarios will be dynamically disabled if the event
2249 // engine doesn't run in the background
2250 scenarios.emplace_back(false, false, false, cred, true);
2251 scenarios.emplace_back(true, false, false, cred, true);
2252 }
2253 if (use_proxy) {
2254 scenarios.emplace_back(false, true, false, cred, false);
2255 scenarios.emplace_back(true, true, false, cred, false);
2256 }
2257 }
2258 if (test_inproc && insec_ok()) {
2259 scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2260 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2261 if (test_callback_server) {
2262 scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2263 true);
2264 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2265 }
2266 }
2267 return scenarios;
2268 }
2269
2270 INSTANTIATE_TEST_SUITE_P(
2271 End2end, End2endTest,
2272 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2273
2274 INSTANTIATE_TEST_SUITE_P(
2275 End2endServerTryCancel, End2endServerTryCancelTest,
2276 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2277
2278 INSTANTIATE_TEST_SUITE_P(
2279 ProxyEnd2end, ProxyEnd2endTest,
2280 ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)));
2281
2282 INSTANTIATE_TEST_SUITE_P(
2283 SecureEnd2end, SecureEnd2endTest,
2284 ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)));
2285
2286 INSTANTIATE_TEST_SUITE_P(
2287 ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2288 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2289
2290 } // namespace
2291 } // namespace testing
2292 } // namespace grpc
2293
main(int argc,char ** argv)2294 int main(int argc, char** argv) {
2295 grpc::testing::TestEnvironment env(argc, argv);
2296 ::testing::InitGoogleTest(&argc, argv);
2297 int ret = RUN_ALL_TESTS();
2298 return ret;
2299 }
2300