1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24 
25 #include <errno.h>
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include "absl/strings/str_cat.h"
31 
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/time.h>
35 
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/executor.h"
41 #include "src/core/lib/iomgr/iomgr_internal.h"
42 #include "src/core/lib/iomgr/sockaddr.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_client_posix.h"
46 #include "src/core/lib/iomgr/tcp_posix.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/iomgr/unix_sockets_posix.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 
51 extern grpc_core::TraceFlag grpc_tcp_trace;
52 
53 struct async_connect {
54   gpr_mu mu;
55   grpc_fd* fd;
56   grpc_timer alarm;
57   grpc_closure on_alarm;
58   int refs;
59   grpc_closure write_closure;
60   grpc_pollset_set* interested_parties;
61   std::string addr_str;
62   grpc_endpoint** ep;
63   grpc_closure* closure;
64   grpc_channel_args* channel_args;
65   grpc_slice_allocator* slice_allocator;
66 };
67 
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)68 static grpc_error_handle prepare_socket(const grpc_resolved_address* addr,
69                                         int fd,
70                                         const grpc_channel_args* channel_args) {
71   grpc_error_handle err = GRPC_ERROR_NONE;
72 
73   GPR_ASSERT(fd >= 0);
74 
75   err = grpc_set_socket_nonblocking(fd, 1);
76   if (err != GRPC_ERROR_NONE) goto error;
77   err = grpc_set_socket_cloexec(fd, 1);
78   if (err != GRPC_ERROR_NONE) goto error;
79   if (!grpc_is_unix_socket(addr)) {
80     err = grpc_set_socket_low_latency(fd, 1);
81     if (err != GRPC_ERROR_NONE) goto error;
82     err = grpc_set_socket_reuse_addr(fd, 1);
83     if (err != GRPC_ERROR_NONE) goto error;
84     err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
85                                            true /* is_client */);
86     if (err != GRPC_ERROR_NONE) goto error;
87   }
88   err = grpc_set_socket_no_sigpipe_if_possible(fd);
89   if (err != GRPC_ERROR_NONE) goto error;
90 
91   err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE,
92                                           channel_args);
93   if (err != GRPC_ERROR_NONE) goto error;
94 
95   goto done;
96 
97 error:
98   if (fd >= 0) {
99     close(fd);
100   }
101 done:
102   return err;
103 }
104 
tc_on_alarm(void * acp,grpc_error_handle error)105 static void tc_on_alarm(void* acp, grpc_error_handle error) {
106   int done;
107   async_connect* ac = static_cast<async_connect*>(acp);
108   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
109     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
110             ac->addr_str.c_str(), grpc_error_std_string(error).c_str());
111   }
112   gpr_mu_lock(&ac->mu);
113   if (ac->fd != nullptr) {
114     grpc_fd_shutdown(
115         ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
116   }
117   done = (--ac->refs == 0);
118   gpr_mu_unlock(&ac->mu);
119   if (done) {
120     gpr_mu_destroy(&ac->mu);
121     if (ac->slice_allocator != nullptr) {
122       grpc_slice_allocator_destroy(ac->slice_allocator);
123     }
124     grpc_channel_args_destroy(ac->channel_args);
125     delete ac;
126   }
127 }
128 
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str,grpc_slice_allocator * slice_allocator)129 grpc_endpoint* grpc_tcp_client_create_from_fd(
130     grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str,
131     grpc_slice_allocator* slice_allocator) {
132   return grpc_tcp_create(fd, channel_args, addr_str, slice_allocator);
133 }
134 
on_writable(void * acp,grpc_error_handle error)135 static void on_writable(void* acp, grpc_error_handle error) {
136   async_connect* ac = static_cast<async_connect*>(acp);
137   int so_error = 0;
138   socklen_t so_error_size;
139   int err;
140   int done;
141   grpc_endpoint** ep = ac->ep;
142   grpc_closure* closure = ac->closure;
143   grpc_fd* fd;
144 
145   (void)GRPC_ERROR_REF(error);
146 
147   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
148     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s",
149             ac->addr_str.c_str(), grpc_error_std_string(error).c_str());
150   }
151 
152   gpr_mu_lock(&ac->mu);
153   GPR_ASSERT(ac->fd);
154   fd = ac->fd;
155   ac->fd = nullptr;
156   gpr_mu_unlock(&ac->mu);
157 
158   grpc_timer_cancel(&ac->alarm);
159 
160   gpr_mu_lock(&ac->mu);
161   if (error != GRPC_ERROR_NONE) {
162     error =
163         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
164     goto finish;
165   }
166 
167   do {
168     so_error_size = sizeof(so_error);
169     err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
170                      &so_error_size);
171   } while (err < 0 && errno == EINTR);
172   if (err < 0) {
173     error = GRPC_OS_ERROR(errno, "getsockopt");
174     goto finish;
175   }
176 
177   switch (so_error) {
178     case 0:
179       grpc_pollset_set_del_fd(ac->interested_parties, fd);
180       *ep = grpc_tcp_client_create_from_fd(
181           fd, ac->channel_args, ac->addr_str.c_str(), ac->slice_allocator);
182       ac->slice_allocator = nullptr;
183       fd = nullptr;
184       break;
185     case ENOBUFS:
186       /* We will get one of these errors if we have run out of
187          memory in the kernel for the data structures allocated
188          when you connect a socket.  If this happens it is very
189          likely that if we wait a little bit then try again the
190          connection will work (since other programs or this
191          program will close their network connections and free up
192          memory).  This does _not_ indicate that there is anything
193          wrong with the server we are connecting to, this is a
194          local problem.
195 
196          If you are looking at this code, then chances are that
197          your program or another program on the same computer
198          opened too many network connections.  The "easy" fix:
199          don't do that! */
200       gpr_log(GPR_ERROR, "kernel out of buffers");
201       gpr_mu_unlock(&ac->mu);
202       grpc_fd_notify_on_write(fd, &ac->write_closure);
203       return;
204     case ECONNREFUSED:
205       /* This error shouldn't happen for anything other than connect(). */
206       error = GRPC_OS_ERROR(so_error, "connect");
207       break;
208     default:
209       /* We don't really know which syscall triggered the problem here,
210          so punt by reporting getsockopt(). */
211       error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
212       break;
213   }
214 
215 finish:
216   if (fd != nullptr) {
217     grpc_pollset_set_del_fd(ac->interested_parties, fd);
218     grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
219     fd = nullptr;
220   }
221   done = (--ac->refs == 0);
222   gpr_mu_unlock(&ac->mu);
223   if (error != GRPC_ERROR_NONE) {
224     std::string str;
225     bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
226     GPR_ASSERT(ret);
227     std::string description =
228         absl::StrCat("Failed to connect to remote host: ", str);
229     error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, description);
230     error =
231         grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
232   }
233   if (done) {
234     // This is safe even outside the lock, because "done", the sentinel, is
235     // populated *inside* the lock.
236     gpr_mu_destroy(&ac->mu);
237     if (ac->slice_allocator != nullptr) {
238       grpc_slice_allocator_destroy(ac->slice_allocator);
239       ac->slice_allocator = nullptr;
240     }
241     grpc_channel_args_destroy(ac->channel_args);
242     delete ac;
243   }
244   // Push async connect closure to the executor since this may actually be
245   // called during the shutdown process, in which case a deadlock could form
246   // between the core shutdown mu and the connector mu (b/188239051)
247   grpc_core::Executor::Run(closure, error);
248 }
249 
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)250 grpc_error_handle grpc_tcp_client_prepare_fd(
251     const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
252     grpc_resolved_address* mapped_addr, int* fd) {
253   grpc_dualstack_mode dsmode;
254   grpc_error_handle error;
255   *fd = -1;
256   /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
257      v6. */
258   if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
259     /* addr is v4 mapped to v6 or v6. */
260     memcpy(mapped_addr, addr, sizeof(*mapped_addr));
261   }
262   error =
263       grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
264   if (error != GRPC_ERROR_NONE) {
265     return error;
266   }
267   if (dsmode == GRPC_DSMODE_IPV4) {
268     /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
269     if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
270       memcpy(mapped_addr, addr, sizeof(*mapped_addr));
271     }
272   }
273   if ((error = prepare_socket(mapped_addr, *fd, channel_args)) !=
274       GRPC_ERROR_NONE) {
275     return error;
276   }
277   return GRPC_ERROR_NONE;
278 }
279 
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep,grpc_slice_allocator * slice_allocator)280 void grpc_tcp_client_create_from_prepared_fd(
281     grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
282     const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
283     grpc_millis deadline, grpc_endpoint** ep,
284     grpc_slice_allocator* slice_allocator) {
285   int err;
286   do {
287     err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
288                   addr->len);
289   } while (err < 0 && errno == EINTR);
290 
291   std::string name = absl::StrCat("tcp-client:", grpc_sockaddr_to_uri(addr));
292   grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
293 
294   if (err >= 0) {
295     *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args,
296                                          grpc_sockaddr_to_uri(addr).c_str(),
297                                          slice_allocator);
298     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
299     return;
300   }
301   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
302     grpc_slice_allocator_destroy(slice_allocator);
303     grpc_error_handle error = GRPC_OS_ERROR(errno, "connect");
304     error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
305                                grpc_sockaddr_to_uri(addr));
306     grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
307     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
308     return;
309   }
310 
311   grpc_pollset_set_add_fd(interested_parties, fdobj);
312 
313   async_connect* ac = new async_connect();
314   ac->closure = closure;
315   ac->ep = ep;
316   ac->fd = fdobj;
317   ac->interested_parties = interested_parties;
318   ac->addr_str = grpc_sockaddr_to_uri(addr);
319   gpr_mu_init(&ac->mu);
320   ac->refs = 2;
321   ac->slice_allocator = slice_allocator;
322   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
323                     grpc_schedule_on_exec_ctx);
324   ac->channel_args = grpc_channel_args_copy(channel_args);
325 
326   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
327     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
328             ac->addr_str.c_str(), fdobj);
329   }
330 
331   gpr_mu_lock(&ac->mu);
332   GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
333   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
334   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
335   gpr_mu_unlock(&ac->mu);
336 }
337 
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_slice_allocator * slice_allocator,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)338 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
339                         grpc_slice_allocator* slice_allocator,
340                         grpc_pollset_set* interested_parties,
341                         const grpc_channel_args* channel_args,
342                         const grpc_resolved_address* addr,
343                         grpc_millis deadline) {
344   grpc_resolved_address mapped_addr;
345   int fd = -1;
346   grpc_error_handle error;
347   *ep = nullptr;
348   if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
349                                           &fd)) != GRPC_ERROR_NONE) {
350     grpc_slice_allocator_destroy(slice_allocator);
351     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
352     return;
353   }
354   grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
355                                           channel_args, &mapped_addr, deadline,
356                                           ep, slice_allocator);
357 }
358 
359 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
360 #endif
361