1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <string.h>
27 #include <sys/socket.h>
28 #include <unistd.h>
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "src/core/lib/iomgr/pollset_set.h"
37 #include "src/core/lib/iomgr/socket_utils_posix.h"
38 #include "src/core/lib/iomgr/tcp_client.h"
39 #include "src/core/lib/iomgr/timer.h"
40 #include "test/core/util/resource_user_util.h"
41 #include "test/core/util/test_config.h"
42
43 static grpc_pollset_set* g_pollset_set;
44 static gpr_mu* g_mu;
45 static grpc_pollset* g_pollset;
46 static int g_connections_complete = 0;
47 static grpc_endpoint* g_connecting = nullptr;
48
test_deadline(void)49 static grpc_millis test_deadline(void) {
50 return grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
51 }
52
finish_connection()53 static void finish_connection() {
54 gpr_mu_lock(g_mu);
55 g_connections_complete++;
56 grpc_core::ExecCtx exec_ctx;
57 GPR_ASSERT(
58 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
59
60 gpr_mu_unlock(g_mu);
61 }
62
must_succeed(void *,grpc_error_handle error)63 static void must_succeed(void* /*arg*/, grpc_error_handle error) {
64 GPR_ASSERT(g_connecting != nullptr);
65 GPR_ASSERT(error == GRPC_ERROR_NONE);
66 grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
67 "must_succeed called"));
68 grpc_endpoint_destroy(g_connecting);
69 g_connecting = nullptr;
70 finish_connection();
71 }
72
must_fail(void *,grpc_error_handle error)73 static void must_fail(void* /*arg*/, grpc_error_handle error) {
74 GPR_ASSERT(g_connecting == nullptr);
75 GPR_ASSERT(error != GRPC_ERROR_NONE);
76 finish_connection();
77 }
78
test_succeeds(void)79 void test_succeeds(void) {
80 gpr_log(GPR_ERROR, "---- starting test_succeeds() ----");
81 grpc_resolved_address resolved_addr;
82 struct sockaddr_in* addr =
83 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
84 int svr_fd;
85 int r;
86 int connections_complete_before;
87 grpc_closure done;
88 grpc_core::ExecCtx exec_ctx;
89
90 memset(&resolved_addr, 0, sizeof(resolved_addr));
91 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
92 addr->sin_family = AF_INET;
93
94 /* create a phony server */
95 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
96 GPR_ASSERT(svr_fd >= 0);
97 GPR_ASSERT(
98 0 == bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len));
99 GPR_ASSERT(0 == listen(svr_fd, 1));
100
101 gpr_mu_lock(g_mu);
102 connections_complete_before = g_connections_complete;
103 gpr_mu_unlock(g_mu);
104
105 /* connect to it */
106 GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr,
107 (socklen_t*)&resolved_addr.len) == 0);
108 GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
109 grpc_tcp_client_connect(
110 &done, &g_connecting, grpc_slice_allocator_create_unlimited(),
111 g_pollset_set, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
112 /* await the connection */
113 do {
114 resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
115 r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
116 reinterpret_cast<socklen_t*>(&resolved_addr.len));
117 } while (r == -1 && errno == EINTR);
118 GPR_ASSERT(r >= 0);
119 close(r);
120
121 gpr_mu_lock(g_mu);
122
123 while (g_connections_complete == connections_complete_before) {
124 grpc_pollset_worker* worker = nullptr;
125 GPR_ASSERT(GRPC_LOG_IF_ERROR(
126 "pollset_work",
127 grpc_pollset_work(g_pollset, &worker,
128 grpc_timespec_to_millis_round_up(
129 grpc_timeout_seconds_to_deadline(5)))));
130 gpr_mu_unlock(g_mu);
131 grpc_core::ExecCtx::Get()->Flush();
132 gpr_mu_lock(g_mu);
133 }
134
135 gpr_mu_unlock(g_mu);
136 gpr_log(GPR_ERROR, "---- finished test_succeeds() ----");
137 }
138
test_fails(void)139 void test_fails(void) {
140 gpr_log(GPR_ERROR, "---- starting test_fails() ----");
141 grpc_resolved_address resolved_addr;
142 struct sockaddr_in* addr =
143 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
144 int connections_complete_before;
145 grpc_closure done;
146 grpc_core::ExecCtx exec_ctx;
147
148 memset(&resolved_addr, 0, sizeof(resolved_addr));
149 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
150 addr->sin_family = AF_INET;
151
152 gpr_mu_lock(g_mu);
153 connections_complete_before = g_connections_complete;
154 gpr_mu_unlock(g_mu);
155
156 /* connect to a broken address */
157 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
158 grpc_tcp_client_connect(
159 &done, &g_connecting, grpc_slice_allocator_create_unlimited(),
160 g_pollset_set, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
161 gpr_mu_lock(g_mu);
162
163 /* wait for the connection callback to finish */
164 while (g_connections_complete == connections_complete_before) {
165 grpc_pollset_worker* worker = nullptr;
166 grpc_millis polling_deadline = test_deadline();
167 switch (grpc_timer_check(&polling_deadline)) {
168 case GRPC_TIMERS_FIRED:
169 break;
170 case GRPC_TIMERS_NOT_CHECKED:
171 polling_deadline = 0;
172 ABSL_FALLTHROUGH_INTENDED;
173 case GRPC_TIMERS_CHECKED_AND_EMPTY:
174 GPR_ASSERT(GRPC_LOG_IF_ERROR(
175 "pollset_work",
176 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
177 break;
178 }
179 gpr_mu_unlock(g_mu);
180 grpc_core::ExecCtx::Get()->Flush();
181 gpr_mu_lock(g_mu);
182 }
183
184 gpr_mu_unlock(g_mu);
185 gpr_log(GPR_ERROR, "---- finished test_fails() ----");
186 }
187
test_fails_bad_addr_no_leak(void)188 void test_fails_bad_addr_no_leak(void) {
189 gpr_log(GPR_ERROR, "---- starting test_fails_bad_addr_no_leak() ----");
190 grpc_resolved_address resolved_addr;
191 struct sockaddr_in* addr =
192 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
193 int connections_complete_before;
194 grpc_closure done;
195 grpc_core::ExecCtx exec_ctx;
196 memset(&resolved_addr, 0, sizeof(resolved_addr));
197 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
198 // force `grpc_tcp_client_prepare_fd` to fail. contrived, but effective.
199 addr->sin_family = AF_IPX;
200 gpr_mu_lock(g_mu);
201 connections_complete_before = g_connections_complete;
202 gpr_mu_unlock(g_mu);
203 // connect to an invalid address.
204 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
205 grpc_tcp_client_connect(
206 &done, &g_connecting, grpc_slice_allocator_create_unlimited(),
207 g_pollset_set, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
208 gpr_mu_lock(g_mu);
209 while (g_connections_complete == connections_complete_before) {
210 grpc_pollset_worker* worker = nullptr;
211 grpc_millis polling_deadline = test_deadline();
212 switch (grpc_timer_check(&polling_deadline)) {
213 case GRPC_TIMERS_FIRED:
214 break;
215 case GRPC_TIMERS_NOT_CHECKED:
216 polling_deadline = 0;
217 ABSL_FALLTHROUGH_INTENDED;
218 case GRPC_TIMERS_CHECKED_AND_EMPTY:
219 GPR_ASSERT(GRPC_LOG_IF_ERROR(
220 "pollset_work",
221 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
222 break;
223 }
224 gpr_mu_unlock(g_mu);
225 grpc_core::ExecCtx::Get()->Flush();
226 gpr_mu_lock(g_mu);
227 }
228 gpr_mu_unlock(g_mu);
229 gpr_log(GPR_ERROR, "---- finished test_fails_bad_addr_no_leak() ----");
230 }
231
destroy_pollset(void * p,grpc_error_handle)232 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
233 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
234 }
235
main(int argc,char ** argv)236 int main(int argc, char** argv) {
237 grpc_closure destroyed;
238 grpc::testing::TestEnvironment env(argc, argv);
239 grpc_init();
240
241 {
242 grpc_core::ExecCtx exec_ctx;
243 g_pollset_set = grpc_pollset_set_create();
244 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
245 grpc_pollset_init(g_pollset, &g_mu);
246 grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
247
248 test_succeeds();
249 test_fails();
250 test_fails_bad_addr_no_leak();
251 grpc_pollset_set_destroy(g_pollset_set);
252 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
253 grpc_schedule_on_exec_ctx);
254 grpc_pollset_shutdown(g_pollset, &destroyed);
255 }
256
257 grpc_shutdown();
258 gpr_free(g_pollset);
259 return 0;
260 }
261
262 #else /* GRPC_POSIX_SOCKET_TCP_CLIENT */
263
main(int argc,char ** argv)264 int main(int argc, char** argv) { return 1; }
265
266 #endif /* GRPC_POSIX_SOCKET_CLIENT */
267