1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/tcp_custom.h"
22 
23 #include <limits.h>
24 #include <string.h>
25 
26 #include <grpc/slice_buffer.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 
31 #include "src/core/lib/address_utils/sockaddr_utils.h"
32 #include "src/core/lib/iomgr/error.h"
33 #include "src/core/lib/iomgr/iomgr_custom.h"
34 #include "src/core/lib/iomgr/port.h"
35 #include "src/core/lib/iomgr/tcp_client.h"
36 #include "src/core/lib/iomgr/tcp_server.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 
40 #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
41 
42 extern grpc_core::TraceFlag grpc_tcp_trace;
43 
44 grpc_socket_vtable* grpc_custom_socket_vtable = nullptr;
45 extern grpc_tcp_server_vtable custom_tcp_server_vtable;
46 extern grpc_tcp_client_vtable custom_tcp_client_vtable;
47 
grpc_custom_endpoint_init(grpc_socket_vtable * impl)48 void grpc_custom_endpoint_init(grpc_socket_vtable* impl) {
49   grpc_custom_socket_vtable = impl;
50   grpc_set_tcp_client_impl(&custom_tcp_client_vtable);
51   grpc_set_tcp_server_impl(&custom_tcp_server_vtable);
52 }
53 
54 struct custom_tcp_endpoint {
55   grpc_endpoint base;
56   gpr_refcount refcount;
57   grpc_custom_socket* socket;
58 
59   grpc_closure* read_cb = nullptr;
60   grpc_closure* write_cb = nullptr;
61 
62   grpc_slice_buffer* read_slices = nullptr;
63   grpc_slice_buffer* write_slices = nullptr;
64 
65   bool shutting_down;
66 
67   std::string peer_string;
68   std::string local_address;
69 };
tcp_free(grpc_custom_socket * s)70 static void tcp_free(grpc_custom_socket* s) {
71   custom_tcp_endpoint* tcp =
72       reinterpret_cast<custom_tcp_endpoint*>(s->endpoint);
73   delete tcp;
74   s->refs--;
75   if (s->refs == 0) {
76     grpc_custom_socket_vtable->destroy(s);
77     gpr_free(s);
78   }
79 }
80 
81 #ifndef NDEBUG
82 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
83 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(custom_tcp_endpoint * tcp,const char * reason,const char * file,int line)84 static void tcp_unref(custom_tcp_endpoint* tcp, const char* reason,
85                       const char* file, int line) {
86   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
87     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
88     gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
89             "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason,
90             val, val - 1);
91   }
92   if (gpr_unref(&tcp->refcount)) {
93     tcp_free(tcp->socket);
94   }
95 }
96 
tcp_ref(custom_tcp_endpoint * tcp,const char * reason,const char * file,int line)97 static void tcp_ref(custom_tcp_endpoint* tcp, const char* reason,
98                     const char* file, int line) {
99   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
100     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
101     gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
102             "TCP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason,
103             val, val + 1);
104   }
105   gpr_ref(&tcp->refcount);
106 }
107 #else
108 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
109 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(custom_tcp_endpoint * tcp)110 static void tcp_unref(custom_tcp_endpoint* tcp) {
111   if (gpr_unref(&tcp->refcount)) {
112     tcp_free(tcp->socket);
113   }
114 }
115 
tcp_ref(custom_tcp_endpoint * tcp)116 static void tcp_ref(custom_tcp_endpoint* tcp) { gpr_ref(&tcp->refcount); }
117 #endif
118 
call_read_cb(custom_tcp_endpoint * tcp,grpc_error_handle error)119 static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error_handle error) {
120   grpc_closure* cb = tcp->read_cb;
121   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
122     gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp->socket, cb, cb->cb,
123             cb->cb_arg);
124     size_t i;
125     gpr_log(GPR_INFO, "read: error=%s", grpc_error_std_string(error).c_str());
126     for (i = 0; i < tcp->read_slices->count; i++) {
127       char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
128                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
129       gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string.c_str(),
130               dump);
131       gpr_free(dump);
132     }
133   }
134   TCP_UNREF(tcp, "read");
135   tcp->read_slices = nullptr;
136   tcp->read_cb = nullptr;
137   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
138 }
139 
custom_read_callback(grpc_custom_socket * socket,size_t nread,grpc_error_handle error)140 static void custom_read_callback(grpc_custom_socket* socket, size_t nread,
141                                  grpc_error_handle error) {
142   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
143   grpc_core::ExecCtx exec_ctx;
144   grpc_slice_buffer garbage;
145   custom_tcp_endpoint* tcp =
146       reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint);
147   if (error == GRPC_ERROR_NONE && nread == 0) {
148     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
149   }
150   if (error == GRPC_ERROR_NONE) {
151     // Successful read
152     if (nread < tcp->read_slices->length) {
153       /* TODO(murgatroid99): Instead of discarding the unused part of the read
154        * buffer, reuse it as the next read buffer. */
155       grpc_slice_buffer_init(&garbage);
156       grpc_slice_buffer_trim_end(tcp->read_slices,
157                                  tcp->read_slices->length - nread, &garbage);
158       grpc_slice_buffer_reset_and_unref_internal(&garbage);
159     }
160   } else {
161     grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
162   }
163   call_read_cb(tcp, error);
164 }
165 
endpoint_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool)166 static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
167                           grpc_closure* cb, bool /*urgent*/) {
168   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
169   GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
170   GPR_ASSERT(tcp->read_cb == nullptr);
171   tcp->read_cb = cb;
172   tcp->read_slices = read_slices;
173   grpc_slice_buffer_reset_and_unref_internal(read_slices);
174   TCP_REF(tcp, "read");
175   if (tcp->read_slices->length < GRPC_TCP_DEFAULT_READ_SLICE_SIZE) {
176     grpc_slice_buffer_add_indexed(
177         tcp->read_slices, GRPC_SLICE_MALLOC(GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
178   }
179   /* slices[0] should always exist here since we just added it if it did not */
180   char* buffer = reinterpret_cast<char*>(
181       GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]));
182   size_t len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]);
183   grpc_custom_socket_vtable->read(tcp->socket, buffer, len,
184                                   custom_read_callback);
185 }
186 
custom_write_callback(grpc_custom_socket * socket,grpc_error_handle error)187 static void custom_write_callback(grpc_custom_socket* socket,
188                                   grpc_error_handle error) {
189   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
190   grpc_core::ExecCtx exec_ctx;
191   custom_tcp_endpoint* tcp =
192       reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint);
193   grpc_closure* cb = tcp->write_cb;
194   tcp->write_cb = nullptr;
195   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
196     gpr_log(GPR_INFO, "write complete on %p: error=%s", tcp->socket,
197             grpc_error_std_string(error).c_str());
198   }
199   TCP_UNREF(tcp, "write");
200   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
201 }
202 
endpoint_write(grpc_endpoint * ep,grpc_slice_buffer * write_slices,grpc_closure * cb,void *)203 static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
204                            grpc_closure* cb, void* /*arg*/) {
205   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
206   GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
207 
208   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
209     size_t j;
210 
211     for (j = 0; j < write_slices->count; j++) {
212       char* data = grpc_dump_slice(write_slices->slices[j],
213                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
214       gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp->socket,
215               tcp->peer_string.c_str(), data);
216       gpr_free(data);
217     }
218   }
219 
220   if (tcp->shutting_down) {
221     grpc_core::ExecCtx::Run(
222         DEBUG_LOCATION, cb,
223         GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP socket is shutting down"));
224     return;
225   }
226 
227   GPR_ASSERT(tcp->write_cb == nullptr);
228   tcp->write_slices = write_slices;
229   GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
230   if (tcp->write_slices->count == 0) {
231     // No slices means we don't have to do anything
232     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
233     return;
234   }
235   tcp->write_cb = cb;
236   TCP_REF(tcp, "write");
237   grpc_custom_socket_vtable->write(tcp->socket, tcp->write_slices,
238                                    custom_write_callback);
239 }
240 
endpoint_add_to_pollset(grpc_endpoint * ep,grpc_pollset * pollset)241 static void endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
242   // No-op. We're ignoring pollsets currently
243   (void)ep;
244   (void)pollset;
245 }
246 
endpoint_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)247 static void endpoint_add_to_pollset_set(grpc_endpoint* ep,
248                                         grpc_pollset_set* pollset) {
249   // No-op. We're ignoring pollsets currently
250   (void)ep;
251   (void)pollset;
252 }
253 
endpoint_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)254 static void endpoint_delete_from_pollset_set(grpc_endpoint* ep,
255                                              grpc_pollset_set* pollset) {
256   // No-op. We're ignoring pollsets currently
257   (void)ep;
258   (void)pollset;
259 }
260 
endpoint_shutdown(grpc_endpoint * ep,grpc_error_handle why)261 static void endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
262   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
263   if (!tcp->shutting_down) {
264     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
265       gpr_log(GPR_INFO, "TCP %p shutdown why=%s", tcp->socket,
266               grpc_error_std_string(why).c_str());
267     }
268     tcp->shutting_down = true;
269     // grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->read_cb,
270     // GRPC_ERROR_REF(why));
271     // grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->write_cb,
272     // GRPC_ERROR_REF(why)); tcp->read_cb = nullptr; tcp->write_cb = nullptr;
273     grpc_custom_socket_vtable->shutdown(tcp->socket);
274   }
275   GRPC_ERROR_UNREF(why);
276 }
277 
custom_close_callback(grpc_custom_socket * socket)278 static void custom_close_callback(grpc_custom_socket* socket) {
279   socket->refs--;
280   if (socket->refs == 0) {
281     grpc_custom_socket_vtable->destroy(socket);
282     gpr_free(socket);
283   } else if (socket->endpoint) {
284     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
285     grpc_core::ExecCtx exec_ctx;
286     custom_tcp_endpoint* tcp =
287         reinterpret_cast<custom_tcp_endpoint*>(socket->endpoint);
288     TCP_UNREF(tcp, "destroy");
289   }
290 }
291 
endpoint_destroy(grpc_endpoint * ep)292 static void endpoint_destroy(grpc_endpoint* ep) {
293   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
294   grpc_custom_socket_vtable->close(tcp->socket, custom_close_callback);
295 }
296 
endpoint_get_peer(grpc_endpoint * ep)297 static absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
298   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
299   return tcp->peer_string;
300 }
301 
endpoint_get_local_address(grpc_endpoint * ep)302 static absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
303   custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
304   return tcp->local_address;
305 }
306 
endpoint_get_fd(grpc_endpoint *)307 static int endpoint_get_fd(grpc_endpoint* /*ep*/) { return -1; }
308 
endpoint_can_track_err(grpc_endpoint *)309 static bool endpoint_can_track_err(grpc_endpoint* /*ep*/) { return false; }
310 
311 static grpc_endpoint_vtable vtable = {endpoint_read,
312                                       endpoint_write,
313                                       endpoint_add_to_pollset,
314                                       endpoint_add_to_pollset_set,
315                                       endpoint_delete_from_pollset_set,
316                                       endpoint_shutdown,
317                                       endpoint_destroy,
318                                       endpoint_get_peer,
319                                       endpoint_get_local_address,
320                                       endpoint_get_fd,
321                                       endpoint_can_track_err};
322 
custom_tcp_endpoint_create(grpc_custom_socket * socket,const char * peer_string)323 grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket,
324                                           const char* peer_string) {
325   custom_tcp_endpoint* tcp = new custom_tcp_endpoint;
326   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
327   grpc_core::ExecCtx exec_ctx;
328 
329   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
330     gpr_log(GPR_INFO, "Creating TCP endpoint %p", socket);
331   }
332   socket->refs++;
333   socket->endpoint = reinterpret_cast<grpc_endpoint*>(tcp);
334   tcp->socket = socket;
335   tcp->base.vtable = &vtable;
336   gpr_ref_init(&tcp->refcount, 1);
337   tcp->peer_string = peer_string;
338   grpc_resolved_address resolved_local_addr;
339   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
340   if (grpc_custom_socket_vtable->getsockname(
341           socket, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
342           reinterpret_cast<int*>(&resolved_local_addr.len)) !=
343       GRPC_ERROR_NONE) {
344     tcp->local_address = "";
345   } else {
346     tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
347   }
348   tcp->shutting_down = false;
349   return &tcp->base;
350 }
351