1 /*
2  *
3  * Copyright 2020 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
20 /// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
21 /// handle and trigger all the CFStream events. The CFStream streams register
22 /// themselves with the run loop with functions grpc_apple_register_read_stream
23 /// and grpc_apple_register_read_stream. Pollsets are dummy and block on a
24 /// condition variable in pollset_work().
25 
26 #include <grpc/support/port_platform.h>
27 
28 #include "src/core/lib/iomgr/port.h"
29 
30 #ifdef GRPC_APPLE_EV
31 
32 #include <CoreFoundation/CoreFoundation.h>
33 
34 #include <list>
35 
36 #include "src/core/lib/gprpp/thd.h"
37 #include "src/core/lib/iomgr/ev_apple.h"
38 
39 grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
40 
41 #ifndef NDEBUG
42 #define GRPC_POLLING_TRACE(format, ...)                    \
43   if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
44     gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__);  \
45   }
46 #else
47 #define GRPC_POLLING_TRACE(...)
48 #endif  // NDEBUG
49 
50 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
51 
52 struct GlobalRunLoopContext {
53   grpc_core::CondVar init_cv;
54   grpc_core::CondVar input_source_cv;
55 
56   grpc_core::Mutex mu;
57 
58   // Whether an input source registration is pending. Protected by mu.
59   bool input_source_registered = false;
60 
61   // The reference to the global run loop object. Protected by mu.
62   CFRunLoopRef run_loop;
63 
64   // Whether the pollset has been globally shut down. Protected by mu.
65   bool is_shutdown = false;
66 };
67 
68 struct GrpcAppleWorker {
69   // The condition varible to kick the worker. Works with the pollset's lock
70   // (GrpcApplePollset.mu).
71   grpc_core::CondVar cv;
72 
73   // Whether the worker is kicked. Protected by the pollset's lock
74   // (GrpcApplePollset.mu).
75   bool kicked = false;
76 };
77 
78 struct GrpcApplePollset {
79   grpc_core::Mutex mu;
80 
81   // Tracks the current workers in the pollset. Protected by mu.
82   std::list<GrpcAppleWorker*> workers;
83 
84   // Whether the pollset is shut down. Protected by mu.
85   bool is_shutdown = false;
86 
87   // Closure to call when shutdown is done. Protected by mu.
88   grpc_closure* shutdown_closure;
89 
90   // Whether there's an outstanding kick that was not processed. Protected by
91   // mu.
92   bool kicked_without_poller = false;
93 };
94 
95 static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
96 static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
97 
98 /// Register the stream with the dispatch queue. Callbacks of the stream will be
99 /// issued to the dispatch queue when a network event happens and will be
100 /// managed by Grand Central Dispatch.
grpc_apple_register_read_stream_queue(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)101 static void grpc_apple_register_read_stream_queue(
102     CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
103   CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
104 }
105 
106 /// Register the stream with the dispatch queue. Callbacks of the stream will be
107 /// issued to the dispatch queue when a network event happens and will be
108 /// managed by Grand Central Dispatch.
grpc_apple_register_write_stream_queue(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)109 static void grpc_apple_register_write_stream_queue(
110     CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
111   CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
112 }
113 
114 /// Register the stream with the global run loop. Callbacks of the stream will
115 /// be issued to the run loop when a network event happens and will be driven by
116 /// the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_read_stream_run_loop(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)117 static void grpc_apple_register_read_stream_run_loop(
118     CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
119   GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
120   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
121   CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
122                                   kCFRunLoopDefaultMode);
123   gGlobalRunLoopContext->input_source_registered = true;
124   gGlobalRunLoopContext->input_source_cv.Signal();
125 }
126 
127 /// Register the stream with the global run loop. Callbacks of the stream will
128 /// be issued to the run loop when a network event happens, and will be driven
129 /// by the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_write_stream_run_loop(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)130 static void grpc_apple_register_write_stream_run_loop(
131     CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
132   GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
133   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
134   CFWriteStreamScheduleWithRunLoop(
135       write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
136   gGlobalRunLoopContext->input_source_registered = true;
137   gGlobalRunLoopContext->input_source_cv.Signal();
138 }
139 
140 /// The default implementation of stream registration is to register the stream
141 /// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
142 /// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
143 /// CFStream streams are registered with the global run loop instead (see
144 /// pollset_global_init below).
145 static void (*grpc_apple_register_read_stream_impl)(
146     CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
147 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
148                                                      dispatch_queue_t) =
149     grpc_apple_register_write_stream_queue;
150 
grpc_apple_register_read_stream(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)151 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
152                                      dispatch_queue_t dispatch_queue) {
153   grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
154 }
155 
grpc_apple_register_write_stream(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)156 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
157                                       dispatch_queue_t dispatch_queue) {
158   grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
159 }
160 
161 /// Drive the run loop in a global singleton thread until the global run loop is
162 /// shutdown.
GlobalRunLoopFunc(void * arg)163 static void GlobalRunLoopFunc(void* arg) {
164   grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
165   gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
166   gGlobalRunLoopContext->init_cv.Signal();
167 
168   while (!gGlobalRunLoopContext->is_shutdown) {
169     // CFRunLoopRun() will return immediately if no stream is registered on it.
170     // So we wait on a conditional variable until a stream is registered;
171     // otherwise we'll be running a spinning loop.
172     while (!gGlobalRunLoopContext->input_source_registered) {
173       gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
174     }
175     gGlobalRunLoopContext->input_source_registered = false;
176     lock.Unlock();
177     CFRunLoopRun();
178     lock.Lock();
179   }
180   lock.Unlock();
181 }
182 
183 // pollset implementation
184 
pollset_global_init(void)185 static void pollset_global_init(void) {
186   gGlobalRunLoopContext = new GlobalRunLoopContext;
187 
188   grpc_apple_register_read_stream_impl =
189       grpc_apple_register_read_stream_run_loop;
190   grpc_apple_register_write_stream_impl =
191       grpc_apple_register_write_stream_run_loop;
192 
193   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
194   gGlobalRunLoopThread =
195       new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
196   gGlobalRunLoopThread->Start();
197   while (gGlobalRunLoopContext->run_loop == NULL)
198     gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
199 }
200 
pollset_global_shutdown(void)201 static void pollset_global_shutdown(void) {
202   {
203     grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
204     gGlobalRunLoopContext->is_shutdown = true;
205     CFRunLoopStop(gGlobalRunLoopContext->run_loop);
206   }
207   gGlobalRunLoopThread->Join();
208   delete gGlobalRunLoopThread;
209   delete gGlobalRunLoopContext;
210 }
211 
212 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
213 /// function. The lock may be temporarily released when waiting on the condition
214 /// variable but will be re-acquired before the function returns.
215 ///
216 /// The Apple pollset simply waits on a condition variable until it is kicked.
217 /// The network events are handled in the global run loop thread. Processing of
218 /// these events will eventually trigger the kick.
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)219 static grpc_error* pollset_work(grpc_pollset* pollset,
220                                 grpc_pollset_worker** worker,
221                                 grpc_millis deadline) {
222   GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64,
223                      pollset, worker, deadline);
224   GrpcApplePollset* apple_pollset =
225       reinterpret_cast<GrpcApplePollset*>(pollset);
226   GrpcAppleWorker actual_worker;
227   if (worker) {
228     *worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
229   }
230 
231   if (apple_pollset->kicked_without_poller) {
232     // Process the outstanding kick and reset the flag. Do not block.
233     apple_pollset->kicked_without_poller = false;
234   } else {
235     // Block until kicked, timed out, or the pollset shuts down.
236     apple_pollset->workers.push_front(&actual_worker);
237     auto it = apple_pollset->workers.begin();
238 
239     while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
240       if (actual_worker.cv.Wait(
241               &apple_pollset->mu,
242               grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
243         // timed out
244         break;
245       }
246     }
247 
248     apple_pollset->workers.erase(it);
249 
250     // If the pollset is shut down asynchronously and this is the last pending
251     // worker, the shutdown process is complete at this moment and the shutdown
252     // callback will be called.
253     if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
254       grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
255                               GRPC_ERROR_NONE);
256     }
257   }
258 
259   return GRPC_ERROR_NONE;
260 }
261 
262 /// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
263 /// before calling this function.
kick_worker(GrpcAppleWorker * worker)264 static void kick_worker(GrpcAppleWorker* worker) {
265   worker->kicked = true;
266   worker->cv.Signal();
267 }
268 
269 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
270 /// function. The kick action simply signals the condition variable of the
271 /// worker.
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)272 static grpc_error* pollset_kick(grpc_pollset* pollset,
273                                 grpc_pollset_worker* specific_worker) {
274   GrpcApplePollset* apple_pollset =
275       reinterpret_cast<GrpcApplePollset*>(pollset);
276 
277   GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker);
278 
279   if (specific_worker == nullptr) {
280     if (apple_pollset->workers.empty()) {
281       apple_pollset->kicked_without_poller = true;
282     } else {
283       GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
284       kick_worker(actual_worker);
285     }
286   } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
287     for (auto& actual_worker : apple_pollset->workers) {
288       kick_worker(actual_worker);
289     }
290   } else {
291     GrpcAppleWorker* actual_worker =
292         reinterpret_cast<GrpcAppleWorker*>(specific_worker);
293     kick_worker(actual_worker);
294   }
295 
296   return GRPC_ERROR_NONE;
297 }
298 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)299 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
300   GRPC_POLLING_TRACE("pollset init: %p", pollset);
301   GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
302   *mu = apple_pollset->mu.get();
303 }
304 
305 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
306 /// function.
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)307 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
308   GRPC_POLLING_TRACE("pollset shutdown: %p", pollset);
309 
310   GrpcApplePollset* apple_pollset =
311       reinterpret_cast<GrpcApplePollset*>(pollset);
312   apple_pollset->is_shutdown = true;
313   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
314 
315   // If there is any worker blocked, shutdown will be done asynchronously.
316   if (apple_pollset->workers.empty()) {
317     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
318   } else {
319     apple_pollset->shutdown_closure = closure;
320   }
321 }
322 
pollset_destroy(grpc_pollset * pollset)323 static void pollset_destroy(grpc_pollset* pollset) {
324   GRPC_POLLING_TRACE("pollset destroy: %p", pollset);
325   GrpcApplePollset* apple_pollset =
326       reinterpret_cast<GrpcApplePollset*>(pollset);
327   apple_pollset->~GrpcApplePollset();
328 }
329 
pollset_size(void)330 size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
331 
332 grpc_pollset_vtable grpc_apple_pollset_vtable = {
333     pollset_global_init, pollset_global_shutdown,
334     pollset_init,        pollset_shutdown,
335     pollset_destroy,     pollset_work,
336     pollset_kick,        pollset_size};
337 
338 // pollset_set implementation
339 
pollset_set_create(void)340 grpc_pollset_set* pollset_set_create(void) { return nullptr; }
pollset_set_destroy(grpc_pollset_set * pollset_set)341 void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
pollset_set_add_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)342 void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
343                              grpc_pollset* pollset) {}
pollset_set_del_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)344 void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
345                              grpc_pollset* pollset) {}
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)346 void pollset_set_add_pollset_set(grpc_pollset_set* bag,
347                                  grpc_pollset_set* item) {}
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)348 void pollset_set_del_pollset_set(grpc_pollset_set* bag,
349                                  grpc_pollset_set* item) {}
350 
351 grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
352     pollset_set_create,          pollset_set_destroy,
353     pollset_set_add_pollset,     pollset_set_del_pollset,
354     pollset_set_add_pollset_set, pollset_set_del_pollset_set};
355 
356 #endif
357