1 #include "later.h"
2 #include <Rcpp.h>
3 #include <map>
4 #include <queue>
5 #include <memory>
6 #include "debug.h"
7 #include "utils.h"
8 #include "threadutils.h"
9 
10 #include "callback_registry.h"
11 #include "callback_registry_table.h"
12 
13 #include "interrupt.h"
14 
15 using std::shared_ptr;
16 
17 static size_t exec_callbacks_reentrancy_count = 0;
18 
19 static CallbackRegistryTable callbackRegistryTable;
20 
21 
22 class ProtectCallbacks {
23 public:
ProtectCallbacks()24   ProtectCallbacks() {
25     exec_callbacks_reentrancy_count++;
26   }
~ProtectCallbacks()27   ~ProtectCallbacks() {
28     exec_callbacks_reentrancy_count--;
29   }
30 };
31 
32 // Returns number of frames on the call stack. Basically just a wrapper for
33 // base::sys.nframe(). Note that this can report that an error occurred if the
34 // user sends an interrupt while the `sys.nframe()` function is running. I
35 // believe that the only reason that it should set errorOccurred is because of
36 // a user interrupt.
sys_nframe()37 int sys_nframe() {
38   ASSERT_MAIN_THREAD()
39   SEXP e, result;
40   int errorOccurred, value;
41 
42   BEGIN_SUSPEND_INTERRUPTS {
43     PROTECT(e = Rf_lang1(Rf_install("sys.nframe")));
44     PROTECT(result = R_tryEval(e, R_BaseEnv, &errorOccurred));
45 
46     if (errorOccurred) {
47       value = -1;
48     } else {
49       value = INTEGER(result)[0];
50     }
51 
52     UNPROTECT(2);
53   } END_SUSPEND_INTERRUPTS;
54 
55   return value;
56 }
57 
58 // Returns true if execCallbacks is executing, or sys.nframes() returns 0.
at_top_level()59 bool at_top_level() {
60   ASSERT_MAIN_THREAD()
61   if (exec_callbacks_reentrancy_count != 0)
62     return false;
63 
64   int nframe = sys_nframe();
65   if (nframe == -1) {
66     throw Rcpp::exception("Error occurred while calling sys.nframe()");
67   }
68   return nframe == 0;
69 }
70 
71 // ============================================================================
72 // Current registry/event loop
73 // ============================================================================
74 //
75 // In the R code, the term "loop" is used. In the C++ code, the terms "loop"
76 // and "registry" are both used. "Loop" is usually used when interfacing with
77 // R-facing event loop, and "registry" is usually used when interfacing with
78 // the implementation, which uses a callback registry.
79 //
80 // The current registry is kept track of entirely in C++, and not in R
81 // (although it can be queried from R). This is because when running a loop
82 // with children, it sets the current loop as it runs each of the children,
83 // and to do so in R would require calling back into R for each child, which
84 // would impose more overhead.
85 
86 static int current_registry;
87 
88 // [[Rcpp::export]]
setCurrentRegistryId(int id)89 void setCurrentRegistryId(int id) {
90   ASSERT_MAIN_THREAD()
91   current_registry = id;
92 }
93 
94 // [[Rcpp::export]]
getCurrentRegistryId()95 int getCurrentRegistryId() {
96   ASSERT_MAIN_THREAD()
97   return current_registry;
98 }
99 
100 // Class for setting current registry and resetting when function exits, using
101 // RAII.
102 class CurrentRegistryGuard {
103 public:
CurrentRegistryGuard(int id)104   CurrentRegistryGuard(int id) {
105     ASSERT_MAIN_THREAD()
106     old_id = getCurrentRegistryId();
107     setCurrentRegistryId(id);
108   }
~CurrentRegistryGuard()109   ~CurrentRegistryGuard() {
110     setCurrentRegistryId(old_id);
111   }
112 private:
113   int old_id;
114 };
115 
116 
117 // ============================================================================
118 // Callback registry functions
119 // ============================================================================
120 
getGlobalRegistry()121 shared_ptr<CallbackRegistry> getGlobalRegistry() {
122   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(GLOBAL_LOOP);
123   if (registry == nullptr) {
124     Rf_error("Global registry does not exist.");
125   }
126   return registry;
127 }
128 
129 // This deletes a CallbackRegistry and deregisters it as a child of its
130 // parent. Any children of this registry are orphaned -- they no longer have a
131 // parent. (Maybe this should be an option?)
132 //
133 // [[Rcpp::export]]
deleteCallbackRegistry(int loop_id)134 bool deleteCallbackRegistry(int loop_id) {
135   ASSERT_MAIN_THREAD()
136   if (loop_id == GLOBAL_LOOP) {
137     Rf_error("Can't delete global loop.");
138   }
139   if (loop_id == getCurrentRegistryId()) {
140     Rf_error("Can't delete current loop.");
141   }
142 
143   return callbackRegistryTable.remove(loop_id);
144 }
145 
146 
147 // This is called when the R loop handle is GC'd.
148 // [[Rcpp::export]]
notifyRRefDeleted(int loop_id)149 bool notifyRRefDeleted(int loop_id) {
150   ASSERT_MAIN_THREAD()
151   if (loop_id == GLOBAL_LOOP) {
152     Rf_error("Can't delete global loop.");
153   }
154   if (loop_id == getCurrentRegistryId()) {
155     Rf_error("Can't delete current loop.");
156   }
157 
158   return callbackRegistryTable.notifyRRefDeleted(loop_id);
159 }
160 
161 
162 // [[Rcpp::export]]
createCallbackRegistry(int id,int parent_id)163 void createCallbackRegistry(int id, int parent_id) {
164   ASSERT_MAIN_THREAD()
165   callbackRegistryTable.create(id, parent_id);
166 }
167 
168 // [[Rcpp::export]]
existsCallbackRegistry(int id)169 bool existsCallbackRegistry(int id) {
170   ASSERT_MAIN_THREAD()
171   return callbackRegistryTable.exists(id);
172 }
173 
174 // [[Rcpp::export]]
list_queue_(int id)175 Rcpp::List list_queue_(int id) {
176   ASSERT_MAIN_THREAD()
177   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(id);
178   if (registry == nullptr) {
179     Rf_error("CallbackRegistry does not exist.");
180   }
181   return registry->list();
182 }
183 
184 
185 // Execute callbacks for a single event loop.
execCallbacksOne(bool runAll,shared_ptr<CallbackRegistry> callback_registry,Timestamp now)186 bool execCallbacksOne(
187   bool runAll,
188   shared_ptr<CallbackRegistry> callback_registry,
189   Timestamp now
190 ) {
191   ASSERT_MAIN_THREAD()
192   // execCallbacks can be called directly from C code, and the callbacks may
193   // include Rcpp code. (Should we also call wrap?)
194   Rcpp::RNGScope rngscope;
195   ProtectCallbacks pcscope;
196 
197   // Set current loop for the duration of this function.
198   CurrentRegistryGuard current_registry_guard(callback_registry->getId());
199 
200   do {
201     // We only take one at a time, because we don't want to lose callbacks if
202     // one of the callbacks throws an error
203     std::vector<Callback_sp> callbacks = callback_registry->take(1, now);
204     if (callbacks.size() == 0) {
205       break;
206     }
207     // This line may throw errors!
208     callbacks[0]->invoke_wrapped();
209   } while (runAll);
210 
211   // I think there's no need to lock this since it's only modified from the
212   // main thread. But need to check.
213   std::vector<std::shared_ptr<CallbackRegistry> > children = callback_registry->children;
214   for (std::vector<std::shared_ptr<CallbackRegistry> >::iterator it = children.begin();
215        it != children.end();
216        ++it)
217   {
218     execCallbacksOne(true, *it, now);
219   }
220 
221   return true;
222 }
223 
224 // Execute callbacks for an event loop and its children.
225 // [[Rcpp::export]]
execCallbacks(double timeoutSecs,bool runAll,int loop_id)226 bool execCallbacks(double timeoutSecs, bool runAll, int loop_id) {
227   ASSERT_MAIN_THREAD()
228   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
229   if (registry == nullptr) {
230     Rf_error("CallbackRegistry does not exist.");
231   }
232 
233   if (!registry->wait(timeoutSecs, true)) {
234     return false;
235   }
236 
237   Timestamp now;
238   execCallbacksOne(runAll, registry, now);
239 
240   // Call this now, in case any CallbackRegistries which have no R references
241   // have emptied.
242   callbackRegistryTable.pruneRegistries();
243   return true;
244 }
245 
246 
247 // This function is called from the input handler on Unix, or the Windows
248 // equivalent. It may throw exceptions.
249 //
250 // Invoke execCallbacks up to 20 times. At the first iteration where no work is
251 // done, terminate. We call this from the top level instead of just calling
252 // execCallbacks because the top level only gets called occasionally (every 10's
253 // of ms), so tasks that generate other tasks will execute surprisingly slowly.
254 //
255 // Example:
256 // promise_map(1:1000, function(i) {
257 //   message(i)
258 //   promise_resolve(i)
259 // })
execCallbacksForTopLevel()260 bool execCallbacksForTopLevel() {
261   bool any = false;
262   for (size_t i = 0; i < 20; i++) {
263     if (!execCallbacks(0, true, GLOBAL_LOOP))
264       return any;
265     any = true;
266   }
267   return any;
268 }
269 
270 // [[Rcpp::export]]
idle(int loop_id)271 bool idle(int loop_id) {
272   ASSERT_MAIN_THREAD()
273   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
274   if (registry == nullptr) {
275     Rf_error("CallbackRegistry does not exist.");
276   }
277   return registry->empty();
278 }
279 
280 
281 static bool initialized = false;
282 // [[Rcpp::export]]
ensureInitialized()283 void ensureInitialized() {
284   if (initialized) {
285     return;
286   }
287   REGISTER_MAIN_THREAD()
288 
289   // Note that the global registry is not created here, but in R, from the
290   // .onLoad function.
291   setCurrentRegistryId(GLOBAL_LOOP);
292 
293   // Call the platform-specific initialization for the mechanism that runs the
294   // event loop when the console is idle.
295   ensureAutorunnerInitialized();
296   initialized = true;
297 }
298 
299 // [[Rcpp::export]]
execLater(Rcpp::Function callback,double delaySecs,int loop_id)300 std::string execLater(Rcpp::Function callback, double delaySecs, int loop_id) {
301   ASSERT_MAIN_THREAD()
302   ensureInitialized();
303   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
304   if (registry == nullptr) {
305     Rf_error("CallbackRegistry does not exist.");
306   }
307   uint64_t callback_id = doExecLater(registry, callback, delaySecs, true);
308 
309   // We have to convert it to a string in order to maintain 64-bit precision,
310   // since R doesn't support 64 bit integers.
311   return toString(callback_id);
312 }
313 
314 
315 
cancel(uint64_t callback_id,int loop_id)316 bool cancel(uint64_t callback_id, int loop_id) {
317   ASSERT_MAIN_THREAD()
318   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
319   if (registry == nullptr) {
320     return false;
321   }
322   return registry->cancel(callback_id);
323 }
324 
325 // [[Rcpp::export]]
cancel(std::string callback_id_s,int loop_id)326 bool cancel(std::string callback_id_s, int loop_id) {
327   ASSERT_MAIN_THREAD()
328   uint64_t callback_id;
329   std::istringstream iss(callback_id_s);
330   iss >> callback_id;
331 
332   // If the input is good (just a number with no other text) then eof will be
333   // 1 and fail will be 0.
334   if (! (iss.eof() && !iss.fail())) {
335     return false;
336   }
337 
338   return cancel(callback_id, loop_id);
339 }
340 
341 
342 
343 // [[Rcpp::export]]
nextOpSecs(int loop_id)344 double nextOpSecs(int loop_id) {
345   ASSERT_MAIN_THREAD()
346   shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
347   if (registry == nullptr) {
348     Rf_error("CallbackRegistry does not exist.");
349   }
350 
351   Optional<Timestamp> nextTime = registry->nextTimestamp();
352   if (!nextTime.has_value()) {
353     return R_PosInf;
354   } else {
355     Timestamp now;
356     return nextTime->diff_secs(now);
357   }
358 }
359 
360 // Schedules a C function to execute on the global loop. Returns callback ID
361 // on success, or 0 on error.
execLaterNative(void (* func)(void *),void * data,double delaySecs)362 extern "C" uint64_t execLaterNative(void (*func)(void*), void* data, double delaySecs) {
363   return execLaterNative2(func, data, delaySecs, GLOBAL_LOOP);
364 }
365 
366 // Schedules a C function to execute on a specific event loop. Returns
367 // callback ID on success, or 0 on error.
execLaterNative2(void (* func)(void *),void * data,double delaySecs,int loop_id)368 extern "C" uint64_t execLaterNative2(void (*func)(void*), void* data, double delaySecs, int loop_id) {
369   ensureInitialized();
370   return callbackRegistryTable.scheduleCallback(func, data, delaySecs, loop_id);
371 }
372 
apiVersion()373 extern "C" int apiVersion() {
374   return LATER_DLL_API_VERSION;
375 }
376