1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <mutex>
20 #include <thread>
21 
22 #include "absl/memory/memory.h"
23 #include "absl/strings/match.h"
24 #include "absl/strings/str_format.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30 #include <grpcpp/channel.h>
31 #include <grpcpp/client_context.h>
32 #include <grpcpp/create_channel.h>
33 #include <grpcpp/resource_quota.h>
34 #include <grpcpp/security/auth_metadata_processor.h>
35 #include <grpcpp/security/credentials.h>
36 #include <grpcpp/security/server_credentials.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
39 #include <grpcpp/server_context.h>
40 #include <grpcpp/support/string_ref.h>
41 #include <grpcpp/test/channel_test_peer.h>
42 
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/lib/gpr/env.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/security/credentials/credentials.h"
47 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
48 #include "src/proto/grpc/testing/echo.grpc.pb.h"
49 #include "test/core/util/port.h"
50 #include "test/core/util/test_config.h"
51 #include "test/cpp/end2end/interceptors_util.h"
52 #include "test/cpp/end2end/test_service_impl.h"
53 #include "test/cpp/util/string_ref_helper.h"
54 #include "test/cpp/util/test_credentials_provider.h"
55 
56 #ifdef GRPC_POSIX_SOCKET_EV
57 #include "src/core/lib/iomgr/ev_posix.h"
58 #endif  // GRPC_POSIX_SOCKET_EV
59 
60 #include <gtest/gtest.h>
61 
62 using grpc::testing::EchoRequest;
63 using grpc::testing::EchoResponse;
64 using grpc::testing::kTlsCredentialsType;
65 using std::chrono::system_clock;
66 
67 namespace grpc {
68 namespace testing {
69 namespace {
70 
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72   const std::string kIpv6("ipv6:[::1]:");
73   const std::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
74   const std::string kIpv4("ipv4:127.0.0.1:");
75   return addr.substr(0, kIpv4.size()) == kIpv4 ||
76          addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77          addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79 
80 const int kClientChannelBackupPollIntervalMs = 200;
81 
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83 
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87     "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
88     "AuthoritySelector:fake_selector}}";
89 
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93     "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
94     "AuthoritySelector:wrong_selector}}";
95 
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99     "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
100     "AuthoritySelector:fake_selector1}}";
101 
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105     "SecureCallCredentials{GoogleIAMCredentials{Token:present,"
106     "AuthoritySelector:fake_selector2}}";
107 
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109     "SecureCallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110     "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112     "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113     "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115     "SecureCallCredentials{TestMetadataCredentials{key:meta_key,value:Does not "
116     "matter}}";
117 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
118     "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
119     "value:Does not matter, will fail anyway (see 3rd param)}}";
120 const char
121     kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
122         [] = "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-"
123              "metadata,value:Dr Jekyll}}";
124 const char
125     kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
126         [] = "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-"
127              "metadata,value:Mr Hyde}}";
128 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
129     "SecureCallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
130     "value:Does not matter, will fail anyway (see 3rd param)}}";
131 const char kExpectedCompositeCallCredsDebugString[] =
132     "SecureCallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
133     "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
134     "call-creds-key2,value:call-creds-val2}}}";
135 
136 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
137  public:
138   static const char kGoodMetadataKey[];
139   static const char kBadMetadataKey[];
140 
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)141   TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
142                                 const grpc::string_ref& metadata_value,
143                                 bool is_blocking, bool is_successful,
144                                 int delay_ms)
145       : metadata_key_(metadata_key.data(), metadata_key.length()),
146         metadata_value_(metadata_value.data(), metadata_value.length()),
147         is_blocking_(is_blocking),
148         is_successful_(is_successful),
149         delay_ms_(delay_ms) {}
150 
IsBlocking() const151   bool IsBlocking() const override { return is_blocking_; }
152 
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)153   Status GetMetadata(
154       grpc::string_ref service_url, grpc::string_ref method_name,
155       const grpc::AuthContext& channel_auth_context,
156       std::multimap<std::string, std::string>* metadata) override {
157     if (delay_ms_ != 0) {
158       gpr_sleep_until(
159           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
160                        gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
161     }
162     EXPECT_GT(service_url.length(), 0UL);
163     EXPECT_GT(method_name.length(), 0UL);
164     EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
165     EXPECT_TRUE(metadata != nullptr);
166     if (is_successful_) {
167       metadata->insert(std::make_pair(metadata_key_, metadata_value_));
168       return Status::OK;
169     } else {
170       return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
171     }
172   }
173 
DebugString()174   std::string DebugString() override {
175     return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
176                            metadata_key_.c_str(), metadata_value_.c_str());
177   }
178 
179  private:
180   std::string metadata_key_;
181   std::string metadata_value_;
182   bool is_blocking_;
183   bool is_successful_;
184   int delay_ms_;
185 };
186 
187 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
188     "TestPluginMetadata";
189 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
190     "test-plugin-metadata";
191 
192 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
193  public:
194   static const char kGoodGuy[];
195 
TestAuthMetadataProcessor(bool is_blocking)196   explicit TestAuthMetadataProcessor(bool is_blocking)
197       : is_blocking_(is_blocking) {}
198 
GetCompatibleClientCreds()199   std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
200     return grpc::MetadataCredentialsFromPlugin(
201         std::unique_ptr<MetadataCredentialsPlugin>(
202             new TestMetadataCredentialsPlugin(
203                 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
204                 is_blocking_, true, 0)));
205   }
206 
GetIncompatibleClientCreds()207   std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
208     return grpc::MetadataCredentialsFromPlugin(
209         std::unique_ptr<MetadataCredentialsPlugin>(
210             new TestMetadataCredentialsPlugin(
211                 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
212                 is_blocking_, true, 0)));
213   }
214 
215   // Interface implementation
IsBlocking() const216   bool IsBlocking() const override { return is_blocking_; }
217 
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)218   Status Process(const InputMetadata& auth_metadata, AuthContext* context,
219                  OutputMetadata* consumed_auth_metadata,
220                  OutputMetadata* response_metadata) override {
221     EXPECT_TRUE(consumed_auth_metadata != nullptr);
222     EXPECT_TRUE(context != nullptr);
223     EXPECT_TRUE(response_metadata != nullptr);
224     auto auth_md =
225         auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
226     EXPECT_NE(auth_md, auth_metadata.end());
227     string_ref auth_md_value = auth_md->second;
228     if (auth_md_value == kGoodGuy) {
229       context->AddProperty(kIdentityPropName, kGoodGuy);
230       context->SetPeerIdentityPropertyName(kIdentityPropName);
231       consumed_auth_metadata->insert(std::make_pair(
232           string(auth_md->first.data(), auth_md->first.length()),
233           string(auth_md->second.data(), auth_md->second.length())));
234       return Status::OK;
235     } else {
236       return Status(StatusCode::UNAUTHENTICATED,
237                     string("Invalid principal: ") +
238                         string(auth_md_value.data(), auth_md_value.length()));
239     }
240   }
241 
242  private:
243   static const char kIdentityPropName[];
244   bool is_blocking_;
245 };
246 
247 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
248 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
249 
250 class Proxy : public ::grpc::testing::EchoTestService::Service {
251  public:
Proxy(const std::shared_ptr<Channel> & channel)252   explicit Proxy(const std::shared_ptr<Channel>& channel)
253       : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
254 
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)255   Status Echo(ServerContext* server_context, const EchoRequest* request,
256               EchoResponse* response) override {
257     std::unique_ptr<ClientContext> client_context =
258         ClientContext::FromServerContext(*server_context);
259     return stub_->Echo(client_context.get(), *request, response);
260   }
261 
262  private:
263   std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_;
264 };
265 
266 class TestServiceImplDupPkg
267     : public ::grpc::testing::duplicate::EchoTestService::Service {
268  public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)269   Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
270               EchoResponse* response) override {
271     response->set_message("no package");
272     return Status::OK;
273   }
274 };
275 
276 class TestScenario {
277  public:
TestScenario(bool interceptors,bool proxy,bool inproc_stub,const std::string & creds_type,bool use_callback_server)278   TestScenario(bool interceptors, bool proxy, bool inproc_stub,
279                const std::string& creds_type, bool use_callback_server)
280       : use_interceptors(interceptors),
281         use_proxy(proxy),
282         inproc(inproc_stub),
283         credentials_type(creds_type),
284         callback_server(use_callback_server) {}
285   void Log() const;
286   bool use_interceptors;
287   bool use_proxy;
288   bool inproc;
289   const std::string credentials_type;
290   bool callback_server;
291 };
292 
operator <<(std::ostream & out,const TestScenario & scenario)293 static std::ostream& operator<<(std::ostream& out,
294                                 const TestScenario& scenario) {
295   return out << "TestScenario{use_interceptors="
296              << (scenario.use_interceptors ? "true" : "false")
297              << ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
298              << ", inproc=" << (scenario.inproc ? "true" : "false")
299              << ", server_type="
300              << (scenario.callback_server ? "callback" : "sync")
301              << ", credentials='" << scenario.credentials_type << "'}";
302 }
303 
Log() const304 void TestScenario::Log() const {
305   std::ostringstream out;
306   out << *this;
307   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
308 }
309 
310 class End2endTest : public ::testing::TestWithParam<TestScenario> {
311  protected:
SetUpTestCase()312   static void SetUpTestCase() { grpc_init(); }
TearDownTestCase()313   static void TearDownTestCase() { grpc_shutdown(); }
End2endTest()314   End2endTest()
315       : is_server_started_(false),
316         kMaxMessageSize_(8192),
317         special_service_("special"),
318         first_picked_port_(0) {
319     GetParam().Log();
320   }
321 
TearDown()322   void TearDown() override {
323     if (is_server_started_) {
324       server_->Shutdown();
325       if (proxy_server_) proxy_server_->Shutdown();
326     }
327     if (first_picked_port_ > 0) {
328       grpc_recycle_unused_port(first_picked_port_);
329     }
330   }
331 
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)332   void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
333     int port = grpc_pick_unused_port_or_die();
334     first_picked_port_ = port;
335     server_address_ << "localhost:" << port;
336     // Setup server
337     BuildAndStartServer(processor);
338   }
339 
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)340   void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
341     if (is_server_started_) {
342       server_->Shutdown();
343       BuildAndStartServer(processor);
344     }
345   }
346 
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)347   void BuildAndStartServer(
348       const std::shared_ptr<AuthMetadataProcessor>& processor) {
349     ServerBuilder builder;
350     ConfigureServerBuilder(&builder);
351     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
352         GetParam().credentials_type);
353     if (GetParam().credentials_type != kInsecureCredentialsType) {
354       server_creds->SetAuthMetadataProcessor(processor);
355     }
356     if (GetParam().use_interceptors) {
357       std::vector<
358           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
359           creators;
360       // Add 20 phony server interceptors
361       creators.reserve(20);
362       for (auto i = 0; i < 20; i++) {
363         creators.push_back(absl::make_unique<PhonyInterceptorFactory>());
364       }
365       builder.experimental().SetInterceptorCreators(std::move(creators));
366     }
367     builder.AddListeningPort(server_address_.str(), server_creds);
368     if (!GetParam().callback_server) {
369       builder.RegisterService(&service_);
370     } else {
371       builder.RegisterService(&callback_service_);
372     }
373     builder.RegisterService("foo.test.youtube.com", &special_service_);
374     builder.RegisterService(&dup_pkg_service_);
375 
376     builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
377     builder.SetSyncServerOption(
378         ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
379 
380     server_ = builder.BuildAndStart();
381     is_server_started_ = true;
382   }
383 
ConfigureServerBuilder(ServerBuilder * builder)384   virtual void ConfigureServerBuilder(ServerBuilder* builder) {
385     builder->SetMaxMessageSize(
386         kMaxMessageSize_);  // For testing max message size.
387   }
388 
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})389   void ResetChannel(
390       std::vector<
391           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
392           interceptor_creators = {}) {
393     if (!is_server_started_) {
394       StartServer(std::shared_ptr<AuthMetadataProcessor>());
395     }
396     EXPECT_TRUE(is_server_started_);
397     ChannelArguments args;
398     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
399         GetParam().credentials_type, &args);
400     if (!user_agent_prefix_.empty()) {
401       args.SetUserAgentPrefix(user_agent_prefix_);
402     }
403     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
404 
405     if (!GetParam().inproc) {
406       if (!GetParam().use_interceptors) {
407         channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
408                                                channel_creds, args);
409       } else {
410         channel_ = CreateCustomChannelWithInterceptors(
411             server_address_.str(), channel_creds, args,
412             interceptor_creators.empty() ? CreatePhonyClientInterceptors()
413                                          : std::move(interceptor_creators));
414       }
415     } else {
416       if (!GetParam().use_interceptors) {
417         channel_ = server_->InProcessChannel(args);
418       } else {
419         channel_ = server_->experimental().InProcessChannelWithInterceptors(
420             args, interceptor_creators.empty()
421                       ? CreatePhonyClientInterceptors()
422                       : std::move(interceptor_creators));
423       }
424     }
425   }
426 
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})427   void ResetStub(
428       std::vector<
429           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
430           interceptor_creators = {}) {
431     ResetChannel(std::move(interceptor_creators));
432     if (GetParam().use_proxy) {
433       proxy_service_ = absl::make_unique<Proxy>(channel_);
434       int port = grpc_pick_unused_port_or_die();
435       std::ostringstream proxyaddr;
436       proxyaddr << "localhost:" << port;
437       ServerBuilder builder;
438       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
439       builder.RegisterService(proxy_service_.get());
440 
441       builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
442       builder.SetSyncServerOption(
443           ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
444 
445       proxy_server_ = builder.BuildAndStart();
446 
447       channel_ =
448           grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
449     }
450 
451     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
452     PhonyInterceptor::Reset();
453   }
454 
455   bool is_server_started_;
456   std::shared_ptr<Channel> channel_;
457   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
458   std::unique_ptr<Server> server_;
459   std::unique_ptr<Server> proxy_server_;
460   std::unique_ptr<Proxy> proxy_service_;
461   std::ostringstream server_address_;
462   const int kMaxMessageSize_;
463   TestServiceImpl service_;
464   CallbackTestServiceImpl callback_service_;
465   TestServiceImpl special_service_;
466   TestServiceImplDupPkg dup_pkg_service_;
467   std::string user_agent_prefix_;
468   int first_picked_port_;
469 };
470 
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)471 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
472                     bool with_binary_metadata) {
473   EchoRequest request;
474   EchoResponse response;
475   request.set_message("Hello hello hello hello");
476 
477   for (int i = 0; i < num_rpcs; ++i) {
478     ClientContext context;
479     if (with_binary_metadata) {
480       char bytes[8] = {'\0', '\1', '\2', '\3',
481                        '\4', '\5', '\6', static_cast<char>(i)};
482       context.AddMetadata("custom-bin", std::string(bytes, 8));
483     }
484     context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
485     Status s = stub->Echo(&context, request, &response);
486     EXPECT_EQ(response.message(), request.message());
487     EXPECT_TRUE(s.ok());
488   }
489 }
490 
491 // This class is for testing scenarios where RPCs are cancelled on the server
492 // by calling ServerContext::TryCancel()
493 class End2endServerTryCancelTest : public End2endTest {
494  protected:
495   // Helper for testing client-streaming RPCs which are cancelled on the server.
496   // Depending on the value of server_try_cancel parameter, this will test one
497   // of the following three scenarios:
498   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
499   //   any messages from the client
500   //
501   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
502   //   messages from the client
503   //
504   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
505   //   the messages from the client
506   //
507   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)508   void TestRequestStreamServerCancel(
509       ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
510     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
511     ResetStub();
512     EchoRequest request;
513     EchoResponse response;
514     ClientContext context;
515 
516     // Send server_try_cancel value in the client metadata
517     context.AddMetadata(kServerTryCancelRequest,
518                         std::to_string(server_try_cancel));
519 
520     auto stream = stub_->RequestStream(&context, &response);
521 
522     int num_msgs_sent = 0;
523     while (num_msgs_sent < num_msgs_to_send) {
524       request.set_message("hello");
525       if (!stream->Write(request)) {
526         break;
527       }
528       num_msgs_sent++;
529     }
530     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
531 
532     stream->WritesDone();
533     Status s = stream->Finish();
534 
535     // At this point, we know for sure that RPC was cancelled by the server
536     // since we passed server_try_cancel value in the metadata. Depending on the
537     // value of server_try_cancel, the RPC might have been cancelled by the
538     // server at different stages. The following validates our expectations of
539     // number of messages sent in various cancellation scenarios:
540 
541     switch (server_try_cancel) {
542       case CANCEL_BEFORE_PROCESSING:
543       case CANCEL_DURING_PROCESSING:
544         // If the RPC is cancelled by server before / during messages from the
545         // client, it means that the client most likely did not get a chance to
546         // send all the messages it wanted to send. i.e num_msgs_sent <=
547         // num_msgs_to_send
548         EXPECT_LE(num_msgs_sent, num_msgs_to_send);
549         break;
550 
551       case CANCEL_AFTER_PROCESSING:
552         // If the RPC was cancelled after all messages were read by the server,
553         // the client did get a chance to send all its messages
554         EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
555         break;
556 
557       default:
558         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
559                 server_try_cancel);
560         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
561                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
562         break;
563     }
564 
565     EXPECT_FALSE(s.ok());
566     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
567     // Make sure that the server interceptors were notified
568     if (GetParam().use_interceptors) {
569       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
570     }
571   }
572 
573   // Helper for testing server-streaming RPCs which are cancelled on the server.
574   // Depending on the value of server_try_cancel parameter, this will test one
575   // of the following three scenarios:
576   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
577   //   any messages to the client
578   //
579   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
580   //   messages to the client
581   //
582   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
583   //   the messages to the client
584   //
585   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)586   void TestResponseStreamServerCancel(
587       ServerTryCancelRequestPhase server_try_cancel) {
588     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
589     ResetStub();
590     EchoRequest request;
591     EchoResponse response;
592     ClientContext context;
593 
594     // Send server_try_cancel in the client metadata
595     context.AddMetadata(kServerTryCancelRequest,
596                         std::to_string(server_try_cancel));
597 
598     request.set_message("hello");
599     auto stream = stub_->ResponseStream(&context, request);
600 
601     int num_msgs_read = 0;
602     while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
603       if (!stream->Read(&response)) {
604         break;
605       }
606       EXPECT_EQ(response.message(),
607                 request.message() + std::to_string(num_msgs_read));
608       num_msgs_read++;
609     }
610     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
611 
612     Status s = stream->Finish();
613 
614     // Depending on the value of server_try_cancel, the RPC might have been
615     // cancelled by the server at different stages. The following validates our
616     // expectations of number of messages read in various cancellation
617     // scenarios:
618     switch (server_try_cancel) {
619       case CANCEL_BEFORE_PROCESSING:
620         // Server cancelled before sending any messages. Which means the client
621         // wouldn't have read any
622         EXPECT_EQ(num_msgs_read, 0);
623         break;
624 
625       case CANCEL_DURING_PROCESSING:
626         // Server cancelled while writing messages. Client must have read less
627         // than or equal to the expected number of messages
628         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
629         break;
630 
631       case CANCEL_AFTER_PROCESSING:
632         // Even though the Server cancelled after writing all messages, the RPC
633         // may be cancelled before the Client got a chance to read all the
634         // messages.
635         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
636         break;
637 
638       default: {
639         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
640                 server_try_cancel);
641         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
642                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
643         break;
644       }
645     }
646 
647     EXPECT_FALSE(s.ok());
648     // Make sure that the server interceptors were notified
649     if (GetParam().use_interceptors) {
650       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
651     }
652   }
653 
654   // Helper for testing bidirectional-streaming RPCs which are cancelled on the
655   // server. Depending on the value of server_try_cancel parameter, this will
656   // test one of the following three scenarios:
657   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
658   //   writing any messages from/to the client
659   //
660   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
661   //   writing messages from/to the client
662   //
663   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
664   //   all the messages from/to the client
665   //
666   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)667   void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
668                                   int num_messages) {
669     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
670     ResetStub();
671     EchoRequest request;
672     EchoResponse response;
673     ClientContext context;
674 
675     // Send server_try_cancel in the client metadata
676     context.AddMetadata(kServerTryCancelRequest,
677                         std::to_string(server_try_cancel));
678 
679     auto stream = stub_->BidiStream(&context);
680 
681     int num_msgs_read = 0;
682     int num_msgs_sent = 0;
683     while (num_msgs_sent < num_messages) {
684       request.set_message("hello " + std::to_string(num_msgs_sent));
685       if (!stream->Write(request)) {
686         break;
687       }
688       num_msgs_sent++;
689 
690       if (!stream->Read(&response)) {
691         break;
692       }
693       num_msgs_read++;
694 
695       EXPECT_EQ(response.message(), request.message());
696     }
697     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
698     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
699 
700     stream->WritesDone();
701     Status s = stream->Finish();
702 
703     // Depending on the value of server_try_cancel, the RPC might have been
704     // cancelled by the server at different stages. The following validates our
705     // expectations of number of messages read in various cancellation
706     // scenarios:
707     switch (server_try_cancel) {
708       case CANCEL_BEFORE_PROCESSING:
709         EXPECT_EQ(num_msgs_read, 0);
710         break;
711 
712       case CANCEL_DURING_PROCESSING:
713         EXPECT_LE(num_msgs_sent, num_messages);
714         EXPECT_LE(num_msgs_read, num_msgs_sent);
715         break;
716 
717       case CANCEL_AFTER_PROCESSING:
718         EXPECT_EQ(num_msgs_sent, num_messages);
719 
720         // The Server cancelled after reading the last message and after writing
721         // the message to the client. However, the RPC cancellation might have
722         // taken effect before the client actually read the response.
723         EXPECT_LE(num_msgs_read, num_msgs_sent);
724         break;
725 
726       default:
727         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
728                 server_try_cancel);
729         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
730                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
731         break;
732     }
733 
734     EXPECT_FALSE(s.ok());
735     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
736     // Make sure that the server interceptors were notified
737     if (GetParam().use_interceptors) {
738       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
739     }
740   }
741 };
742 
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)743 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
744   ResetStub();
745   EchoRequest request;
746   EchoResponse response;
747   ClientContext context;
748 
749   context.AddMetadata(kServerTryCancelRequest,
750                       std::to_string(CANCEL_BEFORE_PROCESSING));
751   Status s = stub_->Echo(&context, request, &response);
752   EXPECT_FALSE(s.ok());
753   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
754 }
755 
756 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)757 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
758   TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
759 }
760 
761 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)762 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
763   TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
764 }
765 
766 // Server to cancel after reading all the requests but before returning to the
767 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)768 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
769   TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
770 }
771 
772 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)773 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
774   TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
775 }
776 
777 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)778 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
779   TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
780 }
781 
782 // Server to cancel after writing all the respones to the stream but before
783 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)784 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
785   TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
786 }
787 
788 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)789 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
790   TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
791 }
792 
793 // Server to cancel while reading/writing requests/responses on the stream in
794 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)795 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
796   TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
797 }
798 
799 // Server to cancel after reading/writing all requests/responses on the stream
800 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)801 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
802   TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
803 }
804 
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)805 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
806   // User-Agent is an HTTP header for HTTP transports only
807   if (GetParam().inproc) {
808     return;
809   }
810   user_agent_prefix_ = "custom_prefix";
811   ResetStub();
812   EchoRequest request;
813   EchoResponse response;
814   request.set_message("Hello hello hello hello");
815   request.mutable_param()->set_echo_metadata(true);
816 
817   ClientContext context;
818   Status s = stub_->Echo(&context, request, &response);
819   EXPECT_EQ(response.message(), request.message());
820   EXPECT_TRUE(s.ok());
821   const auto& trailing_metadata = context.GetServerTrailingMetadata();
822   auto iter = trailing_metadata.find("user-agent");
823   EXPECT_TRUE(iter != trailing_metadata.end());
824   std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
825   EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
826 }
827 
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)828 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
829   ResetStub();
830   std::vector<std::thread> threads;
831   threads.reserve(10);
832   for (int i = 0; i < 10; ++i) {
833     threads.emplace_back(SendRpc, stub_.get(), 10, true);
834   }
835   for (int i = 0; i < 10; ++i) {
836     threads[i].join();
837   }
838 }
839 
TEST_P(End2endTest,MultipleRpcs)840 TEST_P(End2endTest, MultipleRpcs) {
841   ResetStub();
842   std::vector<std::thread> threads;
843   threads.reserve(10);
844   for (int i = 0; i < 10; ++i) {
845     threads.emplace_back(SendRpc, stub_.get(), 10, false);
846   }
847   for (int i = 0; i < 10; ++i) {
848     threads[i].join();
849   }
850 }
851 
TEST_P(End2endTest,ManyStubs)852 TEST_P(End2endTest, ManyStubs) {
853   ResetStub();
854   ChannelTestPeer peer(channel_.get());
855   int registered_calls_pre = peer.registered_calls();
856   int registration_attempts_pre = peer.registration_attempts();
857   for (int i = 0; i < 1000; ++i) {
858     grpc::testing::EchoTestService::NewStub(channel_);
859   }
860   EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
861   EXPECT_GT(peer.registration_attempts(), registration_attempts_pre);
862 }
863 
TEST_P(End2endTest,EmptyBinaryMetadata)864 TEST_P(End2endTest, EmptyBinaryMetadata) {
865   ResetStub();
866   EchoRequest request;
867   EchoResponse response;
868   request.set_message("Hello hello hello hello");
869   ClientContext context;
870   context.AddMetadata("custom-bin", "");
871   Status s = stub_->Echo(&context, request, &response);
872   EXPECT_EQ(response.message(), request.message());
873   EXPECT_TRUE(s.ok());
874 }
875 
TEST_P(End2endTest,ReconnectChannel)876 TEST_P(End2endTest, ReconnectChannel) {
877   if (GetParam().inproc) {
878     return;
879   }
880   int poller_slowdown_factor = 1;
881   // It needs 2 pollset_works to reconnect the channel with polling engine
882   // "poll"
883 #ifdef GRPC_POSIX_SOCKET_EV
884   grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
885   if (0 == strcmp(poller.get(), "poll")) {
886     poller_slowdown_factor = 2;
887   }
888 #endif  // GRPC_POSIX_SOCKET_EV
889   ResetStub();
890   SendRpc(stub_.get(), 1, false);
891   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
892   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
893   // reconnect the channel. Make it a factor of 5x
894   gpr_sleep_until(
895       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
896                    gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
897                                             poller_slowdown_factor *
898                                             grpc_test_slowdown_factor(),
899                                         GPR_TIMESPAN)));
900   SendRpc(stub_.get(), 1, false);
901 }
902 
TEST_P(End2endTest,RequestStreamOneRequest)903 TEST_P(End2endTest, RequestStreamOneRequest) {
904   ResetStub();
905   EchoRequest request;
906   EchoResponse response;
907   ClientContext context;
908 
909   auto stream = stub_->RequestStream(&context, &response);
910   request.set_message("hello");
911   EXPECT_TRUE(stream->Write(request));
912   stream->WritesDone();
913   Status s = stream->Finish();
914   EXPECT_EQ(response.message(), request.message());
915   EXPECT_TRUE(s.ok());
916   EXPECT_TRUE(context.debug_error_string().empty());
917 }
918 
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)919 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
920   ResetStub();
921   EchoRequest request;
922   EchoResponse response;
923   ClientContext context;
924 
925   context.set_initial_metadata_corked(true);
926   auto stream = stub_->RequestStream(&context, &response);
927   request.set_message("hello");
928   stream->WriteLast(request, WriteOptions());
929   Status s = stream->Finish();
930   EXPECT_EQ(response.message(), request.message());
931   EXPECT_TRUE(s.ok());
932 }
933 
TEST_P(End2endTest,RequestStreamTwoRequests)934 TEST_P(End2endTest, RequestStreamTwoRequests) {
935   ResetStub();
936   EchoRequest request;
937   EchoResponse response;
938   ClientContext context;
939 
940   auto stream = stub_->RequestStream(&context, &response);
941   request.set_message("hello");
942   EXPECT_TRUE(stream->Write(request));
943   EXPECT_TRUE(stream->Write(request));
944   stream->WritesDone();
945   Status s = stream->Finish();
946   EXPECT_EQ(response.message(), "hellohello");
947   EXPECT_TRUE(s.ok());
948 }
949 
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)950 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
951   ResetStub();
952   EchoRequest request;
953   EchoResponse response;
954   ClientContext context;
955 
956   auto stream = stub_->RequestStream(&context, &response);
957   request.set_message("hello");
958   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
959   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
960   stream->WritesDone();
961   Status s = stream->Finish();
962   EXPECT_EQ(response.message(), "hellohello");
963   EXPECT_TRUE(s.ok());
964 }
965 
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)966 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
967   ResetStub();
968   EchoRequest request;
969   EchoResponse response;
970   ClientContext context;
971 
972   context.set_initial_metadata_corked(true);
973   auto stream = stub_->RequestStream(&context, &response);
974   request.set_message("hello");
975   EXPECT_TRUE(stream->Write(request));
976   stream->WriteLast(request, WriteOptions());
977   Status s = stream->Finish();
978   EXPECT_EQ(response.message(), "hellohello");
979   EXPECT_TRUE(s.ok());
980 }
981 
TEST_P(End2endTest,ResponseStream)982 TEST_P(End2endTest, ResponseStream) {
983   ResetStub();
984   EchoRequest request;
985   EchoResponse response;
986   ClientContext context;
987   request.set_message("hello");
988 
989   auto stream = stub_->ResponseStream(&context, request);
990   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
991     EXPECT_TRUE(stream->Read(&response));
992     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
993   }
994   EXPECT_FALSE(stream->Read(&response));
995 
996   Status s = stream->Finish();
997   EXPECT_TRUE(s.ok());
998 }
999 
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1000 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1001   ResetStub();
1002   EchoRequest request;
1003   EchoResponse response;
1004   ClientContext context;
1005   request.set_message("hello");
1006   context.AddMetadata(kServerUseCoalescingApi, "1");
1007 
1008   auto stream = stub_->ResponseStream(&context, request);
1009   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1010     EXPECT_TRUE(stream->Read(&response));
1011     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1012   }
1013   EXPECT_FALSE(stream->Read(&response));
1014 
1015   Status s = stream->Finish();
1016   EXPECT_TRUE(s.ok());
1017 }
1018 
1019 // This was added to prevent regression from issue:
1020 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1021 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1022   ResetStub();
1023   EchoRequest request;
1024   EchoResponse response;
1025   ClientContext context;
1026   request.set_message("hello");
1027   context.AddMetadata(kServerUseCoalescingApi, "1");
1028   // We will only send one message, forcing everything (init metadata, message,
1029   // trailing) to be coalesced together.
1030   context.AddMetadata(kServerResponseStreamsToSend, "1");
1031 
1032   auto stream = stub_->ResponseStream(&context, request);
1033   EXPECT_TRUE(stream->Read(&response));
1034   EXPECT_EQ(response.message(), request.message() + "0");
1035 
1036   EXPECT_FALSE(stream->Read(&response));
1037 
1038   Status s = stream->Finish();
1039   EXPECT_TRUE(s.ok());
1040 }
1041 
TEST_P(End2endTest,BidiStream)1042 TEST_P(End2endTest, BidiStream) {
1043   ResetStub();
1044   EchoRequest request;
1045   EchoResponse response;
1046   ClientContext context;
1047   std::string msg("hello");
1048 
1049   auto stream = stub_->BidiStream(&context);
1050 
1051   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1052     request.set_message(msg + std::to_string(i));
1053     EXPECT_TRUE(stream->Write(request));
1054     EXPECT_TRUE(stream->Read(&response));
1055     EXPECT_EQ(response.message(), request.message());
1056   }
1057 
1058   stream->WritesDone();
1059   EXPECT_FALSE(stream->Read(&response));
1060   EXPECT_FALSE(stream->Read(&response));
1061 
1062   Status s = stream->Finish();
1063   EXPECT_TRUE(s.ok());
1064 }
1065 
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1066 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1067   ResetStub();
1068   EchoRequest request;
1069   EchoResponse response;
1070   ClientContext context;
1071   context.AddMetadata(kServerFinishAfterNReads, "3");
1072   context.set_initial_metadata_corked(true);
1073   std::string msg("hello");
1074 
1075   auto stream = stub_->BidiStream(&context);
1076 
1077   request.set_message(msg + "0");
1078   EXPECT_TRUE(stream->Write(request));
1079   EXPECT_TRUE(stream->Read(&response));
1080   EXPECT_EQ(response.message(), request.message());
1081 
1082   request.set_message(msg + "1");
1083   EXPECT_TRUE(stream->Write(request));
1084   EXPECT_TRUE(stream->Read(&response));
1085   EXPECT_EQ(response.message(), request.message());
1086 
1087   request.set_message(msg + "2");
1088   stream->WriteLast(request, WriteOptions());
1089   EXPECT_TRUE(stream->Read(&response));
1090   EXPECT_EQ(response.message(), request.message());
1091 
1092   EXPECT_FALSE(stream->Read(&response));
1093   EXPECT_FALSE(stream->Read(&response));
1094 
1095   Status s = stream->Finish();
1096   EXPECT_TRUE(s.ok());
1097 }
1098 
1099 // This was added to prevent regression from issue:
1100 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1101 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1102   ResetStub();
1103   EchoRequest request;
1104   EchoResponse response;
1105   ClientContext context;
1106   context.AddMetadata(kServerFinishAfterNReads, "1");
1107   context.set_initial_metadata_corked(true);
1108   std::string msg("hello");
1109 
1110   auto stream = stub_->BidiStream(&context);
1111 
1112   request.set_message(msg + "0");
1113   stream->WriteLast(request, WriteOptions());
1114   EXPECT_TRUE(stream->Read(&response));
1115   EXPECT_EQ(response.message(), request.message());
1116 
1117   EXPECT_FALSE(stream->Read(&response));
1118   EXPECT_FALSE(stream->Read(&response));
1119 
1120   Status s = stream->Finish();
1121   EXPECT_TRUE(s.ok());
1122 }
1123 
1124 // Talk to the two services with the same name but different package names.
1125 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1126 TEST_P(End2endTest, DiffPackageServices) {
1127   ResetStub();
1128   EchoRequest request;
1129   EchoResponse response;
1130   request.set_message("Hello");
1131 
1132   ClientContext context;
1133   Status s = stub_->Echo(&context, request, &response);
1134   EXPECT_EQ(response.message(), request.message());
1135   EXPECT_TRUE(s.ok());
1136 
1137   std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1138       grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1139   ClientContext context2;
1140   s = dup_pkg_stub->Echo(&context2, request, &response);
1141   EXPECT_EQ("no package", response.message());
1142   EXPECT_TRUE(s.ok());
1143 }
1144 
1145 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1146 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1147   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1148                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1149   while (!service->signal_client()) {
1150   }
1151   context->TryCancel();
1152 }
1153 
TEST_P(End2endTest,CancelRpcBeforeStart)1154 TEST_P(End2endTest, CancelRpcBeforeStart) {
1155   ResetStub();
1156   EchoRequest request;
1157   EchoResponse response;
1158   ClientContext context;
1159   request.set_message("hello");
1160   context.TryCancel();
1161   Status s = stub_->Echo(&context, request, &response);
1162   EXPECT_EQ("", response.message());
1163   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1164   if (GetParam().use_interceptors) {
1165     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1166   }
1167 }
1168 
TEST_P(End2endTest,CancelRpcAfterStart)1169 TEST_P(End2endTest, CancelRpcAfterStart) {
1170   ResetStub();
1171   EchoRequest request;
1172   EchoResponse response;
1173   ClientContext context;
1174   request.set_message("hello");
1175   request.mutable_param()->set_server_notify_client_when_started(true);
1176   request.mutable_param()->set_skip_cancelled_check(true);
1177   Status s;
1178   std::thread echo_thread([this, &s, &context, &request, &response] {
1179     s = stub_->Echo(&context, request, &response);
1180     EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1181   });
1182   if (!GetParam().callback_server) {
1183     service_.ClientWaitUntilRpcStarted();
1184   } else {
1185     callback_service_.ClientWaitUntilRpcStarted();
1186   }
1187 
1188   context.TryCancel();
1189 
1190   if (!GetParam().callback_server) {
1191     service_.SignalServerToContinue();
1192   } else {
1193     callback_service_.SignalServerToContinue();
1194   }
1195 
1196   echo_thread.join();
1197   EXPECT_EQ("", response.message());
1198   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1199   if (GetParam().use_interceptors) {
1200     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1201   }
1202 }
1203 
1204 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1205 TEST_P(End2endTest, ClientCancelsRequestStream) {
1206   ResetStub();
1207   EchoRequest request;
1208   EchoResponse response;
1209   ClientContext context;
1210   request.set_message("hello");
1211 
1212   auto stream = stub_->RequestStream(&context, &response);
1213   EXPECT_TRUE(stream->Write(request));
1214   EXPECT_TRUE(stream->Write(request));
1215 
1216   context.TryCancel();
1217 
1218   Status s = stream->Finish();
1219   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1220 
1221   EXPECT_EQ(response.message(), "");
1222   if (GetParam().use_interceptors) {
1223     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1224   }
1225 }
1226 
1227 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1228 TEST_P(End2endTest, ClientCancelsResponseStream) {
1229   ResetStub();
1230   EchoRequest request;
1231   EchoResponse response;
1232   ClientContext context;
1233   request.set_message("hello");
1234 
1235   auto stream = stub_->ResponseStream(&context, request);
1236 
1237   EXPECT_TRUE(stream->Read(&response));
1238   EXPECT_EQ(response.message(), request.message() + "0");
1239   EXPECT_TRUE(stream->Read(&response));
1240   EXPECT_EQ(response.message(), request.message() + "1");
1241 
1242   context.TryCancel();
1243 
1244   // The cancellation races with responses, so there might be zero or
1245   // one responses pending, read till failure
1246 
1247   if (stream->Read(&response)) {
1248     EXPECT_EQ(response.message(), request.message() + "2");
1249     // Since we have cancelled, we expect the next attempt to read to fail
1250     EXPECT_FALSE(stream->Read(&response));
1251   }
1252 
1253   Status s = stream->Finish();
1254   // The final status could be either of CANCELLED or OK depending on
1255   // who won the race.
1256   EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1257   if (GetParam().use_interceptors) {
1258     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1259   }
1260 }
1261 
1262 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1263 TEST_P(End2endTest, ClientCancelsBidi) {
1264   ResetStub();
1265   EchoRequest request;
1266   EchoResponse response;
1267   ClientContext context;
1268   std::string msg("hello");
1269 
1270   // Send server_try_cancel value in the client metadata
1271   context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1272 
1273   auto stream = stub_->BidiStream(&context);
1274 
1275   request.set_message(msg + "0");
1276   EXPECT_TRUE(stream->Write(request));
1277   EXPECT_TRUE(stream->Read(&response));
1278   EXPECT_EQ(response.message(), request.message());
1279 
1280   request.set_message(msg + "1");
1281   EXPECT_TRUE(stream->Write(request));
1282 
1283   context.TryCancel();
1284 
1285   // The cancellation races with responses, so there might be zero or
1286   // one responses pending, read till failure
1287 
1288   if (stream->Read(&response)) {
1289     EXPECT_EQ(response.message(), request.message());
1290     // Since we have cancelled, we expect the next attempt to read to fail
1291     EXPECT_FALSE(stream->Read(&response));
1292   }
1293 
1294   Status s = stream->Finish();
1295   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1296   if (GetParam().use_interceptors) {
1297     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1298   }
1299 }
1300 
TEST_P(End2endTest,RpcMaxMessageSize)1301 TEST_P(End2endTest, RpcMaxMessageSize) {
1302   ResetStub();
1303   EchoRequest request;
1304   EchoResponse response;
1305   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1306   request.mutable_param()->set_server_die(true);
1307 
1308   ClientContext context;
1309   Status s = stub_->Echo(&context, request, &response);
1310   EXPECT_FALSE(s.ok());
1311 }
1312 
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1313 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1314                       gpr_event* ev) {
1315   EchoResponse resp;
1316   gpr_event_set(ev, reinterpret_cast<void*>(1));
1317   while (stream->Read(&resp)) {
1318     gpr_log(GPR_INFO, "Read message");
1319   }
1320 }
1321 
1322 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1323 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1324   ResetStub();
1325   ClientContext context;
1326   gpr_event ev;
1327   gpr_event_init(&ev);
1328   auto stream = stub_->BidiStream(&context);
1329   std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1330   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1331   stream->WritesDone();
1332   reader_thread.join();
1333   Status s = stream->Finish();
1334   EXPECT_TRUE(s.ok());
1335 }
1336 
TEST_P(End2endTest,ChannelState)1337 TEST_P(End2endTest, ChannelState) {
1338   if (GetParam().inproc) {
1339     return;
1340   }
1341 
1342   ResetStub();
1343   // Start IDLE
1344   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1345 
1346   // Did not ask to connect, no state change.
1347   CompletionQueue cq;
1348   std::chrono::system_clock::time_point deadline =
1349       std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1350   channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1351   void* tag;
1352   bool ok = true;
1353   cq.Next(&tag, &ok);
1354   EXPECT_FALSE(ok);
1355 
1356   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1357   EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1358                                            gpr_inf_future(GPR_CLOCK_REALTIME)));
1359   auto state = channel_->GetState(false);
1360   EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1361 }
1362 
1363 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1364 TEST_P(End2endTest, ChannelStateTimeout) {
1365   if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1366       GetParam().inproc) {
1367     return;
1368   }
1369   int port = grpc_pick_unused_port_or_die();
1370   std::ostringstream server_address;
1371   server_address << "localhost:" << port;
1372   // Channel to non-existing server
1373   auto channel =
1374       grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1375   // Start IDLE
1376   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1377 
1378   auto state = GRPC_CHANNEL_IDLE;
1379   for (int i = 0; i < 10; i++) {
1380     channel->WaitForStateChange(
1381         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1382     state = channel->GetState(false);
1383   }
1384 }
1385 
TEST_P(End2endTest,ChannelStateOnLameChannel)1386 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1387   if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1388       GetParam().inproc) {
1389     return;
1390   }
1391   // Channel using invalid target URI.  This creates a lame channel.
1392   auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1393   // Channel should immediately report TRANSIENT_FAILURE.
1394   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1395   // And state will never change.
1396   auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1397   for (int i = 0; i < 10; ++i) {
1398     channel->WaitForStateChange(
1399         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1400     state = channel->GetState(false);
1401   }
1402 }
1403 
1404 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1405 TEST_P(End2endTest, NonExistingService) {
1406   ResetChannel();
1407   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1408   stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1409 
1410   EchoRequest request;
1411   EchoResponse response;
1412   request.set_message("Hello");
1413 
1414   ClientContext context;
1415   Status s = stub->Unimplemented(&context, request, &response);
1416   EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1417   EXPECT_EQ("", s.error_message());
1418 }
1419 
1420 // Ask the server to send back a serialized proto in trailer.
1421 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1422 TEST_P(End2endTest, BinaryTrailerTest) {
1423   ResetStub();
1424   EchoRequest request;
1425   EchoResponse response;
1426   ClientContext context;
1427 
1428   request.mutable_param()->set_echo_metadata(true);
1429   DebugInfo* info = request.mutable_param()->mutable_debug_info();
1430   info->add_stack_entries("stack_entry_1");
1431   info->add_stack_entries("stack_entry_2");
1432   info->add_stack_entries("stack_entry_3");
1433   info->set_detail("detailed debug info");
1434   std::string expected_string = info->SerializeAsString();
1435   request.set_message("Hello");
1436 
1437   Status s = stub_->Echo(&context, request, &response);
1438   EXPECT_FALSE(s.ok());
1439   auto trailers = context.GetServerTrailingMetadata();
1440   EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1441   auto iter = trailers.find(kDebugInfoTrailerKey);
1442   EXPECT_EQ(expected_string, iter->second);
1443   // Parse the returned trailer into a DebugInfo proto.
1444   DebugInfo returned_info;
1445   EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1446 }
1447 
TEST_P(End2endTest,ExpectErrorTest)1448 TEST_P(End2endTest, ExpectErrorTest) {
1449   ResetStub();
1450 
1451   std::vector<ErrorStatus> expected_status;
1452   expected_status.emplace_back();
1453   expected_status.back().set_code(13);  // INTERNAL
1454   // No Error message or details
1455 
1456   expected_status.emplace_back();
1457   expected_status.back().set_code(13);  // INTERNAL
1458   expected_status.back().set_error_message("text error message");
1459   expected_status.back().set_binary_error_details("text error details");
1460 
1461   expected_status.emplace_back();
1462   expected_status.back().set_code(13);  // INTERNAL
1463   expected_status.back().set_error_message("text error message");
1464   expected_status.back().set_binary_error_details(
1465       "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1466 
1467   for (auto iter = expected_status.begin(); iter != expected_status.end();
1468        ++iter) {
1469     EchoRequest request;
1470     EchoResponse response;
1471     ClientContext context;
1472     request.set_message("Hello");
1473     auto* error = request.mutable_param()->mutable_expected_error();
1474     error->set_code(iter->code());
1475     error->set_error_message(iter->error_message());
1476     error->set_binary_error_details(iter->binary_error_details());
1477 
1478     Status s = stub_->Echo(&context, request, &response);
1479     EXPECT_FALSE(s.ok());
1480     EXPECT_EQ(iter->code(), s.error_code());
1481     EXPECT_EQ(iter->error_message(), s.error_message());
1482     EXPECT_EQ(iter->binary_error_details(), s.error_details());
1483     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1484 #ifndef NDEBUG
1485     // GRPC_ERROR_INT_FILE_LINE is for debug only
1486     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1487     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1488 #endif
1489     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1490     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1491   }
1492 }
1493 
1494 //////////////////////////////////////////////////////////////////////////
1495 // Test with and without a proxy.
1496 class ProxyEnd2endTest : public End2endTest {
1497  protected:
1498 };
1499 
TEST_P(ProxyEnd2endTest,SimpleRpc)1500 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1501   ResetStub();
1502   SendRpc(stub_.get(), 1, false);
1503 }
1504 
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1505 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1506   ResetStub();
1507   EchoRequest request;
1508   EchoResponse response;
1509 
1510   ClientContext context;
1511   Status s = stub_->Echo(&context, request, &response);
1512   EXPECT_TRUE(s.ok());
1513 }
1514 
TEST_P(ProxyEnd2endTest,MultipleRpcs)1515 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1516   ResetStub();
1517   std::vector<std::thread> threads;
1518   threads.reserve(10);
1519   for (int i = 0; i < 10; ++i) {
1520     threads.emplace_back(SendRpc, stub_.get(), 10, false);
1521   }
1522   for (int i = 0; i < 10; ++i) {
1523     threads[i].join();
1524   }
1525 }
1526 
1527 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1528 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1529   ResetStub();
1530   EchoRequest request;
1531   EchoResponse response;
1532   request.set_message("Hello");
1533   request.mutable_param()->set_skip_cancelled_check(true);
1534   // Let server sleep for 40 ms first to guarantee expiry.
1535   // 40 ms might seem a bit extreme but the timer manager would have been just
1536   // initialized (when ResetStub() was called) and there are some warmup costs
1537   // i.e the timer thread many not have even started. There might also be other
1538   // delays in the timer manager thread (in acquiring locks, timer data
1539   // structure manipulations, starting backup timer threads) that add to the
1540   // delays. 40ms is still not enough in some cases but this significantly
1541   // reduces the test flakes
1542   request.mutable_param()->set_server_sleep_us(40 * 1000);
1543 
1544   ClientContext context;
1545   std::chrono::system_clock::time_point deadline =
1546       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1547   context.set_deadline(deadline);
1548   Status s = stub_->Echo(&context, request, &response);
1549   EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1550 }
1551 
1552 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1553 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1554   ResetStub();
1555   EchoRequest request;
1556   EchoResponse response;
1557   request.set_message("Hello");
1558 
1559   ClientContext context;
1560   std::chrono::system_clock::time_point deadline =
1561       std::chrono::system_clock::now() + std::chrono::hours(1);
1562   context.set_deadline(deadline);
1563   Status s = stub_->Echo(&context, request, &response);
1564   EXPECT_EQ(response.message(), request.message());
1565   EXPECT_TRUE(s.ok());
1566 }
1567 
1568 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1569 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1570   ResetStub();
1571   EchoRequest request;
1572   EchoResponse response;
1573   request.set_message("Hello");
1574   request.mutable_param()->set_echo_deadline(true);
1575 
1576   ClientContext context;
1577   std::chrono::system_clock::time_point deadline =
1578       std::chrono::system_clock::now() + std::chrono::seconds(100);
1579   context.set_deadline(deadline);
1580   Status s = stub_->Echo(&context, request, &response);
1581   EXPECT_EQ(response.message(), request.message());
1582   EXPECT_TRUE(s.ok());
1583   gpr_timespec sent_deadline;
1584   Timepoint2Timespec(deadline, &sent_deadline);
1585   // We want to allow some reasonable error given:
1586   // - request_deadline() only has 1sec resolution so the best we can do is +-1
1587   // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1588   // can end up being off by 2 in one direction.
1589   EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1590   EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1591 }
1592 
1593 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1594 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1595   ResetStub();
1596   EchoRequest request;
1597   EchoResponse response;
1598   request.set_message("Hello");
1599   request.mutable_param()->set_echo_deadline(true);
1600 
1601   ClientContext context;
1602   Status s = stub_->Echo(&context, request, &response);
1603   EXPECT_EQ(response.message(), request.message());
1604   EXPECT_TRUE(s.ok());
1605   EXPECT_EQ(response.param().request_deadline(),
1606             gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1607 }
1608 
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1609 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1610   ResetStub();
1611   EchoRequest request;
1612   EchoResponse response;
1613   request.set_message("Hello");
1614 
1615   ClientContext context;
1616   Status s = stub_->Unimplemented(&context, request, &response);
1617   EXPECT_FALSE(s.ok());
1618   EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1619   EXPECT_EQ(s.error_message(), "");
1620   EXPECT_EQ(response.message(), "");
1621 }
1622 
1623 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1624 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1625   ResetStub();
1626   EchoRequest request;
1627   EchoResponse response;
1628   request.set_message("Hello");
1629   const int kCancelDelayUs = 10 * 1000;
1630   request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1631 
1632   ClientContext context;
1633   std::thread cancel_thread;
1634   if (!GetParam().callback_server) {
1635     cancel_thread = std::thread(
1636         [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1637         kCancelDelayUs);
1638     // Note: the unusual pattern above (and below) is caused by a conflict
1639     // between two sets of compiler expectations. clang allows const to be
1640     // captured without mention, so there is no need to capture kCancelDelayUs
1641     // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1642     // in our tests requires an explicit capture even for const. We square this
1643     // circle by passing the const value in as an argument to the lambda.
1644   } else {
1645     cancel_thread = std::thread(
1646         [&context, this](int delay) {
1647           CancelRpc(&context, delay, &callback_service_);
1648         },
1649         kCancelDelayUs);
1650   }
1651   Status s = stub_->Echo(&context, request, &response);
1652   cancel_thread.join();
1653   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1654   EXPECT_EQ(s.error_message(), "CANCELLED");
1655 }
1656 
1657 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1658 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1659   ResetStub();
1660   EchoRequest request;
1661   EchoResponse response;
1662   request.set_message("Hello");
1663   request.mutable_param()->set_server_cancel_after_us(1000);
1664 
1665   ClientContext context;
1666   Status s = stub_->Echo(&context, request, &response);
1667   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1668   EXPECT_TRUE(s.error_message().empty());
1669 }
1670 
1671 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1672 TEST_P(ProxyEnd2endTest, HugeResponse) {
1673   ResetStub();
1674   EchoRequest request;
1675   EchoResponse response;
1676   request.set_message("huge response");
1677   const size_t kResponseSize = 1024 * (1024 + 10);
1678   request.mutable_param()->set_response_message_length(kResponseSize);
1679 
1680   ClientContext context;
1681   std::chrono::system_clock::time_point deadline =
1682       std::chrono::system_clock::now() + std::chrono::seconds(20);
1683   context.set_deadline(deadline);
1684   Status s = stub_->Echo(&context, request, &response);
1685   EXPECT_EQ(kResponseSize, response.message().size());
1686   EXPECT_TRUE(s.ok());
1687 }
1688 
TEST_P(ProxyEnd2endTest,Peer)1689 TEST_P(ProxyEnd2endTest, Peer) {
1690   // Peer is not meaningful for inproc
1691   if (GetParam().inproc) {
1692     return;
1693   }
1694   ResetStub();
1695   EchoRequest request;
1696   EchoResponse response;
1697   request.set_message("hello");
1698   request.mutable_param()->set_echo_peer(true);
1699 
1700   ClientContext context;
1701   Status s = stub_->Echo(&context, request, &response);
1702   EXPECT_EQ(response.message(), request.message());
1703   EXPECT_TRUE(s.ok());
1704   EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1705   EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1706 }
1707 
1708 //////////////////////////////////////////////////////////////////////////
1709 class SecureEnd2endTest : public End2endTest {
1710  protected:
SecureEnd2endTest()1711   SecureEnd2endTest() {
1712     GPR_ASSERT(!GetParam().use_proxy);
1713     GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
1714   }
1715 };
1716 
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1717 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1718   ResetStub();
1719 
1720   EchoRequest request;
1721   EchoResponse response;
1722   request.set_message("Hello");
1723 
1724   ClientContext context;
1725   context.set_authority("foo.test.youtube.com");
1726   Status s = stub_->Echo(&context, request, &response);
1727   EXPECT_EQ(response.message(), request.message());
1728   EXPECT_TRUE(response.has_param());
1729   EXPECT_EQ("special", response.param().host());
1730   EXPECT_TRUE(s.ok());
1731 }
1732 
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1733 bool MetadataContains(
1734     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1735     const std::string& key, const std::string& value) {
1736   int count = 0;
1737 
1738   for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1739            metadata.begin();
1740        iter != metadata.end(); ++iter) {
1741     if (ToString(iter->first) == key && ToString(iter->second) == value) {
1742       count++;
1743     }
1744   }
1745   return count == 1;
1746 }
1747 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1748 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1749   auto* processor = new TestAuthMetadataProcessor(true);
1750   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1751   ResetStub();
1752   EchoRequest request;
1753   EchoResponse response;
1754   ClientContext context;
1755   context.set_credentials(processor->GetCompatibleClientCreds());
1756   request.set_message("Hello");
1757   request.mutable_param()->set_echo_metadata(true);
1758   request.mutable_param()->set_expected_client_identity(
1759       TestAuthMetadataProcessor::kGoodGuy);
1760   request.mutable_param()->set_expected_transport_security_type(
1761       GetParam().credentials_type);
1762 
1763   Status s = stub_->Echo(&context, request, &response);
1764   EXPECT_EQ(request.message(), response.message());
1765   EXPECT_TRUE(s.ok());
1766 
1767   // Metadata should have been consumed by the processor.
1768   EXPECT_FALSE(MetadataContains(
1769       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1770       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1771 }
1772 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1773 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1774   auto* processor = new TestAuthMetadataProcessor(true);
1775   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1776   ResetStub();
1777   EchoRequest request;
1778   EchoResponse response;
1779   ClientContext context;
1780   context.set_credentials(processor->GetIncompatibleClientCreds());
1781   request.set_message("Hello");
1782 
1783   Status s = stub_->Echo(&context, request, &response);
1784   EXPECT_FALSE(s.ok());
1785   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1786 }
1787 
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1788 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1789   ResetStub();
1790   EchoRequest request;
1791   EchoResponse response;
1792   ClientContext context;
1793   std::shared_ptr<CallCredentials> creds =
1794       GoogleIAMCredentials(kFakeToken, kFakeSelector);
1795   context.set_credentials(creds);
1796   request.set_message("Hello");
1797   request.mutable_param()->set_echo_metadata(true);
1798 
1799   Status s = stub_->Echo(&context, request, &response);
1800   EXPECT_EQ(request.message(), response.message());
1801   EXPECT_TRUE(s.ok());
1802   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1803                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1804                                kFakeToken));
1805   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1806                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1807                                kFakeSelector));
1808   EXPECT_EQ(context.credentials()->DebugString(),
1809             kExpectedFakeCredsDebugString);
1810 }
1811 
1812 class CredentialsInterceptor : public experimental::Interceptor {
1813  public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1814   explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1815       : info_(info) {}
1816 
Intercept(experimental::InterceptorBatchMethods * methods)1817   void Intercept(experimental::InterceptorBatchMethods* methods) override {
1818     if (methods->QueryInterceptionHookPoint(
1819             experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1820       std::shared_ptr<CallCredentials> creds =
1821           GoogleIAMCredentials(kFakeToken, kFakeSelector);
1822       info_->client_context()->set_credentials(creds);
1823     }
1824     methods->Proceed();
1825   }
1826 
1827  private:
1828   experimental::ClientRpcInfo* info_ = nullptr;
1829 };
1830 
1831 class CredentialsInterceptorFactory
1832     : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1833   CredentialsInterceptor* CreateClientInterceptor(
1834       experimental::ClientRpcInfo* info) override {
1835     return new CredentialsInterceptor(info);
1836   }
1837 };
1838 
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1839 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1840   if (!GetParam().use_interceptors) {
1841     return;
1842   }
1843   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1844       interceptor_creators;
1845   interceptor_creators.push_back(
1846       absl::make_unique<CredentialsInterceptorFactory>());
1847   ResetStub(std::move(interceptor_creators));
1848   EchoRequest request;
1849   EchoResponse response;
1850   ClientContext context;
1851 
1852   request.set_message("Hello");
1853   request.mutable_param()->set_echo_metadata(true);
1854 
1855   Status s = stub_->Echo(&context, request, &response);
1856   EXPECT_EQ(request.message(), response.message());
1857   EXPECT_TRUE(s.ok());
1858   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1859                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1860                                kFakeToken));
1861   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1862                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1863                                kFakeSelector));
1864   EXPECT_EQ(context.credentials()->DebugString(),
1865             kExpectedFakeCredsDebugString);
1866 }
1867 
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1868 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1869   if (!GetParam().use_interceptors) {
1870     return;
1871   }
1872   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1873       interceptor_creators;
1874   interceptor_creators.push_back(
1875       absl::make_unique<CredentialsInterceptorFactory>());
1876   ResetStub(std::move(interceptor_creators));
1877   EchoRequest request;
1878   EchoResponse response;
1879   ClientContext context;
1880   std::shared_ptr<CallCredentials> creds1 =
1881       GoogleIAMCredentials(kWrongToken, kWrongSelector);
1882   context.set_credentials(creds1);
1883   EXPECT_EQ(context.credentials(), creds1);
1884   EXPECT_EQ(context.credentials()->DebugString(),
1885             kExpectedWrongCredsDebugString);
1886   request.set_message("Hello");
1887   request.mutable_param()->set_echo_metadata(true);
1888 
1889   Status s = stub_->Echo(&context, request, &response);
1890   EXPECT_EQ(request.message(), response.message());
1891   EXPECT_TRUE(s.ok());
1892   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1893                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1894                                kFakeToken));
1895   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1897                                kFakeSelector));
1898   EXPECT_EQ(context.credentials()->DebugString(),
1899             kExpectedFakeCredsDebugString);
1900 }
1901 
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1902 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1903   ResetStub();
1904   EchoRequest request;
1905   EchoResponse response;
1906   ClientContext context;
1907   std::shared_ptr<CallCredentials> creds1 =
1908       GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1909   context.set_credentials(creds1);
1910   EXPECT_EQ(context.credentials(), creds1);
1911   EXPECT_EQ(context.credentials()->DebugString(),
1912             kExpectedFakeCreds1DebugString);
1913   std::shared_ptr<CallCredentials> creds2 =
1914       GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1915   context.set_credentials(creds2);
1916   EXPECT_EQ(context.credentials(), creds2);
1917   request.set_message("Hello");
1918   request.mutable_param()->set_echo_metadata(true);
1919 
1920   Status s = stub_->Echo(&context, request, &response);
1921   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1922                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1923                                kFakeToken2));
1924   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1925                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1926                                kFakeSelector2));
1927   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1928                                 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1929                                 kFakeToken1));
1930   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1931                                 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1932                                 kFakeSelector1));
1933   EXPECT_EQ(context.credentials()->DebugString(),
1934             kExpectedFakeCreds2DebugString);
1935   EXPECT_EQ(request.message(), response.message());
1936   EXPECT_TRUE(s.ok());
1937 }
1938 
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1939 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1940   ResetStub();
1941   EchoRequest request;
1942   EchoResponse response;
1943   ClientContext context;
1944   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1945       std::unique_ptr<MetadataCredentialsPlugin>(
1946           new TestMetadataCredentialsPlugin(
1947               TestMetadataCredentialsPlugin::kBadMetadataKey,
1948               "Does not matter, will fail the key is invalid.", false, true,
1949               0))));
1950   request.set_message("Hello");
1951 
1952   Status s = stub_->Echo(&context, request, &response);
1953   EXPECT_FALSE(s.ok());
1954   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1955   EXPECT_EQ(context.credentials()->DebugString(),
1956             kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1957 }
1958 
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1959 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1960   ResetStub();
1961   EchoRequest request;
1962   EchoResponse response;
1963   ClientContext context;
1964   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1965       std::unique_ptr<MetadataCredentialsPlugin>(
1966           new TestMetadataCredentialsPlugin(
1967               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1968               "With illegal \n value.", false, true, 0))));
1969   request.set_message("Hello");
1970 
1971   Status s = stub_->Echo(&context, request, &response);
1972   EXPECT_FALSE(s.ok());
1973   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1974   EXPECT_EQ(context.credentials()->DebugString(),
1975             kExpectedAuthMetadataPluginValueFailureCredsDebugString);
1976 }
1977 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)1978 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
1979   ResetStub();
1980   EchoRequest request;
1981   request.mutable_param()->set_skip_cancelled_check(true);
1982   EchoResponse response;
1983   ClientContext context;
1984   const int delay = 100;
1985   std::chrono::system_clock::time_point deadline =
1986       std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
1987   context.set_deadline(deadline);
1988   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1989       std::unique_ptr<MetadataCredentialsPlugin>(
1990           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
1991                                             true, delay))));
1992   request.set_message("Hello");
1993 
1994   Status s = stub_->Echo(&context, request, &response);
1995   if (!s.ok()) {
1996     EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
1997                 s.error_code() == StatusCode::UNAVAILABLE);
1998   }
1999   EXPECT_EQ(context.credentials()->DebugString(),
2000             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2001 }
2002 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2003 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2004   ResetStub();
2005   EchoRequest request;
2006   request.mutable_param()->set_skip_cancelled_check(true);
2007   EchoResponse response;
2008   ClientContext context;
2009   const int delay = 100;
2010   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2011       std::unique_ptr<MetadataCredentialsPlugin>(
2012           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2013                                             true, delay))));
2014   request.set_message("Hello");
2015 
2016   std::thread cancel_thread([&] {
2017     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2018                                  gpr_time_from_millis(delay, GPR_TIMESPAN)));
2019     context.TryCancel();
2020   });
2021   Status s = stub_->Echo(&context, request, &response);
2022   if (!s.ok()) {
2023     EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2024                 s.error_code() == StatusCode::UNAVAILABLE);
2025   }
2026   cancel_thread.join();
2027   EXPECT_EQ(context.credentials()->DebugString(),
2028             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2029 }
2030 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2031 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2032   ResetStub();
2033   EchoRequest request;
2034   EchoResponse response;
2035   ClientContext context;
2036   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2037       std::unique_ptr<MetadataCredentialsPlugin>(
2038           new TestMetadataCredentialsPlugin(
2039               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2040               "Does not matter, will fail anyway (see 3rd param)", false, false,
2041               0))));
2042   request.set_message("Hello");
2043 
2044   Status s = stub_->Echo(&context, request, &response);
2045   EXPECT_FALSE(s.ok());
2046   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2047   EXPECT_EQ(s.error_message(),
2048             std::string("Getting metadata from plugin failed with error: ") +
2049                 kTestCredsPluginErrorMsg);
2050   EXPECT_EQ(context.credentials()->DebugString(),
2051             kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2052 }
2053 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2054 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2055   auto* processor = new TestAuthMetadataProcessor(false);
2056   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2057   ResetStub();
2058   EchoRequest request;
2059   EchoResponse response;
2060   ClientContext context;
2061   context.set_credentials(processor->GetCompatibleClientCreds());
2062   request.set_message("Hello");
2063   request.mutable_param()->set_echo_metadata(true);
2064   request.mutable_param()->set_expected_client_identity(
2065       TestAuthMetadataProcessor::kGoodGuy);
2066   request.mutable_param()->set_expected_transport_security_type(
2067       GetParam().credentials_type);
2068 
2069   Status s = stub_->Echo(&context, request, &response);
2070   EXPECT_EQ(request.message(), response.message());
2071   EXPECT_TRUE(s.ok());
2072 
2073   // Metadata should have been consumed by the processor.
2074   EXPECT_FALSE(MetadataContains(
2075       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2076       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2077   EXPECT_EQ(
2078       context.credentials()->DebugString(),
2079       kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2080 }
2081 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2082 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2083   auto* processor = new TestAuthMetadataProcessor(false);
2084   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2085   ResetStub();
2086   EchoRequest request;
2087   EchoResponse response;
2088   ClientContext context;
2089   context.set_credentials(processor->GetIncompatibleClientCreds());
2090   request.set_message("Hello");
2091 
2092   Status s = stub_->Echo(&context, request, &response);
2093   EXPECT_FALSE(s.ok());
2094   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2095   EXPECT_EQ(
2096       context.credentials()->DebugString(),
2097       kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2098 }
2099 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2100 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2101   ResetStub();
2102   EchoRequest request;
2103   EchoResponse response;
2104   ClientContext context;
2105   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2106       std::unique_ptr<MetadataCredentialsPlugin>(
2107           new TestMetadataCredentialsPlugin(
2108               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2109               "Does not matter, will fail anyway (see 3rd param)", true, false,
2110               0))));
2111   request.set_message("Hello");
2112 
2113   Status s = stub_->Echo(&context, request, &response);
2114   EXPECT_FALSE(s.ok());
2115   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2116   EXPECT_EQ(s.error_message(),
2117             std::string("Getting metadata from plugin failed with error: ") +
2118                 kTestCredsPluginErrorMsg);
2119   EXPECT_EQ(context.credentials()->DebugString(),
2120             kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2121 }
2122 
TEST_P(SecureEnd2endTest,CompositeCallCreds)2123 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2124   ResetStub();
2125   EchoRequest request;
2126   EchoResponse response;
2127   ClientContext context;
2128   const char kMetadataKey1[] = "call-creds-key1";
2129   const char kMetadataKey2[] = "call-creds-key2";
2130   const char kMetadataVal1[] = "call-creds-val1";
2131   const char kMetadataVal2[] = "call-creds-val2";
2132 
2133   context.set_credentials(grpc::CompositeCallCredentials(
2134       grpc::MetadataCredentialsFromPlugin(
2135           std::unique_ptr<MetadataCredentialsPlugin>(
2136               new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2137                                                 true, true, 0))),
2138       grpc::MetadataCredentialsFromPlugin(
2139           std::unique_ptr<MetadataCredentialsPlugin>(
2140               new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2141                                                 true, true, 0)))));
2142   request.set_message("Hello");
2143   request.mutable_param()->set_echo_metadata(true);
2144 
2145   Status s = stub_->Echo(&context, request, &response);
2146   EXPECT_TRUE(s.ok());
2147   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2148                                kMetadataKey1, kMetadataVal1));
2149   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2150                                kMetadataKey2, kMetadataVal2));
2151   EXPECT_EQ(context.credentials()->DebugString(),
2152             kExpectedCompositeCallCredsDebugString);
2153 }
2154 
TEST_P(SecureEnd2endTest,ClientAuthContext)2155 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2156   ResetStub();
2157   EchoRequest request;
2158   EchoResponse response;
2159   request.set_message("Hello");
2160   request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
2161                                                   kTlsCredentialsType);
2162   request.mutable_param()->set_expected_transport_security_type(
2163       GetParam().credentials_type);
2164   ClientContext context;
2165   Status s = stub_->Echo(&context, request, &response);
2166   EXPECT_EQ(response.message(), request.message());
2167   EXPECT_TRUE(s.ok());
2168 
2169   std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2170   std::vector<grpc::string_ref> tst =
2171       auth_ctx->FindPropertyValues("transport_security_type");
2172   ASSERT_EQ(1u, tst.size());
2173   EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
2174   if (GetParam().credentials_type == kTlsCredentialsType) {
2175     EXPECT_EQ("x509_subject_alternative_name",
2176               auth_ctx->GetPeerIdentityPropertyName());
2177     EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2178     EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2179     EXPECT_EQ("waterzooi.test.google.be",
2180               ToString(auth_ctx->GetPeerIdentity()[1]));
2181     EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2182     EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2183   }
2184 }
2185 
2186 class ResourceQuotaEnd2endTest : public End2endTest {
2187  public:
ResourceQuotaEnd2endTest()2188   ResourceQuotaEnd2endTest()
2189       : server_resource_quota_("server_resource_quota") {}
2190 
ConfigureServerBuilder(ServerBuilder * builder)2191   void ConfigureServerBuilder(ServerBuilder* builder) override {
2192     builder->SetResourceQuota(server_resource_quota_);
2193   }
2194 
2195  private:
2196   ResourceQuota server_resource_quota_;
2197 };
2198 
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2199 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2200   ResetStub();
2201 
2202   EchoRequest request;
2203   EchoResponse response;
2204   request.set_message("Hello");
2205 
2206   ClientContext context;
2207   Status s = stub_->Echo(&context, request, &response);
2208   EXPECT_EQ(response.message(), request.message());
2209   EXPECT_TRUE(s.ok());
2210 }
2211 
2212 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2213 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2214                                               bool test_insecure,
2215                                               bool test_secure,
2216                                               bool test_inproc,
2217                                               bool test_callback_server) {
2218   std::vector<TestScenario> scenarios;
2219   std::vector<std::string> credentials_types;
2220 
2221   GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms,
2222                         kClientChannelBackupPollIntervalMs);
2223 #if TARGET_OS_IPHONE
2224   // Workaround Apple CFStream bug
2225   gpr_setenv("grpc_cfstream", "0");
2226 #endif
2227 
2228   if (test_secure) {
2229     credentials_types =
2230         GetCredentialsProvider()->GetSecureCredentialsTypeList();
2231   }
2232   auto insec_ok = [] {
2233     // Only allow insecure credentials type when it is registered with the
2234     // provider. User may create providers that do not have insecure.
2235     return GetCredentialsProvider()->GetChannelCredentials(
2236                kInsecureCredentialsType, nullptr) != nullptr;
2237   };
2238   if (test_insecure && insec_ok()) {
2239     credentials_types.push_back(kInsecureCredentialsType);
2240   }
2241 
2242   // Test callback with inproc or if the event-engine allows it
2243   GPR_ASSERT(!credentials_types.empty());
2244   for (const auto& cred : credentials_types) {
2245     scenarios.emplace_back(false, false, false, cred, false);
2246     scenarios.emplace_back(true, false, false, cred, false);
2247     if (test_callback_server) {
2248       // Note that these scenarios will be dynamically disabled if the event
2249       // engine doesn't run in the background
2250       scenarios.emplace_back(false, false, false, cred, true);
2251       scenarios.emplace_back(true, false, false, cred, true);
2252     }
2253     if (use_proxy) {
2254       scenarios.emplace_back(false, true, false, cred, false);
2255       scenarios.emplace_back(true, true, false, cred, false);
2256     }
2257   }
2258   if (test_inproc && insec_ok()) {
2259     scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2260     scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2261     if (test_callback_server) {
2262       scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2263                              true);
2264       scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2265     }
2266   }
2267   return scenarios;
2268 }
2269 
2270 INSTANTIATE_TEST_SUITE_P(
2271     End2end, End2endTest,
2272     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2273 
2274 INSTANTIATE_TEST_SUITE_P(
2275     End2endServerTryCancel, End2endServerTryCancelTest,
2276     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2277 
2278 INSTANTIATE_TEST_SUITE_P(
2279     ProxyEnd2end, ProxyEnd2endTest,
2280     ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)));
2281 
2282 INSTANTIATE_TEST_SUITE_P(
2283     SecureEnd2end, SecureEnd2endTest,
2284     ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)));
2285 
2286 INSTANTIATE_TEST_SUITE_P(
2287     ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2288     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2289 
2290 }  // namespace
2291 }  // namespace testing
2292 }  // namespace grpc
2293 
main(int argc,char ** argv)2294 int main(int argc, char** argv) {
2295   grpc::testing::TestEnvironment env(argc, argv);
2296   ::testing::InitGoogleTest(&argc, argv);
2297   int ret = RUN_ALL_TESTS();
2298   return ret;
2299 }
2300