1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24 
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/endpoint_cfstream.h"
27 
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31 
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/error_cfstream.h"
37 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/slice/slice_string_helpers.h"
41 
42 extern grpc_core::TraceFlag grpc_tcp_trace;
43 
44 struct CFStreamEndpoint {
45   grpc_endpoint base;
46   gpr_refcount refcount;
47 
48   CFReadStreamRef read_stream;
49   CFWriteStreamRef write_stream;
50   CFStreamHandle* stream_sync;
51 
52   grpc_closure* read_cb;
53   grpc_closure* write_cb;
54   grpc_slice_buffer* read_slices;
55   grpc_slice_buffer* write_slices;
56 
57   grpc_closure read_action;
58   grpc_closure write_action;
59 
60   std::string peer_string;
61   std::string local_address;
62   grpc_resource_user* resource_user;
63   grpc_resource_user_slice_allocator slice_allocator;
64 };
CFStreamFree(CFStreamEndpoint * ep)65 static void CFStreamFree(CFStreamEndpoint* ep) {
66   grpc_resource_user_unref(ep->resource_user);
67   CFRelease(ep->read_stream);
68   CFRelease(ep->write_stream);
69   CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
70   delete ep;
71 }
72 
73 #ifndef NDEBUG
74 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
75 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)76 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
77                           const char* file, int line) {
78   if (grpc_tcp_trace.enabled()) {
79     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
80     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
81             "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
82             reason, val, val - 1);
83   }
84   if (gpr_unref(&ep->refcount)) {
85     CFStreamFree(ep);
86   }
87 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)88 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
89                         const char* file, int line) {
90   if (grpc_tcp_trace.enabled()) {
91     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
92     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
93             "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
94             reason, val, val + 1);
95   }
96   gpr_ref(&ep->refcount);
97 }
98 #else
99 #define EP_REF(ep, reason) CFStreamRef((ep))
100 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)101 static void CFStreamUnref(CFStreamEndpoint* ep) {
102   if (gpr_unref(&ep->refcount)) {
103     CFStreamFree(ep);
104   }
105 }
CFStreamRef(CFStreamEndpoint * ep)106 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
107 #endif
108 
CFStreamAnnotateError(grpc_error * src_error,CFStreamEndpoint * ep)109 static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
110                                          CFStreamEndpoint* ep) {
111   return grpc_error_set_str(
112       grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
113                          GRPC_STATUS_UNAVAILABLE),
114       GRPC_ERROR_STR_TARGET_ADDRESS,
115       grpc_slice_from_copied_string(ep->peer_string.c_str()));
116 }
117 
CallReadCb(CFStreamEndpoint * ep,grpc_error * error)118 static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
119   if (grpc_tcp_trace.enabled()) {
120     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
121             ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
122     size_t i;
123     const char* str = grpc_error_string(error);
124     gpr_log(GPR_DEBUG, "read: error=%s", str);
125 
126     for (i = 0; i < ep->read_slices->count; i++) {
127       char* dump = grpc_dump_slice(ep->read_slices->slices[i],
128                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
129       gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string.c_str(),
130               dump);
131       gpr_free(dump);
132     }
133   }
134   grpc_closure* cb = ep->read_cb;
135   ep->read_cb = nullptr;
136   ep->read_slices = nullptr;
137   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
138 }
139 
CallWriteCb(CFStreamEndpoint * ep,grpc_error * error)140 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
141   if (grpc_tcp_trace.enabled()) {
142     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
143             ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
144     const char* str = grpc_error_string(error);
145     gpr_log(GPR_DEBUG, "write: error=%s", str);
146   }
147   grpc_closure* cb = ep->write_cb;
148   ep->write_cb = nullptr;
149   ep->write_slices = nullptr;
150   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
151 }
152 
ReadAction(void * arg,grpc_error * error)153 static void ReadAction(void* arg, grpc_error* error) {
154   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
155   GPR_ASSERT(ep->read_cb != nullptr);
156   if (error) {
157     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
158     CallReadCb(ep, GRPC_ERROR_REF(error));
159     EP_UNREF(ep, "read");
160     return;
161   }
162 
163   GPR_ASSERT(ep->read_slices->count == 1);
164   grpc_slice slice = ep->read_slices->slices[0];
165   size_t len = GRPC_SLICE_LENGTH(slice);
166   CFIndex read_size =
167       CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
168   if (read_size == -1) {
169     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
170     CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
171     if (stream_error != nullptr) {
172       error = CFStreamAnnotateError(
173           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
174       CFRelease(stream_error);
175     } else {
176       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
177     }
178     CallReadCb(ep, error);
179     EP_UNREF(ep, "read");
180   } else if (read_size == 0) {
181     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
182     CallReadCb(ep,
183                CFStreamAnnotateError(
184                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
185     EP_UNREF(ep, "read");
186   } else {
187     if (read_size < static_cast<CFIndex>(len)) {
188       grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
189     }
190     CallReadCb(ep, GRPC_ERROR_NONE);
191     EP_UNREF(ep, "read");
192   }
193 }
194 
WriteAction(void * arg,grpc_error * error)195 static void WriteAction(void* arg, grpc_error* error) {
196   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
197   GPR_ASSERT(ep->write_cb != nullptr);
198   if (error) {
199     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
200     CallWriteCb(ep, GRPC_ERROR_REF(error));
201     EP_UNREF(ep, "write");
202     return;
203   }
204 
205   grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
206   size_t slice_len = GRPC_SLICE_LENGTH(slice);
207   CFIndex write_size = CFWriteStreamWrite(
208       ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
209   if (write_size == -1) {
210     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
211     CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
212     if (stream_error != nullptr) {
213       error = CFStreamAnnotateError(
214           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
215       CFRelease(stream_error);
216     } else {
217       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
218     }
219     CallWriteCb(ep, error);
220     EP_UNREF(ep, "write");
221   } else {
222     if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
223       grpc_slice_buffer_undo_take_first(
224           ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
225     }
226     if (ep->write_slices->length > 0) {
227       ep->stream_sync->NotifyOnWrite(&ep->write_action);
228     } else {
229       CallWriteCb(ep, GRPC_ERROR_NONE);
230       EP_UNREF(ep, "write");
231     }
232 
233     if (grpc_tcp_trace.enabled()) {
234       grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
235       char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
236       gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string.c_str(),
237               dump);
238       gpr_free(dump);
239       grpc_slice_unref_internal(trace_slice);
240     }
241   }
242   grpc_slice_unref_internal(slice);
243 }
244 
CFStreamReadAllocationDone(void * arg,grpc_error * error)245 static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
246   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
247   if (error == GRPC_ERROR_NONE) {
248     ep->stream_sync->NotifyOnRead(&ep->read_action);
249   } else {
250     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
251     CallReadCb(ep, error);
252     EP_UNREF(ep, "read");
253   }
254 }
255 
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent)256 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
257                          grpc_closure* cb, bool urgent) {
258   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
259   if (grpc_tcp_trace.enabled()) {
260     gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
261             slices, cb, slices->length);
262   }
263   GPR_ASSERT(ep_impl->read_cb == nullptr);
264   ep_impl->read_cb = cb;
265   ep_impl->read_slices = slices;
266   grpc_slice_buffer_reset_and_unref_internal(slices);
267   EP_REF(ep_impl, "read");
268   if (grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
269                                       GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
270                                       ep_impl->read_slices)) {
271     ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
272   }
273 }
274 
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)275 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
276                           grpc_closure* cb, void* arg) {
277   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
278   if (grpc_tcp_trace.enabled()) {
279     gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
280             ep_impl, slices, cb, slices->length);
281   }
282   GPR_ASSERT(ep_impl->write_cb == nullptr);
283   ep_impl->write_cb = cb;
284   ep_impl->write_slices = slices;
285   EP_REF(ep_impl, "write");
286   ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
287 }
288 
CFStreamShutdown(grpc_endpoint * ep,grpc_error * why)289 void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
290   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
291   if (grpc_tcp_trace.enabled()) {
292     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
293   }
294   CFReadStreamClose(ep_impl->read_stream);
295   CFWriteStreamClose(ep_impl->write_stream);
296   ep_impl->stream_sync->Shutdown(why);
297   grpc_resource_user_shutdown(ep_impl->resource_user);
298   if (grpc_tcp_trace.enabled()) {
299     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
300   }
301 }
302 
CFStreamDestroy(grpc_endpoint * ep)303 void CFStreamDestroy(grpc_endpoint* ep) {
304   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
305   if (grpc_tcp_trace.enabled()) {
306     gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
307   }
308   EP_UNREF(ep_impl, "destroy");
309 }
310 
CFStreamGetResourceUser(grpc_endpoint * ep)311 grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
312   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
313   return ep_impl->resource_user;
314 }
315 
CFStreamGetPeer(grpc_endpoint * ep)316 absl::string_view CFStreamGetPeer(grpc_endpoint* ep) {
317   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
318   return ep_impl->peer_string;
319 }
320 
CFStreamGetLocalAddress(grpc_endpoint * ep)321 absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
322   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
323   return ep_impl->local_address;
324 }
325 
CFStreamGetFD(grpc_endpoint * ep)326 int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
327 
CFStreamCanTrackErr(grpc_endpoint * ep)328 bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; }
329 
CFStreamAddToPollset(grpc_endpoint * ep,grpc_pollset * pollset)330 void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
CFStreamAddToPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)331 void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)332 void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
333                                   grpc_pollset_set* pollset) {}
334 
335 static const grpc_endpoint_vtable vtable = {CFStreamRead,
336                                             CFStreamWrite,
337                                             CFStreamAddToPollset,
338                                             CFStreamAddToPollsetSet,
339                                             CFStreamDeleteFromPollsetSet,
340                                             CFStreamShutdown,
341                                             CFStreamDestroy,
342                                             CFStreamGetResourceUser,
343                                             CFStreamGetPeer,
344                                             CFStreamGetLocalAddress,
345                                             CFStreamGetFD,
346                                             CFStreamCanTrackErr};
347 
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,grpc_resource_quota * resource_quota,CFStreamHandle * stream_sync)348 grpc_endpoint* grpc_cfstream_endpoint_create(
349     CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
350     const char* peer_string, grpc_resource_quota* resource_quota,
351     CFStreamHandle* stream_sync) {
352   CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
353   if (grpc_tcp_trace.enabled()) {
354     gpr_log(GPR_DEBUG,
355             "CFStream endpoint:%p create readStream:%p writeStream: %p",
356             ep_impl, read_stream, write_stream);
357   }
358   ep_impl->base.vtable = &vtable;
359   gpr_ref_init(&ep_impl->refcount, 1);
360   ep_impl->read_stream = read_stream;
361   ep_impl->write_stream = write_stream;
362   CFRetain(read_stream);
363   CFRetain(write_stream);
364   ep_impl->stream_sync = stream_sync;
365   CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
366 
367   ep_impl->peer_string = peer_string;
368   grpc_resolved_address resolved_local_addr;
369   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
370   CFDataRef native_handle = static_cast<CFDataRef>(CFReadStreamCopyProperty(
371       ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
372   CFSocketNativeHandle sockfd;
373   CFDataGetBytes(native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
374                  (UInt8*)&sockfd);
375   if (native_handle) {
376     CFRelease(native_handle);
377   }
378   if (getsockname(sockfd, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
379                   &resolved_local_addr.len) < 0) {
380     ep_impl->local_address = "";
381   } else {
382     ep_impl->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
383   }
384   ep_impl->read_cb = nil;
385   ep_impl->write_cb = nil;
386   ep_impl->read_slices = nil;
387   ep_impl->write_slices = nil;
388   GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
389                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
390   GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
391                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
392   ep_impl->resource_user =
393       grpc_resource_user_create(resource_quota, peer_string);
394   grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
395                                           ep_impl->resource_user,
396                                           CFStreamReadAllocationDone, ep_impl);
397 
398   return &ep_impl->base;
399 }
400 
401 #endif /* GRPC_CFSTREAM_ENDPOINT */
402