1
2#' External R Session
3#'
4#' @description
5#' A permanent R session that runs in the background. This is an R6 class
6#' that extends the [processx::process] class.
7#'
8#' The process is started at the creation of the object, and then it can
9#' be used to evaluate R function calls, one at a time.
10#'
11#' @param func Function object to call in the background R process.
12#'   Please read the notes for the similar argument of [r()].
13#' @param args Arguments to pass to the function. Must be a list.
14#' @param package Whether to keep the environment of `func` when passing
15#'   it to the other package. Possible values are:
16#'   * `FALSE`: reset the environment to `.GlobalEnv`. This is the default.
17#'   * `TRUE`: keep the environment as is.
18#'   * `pkg`: set the environment to the `pkg` package namespace.
19#'
20#' @examplesIf FALSE
21#' rs <- r_ression$new()
22#'
23#' rs$run(function() 1 + 2)
24#'
25#' rs$call(function() Sys.sleep(1))
26#' rs$get_state()
27#'
28#' rs$poll_process(-1)
29#' rs$get_state()
30#' rs$read()
31#' @export
32
33r_session <- R6::R6Class(
34  "r_session",
35  inherit = processx::process,
36
37  public = list(
38
39    #' @field status
40    #' Status codes returned by `read()`.
41    status = list(
42      DONE        = 200L,
43      STARTED     = 201L,
44      ATTACH_DONE = 202L,
45      MSG         = 301L,
46      EXITED      = 500L,
47      CRASHED     = 501L,
48      CLOSED      = 502L
49    ),
50
51    #' @description
52    #' creates a new R background process. It can wait for the process to
53    #' start up (`wait = TRUE`), or return immediately, i.e. before
54    #' the process is actually ready to run. In the latter case you may call
55    #' the `poll_process()` method to make sure it is ready.
56    #'
57    #' @param options A list of options created via [r_session_options()].
58    #' @param wait Whether to wait for the R process to start and be ready
59    #'   for running commands.
60    #' @param wait_timeout Timeout for waiting for the R process to start,
61    #'   in milliseconds.
62    #' @return An `r_session` object.
63    initialize = function(options = r_session_options(), wait = TRUE,
64                          wait_timeout = 3000)
65      rs_init(self, private, super, options, wait, wait_timeout),
66
67    #' @description
68    #' Similar to [r()], but runs the function in a permanent background
69    #' R session. It throws an error if the function call generated an
70    #' error in the child process.
71    #' @return The return value of the R expression.
72    run = function(func, args = list(), package = FALSE)
73      rs_run(self, private, func, args, package),
74
75    #' @description
76    #' Similar to `$run()`, but returns the standard output and error of
77    #' the child process as well. It does not throw on errors, but
78    #' returns a non-`NULL` `error` member in the result list.
79    #'
80    #' @return A list with the following entries.
81    #' * `result`: The value returned by `func`. On error this is `NULL`.
82    #' * `stdout`: The standard output of the process while evaluating
83    #    the `func` call,
84    #' * `stderr`: The standard error of the process while evaluating
85    #'   the `func` call.
86    #' * `error`: On error it contains an error object, that contains the
87    #'   error thrown in the subprocess. Otherwise it is `NULL`.
88    #' * `code`, `message`: These fields are used by call internally and
89    #'   you can ignore them.
90    run_with_output = function(func, args = list(), package = FALSE)
91      rs_run_with_output(self, private, func, args, package),
92
93    #' @description
94    #' Starts running a function in the background R session, and
95    #' returns immediately. To check if the function is done, call the
96    #' `poll_process()` method.
97    call = function(func, args = list(), package = FALSE)
98      rs_call(self, private, func, args, package),
99
100    #' @description
101    #' Poll the R session with a timeout. If the session has finished the
102    #' computation, it returns with `"ready"`. If the timeout
103    #' is reached, it returns with `"timeout"`.
104    #'
105    #' @param timeout Timeout period in milliseconds.
106    #' @return Character string `"ready"` or `"timeout"`.
107    poll_process = function(timeout)
108      rs_poll_process(self, private, timeout),
109
110    #' @description
111    #' Return the state of the R session.
112    #'
113    #' @return Possible values:
114    #'   * `"starting"`: starting up,
115    #'   * `"idle"`: ready to compute,
116    #'   * `"busy"`: computing right now,
117    #'   * `"finished"`: the R process has finished.
118    get_state = function()
119      rs_get_state(self, private),
120
121    #' @description
122    #' Returns the elapsed time since the R process has started, and the
123    #' elapsed time since the current computation has started. The latter
124    #' is `NA` if there is no active computation.
125    #' @return Named vector of `POSIXct` objects. The names are `"total"`
126    #'   and `"current"`.
127    get_running_time = function()
128      rs_get_running_time(self, private),
129
130    #' @description
131    #' Reads an event from the child process, if there is one available.
132    #' Events might signal that the function call has finished, or they
133    #' can be progress report events.
134    #'
135    #' This is a low level function that you only need to use if you
136    #' want to process events (messages) from the R session manually.
137    #'
138    #' @return `NULL` if no events are available. Otherwise a named list,
139    #'   which is also a `callr_session_result` object. The list always has
140    #'   a `code` entry which is the type of the event. See also
141    #'   `r_session$public_fields$status` for symbolic names of the
142    #'   event types.
143    #'   * `200`: (`DONE`) The computation is done, and the event includes
144    #'      the result, in the same form as for the `run()` method.
145    #'   * `201`: (`STARTED`) An R session that was in 'starting' state is
146    #'      ready to go.
147    #'   * `202`: (`ATTACH_DONE`) Used by the `attach()` method.
148    #'   * `301`: (`MSG`) A message from the subprocess. The message is a
149    #'      condition object with class `callr_message`. (It typically has
150    #'      other classes, e.g. `cli_message` for output from the cli
151    #'      package.)
152    #'   * `500`: (`EXITED`) The R session finished cleanly. This means
153    #'      that the evaluated expression quit R.
154    #'   * `501`: (`CRASHED`) The R session crashed or was killed.
155    #'   * `502`: (`CLOSED`) The R session closed its end of the connection
156    #'      that callr uses for communication.
157
158    read = function()
159      rs_read(self, private),
160
161    #' @description
162    #' Terminate the current computation and the R process.
163    #' The session object will be in `"finished"` state after this.
164    #' @param grace Grace period in milliseconds, to wait for the
165    #' subprocess to exit cleanly, after its standard input is closed.
166    #' If the process is still running after this period, it will be
167    #' killed.
168    close = function(grace = 1000)
169      rs_close(self, private, grace),
170
171    #' @description
172    #' The `traceback()` method can be used after an error in the R
173    #' subprocess. It is equivalent to the [base::traceback()] call, in
174    #' the subprocess.
175    #' @return The same output as from [base::traceback()]
176    traceback = function()
177      rs_traceback(self, private),
178
179    #' @description
180    #' Interactive debugger to inspect the dumped frames in the subprocess,
181    #' after an error. See more at [r_session_debug].
182    debug = function()
183      rs_debug(self, private),
184
185    #' @description Experimental function that provides a REPL
186    #' (Read-Eval-Print-Loop) to the subprocess.
187    attach = function()
188      rs_attach(self, private),
189
190    #' @description
191    #' Finalizer that is called when garbage collecting an `r_session`
192    #' object, to clean up temporary files.
193    finalize = function() {
194      unlink(private$tmp_output_file)
195      unlink(private$tmp_error_file)
196      unlink(private$options$tmp_files, recursive = TRUE)
197      if ("finalize" %in% ls(super)) super$finalize()
198    },
199
200    #' @description
201    #' Print method for an `r_session`.
202    #' @param ... Arguments are not used currently.
203    print = function(...) {
204      cat(
205        sep = "",
206        "R SESSION, ",
207        if (self$is_alive()) {
208          paste0("alive, ", self$get_state(), ", ")
209        } else {
210          "finished, "
211        },
212        "pid ", self$get_pid(), ".\n")
213      invisible(self)
214    }
215  ),
216
217  private = list(
218    options = NULL,
219    state = NULL,
220    started_at = NULL,
221    fun_started_at = as.POSIXct(NA),
222    pipe = NULL,
223
224    tmp_output_file = NULL,
225    tmp_error_file = NULL,
226
227    func_file = NULL,
228    res_file = NULL,
229
230    buffer = NULL,
231    read_buffer = function()
232      rs__read_buffer(self, private),
233    read_message = function()
234      rs__read_message(self, private),
235
236    get_result_and_output = function(std = FALSE)
237      rs__get_result_and_output(self, private, std),
238    report_back = function(code, text = "")
239      rs__report_back(self, private, code, text),
240    write_for_sure = function(text)
241      rs__write_for_sure(self, private, text),
242    parse_msg = function(msg)
243      rs__parse_msg(self, private, msg),
244    attach_wait = function()
245      rs__attach_wait(self, private)
246  )
247)
248
249rs_init <- function(self, private, super, options, wait, wait_timeout) {
250
251  options$func <- options$func %||% function() { }
252  options$args <- list()
253  options$load_hook <- session_load_hook(options$load_hook)
254
255  options <- convert_and_check_my_args(options)
256  options <- setup_context(options)
257  options <- setup_r_binary_and_args(options, script_file = FALSE)
258
259  private$options <- options
260
261  prepare_client_files()
262  with_envvar(
263    options$env,
264    do.call(super$initialize, c(list(options$bin, options$real_cmdargs,
265      stdin = "|", stdout = "|", stderr = "|", poll_connection = TRUE),
266      options$extra))
267  )
268
269  ## Make child report back when ready
270  private$report_back(201, "ready to go")
271
272  private$pipe <- self$get_poll_connection()
273
274  private$started_at <- Sys.time()
275  private$state <- "starting"
276
277  if (wait) {
278    timeout <- wait_timeout
279    have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs")
280    pr <- self$poll_io(timeout)
281    out <- ""
282    err <- ""
283    while (any(pr == "ready")) {
284      if (pr["output"] == "ready") out <- paste0(out, self$read_output())
285      if (pr["error"] == "ready") err <- paste0(err, self$read_error())
286      if (pr["process"] == "ready") break
287      timeout <- as.double(have_until - Sys.time(), units = "secs") * 1000
288      pr <- self$poll_io(as.integer(timeout))
289    }
290
291    if (pr["process"] == "ready") {
292      msg <- self$read()
293      out <- paste0(out, msg$stdout)
294      err <- paste0(err, msg$stderr)
295      if (msg$code != 201) {
296        data <- list(
297          status = self$get_exit_status(),
298          stdout = out,
299          stderr = err,
300          timeout = FALSE
301        )
302        throw(new_callr_error(data, "Failed to start R session"))
303      }
304    } else if (pr["process"] != "ready") {
305      cat("stdout:]\n", out, "\n")
306      cat("stderr:]\n", err, "\n")
307      throw(new_error("Could not start R session, timed out"))
308    }
309  }
310
311  invisible(self)
312}
313
314rs_read <- function(self, private) {
315  if (!is.null(private$buffer)) {
316    # There is a partial message in the buffer, try to finish it.
317    out <- private$read_buffer()
318  } else {
319    # A new message.
320    out <- private$read_message()
321  }
322  if (!length(out)) {
323    if (processx::processx_conn_is_incomplete(private$pipe)) return()
324    if (self$is_alive()) {
325      # We do this in on.exit(), because parse_msg still reads the streams
326      on.exit(self$kill(), add = TRUE)
327      out <- list(header = list(
328        code = 502, length = 0,
329        rest = "R session closed the process connection, killed"
330      ))
331    } else if (identical(es <- self$get_exit_status(), 0L)) {
332      out <- list(header = list(
333        code = 500, length = 0,
334        rest = "R session finished cleanly"
335      ))
336    } else {
337      out <- list(header = list(
338        code = 501, length = 0,
339        rest = paste0("R session crashed with exit code ", es)
340      ))
341    }
342  }
343  if (length(out)) private$parse_msg(out)
344}
345
346rs__read_buffer <- function(self, private) {
347  # There is a partial message in the buffer already, we need to
348  # read some more
349  need <- private$buffer$header$length - private$buffer$got
350  chunk <- processx::processx_conn_read_chars(private$pipe, need)
351  got <- nchar(chunk)
352  if (got == 0) {
353    # make this special case fast
354    NULL
355  } else if (got == need) {
356    msg <- list(
357      header = private$buffer$header,
358      body = paste(c(private$buffer$chunks, list(chunk)), collapse = "")
359    )
360    private$buffer <- NULL
361    msg
362  } else {
363    private$buffer$got <- private$buffer$got + got
364    private$buffer$chunks <- c(private$buffer$chunks, list(chunk))
365    NULL
366  }
367}
368
369rs__read_message <- function(self, private) {
370  # A new message, we can surely read the first line
371  out <- processx::processx_conn_read_lines(private$pipe, 1)
372  if (length(out) == 0) return(NULL)
373
374  header <- rs__parse_header(out)
375  body <- ""
376  if (header$length > 0) {
377    body <- processx::processx_conn_read_chars(
378      private$pipe,
379      header$length
380    )
381  }
382  got <- nchar(body)
383  if (got < header$length) {
384    # Partial message
385    private$buffer <- list(
386      header = header,
387      got = got,
388      chunks = list(body)
389    )
390    NULL
391  } else {
392    list(header = header, body = body)
393  }
394}
395
396rs__parse_header <- function(line) {
397  parts <- strsplit(line, " ", fixed = TRUE)[[1]]
398  parts2 <- suppressWarnings(as.integer(parts[1:2]))
399  rest <- paste(parts[-(1:2)], collapse = " ")
400  header <- list(code = parts2[1], length = parts2[2], rest = rest)
401  if (is.na(header$code) || is.na(header$length)) {
402    stop("Internal callr error, invalid message header")
403  }
404  header
405}
406
407rs_close <- function(self, private, grace) {
408  processx::processx_conn_close(self$get_input_connection())
409  self$poll_process(grace)
410  self$kill()
411  self$wait(1000)
412  if (self$is_alive()) throw(new_error("Could not kill background R session"))
413  private$state <- "finished"
414  private$fun_started_at <- as.POSIXct(NA)
415  processx::processx_conn_close(private$pipe)
416  processx::processx_conn_close(self$get_output_connection())
417  processx::processx_conn_close(self$get_error_connection())
418}
419
420rs_call <- function(self, private, func, args, package) {
421
422  ## We only allow a new command if the R session is idle.
423  ## This allows keeping a clean state
424  ## TODO: do we need a state at all?
425  if (private$state == "starting") throw(new_error("R session not ready yet"))
426  if (private$state == "finished") throw(new_error("R session finished"))
427  if (private$state == "busy") throw(new_error("R session busy"))
428
429  ## Save the function in a file
430  private$options$func <- func
431  private$options$args <- args
432  private$options$package <- package
433  private$options$func_file <- save_function_to_temp(private$options)
434  private$options$result_file <- tempfile("callr-rs-result-")
435  private$options$tmp_files <-
436    c(private$options$tmp_files, private$options$func_file,
437      private$options$result_file)
438
439  ## Maybe we need to redirect stdout / stderr
440  re_stdout <- if (is.null(private$options$stdout)) {
441    private$tmp_output_file <- tempfile("callr-rs-stdout-")
442  }
443  re_stderr <- if (is.null(private$options$stderr)) {
444    private$tmp_error_file <- tempfile("callr-rs-stderr-")
445  }
446
447  pre <- rs__prehook(re_stdout, re_stderr)
448  post <- rs__posthook(re_stdout, re_stderr)
449
450  ## Run an expr that loads it, in the child process, with error handlers
451  expr <- make_vanilla_script_expr(private$options$func_file,
452                                   private$options$result_file,
453                                   private$options$error,
454                                   pre_hook = pre, post_hook = post,
455                                   messages = TRUE)
456  cmd <- paste0(deparse(expr), "\n")
457
458  ## Write this to stdin
459  private$write_for_sure(cmd)
460  private$fun_started_at <- Sys.time()
461
462  ## Report back when done
463  report_str <- paste0("done ", basename(private$options$result_file))
464  private$report_back(200, report_str)
465
466  private$state <- "busy"
467
468
469}
470
471rs_run_with_output <- function(self, private, func, args, package) {
472  self$call(func, args, package)
473
474  go <- TRUE
475  res <- NULL
476
477  while (go) {
478    ## TODO: why is this in a tryCatch?
479    res <- tryCatch(
480      { processx::poll(list(private$pipe), -1)
481        msg <- self$read()
482        if (is.null(msg)) next
483        if (msg$code == 200 || (msg$code >= 500 && msg$code < 600)) {
484          return(msg)
485        }
486        if (msg$code == 301) {
487          rs__handle_condition(msg$message)
488        }
489      },
490      interrupt = function(e) {
491        self$interrupt()
492        ## The R process will catch the interrupt, and then save the
493        ## error object to a file, but this might still take some time,
494        ## so we need to poll here. If the bg process ignores
495        ## interrupts, then we kill it.
496        ps <- processx::poll(list(private$pipe), 1000)[[1]]
497        if (ps == "timeout") {
498          self$kill()
499        } else {
500          res <<- self$read()
501          go <<- FALSE
502        }
503        iconn <- structure(
504          list(message = "Interrupted"),
505          class = c("interrupt", "condition"))
506        signalCondition(iconn)
507        cat("\n")
508        invokeRestart("abort")
509    })
510  }
511  res
512}
513
514rs_run <- function(self, private, func, args, package) {
515  res <- rs_run_with_output(self, private, func, args, package)
516  if (is.null(res$error)) {
517    res$result
518  } else{
519    res$stdout <- paste0(res$stdout, self$read_output())
520    res$stderr <- paste0(res$stderr, self$read_error())
521    throw(res$error)
522  }
523}
524
525rs_get_state <- function(self, private) {
526  private$state
527}
528
529rs_get_running_time <- function(self, private) {
530  now <- Sys.time()
531  finished <- private$state == "finished"
532  c(total = if (finished) now - private$started_at else as.POSIXct(NA),
533    current = now - private$fun_started_at)
534}
535
536rs_poll_process <- function(self, private, timeout) {
537  processx::poll(list(self$get_poll_connection()), timeout)[[1]]
538}
539
540rs_traceback <- function(self, private) {
541  ## TODO: get rid of magic number 12
542  traceback(utils::head(self$run(function() {
543    traceback(as.environment("tools:callr")$`__callr_data__`$.Traceback, 10)
544  }), -12))
545}
546
547rs_debug <- function(self, private) {
548  hasdump <- self$run(function() {
549    ! is.null(as.environment("tools:callr")$`__callr_data__`$.Last.dump)
550  })
551  if (!hasdump) stop("Can't find dumped frames, nothing to debug")
552
553  help <- function() {
554    cat("Debugging in process ", self$get_pid(),
555        ", press CTRL+C (ESC) to quit. Commands:\n", sep = "")
556    cat("  .where       -- print stack trace\n",
557        "  .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv\n",
558        "  .help        -- print this message\n",
559        "  <cmd>        -- run <cmd> in frame or .GlobalEnv\n\n", sep = "")
560  }
561
562  translate_cmd <- function(cmd) {
563    if (cmd == ".where") {
564      traceback(tb)
565      if (frame) cat("Inspecting frame", frame, "\n")
566      NULL
567
568    } else if (cmd == ".help") {
569      help()
570      NULL
571
572    } else if (grepl("^.inspect ", cmd)) {
573      newframe <- as.integer(strsplit(cmd, " ")[[1]][[2]])
574      if (is.na(newframe)) {
575        message("Cannot parse frame number")
576      } else {
577        frame <<- newframe
578      }
579      NULL
580
581    } else {
582      cmd
583    }
584  }
585
586  help()
587  tb <- self$traceback()
588  frame <- 0L
589
590  while (TRUE) {
591    cat("\n")
592    prompt <- paste0(
593      "RS ", self$get_pid(),
594      if (frame) paste0(" (frame ", frame, ")"), " > ")
595    cmd <- rs__attach_get_input(prompt)
596    cmd2 <- translate_cmd(cmd)
597    if (is.null(cmd2)) next
598
599    update_history(cmd)
600
601    ret <- self$run_with_output(function(cmd, frame) {
602      dump <- as.environment("tools:callr")$`__callr_data__`$.Last.dump
603      envir <- if (!frame) .GlobalEnv else dump[[frame + 12L]]
604      eval(parse(text = cmd), envir = envir)
605    }, list(cmd = cmd, frame = frame))
606    cat(ret$stdout)
607    cat(ret$stderr)
608    if (!is.null(ret$error)) print(ret$error)
609    print(ret$result)
610  }
611  invisible()
612}
613
614rs_attach <- function(self, private) {
615  out <- self$get_output_connection()
616  err <- self$get_error_connection()
617  while (nchar(x <- processx::processx_conn_read_chars(out))) cat(x)
618  while (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x))
619  tryCatch({
620    while (TRUE) {
621      cmd <- rs__attach_get_input(paste0("RS ", self$get_pid(), " > "))
622      update_history(cmd)
623      private$write_for_sure(paste0(cmd, "\n"))
624      private$report_back(202, "done")
625      private$attach_wait()
626    } },
627    interrupt = function(e) { self$interrupt(); invisible() }
628  )
629}
630
631## Internal functions ----------------------------------------------------
632
633rs__attach_get_input <- function(prompt) {
634  cmd <- readline(prompt = prompt)
635  while (! is_complete_expression(cmd)) {
636    cmd <- paste0(cmd, sep = "\n", readline(prompt = "+ "))
637  }
638  cmd
639}
640
641rs__attach_wait <- function(self, private) {
642  out <- self$get_output_connection()
643  err <- self$get_error_connection()
644  pro <- private$pipe
645  while (TRUE) {
646    pr <- processx::poll(list(out, err, pro), -1)
647    if (pr[[1]] == "ready") {
648      if (nchar(x <- processx::processx_conn_read_chars(out))) cat(x)
649    }
650    if (pr[[2]] == "ready") {
651      if (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x))
652    }
653    if (pr[[3]] == "ready") {
654      msg <- self$read()
655      if (msg$code == 202) break;
656    }
657  }
658}
659
660rs__report_back <- function(self, private, code, text) {
661  cmd <- paste0(
662    deparse(rs__status_expr(code, text, fd = 3)),
663    "\n"
664  )
665  private$write_for_sure(cmd)
666}
667
668rs__write_for_sure <- function(self, private, text) {
669  while (1) {
670    text <- self$write_input(text)
671    if (!length(text)) break;
672    Sys.sleep(.1)
673  }
674}
675
676rs__parse_msg <- function(self, private, msg) {
677  code <- as.character(msg$header$code)
678  message <- msg$body
679  if (length(message) && substr(message, 1, 8) == "base64::") {
680    message <- substr(message, 9, nchar(message))
681    message <- unserialize(processx::base64_decode(message))
682  } else {
683    message <- msg$header$rest
684  }
685
686  if (! code %in% names(rs__parse_msg_funcs)) {
687    throw(new_error("Unknown message code: `", code, "`"))
688  }
689  structure(
690    rs__parse_msg_funcs[[code]](self, private, msg$header$code, message),
691    class = "callr_session_result")
692}
693
694rs__parse_msg_funcs <- list()
695rs__parse_msg_funcs[["200"]] <- function(self, private, code, message) {
696  if (private$state != "busy") {
697    throw(new_error("Got `done` message when session is not busy"))
698  }
699  private$state <- "idle"
700
701  res <- private$get_result_and_output()
702  c(list(code = code, message = message), res)
703}
704
705rs__parse_msg_funcs[["201"]] <- function(self, private, code, message) {
706  if (private$state != "starting") {
707    throw(new_error("Session already started, invalid `starting` message"))
708  }
709  private$state <- "idle"
710  list(code = code, message = message)
711}
712
713rs__parse_msg_funcs[["202"]] <- function(self, private, code, message) {
714  private$state <- "idle"
715  list(code = code, message = message)
716}
717
718rs__parse_msg_funcs[["301"]] <- function(self, private, code, message) {
719  ## TODO: progress bar update, what to do here?
720  list(code = code, message = message)
721}
722
723rs__parse_msg_funcs[["500"]] <- function(self, private, code, message) {
724  private$state <- "finished"
725  res <- private$get_result_and_output(std = TRUE)
726  c(list(code = code, message = message), res)
727}
728
729rs__parse_msg_funcs[["501"]] <- function(self, private, code, message) {
730  private$state <- "finished"
731  err <- structure(
732    list(message = message),
733    class = c("error", "condition"))
734  res <- private$get_result_and_output(std = TRUE)
735  res$error <- err
736  c(list(code = code, message = message), res)
737}
738
739rs__parse_msg_funcs[["502"]] <- rs__parse_msg_funcs[["501"]]
740
741rs__status_expr <- function(code, text = "", fd = 3L) {
742  substitute(
743    local({
744      pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib
745      code_ <- code; fd_ <- fd; text_ <- text
746      data <- paste0(code_, " 0 ", text_, "\n")
747      pxlib$write_fd(as.integer(fd), data)
748    }),
749    list(code = code, fd = fd, text = text)
750  )
751}
752
753rs__prehook <- function(stdout, stderr) {
754  oexpr <- if (!is.null(stdout)) substitute({
755    assign(
756      ".__stdout__",
757      as.environment("tools:callr")$`__callr_data__`$pxlib$
758                                   set_stdout_file(`__fn__`),
759      envir = as.environment("tools:callr")$`__callr_data__`)
760  }, list(`__fn__` = stdout))
761  eexpr <- if (!is.null(stderr)) substitute({
762    assign(
763      ".__stderr__",
764      as.environment("tools:callr")$`__callr_data__`$pxlib$
765                                   set_stderr_file(`__fn__`),
766      envir = as.environment("tools:callr")$`__callr_data__`)
767  }, list(`__fn__` = stderr))
768
769  substitute({ o; e }, list(o = oexpr, e = eexpr))
770}
771
772rs__posthook <- function(stdout, stderr) {
773  oexpr <- if (!is.null(stdout)) substitute({
774    as.environment("tools:callr")$`__callr_data__`$
775      pxlib$set_stdout(as.environment("tools:callr")$`__callr_data__`$
776      .__stdout__)
777  })
778  eexpr <- if (!is.null(stderr)) substitute({
779    as.environment("tools:callr")$`__callr_data__`$
780      pxlib$set_stderr(as.environment("tools:callr")$`__callr_data__`$
781      .__stderr__)
782  })
783
784  substitute({ o; e }, list(o = oexpr, e = eexpr))
785}
786
787rs__get_result_and_output <- function(self, private, std) {
788
789  ## Get stdout and stderr
790  stdout <- if (!is.null(private$tmp_output_file) &&
791             file.exists(private$tmp_output_file)) {
792    tryCatch(suppressWarnings(read_all(private$tmp_output_file)),
793             error = function(e) "")
794  } else if (std && self$has_output_connection()) {
795    tryCatch(self$read_all_output(), error = function(err) NULL)
796  }
797  stderr <- if (!is.null(private$tmp_error_file) &&
798             file.exists(private$tmp_error_file)) {
799    tryCatch(suppressWarnings(read_all(private$tmp_error_file)),
800             error = function(e) "")
801  } else if (std && self$has_error_connection()) {
802    tryCatch(self$read_all_error(), error = function(err) NULL)
803  }
804  unlink(c(private$tmp_output_file, private$tmp_error_file))
805  private$tmp_output_file <- private$tmp_error_file <- NULL
806
807  ## Get result or error from RDS
808  outp <- list(
809    status = 0,
810    stdout = stdout %||% "",
811    stderr = stderr %||% "",
812    timeout = FALSE
813  )
814  res <- err <- NULL
815  tryCatch(
816    res <- get_result(outp, private$options),
817    error = function(e) err <<- e,
818    interrupt = function(e) err <<- e
819  )
820  unlink(private$options$tmp_files, recursive = TRUE)
821  private$options$tmp_files <- NULL
822
823  ## Assemble result
824  list(result = res, stdout = stdout, stderr = stderr, error = err)
825}
826
827rs__handle_condition <- function(cond) {
828
829  default_handler <- function(x) {
830    classes <- class(x)
831    for (cl in classes) {
832      opt <- paste0("callr.condition_handler_", cl)
833      if (!is.null(val <- getOption(opt)) && is.function(val)) {
834        val(x)
835        break
836      }
837    }
838  }
839
840  if (is.list(cond) && is.null(cond$muffle)) {
841    cond$muffle <- "callr_r_session_muffle"
842  }
843  withRestarts({
844    signalCondition(cond)
845    default_handler(cond)
846  }, callr_r_session_muffle = function() NULL)
847
848  invisible()
849}
850
851## Helper functions ------------------------------------------------------
852
853#' Create options for an [r_session] object
854#'
855#' @param ... Options to override, named arguments.
856#' @return Named list of options.
857#'
858#' The current options are:
859#' * `libpath`: Library path for the subprocess. By default the same as the
860#'   _current_ library path. I.e. _not_ necessarily the library path of
861#'   a fresh R session.)
862#' * `repos`: `repos` option for the subprocess. By default the current
863#'   value of the main process.
864#' * `stdout`: Standard output of the sub-process. This can be `NULL` or
865#'   a pipe: `"|"`. If it is a pipe then the output of the subprocess is
866#'   not included in the responses, but you need to poll and read it
867#'   manually. This is for exports.
868#' * `stderr`: Similar to `stdout`, but for the standard error.
869#' * `error`: See 'Error handling' in [r()].
870#' * `cmdargs`: See the same argument of [r()]. (Its default might be
871#'   different, though.)
872#' * `system_profile`: See the same argument of [r()].
873#' * `user_profile`: See the same argument of [r()].
874#' * `env`: See the same argument of [r()].
875#' * `load_hook`: `NULL`, or code (quoted) to run in the sub-process
876#'   at start up. (I.e. not for every single `run()` call.)
877#' * `extra`: List of extra arguments to pass to [processx::process].
878#'
879#' Call `r_session_options()` to see the default values.
880#' `r_session_options()` might contain undocumented entries, you cannot
881#' change these.
882#'
883#' @export
884#' @examples
885#' r_session_options()
886
887r_session_options <- function(...) {
888  update_options(r_session_options_default(), ...)
889}
890
891r_session_options_default <- function() {
892  list(
893    func = NULL,
894    args = NULL,
895    libpath = .libPaths(),
896    repos = default_repos(),
897    stdout = NULL,
898    stderr = NULL,
899    error = getOption("callr.error", "error"),
900    cmdargs = c(
901      if (os_platform() != "windows") "--no-readline",
902      "--slave",
903      "--no-save",
904      "--no-restore"
905    ),
906    system_profile = FALSE,
907    user_profile = "project",
908    env = c(TERM = "dumb"),
909    supervise = FALSE,
910    load_hook = NULL,
911    extra = list(),
912    arch = "same"
913  )
914}
915
916#' Interactive debugging of persistent R sessions
917#'
918#' The `r_session$debug()` method is an interactive debugger to inspect
919#' the stack of the background process after an error.
920#'
921#' `$debug()` starts a REPL (Read-Eval-Print-Loop), that evaluates R
922#' expressions in the subprocess. It is similar to [browser()] and
923#' [debugger()] and also has some extra commands:
924#'
925#' * `.help` prints a short help message.
926#' * `.where` prints the complete stack trace of the error. (The same as
927#'   the `$traceback()` method.
928#' * `.inspect <n>` switches the "focus" to frame `<n>`. Frame 0 is the
929#'   global environment, so `.inspect 0` will switch back to that.
930#'
931#' To exit the debugger, press the usual interrupt key, i.e. `CTRL+c` or
932#' `ESC` in some GUIs.
933#'
934#' Here is an example session that uses `$debug()` (some output is omitted
935#' for brevity):
936#'
937#' ```
938#' # ----------------------------------------------------------------------
939#' > rs <- r_session$new()
940#' > rs$run(function() knitr::knit("no-such-file"))
941#' Error in rs_run(self, private, func, args) :
942#'  callr subprocess failed: cannot open the connection
943#'
944#' > rs$debug()
945#' Debugging in process 87361, press CTRL+C (ESC) to quit. Commands:
946#'   .where       -- print stack trace
947#'   .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv
948#'   .help        -- print this message
949#'   <cmd>        -- run <cmd> in frame or .GlobalEnv
950#'
951#' 3: file(con, "r")
952#' 2: readLines(input2, encoding = "UTF-8", warn = FALSE)
953#' 1: knitr::knit("no-such-file") at #1
954#'
955#' RS 87361 > .inspect 1
956#'
957#' RS 87361 (frame 1) > ls()
958#'  [1] "encoding"  "envir"     "ext"       "in.file"   "input"     "input.dir"
959#'  [7] "input2"    "ocode"     "oconc"     "oenvir"    "oopts"     "optc"
960#' [13] "optk"      "otangle"   "out.purl"  "output"    "quiet"     "tangle"
961#' [19] "text"
962#'
963#' RS 87361 (frame 1) > input
964#' [1] "no-such-file"
965#'
966#' RS 87361 (frame 1) > file.exists(input)
967#' [1] FALSE
968#'
969#' RS 87361 (frame 1) > # <CTRL + C>
970#' # ----------------------------------------------------------------------
971#' ```
972#'
973#' @name r_session_debug
974NULL
975