1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <fcntl.h>
22 #include <netinet/in.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29
30 #include <functional>
31 #include <set>
32 #include <thread>
33
34 #include <gmock/gmock.h>
35
36 #include "absl/memory/memory.h"
37 #include "absl/strings/str_cat.h"
38
39 #include <grpc/grpc.h>
40 #include <grpc/grpc_security.h>
41 #include <grpc/slice.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/string_util.h>
45 #include <grpc/support/time.h>
46 #include <grpcpp/impl/codegen/service_type.h>
47 #include <grpcpp/server_builder.h>
48
49 #include "src/core/lib/gpr/useful.h"
50 #include "src/core/lib/gprpp/host_port.h"
51 #include "src/core/lib/gprpp/thd.h"
52 #include "src/core/lib/iomgr/error.h"
53 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
54 #include "src/core/lib/security/credentials/credentials.h"
55 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "test/core/end2end/cq_verifier.h"
58 #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
59 #include "test/core/util/memory_counters.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/test_config.h"
62
63 namespace {
64
65 const int kFakeHandshakeServerMaxConcurrentStreams = 40;
66
drain_cq(grpc_completion_queue * cq)67 void drain_cq(grpc_completion_queue* cq) {
68 grpc_event ev;
69 do {
70 ev = grpc_completion_queue_next(
71 cq, grpc_timeout_milliseconds_to_deadline(5000), nullptr);
72 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
73 }
74
create_secure_channel_for_test(const char * server_addr,const char * fake_handshake_server_addr,int reconnect_backoff_ms)75 grpc_channel* create_secure_channel_for_test(
76 const char* server_addr, const char* fake_handshake_server_addr,
77 int reconnect_backoff_ms) {
78 grpc_alts_credentials_options* alts_options =
79 grpc_alts_credentials_client_options_create();
80 grpc_channel_credentials* channel_creds =
81 grpc_alts_credentials_create_customized(alts_options,
82 fake_handshake_server_addr,
83 true /* enable_untrusted_alts */);
84 grpc_alts_credentials_options_destroy(alts_options);
85 // The main goal of these tests are to stress concurrent ALTS handshakes,
86 // so we prevent subchnannel sharing.
87 std::vector<grpc_arg> new_args;
88 new_args.push_back(grpc_channel_arg_integer_create(
89 const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
90 if (reconnect_backoff_ms != 0) {
91 new_args.push_back(grpc_channel_arg_integer_create(
92 const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"),
93 reconnect_backoff_ms));
94 }
95 grpc_channel_args* channel_args =
96 grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size());
97 grpc_channel* channel = grpc_secure_channel_create(channel_creds, server_addr,
98 channel_args, nullptr);
99 grpc_channel_args_destroy(channel_args);
100 grpc_channel_credentials_release(channel_creds);
101 return channel;
102 }
103
104 class FakeHandshakeServer {
105 public:
FakeHandshakeServer(bool check_num_concurrent_rpcs)106 explicit FakeHandshakeServer(bool check_num_concurrent_rpcs) {
107 int port = grpc_pick_unused_port_or_die();
108 address_ = grpc_core::JoinHostPort("localhost", port);
109 if (check_num_concurrent_rpcs) {
110 service_ = grpc::gcp::
111 CreateFakeHandshakerService(kFakeHandshakeServerMaxConcurrentStreams /* expected max concurrent rpcs */);
112 } else {
113 service_ = grpc::gcp::CreateFakeHandshakerService(
114 0 /* expected max concurrent rpcs unset */);
115 }
116 grpc::ServerBuilder builder;
117 builder.AddListeningPort(address_.c_str(),
118 grpc::InsecureServerCredentials());
119 builder.RegisterService(service_.get());
120 // TODO(apolcyn): when removing the global concurrent handshake limiting
121 // queue, set MAX_CONCURRENT_STREAMS on this server.
122 server_ = builder.BuildAndStart();
123 gpr_log(GPR_INFO, "Fake handshaker server listening on %s",
124 address_.c_str());
125 }
126
~FakeHandshakeServer()127 ~FakeHandshakeServer() {
128 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
129 }
130
address()131 const char* address() { return address_.c_str(); }
132
133 private:
134 std::string address_;
135 std::unique_ptr<grpc::Service> service_;
136 std::unique_ptr<grpc::Server> server_;
137 };
138
139 class TestServer {
140 public:
TestServer()141 explicit TestServer()
142 : fake_handshake_server_(true /* check num concurrent rpcs */) {
143 grpc_alts_credentials_options* alts_options =
144 grpc_alts_credentials_server_options_create();
145 grpc_server_credentials* server_creds =
146 grpc_alts_server_credentials_create_customized(
147 alts_options, fake_handshake_server_.address(),
148 true /* enable_untrusted_alts */);
149 grpc_alts_credentials_options_destroy(alts_options);
150 server_ = grpc_server_create(nullptr, nullptr);
151 server_cq_ = grpc_completion_queue_create_for_next(nullptr);
152 grpc_server_register_completion_queue(server_, server_cq_, nullptr);
153 int port = grpc_pick_unused_port_or_die();
154 server_addr_ = grpc_core::JoinHostPort("localhost", port);
155 GPR_ASSERT(grpc_server_add_secure_http2_port(server_, server_addr_.c_str(),
156 server_creds));
157 grpc_server_credentials_release(server_creds);
158 grpc_server_start(server_);
159 gpr_log(GPR_DEBUG, "Start TestServer %p. listen on %s", this,
160 server_addr_.c_str());
161 server_thd_ = absl::make_unique<std::thread>(PollUntilShutdown, this);
162 }
163
~TestServer()164 ~TestServer() {
165 gpr_log(GPR_DEBUG, "Begin dtor of TestServer %p", this);
166 grpc_server_shutdown_and_notify(server_, server_cq_, this);
167 server_thd_->join();
168 grpc_server_destroy(server_);
169 grpc_completion_queue_shutdown(server_cq_);
170 drain_cq(server_cq_);
171 grpc_completion_queue_destroy(server_cq_);
172 }
173
address()174 const char* address() { return server_addr_.c_str(); }
175
PollUntilShutdown(const TestServer * self)176 static void PollUntilShutdown(const TestServer* self) {
177 grpc_event ev = grpc_completion_queue_next(
178 self->server_cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
179 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
180 GPR_ASSERT(ev.tag == self);
181 gpr_log(GPR_DEBUG, "TestServer %p stop polling", self);
182 }
183
184 private:
185 grpc_server* server_;
186 grpc_completion_queue* server_cq_;
187 std::unique_ptr<std::thread> server_thd_;
188 std::string server_addr_;
189 // Give this test server its own ALTS handshake server
190 // so that we avoid competing for ALTS handshake server resources (e.g.
191 // available HTTP2 streams on a globally shared handshaker subchannel)
192 // with clients that are trying to do mutual ALTS handshakes
193 // with this server (which could "deadlock" mutual handshakes).
194 // TODO(apolcyn): remove this workaround from this test and have
195 // clients/servers share a single fake handshake server if
196 // the underlying issue needs to be fixed.
197 FakeHandshakeServer fake_handshake_server_;
198 };
199
200 class ConnectLoopRunner {
201 public:
ConnectLoopRunner(const char * server_address,const char * fake_handshake_server_addr,int per_connect_deadline_seconds,size_t loops,grpc_connectivity_state expected_connectivity_states,int reconnect_backoff_ms)202 explicit ConnectLoopRunner(
203 const char* server_address, const char* fake_handshake_server_addr,
204 int per_connect_deadline_seconds, size_t loops,
205 grpc_connectivity_state expected_connectivity_states,
206 int reconnect_backoff_ms)
207 : server_address_(grpc_core::UniquePtr<char>(gpr_strdup(server_address))),
208 fake_handshake_server_addr_(
209 grpc_core::UniquePtr<char>(gpr_strdup(fake_handshake_server_addr))),
210 per_connect_deadline_seconds_(per_connect_deadline_seconds),
211 loops_(loops),
212 expected_connectivity_states_(expected_connectivity_states),
213 reconnect_backoff_ms_(reconnect_backoff_ms) {
214 thd_ = absl::make_unique<std::thread>(ConnectLoop, this);
215 }
216
~ConnectLoopRunner()217 ~ConnectLoopRunner() { thd_->join(); }
218
ConnectLoop(const ConnectLoopRunner * self)219 static void ConnectLoop(const ConnectLoopRunner* self) {
220 for (size_t i = 0; i < self->loops_; i++) {
221 gpr_log(GPR_DEBUG, "runner:%p connect_loop begin loop %ld", self, i);
222 grpc_completion_queue* cq =
223 grpc_completion_queue_create_for_next(nullptr);
224 grpc_channel* channel = create_secure_channel_for_test(
225 self->server_address_.get(), self->fake_handshake_server_addr_.get(),
226 self->reconnect_backoff_ms_);
227 // Connect, forcing an ALTS handshake
228 gpr_timespec connect_deadline =
229 grpc_timeout_seconds_to_deadline(self->per_connect_deadline_seconds_);
230 grpc_connectivity_state state =
231 grpc_channel_check_connectivity_state(channel, 1);
232 ASSERT_EQ(state, GRPC_CHANNEL_IDLE);
233 while (state != self->expected_connectivity_states_) {
234 if (self->expected_connectivity_states_ ==
235 GRPC_CHANNEL_TRANSIENT_FAILURE) {
236 ASSERT_NE(state, GRPC_CHANNEL_READY); // sanity check
237 } else {
238 ASSERT_EQ(self->expected_connectivity_states_, GRPC_CHANNEL_READY);
239 }
240 grpc_channel_watch_connectivity_state(
241 channel, state, gpr_inf_future(GPR_CLOCK_REALTIME), cq, nullptr);
242 grpc_event ev =
243 grpc_completion_queue_next(cq, connect_deadline, nullptr);
244 ASSERT_EQ(ev.type, GRPC_OP_COMPLETE)
245 << "connect_loop runner:" << std::hex << self
246 << " got ev.type:" << ev.type << " i:" << i;
247 ASSERT_TRUE(ev.success);
248 grpc_connectivity_state prev_state = state;
249 state = grpc_channel_check_connectivity_state(channel, 1);
250 if (self->expected_connectivity_states_ ==
251 GRPC_CHANNEL_TRANSIENT_FAILURE &&
252 prev_state == GRPC_CHANNEL_CONNECTING &&
253 state == GRPC_CHANNEL_CONNECTING) {
254 // Detect a race in state checking: if the watch_connectivity_state
255 // completed from prior state "connecting", this could be because the
256 // channel momentarily entered state "transient failure", which is
257 // what we want. However, if the channel immediately re-enters
258 // "connecting" state, then the new state check might still result in
259 // "connecting". A continuous repeat of this can cause this loop to
260 // never terminate in time. So take this scenario to indicate that the
261 // channel momentarily entered transient failure.
262 break;
263 }
264 }
265 grpc_channel_destroy(channel);
266 grpc_completion_queue_shutdown(cq);
267 drain_cq(cq);
268 grpc_completion_queue_destroy(cq);
269 gpr_log(GPR_DEBUG, "runner:%p connect_loop finished loop %ld", self, i);
270 }
271 }
272
273 private:
274 grpc_core::UniquePtr<char> server_address_;
275 grpc_core::UniquePtr<char> fake_handshake_server_addr_;
276 int per_connect_deadline_seconds_;
277 size_t loops_;
278 grpc_connectivity_state expected_connectivity_states_;
279 std::unique_ptr<std::thread> thd_;
280 int reconnect_backoff_ms_;
281 };
282
283 // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
284 // handshake server).
TEST(AltsConcurrentConnectivityTest,TestBasicClientServerHandshakes)285 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
286 FakeHandshakeServer fake_handshake_server(
287 true /* check num concurrent rpcs */);
288 TestServer test_server;
289 {
290 ConnectLoopRunner runner(
291 test_server.address(), fake_handshake_server.address(),
292 5 /* per connect deadline seconds */, 10 /* loops */,
293 GRPC_CHANNEL_READY /* expected connectivity states */,
294 0 /* reconnect_backoff_ms unset */);
295 }
296 }
297
298 /* Run a bunch of concurrent ALTS handshakes on concurrent channels
299 * (using the fake, in-process handshake server). */
TEST(AltsConcurrentConnectivityTest,TestConcurrentClientServerHandshakes)300 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
301 FakeHandshakeServer fake_handshake_server(
302 true /* check num concurrent rpcs */);
303 // Test
304 {
305 TestServer test_server;
306 gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
307 size_t num_concurrent_connects = 50;
308 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
309 gpr_log(GPR_DEBUG,
310 "start performing concurrent expected-to-succeed connects");
311 for (size_t i = 0; i < num_concurrent_connects; i++) {
312 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
313 test_server.address(), fake_handshake_server.address(),
314 15 /* per connect deadline seconds */, 5 /* loops */,
315 GRPC_CHANNEL_READY /* expected connectivity states */,
316 0 /* reconnect_backoff_ms unset */));
317 }
318 connect_loop_runners.clear();
319 gpr_log(GPR_DEBUG,
320 "done performing concurrent expected-to-succeed connects");
321 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
322 gpr_log(GPR_DEBUG, "Test took longer than expected.");
323 abort();
324 }
325 }
326 }
327
328 class FakeTcpServer {
329 public:
330 enum ProcessReadResult {
331 CONTINUE_READING,
332 CLOSE_SOCKET,
333 };
334
335 enum class AcceptMode {
336 kWaitForClientToSendFirstBytes, // useful for emulating ALTS based
337 // grpc servers
338 kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g.
339 // ALTS handshake servers)
340 };
341
FakeTcpServer(AcceptMode accept_mode,const std::function<ProcessReadResult (int,int,int)> & process_read_cb)342 explicit FakeTcpServer(
343 AcceptMode accept_mode,
344 const std::function<ProcessReadResult(int, int, int)>& process_read_cb)
345 : accept_mode_(accept_mode), process_read_cb_(process_read_cb) {
346 port_ = grpc_pick_unused_port_or_die();
347 accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
348 address_ = absl::StrCat("[::]:", port_);
349 GPR_ASSERT(accept_socket_ != -1);
350 if (accept_socket_ == -1) {
351 gpr_log(GPR_ERROR, "Failed to create socket: %d", errno);
352 abort();
353 }
354 int val = 1;
355 if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val,
356 sizeof(val)) != 0) {
357 gpr_log(GPR_ERROR,
358 "Failed to set SO_REUSEADDR on socket bound to [::1]:%d : %d",
359 port_, errno);
360 abort();
361 }
362 if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
363 gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
364 abort();
365 }
366 sockaddr_in6 addr;
367 memset(&addr, 0, sizeof(addr));
368 addr.sin6_family = AF_INET6;
369 addr.sin6_port = htons(port_);
370 (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
371 if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
372 sizeof(addr)) != 0) {
373 gpr_log(GPR_ERROR, "Failed to bind socket to [::1]:%d : %d", port_,
374 errno);
375 abort();
376 }
377 if (listen(accept_socket_, 100)) {
378 gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
379 port_, errno);
380 abort();
381 }
382 gpr_event_init(&stop_ev_);
383 run_server_loop_thd_ = absl::make_unique<std::thread>(RunServerLoop, this);
384 }
385
~FakeTcpServer()386 ~FakeTcpServer() {
387 gpr_log(GPR_DEBUG,
388 "FakeTcpServer stop and "
389 "join server thread");
390 gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
391 run_server_loop_thd_->join();
392 gpr_log(GPR_DEBUG,
393 "FakeTcpServer join server "
394 "thread complete");
395 }
396
address()397 const char* address() { return address_.c_str(); }
398
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)399 static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(
400 int bytes_received_size, int read_error, int s) {
401 if (bytes_received_size < 0 && read_error != EAGAIN &&
402 read_error != EWOULDBLOCK) {
403 gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
404 errno);
405 abort();
406 }
407 if (bytes_received_size >= 0) {
408 gpr_log(GPR_DEBUG,
409 "Fake TCP server received %d bytes from peer socket: %d. Close "
410 "the "
411 "connection.",
412 bytes_received_size, s);
413 return CLOSE_SOCKET;
414 }
415 return CONTINUE_READING;
416 }
417
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)418 static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
419 int read_error, int s) {
420 if (bytes_received_size < 0 && read_error != EAGAIN &&
421 read_error != EWOULDBLOCK) {
422 gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
423 errno);
424 abort();
425 }
426 if (bytes_received_size == 0) {
427 // The peer has shut down the connection.
428 gpr_log(GPR_DEBUG,
429 "Fake TCP server received 0 bytes from peer socket: %d. Close "
430 "the "
431 "connection.",
432 s);
433 return CLOSE_SOCKET;
434 }
435 return CONTINUE_READING;
436 }
437
438 class FakeTcpServerPeer {
439 public:
FakeTcpServerPeer(int fd)440 explicit FakeTcpServerPeer(int fd) : fd_(fd) {}
441
~FakeTcpServerPeer()442 ~FakeTcpServerPeer() { close(fd_); }
443
MaybeContinueSendingSettings()444 void MaybeContinueSendingSettings() {
445 // https://tools.ietf.org/html/rfc7540#section-4.1
446 const std::vector<uint8_t> kEmptyHttp2SettingsFrame = {
447 0x00, 0x00, 0x00, // length
448 0x04, // settings type
449 0x00, // flags
450 0x00, 0x00, 0x00, 0x00 // stream identifier
451 };
452 if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) {
453 int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
454 int bytes_sent =
455 send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
456 bytes_to_send, 0);
457 if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
458 gpr_log(GPR_ERROR,
459 "Fake TCP server encountered unexpected error:%d |%s| "
460 "sending %d bytes on fd:%d",
461 errno, strerror(errno), bytes_to_send, fd_);
462 GPR_ASSERT(0);
463 } else if (bytes_sent > 0) {
464 total_bytes_sent_ += bytes_sent;
465 GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
466 }
467 }
468 }
469
fd()470 int fd() { return fd_; }
471
472 private:
473 int fd_;
474 int total_bytes_sent_ = 0;
475 };
476
477 // Run a loop that periodically, every 10 ms:
478 // 1) Checks if there are any new TCP connections to accept.
479 // 2) Checks if any data has arrived yet on established connections,
480 // and reads from them if so, processing the sockets as configured.
RunServerLoop(FakeTcpServer * self)481 static void RunServerLoop(FakeTcpServer* self) {
482 std::set<std::unique_ptr<FakeTcpServerPeer>> peers;
483 while (!gpr_event_get(&self->stop_ev_)) {
484 int p = accept(self->accept_socket_, nullptr, nullptr);
485 if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
486 gpr_log(GPR_ERROR, "Failed to accept connection: %d", errno);
487 abort();
488 }
489 if (p != -1) {
490 gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
491 if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
492 gpr_log(GPR_ERROR,
493 "Failed to set O_NONBLOCK on peer socket:%d errno:%d", p,
494 errno);
495 abort();
496 }
497 peers.insert(absl::make_unique<FakeTcpServerPeer>(p));
498 }
499 auto it = peers.begin();
500 while (it != peers.end()) {
501 FakeTcpServerPeer* peer = (*it).get();
502 if (self->accept_mode_ == AcceptMode::kEagerlySendSettings) {
503 peer->MaybeContinueSendingSettings();
504 }
505 char buf[100];
506 int bytes_received_size = recv(peer->fd(), buf, 100, 0);
507 ProcessReadResult r =
508 self->process_read_cb_(bytes_received_size, errno, peer->fd());
509 if (r == CLOSE_SOCKET) {
510 it = peers.erase(it);
511 } else {
512 GPR_ASSERT(r == CONTINUE_READING);
513 it++;
514 }
515 }
516 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
517 gpr_time_from_millis(10, GPR_TIMESPAN)));
518 }
519 close(self->accept_socket_);
520 }
521
522 private:
523 int accept_socket_;
524 int port_;
525 gpr_event stop_ev_;
526 std::string address_;
527 std::unique_ptr<std::thread> run_server_loop_thd_;
528 const AcceptMode accept_mode_;
529 std::function<ProcessReadResult(int, int, int)> process_read_cb_;
530 };
531
532 /* This test is intended to make sure that ALTS handshakes we correctly
533 * fail fast when the security handshaker gets an error while reading
534 * from the remote peer, after having earlier sent the first bytes of the
535 * ALTS handshake to the peer, i.e. after getting into the middle of a
536 * handshake. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting)537 TEST(AltsConcurrentConnectivityTest,
538 TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
539 // Don't enforce the number of concurrent rpcs for the fake handshake
540 // server in this test, because this test will involve handshake RPCs
541 // getting cancelled. Because there isn't explicit synchronization between
542 // an ALTS handshake client's RECV_STATUS op completing after call
543 // cancellation, and the corresponding fake handshake server's sync
544 // method handler returning, enforcing a limit on the number of active
545 // RPCs at the fake handshake server would be inherently racey.
546 FakeHandshakeServer fake_handshake_server(
547 false /* check num concurrent rpcs */);
548 // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
549 // it waits for the client to send the first bytes.
550 FakeTcpServer fake_backend_server(
551 FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
552 FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
553 {
554 gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
555 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
556 size_t num_concurrent_connects = 100;
557 gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
558 for (size_t i = 0; i < num_concurrent_connects; i++) {
559 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
560 fake_backend_server.address(), fake_handshake_server.address(),
561 10 /* per connect deadline seconds */, 3 /* loops */,
562 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
563 0 /* reconnect_backoff_ms unset */));
564 }
565 connect_loop_runners.clear();
566 gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
567 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
568 gpr_log(GPR_ERROR,
569 "Exceeded test deadline. ALTS handshakes might not be failing "
570 "fast when the peer endpoint closes the connection abruptly");
571 abort();
572 }
573 }
574 }
575
576 /* This test is intended to make sure that ALTS handshakes correctly
577 * fail fast when the ALTS handshake server fails incoming handshakes fast. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting)578 TEST(AltsConcurrentConnectivityTest,
579 TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
580 // The fake_handshake_server emulates a broken ALTS handshaker, which
581 // is an insecure server. So send settings to the client eagerly.
582 FakeTcpServer fake_handshake_server(
583 FakeTcpServer::AcceptMode::kEagerlySendSettings,
584 FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
585 // The fake_backend_server emulates a secure (ALTS based) server, so wait
586 // for the client to send the first bytes.
587 FakeTcpServer fake_backend_server(
588 FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
589 FakeTcpServer::CloseSocketUponCloseFromPeer);
590 {
591 gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
592 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
593 size_t num_concurrent_connects = 100;
594 gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
595 for (size_t i = 0; i < num_concurrent_connects; i++) {
596 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
597 fake_backend_server.address(), fake_handshake_server.address(),
598 20 /* per connect deadline seconds */, 2 /* loops */,
599 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
600 0 /* reconnect_backoff_ms unset */));
601 }
602 connect_loop_runners.clear();
603 gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
604 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
605 gpr_log(GPR_ERROR,
606 "Exceeded test deadline. ALTS handshakes might not be failing "
607 "fast when the handshake server closes new connections");
608 abort();
609 }
610 }
611 }
612
613 /* This test is intended to make sure that ALTS handshakes correctly
614 * fail fast when the ALTS handshake server is non-responsive, in which case
615 * the overall connection deadline kicks in. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting)616 TEST(AltsConcurrentConnectivityTest,
617 TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
618 // fake_handshake_server emulates an insecure server, so send settings first.
619 // It will be unresponsive for the rest of the connection, though.
620 FakeTcpServer fake_handshake_server(
621 FakeTcpServer::AcceptMode::kEagerlySendSettings,
622 FakeTcpServer::CloseSocketUponCloseFromPeer);
623 // fake_backend_server emulates an ALTS based server, so wait for the client
624 // to send the first bytes.
625 FakeTcpServer fake_backend_server(
626 FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
627 FakeTcpServer::CloseSocketUponCloseFromPeer);
628 {
629 gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
630 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
631 size_t num_concurrent_connects = 100;
632 gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
633 for (size_t i = 0; i < num_concurrent_connects; i++) {
634 connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
635 fake_backend_server.address(), fake_handshake_server.address(),
636 10 /* per connect deadline seconds */, 2 /* loops */,
637 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
638 100 /* reconnect_backoff_ms */));
639 }
640 connect_loop_runners.clear();
641 gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
642 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
643 gpr_log(GPR_ERROR,
644 "Exceeded test deadline. ALTS handshakes might not be failing "
645 "fast when the handshake server is non-response timeout occurs");
646 abort();
647 }
648 }
649 }
650
651 } // namespace
652
main(int argc,char ** argv)653 int main(int argc, char** argv) {
654 ::testing::InitGoogleTest(&argc, argv);
655 grpc::testing::TestEnvironment env(argc, argv);
656 grpc_init();
657 auto result = RUN_ALL_TESTS();
658 grpc_shutdown();
659 return result;
660 }
661