1 #include "later.h"
2 #include <Rcpp.h>
3 #include <map>
4 #include <queue>
5 #include <memory>
6 #include "debug.h"
7 #include "utils.h"
8 #include "threadutils.h"
9
10 #include "callback_registry.h"
11 #include "callback_registry_table.h"
12
13 #include "interrupt.h"
14
15 using std::shared_ptr;
16
17 static size_t exec_callbacks_reentrancy_count = 0;
18
19 static CallbackRegistryTable callbackRegistryTable;
20
21
22 class ProtectCallbacks {
23 public:
ProtectCallbacks()24 ProtectCallbacks() {
25 exec_callbacks_reentrancy_count++;
26 }
~ProtectCallbacks()27 ~ProtectCallbacks() {
28 exec_callbacks_reentrancy_count--;
29 }
30 };
31
32 // Returns number of frames on the call stack. Basically just a wrapper for
33 // base::sys.nframe(). Note that this can report that an error occurred if the
34 // user sends an interrupt while the `sys.nframe()` function is running. I
35 // believe that the only reason that it should set errorOccurred is because of
36 // a user interrupt.
sys_nframe()37 int sys_nframe() {
38 ASSERT_MAIN_THREAD()
39 SEXP e, result;
40 int errorOccurred, value;
41
42 BEGIN_SUSPEND_INTERRUPTS {
43 PROTECT(e = Rf_lang1(Rf_install("sys.nframe")));
44 PROTECT(result = R_tryEval(e, R_BaseEnv, &errorOccurred));
45
46 if (errorOccurred) {
47 value = -1;
48 } else {
49 value = INTEGER(result)[0];
50 }
51
52 UNPROTECT(2);
53 } END_SUSPEND_INTERRUPTS;
54
55 return value;
56 }
57
58 // Returns true if execCallbacks is executing, or sys.nframes() returns 0.
at_top_level()59 bool at_top_level() {
60 ASSERT_MAIN_THREAD()
61 if (exec_callbacks_reentrancy_count != 0)
62 return false;
63
64 int nframe = sys_nframe();
65 if (nframe == -1) {
66 throw Rcpp::exception("Error occurred while calling sys.nframe()");
67 }
68 return nframe == 0;
69 }
70
71 // ============================================================================
72 // Current registry/event loop
73 // ============================================================================
74 //
75 // In the R code, the term "loop" is used. In the C++ code, the terms "loop"
76 // and "registry" are both used. "Loop" is usually used when interfacing with
77 // R-facing event loop, and "registry" is usually used when interfacing with
78 // the implementation, which uses a callback registry.
79 //
80 // The current registry is kept track of entirely in C++, and not in R
81 // (although it can be queried from R). This is because when running a loop
82 // with children, it sets the current loop as it runs each of the children,
83 // and to do so in R would require calling back into R for each child, which
84 // would impose more overhead.
85
86 static int current_registry;
87
88 // [[Rcpp::export]]
setCurrentRegistryId(int id)89 void setCurrentRegistryId(int id) {
90 ASSERT_MAIN_THREAD()
91 current_registry = id;
92 }
93
94 // [[Rcpp::export]]
getCurrentRegistryId()95 int getCurrentRegistryId() {
96 ASSERT_MAIN_THREAD()
97 return current_registry;
98 }
99
100 // Class for setting current registry and resetting when function exits, using
101 // RAII.
102 class CurrentRegistryGuard {
103 public:
CurrentRegistryGuard(int id)104 CurrentRegistryGuard(int id) {
105 ASSERT_MAIN_THREAD()
106 old_id = getCurrentRegistryId();
107 setCurrentRegistryId(id);
108 }
~CurrentRegistryGuard()109 ~CurrentRegistryGuard() {
110 setCurrentRegistryId(old_id);
111 }
112 private:
113 int old_id;
114 };
115
116
117 // ============================================================================
118 // Callback registry functions
119 // ============================================================================
120
getGlobalRegistry()121 shared_ptr<CallbackRegistry> getGlobalRegistry() {
122 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(GLOBAL_LOOP);
123 if (registry == nullptr) {
124 Rf_error("Global registry does not exist.");
125 }
126 return registry;
127 }
128
129 // This deletes a CallbackRegistry and deregisters it as a child of its
130 // parent. Any children of this registry are orphaned -- they no longer have a
131 // parent. (Maybe this should be an option?)
132 //
133 // [[Rcpp::export]]
deleteCallbackRegistry(int loop_id)134 bool deleteCallbackRegistry(int loop_id) {
135 ASSERT_MAIN_THREAD()
136 if (loop_id == GLOBAL_LOOP) {
137 Rf_error("Can't delete global loop.");
138 }
139 if (loop_id == getCurrentRegistryId()) {
140 Rf_error("Can't delete current loop.");
141 }
142
143 return callbackRegistryTable.remove(loop_id);
144 }
145
146
147 // This is called when the R loop handle is GC'd.
148 // [[Rcpp::export]]
notifyRRefDeleted(int loop_id)149 bool notifyRRefDeleted(int loop_id) {
150 ASSERT_MAIN_THREAD()
151 if (loop_id == GLOBAL_LOOP) {
152 Rf_error("Can't delete global loop.");
153 }
154 if (loop_id == getCurrentRegistryId()) {
155 Rf_error("Can't delete current loop.");
156 }
157
158 return callbackRegistryTable.notifyRRefDeleted(loop_id);
159 }
160
161
162 // [[Rcpp::export]]
createCallbackRegistry(int id,int parent_id)163 void createCallbackRegistry(int id, int parent_id) {
164 ASSERT_MAIN_THREAD()
165 callbackRegistryTable.create(id, parent_id);
166 }
167
168 // [[Rcpp::export]]
existsCallbackRegistry(int id)169 bool existsCallbackRegistry(int id) {
170 ASSERT_MAIN_THREAD()
171 return callbackRegistryTable.exists(id);
172 }
173
174 // [[Rcpp::export]]
list_queue_(int id)175 Rcpp::List list_queue_(int id) {
176 ASSERT_MAIN_THREAD()
177 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(id);
178 if (registry == nullptr) {
179 Rf_error("CallbackRegistry does not exist.");
180 }
181 return registry->list();
182 }
183
184
185 // Execute callbacks for a single event loop.
execCallbacksOne(bool runAll,shared_ptr<CallbackRegistry> callback_registry,Timestamp now)186 bool execCallbacksOne(
187 bool runAll,
188 shared_ptr<CallbackRegistry> callback_registry,
189 Timestamp now
190 ) {
191 ASSERT_MAIN_THREAD()
192 // execCallbacks can be called directly from C code, and the callbacks may
193 // include Rcpp code. (Should we also call wrap?)
194 Rcpp::RNGScope rngscope;
195 ProtectCallbacks pcscope;
196
197 // Set current loop for the duration of this function.
198 CurrentRegistryGuard current_registry_guard(callback_registry->getId());
199
200 do {
201 // We only take one at a time, because we don't want to lose callbacks if
202 // one of the callbacks throws an error
203 std::vector<Callback_sp> callbacks = callback_registry->take(1, now);
204 if (callbacks.size() == 0) {
205 break;
206 }
207 // This line may throw errors!
208 callbacks[0]->invoke_wrapped();
209 } while (runAll);
210
211 // I think there's no need to lock this since it's only modified from the
212 // main thread. But need to check.
213 std::vector<std::shared_ptr<CallbackRegistry> > children = callback_registry->children;
214 for (std::vector<std::shared_ptr<CallbackRegistry> >::iterator it = children.begin();
215 it != children.end();
216 ++it)
217 {
218 execCallbacksOne(true, *it, now);
219 }
220
221 return true;
222 }
223
224 // Execute callbacks for an event loop and its children.
225 // [[Rcpp::export]]
execCallbacks(double timeoutSecs,bool runAll,int loop_id)226 bool execCallbacks(double timeoutSecs, bool runAll, int loop_id) {
227 ASSERT_MAIN_THREAD()
228 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
229 if (registry == nullptr) {
230 Rf_error("CallbackRegistry does not exist.");
231 }
232
233 if (!registry->wait(timeoutSecs, true)) {
234 return false;
235 }
236
237 Timestamp now;
238 execCallbacksOne(runAll, registry, now);
239
240 // Call this now, in case any CallbackRegistries which have no R references
241 // have emptied.
242 callbackRegistryTable.pruneRegistries();
243 return true;
244 }
245
246
247 // This function is called from the input handler on Unix, or the Windows
248 // equivalent. It may throw exceptions.
249 //
250 // Invoke execCallbacks up to 20 times. At the first iteration where no work is
251 // done, terminate. We call this from the top level instead of just calling
252 // execCallbacks because the top level only gets called occasionally (every 10's
253 // of ms), so tasks that generate other tasks will execute surprisingly slowly.
254 //
255 // Example:
256 // promise_map(1:1000, function(i) {
257 // message(i)
258 // promise_resolve(i)
259 // })
execCallbacksForTopLevel()260 bool execCallbacksForTopLevel() {
261 bool any = false;
262 for (size_t i = 0; i < 20; i++) {
263 if (!execCallbacks(0, true, GLOBAL_LOOP))
264 return any;
265 any = true;
266 }
267 return any;
268 }
269
270 // [[Rcpp::export]]
idle(int loop_id)271 bool idle(int loop_id) {
272 ASSERT_MAIN_THREAD()
273 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
274 if (registry == nullptr) {
275 Rf_error("CallbackRegistry does not exist.");
276 }
277 return registry->empty();
278 }
279
280
281 static bool initialized = false;
282 // [[Rcpp::export]]
ensureInitialized()283 void ensureInitialized() {
284 if (initialized) {
285 return;
286 }
287 REGISTER_MAIN_THREAD()
288
289 // Note that the global registry is not created here, but in R, from the
290 // .onLoad function.
291 setCurrentRegistryId(GLOBAL_LOOP);
292
293 // Call the platform-specific initialization for the mechanism that runs the
294 // event loop when the console is idle.
295 ensureAutorunnerInitialized();
296 initialized = true;
297 }
298
299 // [[Rcpp::export]]
execLater(Rcpp::Function callback,double delaySecs,int loop_id)300 std::string execLater(Rcpp::Function callback, double delaySecs, int loop_id) {
301 ASSERT_MAIN_THREAD()
302 ensureInitialized();
303 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
304 if (registry == nullptr) {
305 Rf_error("CallbackRegistry does not exist.");
306 }
307 uint64_t callback_id = doExecLater(registry, callback, delaySecs, true);
308
309 // We have to convert it to a string in order to maintain 64-bit precision,
310 // since R doesn't support 64 bit integers.
311 return toString(callback_id);
312 }
313
314
315
cancel(uint64_t callback_id,int loop_id)316 bool cancel(uint64_t callback_id, int loop_id) {
317 ASSERT_MAIN_THREAD()
318 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
319 if (registry == nullptr) {
320 return false;
321 }
322 return registry->cancel(callback_id);
323 }
324
325 // [[Rcpp::export]]
cancel(std::string callback_id_s,int loop_id)326 bool cancel(std::string callback_id_s, int loop_id) {
327 ASSERT_MAIN_THREAD()
328 uint64_t callback_id;
329 std::istringstream iss(callback_id_s);
330 iss >> callback_id;
331
332 // If the input is good (just a number with no other text) then eof will be
333 // 1 and fail will be 0.
334 if (! (iss.eof() && !iss.fail())) {
335 return false;
336 }
337
338 return cancel(callback_id, loop_id);
339 }
340
341
342
343 // [[Rcpp::export]]
nextOpSecs(int loop_id)344 double nextOpSecs(int loop_id) {
345 ASSERT_MAIN_THREAD()
346 shared_ptr<CallbackRegistry> registry = callbackRegistryTable.getRegistry(loop_id);
347 if (registry == nullptr) {
348 Rf_error("CallbackRegistry does not exist.");
349 }
350
351 Optional<Timestamp> nextTime = registry->nextTimestamp();
352 if (!nextTime.has_value()) {
353 return R_PosInf;
354 } else {
355 Timestamp now;
356 return nextTime->diff_secs(now);
357 }
358 }
359
360 // Schedules a C function to execute on the global loop. Returns callback ID
361 // on success, or 0 on error.
execLaterNative(void (* func)(void *),void * data,double delaySecs)362 extern "C" uint64_t execLaterNative(void (*func)(void*), void* data, double delaySecs) {
363 return execLaterNative2(func, data, delaySecs, GLOBAL_LOOP);
364 }
365
366 // Schedules a C function to execute on a specific event loop. Returns
367 // callback ID on success, or 0 on error.
execLaterNative2(void (* func)(void *),void * data,double delaySecs,int loop_id)368 extern "C" uint64_t execLaterNative2(void (*func)(void*), void* data, double delaySecs, int loop_id) {
369 ensureInitialized();
370 return callbackRegistryTable.scheduleCallback(func, data, delaySecs, loop_id);
371 }
372
apiVersion()373 extern "C" int apiVersion() {
374 return LATER_DLL_API_VERSION;
375 }
376