1 #ifndef _WIN32
2
3 #include <Rcpp.h>
4 #include <Rinternals.h>
5 #include <R_ext/eventloop.h>
6 #include <unistd.h>
7 #include <queue>
8
9 #include "later.h"
10 #include "callback_registry.h"
11 #include "timer_posix.h"
12 #include "threadutils.h"
13 #include "debug.h"
14
15 using namespace Rcpp;
16
17 #define LATER_ACTIVITY 20
18 #define LATER_DUMMY_ACTIVITY 21
19
20 extern void* R_GlobalContext;
21 extern void* R_TopLevelContext;
22
23 // Whether we have initialized the input handler.
24 int initialized = 0;
25
26 // The handles to the read and write ends of a pipe. We use this pipe
27 // to signal R's input handler callback mechanism that we want to be
28 // called back.
29 int pipe_in = -1;
30 int pipe_out = -1;
31
32 int dummy_pipe_in = -1;
33 int dummy_pipe_out = -1;
34
35 // Whether the file descriptor is ready for reading, i.e., whether
36 // the input handler callback is scheduled to be called. We use this
37 // to avoid unnecessarily writing to the pipe.
38 bool hot = false;
39 // This mutex protects reading/writing of `hot` and of reading from/writing to
40 // the pipe.
41 Mutex m(tct_mtx_plain);
42
43 // The buffer we're using for the pipe. This doesn't have to be large,
44 // in theory it only ever holds zero or one byte.
45 size_t BUF_SIZE = 256;
46 void *buf;
47
set_fd(bool ready)48 void set_fd(bool ready) {
49 Guard g(&m);
50
51 if (ready != hot) {
52 if (ready) {
53 ssize_t cbytes = write(pipe_in, "a", 1);
54 (void)cbytes; // squelch compiler warning
55 hot = true;
56 } else {
57 if (read(pipe_out, buf, BUF_SIZE) < 0) {
58 // TODO: This sets a warning but it doesn't get displayed until
59 // after the next R command is executed. Can we make it sooner?
60 Rf_warning("Failed to read out of pipe for later package");
61 }
62 hot = false;
63 }
64 }
65 }
66
67 namespace {
fd_on()68 void fd_on() {
69 set_fd(true);
70 }
71
72 Timer timer(fd_on);
73 } // namespace
74
75 class ResetTimerOnExit {
76 public:
ResetTimerOnExit()77 ResetTimerOnExit() {
78 }
~ResetTimerOnExit()79 ~ResetTimerOnExit() {
80 ASSERT_MAIN_THREAD()
81 // Find the next event in the registry and, if there is one, set the timer.
82 Optional<Timestamp> nextEvent = getGlobalRegistry()->nextTimestamp();
83 if (nextEvent.has_value()) {
84 timer.set(*nextEvent);
85 }
86 }
87 };
88
async_input_handler(void * data)89 static void async_input_handler(void *data) {
90 ASSERT_MAIN_THREAD()
91 set_fd(false);
92
93 if (!at_top_level()) {
94 // It's not safe to run arbitrary callbacks when other R code
95 // is already running. Wait until we're back at the top level.
96
97 // jcheng 2017-08-02: We can't just leave the file descriptor hot and let
98 // async_input_handler get invoked as fast as possible. Previously we did
99 // this, but on POSIX systems, it interferes with R_SocketWait.
100 // https://github.com/r-lib/later/issues/4
101 // Instead, we set the file descriptor to cold, and tell the timer to fire
102 // again in a few milliseconds. This should give enough breathing room that
103 // we don't interfere with the sockets too much.
104 timer.set(Timestamp(0.032));
105 return;
106 }
107
108 // jcheng 2017-08-01: While callbacks are executing, make the file descriptor
109 // not-ready so that our input handler is not even called back by R.
110 // Previously we'd let the input handler run but return quickly, but this
111 // seemed to cause R_SocketWait to hang (encountered while working with the
112 // future package, trying to call value(future) with plan(multisession)).
113 ResetTimerOnExit resetTimerOnExit_scope;
114
115 // This try-catch is meant to be similar to the BEGIN_RCPP and VOID_END_RCPP
116 // macros. They are needed for two reasons: first, if an exception occurs in
117 // any of the callbacks, destructors will still execute; and second, if an
118 // exception (including R-level error) occurs in a callback and it reaches
119 // the top level in an R input handler, R appears to be unable to handle it
120 // properly.
121 // https://github.com/r-lib/later/issues/12
122 // https://github.com/RcppCore/Rcpp/issues/753
123 // https://github.com/r-lib/later/issues/31
124 try {
125 execCallbacksForTopLevel();
126 }
127 catch(Rcpp::internal::InterruptedException &e) {
128 DEBUG_LOG("async_input_handler: caught Rcpp::internal::InterruptedException", LOG_INFO);
129 REprintf("later: interrupt occurred while executing callback.\n");
130 }
131 catch(std::exception& e){
132 DEBUG_LOG("async_input_handler: caught exception", LOG_INFO);
133 std::string msg = "later: exception occurred while executing callback: \n";
134 msg += e.what();
135 msg += "\n";
136 REprintf(msg.c_str());
137 }
138 catch( ... ){
139 REprintf("later: c++ exception (unknown reason) occurred while executing callback.\n");
140 }
141 }
142
143 InputHandler* inputHandlerHandle;
144 InputHandler* dummyInputHandlerHandle;
145
146 // If the real input handler has been removed, the dummy input handler removes
147 // itself. The real input handler cannot remove both; otherwise a segfault
148 // could occur.
remove_dummy_handler(void * data)149 static void remove_dummy_handler(void *data) {
150 ASSERT_MAIN_THREAD()
151 removeInputHandler(&R_InputHandlers, dummyInputHandlerHandle);
152 if (dummy_pipe_in > 0) {
153 close(dummy_pipe_in);
154 dummy_pipe_in = -1;
155 }
156 if (dummy_pipe_out > 0) {
157 close(dummy_pipe_out);
158 dummy_pipe_out = -1;
159 }
160 }
161
162 // Callback to run in child process after forking.
child_proc_after_fork()163 void child_proc_after_fork() {
164 ASSERT_MAIN_THREAD()
165 if (initialized) {
166 removeInputHandler(&R_InputHandlers, inputHandlerHandle);
167
168 if (pipe_in > 0) {
169 close(pipe_in);
170 pipe_in = -1;
171 }
172 if (pipe_out > 0) {
173 close(pipe_out);
174 pipe_out = -1;
175 }
176
177 removeInputHandler(&R_InputHandlers, dummyInputHandlerHandle);
178 if (dummy_pipe_in > 0) {
179 close(dummy_pipe_in);
180 dummy_pipe_in = -1;
181 }
182 if (dummy_pipe_out > 0) {
183 close(dummy_pipe_out);
184 dummy_pipe_out = -1;
185 }
186
187 initialized = 0;
188 }
189 }
190
ensureAutorunnerInitialized()191 void ensureAutorunnerInitialized() {
192 if (!initialized) {
193 buf = malloc(BUF_SIZE);
194
195 int pipes[2];
196 if (pipe(pipes)) {
197 free(buf);
198 Rf_error("Failed to create pipe");
199 return;
200 }
201 pipe_out = pipes[0];
202 pipe_in = pipes[1];
203
204 inputHandlerHandle = addInputHandler(R_InputHandlers, pipe_out, async_input_handler, LATER_ACTIVITY);
205
206 // If the R process is forked, make sure that the child process doesn't mess
207 // with the pipes. This also means that functions scheduled in the child
208 // process with `later()` will only work if `run_now()` is called. In this
209 // situation, there's also the danger that a function will be scheduled by
210 // the parent process and then will be executed in the child process (in
211 // addition to in the parent process).
212 // https://github.com/r-lib/later/issues/140
213 pthread_atfork(NULL, NULL, child_proc_after_fork);
214
215 // Need to add a dummy input handler to avoid segfault when the "real"
216 // input handler removes the subsequent input handler in the linked list.
217 // See https://github.com/rstudio/httpuv/issues/78
218 int dummy_pipes[2];
219 if (pipe(dummy_pipes)) {
220 Rf_error("Failed to create pipe");
221 return;
222 }
223 dummy_pipe_out = dummy_pipes[0];
224 dummy_pipe_in = dummy_pipes[1];
225 dummyInputHandlerHandle = addInputHandler(R_InputHandlers, dummy_pipe_out, remove_dummy_handler, LATER_DUMMY_ACTIVITY);
226
227 initialized = 1;
228 }
229 }
230
deInitialize()231 void deInitialize() {
232 ASSERT_MAIN_THREAD()
233 if (initialized) {
234 removeInputHandler(&R_InputHandlers, inputHandlerHandle);
235 if (pipe_in > 0) {
236 close(pipe_in);
237 pipe_in = -1;
238 }
239 if (pipe_out > 0) {
240 close(pipe_out);
241 pipe_out = -1;
242 }
243 initialized = 0;
244
245 // Trigger remove_dummy_handler()
246 // Store `ret` because otherwise it raises a significant warning.
247 ssize_t ret = write(dummy_pipe_in, "a", 1);
248 }
249 }
250
doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry,Rcpp::Function callback,double delaySecs,bool resetTimer)251 uint64_t doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry, Rcpp::Function callback, double delaySecs, bool resetTimer) {
252 ASSERT_MAIN_THREAD()
253 uint64_t callback_id = callbackRegistry->add(callback, delaySecs);
254
255 // The timer needs to be reset only if we're using the global loop, because
256 // this usage of the timer is relevant only when the event loop is driven by
257 // R's input handler (at the idle console), and only the global loop is by
258 // that.
259 if (resetTimer)
260 timer.set(*(callbackRegistry->nextTimestamp()));
261
262 return callback_id;
263 }
264
doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry,void (* callback)(void *),void * data,double delaySecs,bool resetTimer)265 uint64_t doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry, void (*callback)(void*), void* data, double delaySecs, bool resetTimer) {
266 uint64_t callback_id = callbackRegistry->add(callback, data, delaySecs);
267
268 if (resetTimer)
269 timer.set(*(callbackRegistry->nextTimestamp()));
270
271 return callback_id;
272 }
273
274 #endif // ifndef _WIN32
275