1 #ifndef _WIN32
2 
3 #include <Rcpp.h>
4 #include <Rinternals.h>
5 #include <R_ext/eventloop.h>
6 #include <unistd.h>
7 #include <queue>
8 
9 #include "later.h"
10 #include "callback_registry.h"
11 #include "timer_posix.h"
12 #include "threadutils.h"
13 #include "debug.h"
14 
15 using namespace Rcpp;
16 
17 #define LATER_ACTIVITY 20
18 #define LATER_DUMMY_ACTIVITY 21
19 
20 extern void* R_GlobalContext;
21 extern void* R_TopLevelContext;
22 
23 // Whether we have initialized the input handler.
24 int initialized = 0;
25 
26 // The handles to the read and write ends of a pipe. We use this pipe
27 // to signal R's input handler callback mechanism that we want to be
28 // called back.
29 int pipe_in  = -1;
30 int pipe_out = -1;
31 
32 int dummy_pipe_in  = -1;
33 int dummy_pipe_out = -1;
34 
35 // Whether the file descriptor is ready for reading, i.e., whether
36 // the input handler callback is scheduled to be called. We use this
37 // to avoid unnecessarily writing to the pipe.
38 bool hot = false;
39 // This mutex protects reading/writing of `hot` and of reading from/writing to
40 // the pipe.
41 Mutex m(tct_mtx_plain);
42 
43 // The buffer we're using for the pipe. This doesn't have to be large,
44 // in theory it only ever holds zero or one byte.
45 size_t BUF_SIZE = 256;
46 void *buf;
47 
set_fd(bool ready)48 void set_fd(bool ready) {
49   Guard g(&m);
50 
51   if (ready != hot) {
52     if (ready) {
53       ssize_t cbytes = write(pipe_in, "a", 1);
54       (void)cbytes; // squelch compiler warning
55       hot = true;
56     } else {
57       if (read(pipe_out, buf, BUF_SIZE) < 0) {
58         // TODO: This sets a warning but it doesn't get displayed until
59         // after the next R command is executed. Can we make it sooner?
60         Rf_warning("Failed to read out of pipe for later package");
61       }
62       hot = false;
63     }
64   }
65 }
66 
67 namespace {
fd_on()68 void fd_on() {
69   set_fd(true);
70 }
71 
72 Timer timer(fd_on);
73 } // namespace
74 
75 class ResetTimerOnExit {
76 public:
ResetTimerOnExit()77   ResetTimerOnExit() {
78   }
~ResetTimerOnExit()79   ~ResetTimerOnExit() {
80     ASSERT_MAIN_THREAD()
81     // Find the next event in the registry and, if there is one, set the timer.
82     Optional<Timestamp> nextEvent = getGlobalRegistry()->nextTimestamp();
83     if (nextEvent.has_value()) {
84       timer.set(*nextEvent);
85     }
86   }
87 };
88 
async_input_handler(void * data)89 static void async_input_handler(void *data) {
90   ASSERT_MAIN_THREAD()
91   set_fd(false);
92 
93   if (!at_top_level()) {
94     // It's not safe to run arbitrary callbacks when other R code
95     // is already running. Wait until we're back at the top level.
96 
97     // jcheng 2017-08-02: We can't just leave the file descriptor hot and let
98     // async_input_handler get invoked as fast as possible. Previously we did
99     // this, but on POSIX systems, it interferes with R_SocketWait.
100     // https://github.com/r-lib/later/issues/4
101     // Instead, we set the file descriptor to cold, and tell the timer to fire
102     // again in a few milliseconds. This should give enough breathing room that
103     // we don't interfere with the sockets too much.
104     timer.set(Timestamp(0.032));
105     return;
106   }
107 
108   // jcheng 2017-08-01: While callbacks are executing, make the file descriptor
109   // not-ready so that our input handler is not even called back by R.
110   // Previously we'd let the input handler run but return quickly, but this
111   // seemed to cause R_SocketWait to hang (encountered while working with the
112   // future package, trying to call value(future) with plan(multisession)).
113   ResetTimerOnExit resetTimerOnExit_scope;
114 
115   // This try-catch is meant to be similar to the BEGIN_RCPP and VOID_END_RCPP
116   // macros. They are needed for two reasons: first, if an exception occurs in
117   // any of the callbacks, destructors will still execute; and second, if an
118   // exception (including R-level error) occurs in a callback and it reaches
119   // the top level in an R input handler, R appears to be unable to handle it
120   // properly.
121   // https://github.com/r-lib/later/issues/12
122   // https://github.com/RcppCore/Rcpp/issues/753
123   // https://github.com/r-lib/later/issues/31
124   try {
125     execCallbacksForTopLevel();
126   }
127   catch(Rcpp::internal::InterruptedException &e) {
128     DEBUG_LOG("async_input_handler: caught Rcpp::internal::InterruptedException", LOG_INFO);
129     REprintf("later: interrupt occurred while executing callback.\n");
130   }
131   catch(std::exception& e){
132     DEBUG_LOG("async_input_handler: caught exception", LOG_INFO);
133     std::string msg = "later: exception occurred while executing callback: \n";
134     msg += e.what();
135     msg += "\n";
136     REprintf(msg.c_str());
137   }
138   catch( ... ){
139     REprintf("later: c++ exception (unknown reason) occurred while executing callback.\n");
140   }
141 }
142 
143 InputHandler* inputHandlerHandle;
144 InputHandler* dummyInputHandlerHandle;
145 
146 // If the real input handler has been removed, the dummy input handler removes
147 // itself. The real input handler cannot remove both; otherwise a segfault
148 // could occur.
remove_dummy_handler(void * data)149 static void remove_dummy_handler(void *data) {
150   ASSERT_MAIN_THREAD()
151   removeInputHandler(&R_InputHandlers, dummyInputHandlerHandle);
152   if (dummy_pipe_in  > 0) {
153     close(dummy_pipe_in);
154     dummy_pipe_in  = -1;
155   }
156   if (dummy_pipe_out > 0) {
157     close(dummy_pipe_out);
158     dummy_pipe_out = -1;
159   }
160 }
161 
162 // Callback to run in child process after forking.
child_proc_after_fork()163 void child_proc_after_fork() {
164   ASSERT_MAIN_THREAD()
165   if (initialized) {
166     removeInputHandler(&R_InputHandlers, inputHandlerHandle);
167 
168     if (pipe_in  > 0) {
169       close(pipe_in);
170       pipe_in = -1;
171     }
172     if (pipe_out > 0) {
173       close(pipe_out);
174       pipe_out = -1;
175     }
176 
177     removeInputHandler(&R_InputHandlers, dummyInputHandlerHandle);
178     if (dummy_pipe_in  > 0) {
179       close(dummy_pipe_in);
180       dummy_pipe_in  = -1;
181     }
182     if (dummy_pipe_out > 0) {
183       close(dummy_pipe_out);
184       dummy_pipe_out = -1;
185     }
186 
187     initialized = 0;
188   }
189 }
190 
ensureAutorunnerInitialized()191 void ensureAutorunnerInitialized() {
192   if (!initialized) {
193     buf = malloc(BUF_SIZE);
194 
195     int pipes[2];
196     if (pipe(pipes)) {
197       free(buf);
198       Rf_error("Failed to create pipe");
199       return;
200     }
201     pipe_out = pipes[0];
202     pipe_in = pipes[1];
203 
204     inputHandlerHandle = addInputHandler(R_InputHandlers, pipe_out, async_input_handler, LATER_ACTIVITY);
205 
206    // If the R process is forked, make sure that the child process doesn't mess
207    // with the pipes. This also means that functions scheduled in the child
208    // process with `later()` will only work if `run_now()` is called. In this
209    // situation, there's also the danger that a function will be scheduled by
210    // the parent process and then will be executed in the child process (in
211    // addition to in the parent process).
212    // https://github.com/r-lib/later/issues/140
213    pthread_atfork(NULL, NULL, child_proc_after_fork);
214 
215     // Need to add a dummy input handler to avoid segfault when the "real"
216     // input handler removes the subsequent input handler in the linked list.
217     // See https://github.com/rstudio/httpuv/issues/78
218     int dummy_pipes[2];
219     if (pipe(dummy_pipes)) {
220       Rf_error("Failed to create pipe");
221       return;
222     }
223     dummy_pipe_out = dummy_pipes[0];
224     dummy_pipe_in  = dummy_pipes[1];
225     dummyInputHandlerHandle = addInputHandler(R_InputHandlers, dummy_pipe_out, remove_dummy_handler, LATER_DUMMY_ACTIVITY);
226 
227     initialized = 1;
228   }
229 }
230 
deInitialize()231 void deInitialize() {
232   ASSERT_MAIN_THREAD()
233   if (initialized) {
234     removeInputHandler(&R_InputHandlers, inputHandlerHandle);
235     if (pipe_in  > 0) {
236       close(pipe_in);
237       pipe_in = -1;
238     }
239     if (pipe_out > 0) {
240       close(pipe_out);
241       pipe_out = -1;
242     }
243     initialized = 0;
244 
245     // Trigger remove_dummy_handler()
246     // Store `ret` because otherwise it raises a significant warning.
247     ssize_t ret = write(dummy_pipe_in, "a", 1);
248   }
249 }
250 
doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry,Rcpp::Function callback,double delaySecs,bool resetTimer)251 uint64_t doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry, Rcpp::Function callback, double delaySecs, bool resetTimer) {
252   ASSERT_MAIN_THREAD()
253   uint64_t callback_id = callbackRegistry->add(callback, delaySecs);
254 
255   // The timer needs to be reset only if we're using the global loop, because
256   // this usage of the timer is relevant only when the event loop is driven by
257   // R's input handler (at the idle console), and only the global loop is by
258   // that.
259   if (resetTimer)
260     timer.set(*(callbackRegistry->nextTimestamp()));
261 
262   return callback_id;
263 }
264 
doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry,void (* callback)(void *),void * data,double delaySecs,bool resetTimer)265 uint64_t doExecLater(std::shared_ptr<CallbackRegistry> callbackRegistry, void (*callback)(void*), void* data, double delaySecs, bool resetTimer) {
266   uint64_t callback_id = callbackRegistry->add(callback, data, delaySecs);
267 
268   if (resetTimer)
269     timer.set(*(callbackRegistry->nextTimestamp()));
270 
271   return callback_id;
272 }
273 
274 #endif // ifndef _WIN32
275