1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24
25 #include <errno.h>
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "absl/strings/str_cat.h"
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/time.h>
35
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/executor.h"
41 #include "src/core/lib/iomgr/iomgr_internal.h"
42 #include "src/core/lib/iomgr/sockaddr.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_client_posix.h"
46 #include "src/core/lib/iomgr/tcp_posix.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/iomgr/unix_sockets_posix.h"
49 #include "src/core/lib/slice/slice_internal.h"
50
51 extern grpc_core::TraceFlag grpc_tcp_trace;
52
53 struct async_connect {
54 gpr_mu mu;
55 grpc_fd* fd;
56 grpc_timer alarm;
57 grpc_closure on_alarm;
58 int refs;
59 grpc_closure write_closure;
60 grpc_pollset_set* interested_parties;
61 std::string addr_str;
62 grpc_endpoint** ep;
63 grpc_closure* closure;
64 grpc_channel_args* channel_args;
65 grpc_slice_allocator* slice_allocator;
66 };
67
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)68 static grpc_error_handle prepare_socket(const grpc_resolved_address* addr,
69 int fd,
70 const grpc_channel_args* channel_args) {
71 grpc_error_handle err = GRPC_ERROR_NONE;
72
73 GPR_ASSERT(fd >= 0);
74
75 err = grpc_set_socket_nonblocking(fd, 1);
76 if (err != GRPC_ERROR_NONE) goto error;
77 err = grpc_set_socket_cloexec(fd, 1);
78 if (err != GRPC_ERROR_NONE) goto error;
79 if (!grpc_is_unix_socket(addr)) {
80 err = grpc_set_socket_low_latency(fd, 1);
81 if (err != GRPC_ERROR_NONE) goto error;
82 err = grpc_set_socket_reuse_addr(fd, 1);
83 if (err != GRPC_ERROR_NONE) goto error;
84 err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
85 true /* is_client */);
86 if (err != GRPC_ERROR_NONE) goto error;
87 }
88 err = grpc_set_socket_no_sigpipe_if_possible(fd);
89 if (err != GRPC_ERROR_NONE) goto error;
90
91 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE,
92 channel_args);
93 if (err != GRPC_ERROR_NONE) goto error;
94
95 goto done;
96
97 error:
98 if (fd >= 0) {
99 close(fd);
100 }
101 done:
102 return err;
103 }
104
tc_on_alarm(void * acp,grpc_error_handle error)105 static void tc_on_alarm(void* acp, grpc_error_handle error) {
106 int done;
107 async_connect* ac = static_cast<async_connect*>(acp);
108 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
109 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
110 ac->addr_str.c_str(), grpc_error_std_string(error).c_str());
111 }
112 gpr_mu_lock(&ac->mu);
113 if (ac->fd != nullptr) {
114 grpc_fd_shutdown(
115 ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
116 }
117 done = (--ac->refs == 0);
118 gpr_mu_unlock(&ac->mu);
119 if (done) {
120 gpr_mu_destroy(&ac->mu);
121 if (ac->slice_allocator != nullptr) {
122 grpc_slice_allocator_destroy(ac->slice_allocator);
123 }
124 grpc_channel_args_destroy(ac->channel_args);
125 delete ac;
126 }
127 }
128
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str,grpc_slice_allocator * slice_allocator)129 grpc_endpoint* grpc_tcp_client_create_from_fd(
130 grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str,
131 grpc_slice_allocator* slice_allocator) {
132 return grpc_tcp_create(fd, channel_args, addr_str, slice_allocator);
133 }
134
on_writable(void * acp,grpc_error_handle error)135 static void on_writable(void* acp, grpc_error_handle error) {
136 async_connect* ac = static_cast<async_connect*>(acp);
137 int so_error = 0;
138 socklen_t so_error_size;
139 int err;
140 int done;
141 grpc_endpoint** ep = ac->ep;
142 grpc_closure* closure = ac->closure;
143 grpc_fd* fd;
144
145 (void)GRPC_ERROR_REF(error);
146
147 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
148 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s",
149 ac->addr_str.c_str(), grpc_error_std_string(error).c_str());
150 }
151
152 gpr_mu_lock(&ac->mu);
153 GPR_ASSERT(ac->fd);
154 fd = ac->fd;
155 ac->fd = nullptr;
156 gpr_mu_unlock(&ac->mu);
157
158 grpc_timer_cancel(&ac->alarm);
159
160 gpr_mu_lock(&ac->mu);
161 if (error != GRPC_ERROR_NONE) {
162 error =
163 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
164 goto finish;
165 }
166
167 do {
168 so_error_size = sizeof(so_error);
169 err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
170 &so_error_size);
171 } while (err < 0 && errno == EINTR);
172 if (err < 0) {
173 error = GRPC_OS_ERROR(errno, "getsockopt");
174 goto finish;
175 }
176
177 switch (so_error) {
178 case 0:
179 grpc_pollset_set_del_fd(ac->interested_parties, fd);
180 *ep = grpc_tcp_client_create_from_fd(
181 fd, ac->channel_args, ac->addr_str.c_str(), ac->slice_allocator);
182 ac->slice_allocator = nullptr;
183 fd = nullptr;
184 break;
185 case ENOBUFS:
186 /* We will get one of these errors if we have run out of
187 memory in the kernel for the data structures allocated
188 when you connect a socket. If this happens it is very
189 likely that if we wait a little bit then try again the
190 connection will work (since other programs or this
191 program will close their network connections and free up
192 memory). This does _not_ indicate that there is anything
193 wrong with the server we are connecting to, this is a
194 local problem.
195
196 If you are looking at this code, then chances are that
197 your program or another program on the same computer
198 opened too many network connections. The "easy" fix:
199 don't do that! */
200 gpr_log(GPR_ERROR, "kernel out of buffers");
201 gpr_mu_unlock(&ac->mu);
202 grpc_fd_notify_on_write(fd, &ac->write_closure);
203 return;
204 case ECONNREFUSED:
205 /* This error shouldn't happen for anything other than connect(). */
206 error = GRPC_OS_ERROR(so_error, "connect");
207 break;
208 default:
209 /* We don't really know which syscall triggered the problem here,
210 so punt by reporting getsockopt(). */
211 error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
212 break;
213 }
214
215 finish:
216 if (fd != nullptr) {
217 grpc_pollset_set_del_fd(ac->interested_parties, fd);
218 grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
219 fd = nullptr;
220 }
221 done = (--ac->refs == 0);
222 gpr_mu_unlock(&ac->mu);
223 if (error != GRPC_ERROR_NONE) {
224 std::string str;
225 bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
226 GPR_ASSERT(ret);
227 std::string description =
228 absl::StrCat("Failed to connect to remote host: ", str);
229 error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, description);
230 error =
231 grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
232 }
233 if (done) {
234 // This is safe even outside the lock, because "done", the sentinel, is
235 // populated *inside* the lock.
236 gpr_mu_destroy(&ac->mu);
237 if (ac->slice_allocator != nullptr) {
238 grpc_slice_allocator_destroy(ac->slice_allocator);
239 ac->slice_allocator = nullptr;
240 }
241 grpc_channel_args_destroy(ac->channel_args);
242 delete ac;
243 }
244 // Push async connect closure to the executor since this may actually be
245 // called during the shutdown process, in which case a deadlock could form
246 // between the core shutdown mu and the connector mu (b/188239051)
247 grpc_core::Executor::Run(closure, error);
248 }
249
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)250 grpc_error_handle grpc_tcp_client_prepare_fd(
251 const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
252 grpc_resolved_address* mapped_addr, int* fd) {
253 grpc_dualstack_mode dsmode;
254 grpc_error_handle error;
255 *fd = -1;
256 /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
257 v6. */
258 if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
259 /* addr is v4 mapped to v6 or v6. */
260 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
261 }
262 error =
263 grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
264 if (error != GRPC_ERROR_NONE) {
265 return error;
266 }
267 if (dsmode == GRPC_DSMODE_IPV4) {
268 /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
269 if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
270 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
271 }
272 }
273 if ((error = prepare_socket(mapped_addr, *fd, channel_args)) !=
274 GRPC_ERROR_NONE) {
275 return error;
276 }
277 return GRPC_ERROR_NONE;
278 }
279
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep,grpc_slice_allocator * slice_allocator)280 void grpc_tcp_client_create_from_prepared_fd(
281 grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
282 const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
283 grpc_millis deadline, grpc_endpoint** ep,
284 grpc_slice_allocator* slice_allocator) {
285 int err;
286 do {
287 err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
288 addr->len);
289 } while (err < 0 && errno == EINTR);
290
291 std::string name = absl::StrCat("tcp-client:", grpc_sockaddr_to_uri(addr));
292 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
293
294 if (err >= 0) {
295 *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args,
296 grpc_sockaddr_to_uri(addr).c_str(),
297 slice_allocator);
298 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
299 return;
300 }
301 if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
302 grpc_slice_allocator_destroy(slice_allocator);
303 grpc_error_handle error = GRPC_OS_ERROR(errno, "connect");
304 error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
305 grpc_sockaddr_to_uri(addr));
306 grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
307 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
308 return;
309 }
310
311 grpc_pollset_set_add_fd(interested_parties, fdobj);
312
313 async_connect* ac = new async_connect();
314 ac->closure = closure;
315 ac->ep = ep;
316 ac->fd = fdobj;
317 ac->interested_parties = interested_parties;
318 ac->addr_str = grpc_sockaddr_to_uri(addr);
319 gpr_mu_init(&ac->mu);
320 ac->refs = 2;
321 ac->slice_allocator = slice_allocator;
322 GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
323 grpc_schedule_on_exec_ctx);
324 ac->channel_args = grpc_channel_args_copy(channel_args);
325
326 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
327 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
328 ac->addr_str.c_str(), fdobj);
329 }
330
331 gpr_mu_lock(&ac->mu);
332 GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
333 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
334 grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
335 gpr_mu_unlock(&ac->mu);
336 }
337
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_slice_allocator * slice_allocator,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)338 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
339 grpc_slice_allocator* slice_allocator,
340 grpc_pollset_set* interested_parties,
341 const grpc_channel_args* channel_args,
342 const grpc_resolved_address* addr,
343 grpc_millis deadline) {
344 grpc_resolved_address mapped_addr;
345 int fd = -1;
346 grpc_error_handle error;
347 *ep = nullptr;
348 if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
349 &fd)) != GRPC_ERROR_NONE) {
350 grpc_slice_allocator_destroy(slice_allocator);
351 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
352 return;
353 }
354 grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
355 channel_args, &mapped_addr, deadline,
356 ep, slice_allocator);
357 }
358
359 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
360 #endif
361