1 2#' External R Session 3#' 4#' @description 5#' A permanent R session that runs in the background. This is an R6 class 6#' that extends the [processx::process] class. 7#' 8#' The process is started at the creation of the object, and then it can 9#' be used to evaluate R function calls, one at a time. 10#' 11#' @param func Function object to call in the background R process. 12#' Please read the notes for the similar argument of [r()]. 13#' @param args Arguments to pass to the function. Must be a list. 14#' @param package Whether to keep the environment of `func` when passing 15#' it to the other package. Possible values are: 16#' * `FALSE`: reset the environment to `.GlobalEnv`. This is the default. 17#' * `TRUE`: keep the environment as is. 18#' * `pkg`: set the environment to the `pkg` package namespace. 19#' 20#' @examplesIf FALSE 21#' rs <- r_ression$new() 22#' 23#' rs$run(function() 1 + 2) 24#' 25#' rs$call(function() Sys.sleep(1)) 26#' rs$get_state() 27#' 28#' rs$poll_process(-1) 29#' rs$get_state() 30#' rs$read() 31#' @export 32 33r_session <- R6::R6Class( 34 "r_session", 35 inherit = processx::process, 36 37 public = list( 38 39 #' @field status 40 #' Status codes returned by `read()`. 41 status = list( 42 DONE = 200L, 43 STARTED = 201L, 44 ATTACH_DONE = 202L, 45 MSG = 301L, 46 EXITED = 500L, 47 CRASHED = 501L, 48 CLOSED = 502L 49 ), 50 51 #' @description 52 #' creates a new R background process. It can wait for the process to 53 #' start up (`wait = TRUE`), or return immediately, i.e. before 54 #' the process is actually ready to run. In the latter case you may call 55 #' the `poll_process()` method to make sure it is ready. 56 #' 57 #' @param options A list of options created via [r_session_options()]. 58 #' @param wait Whether to wait for the R process to start and be ready 59 #' for running commands. 60 #' @param wait_timeout Timeout for waiting for the R process to start, 61 #' in milliseconds. 62 #' @return An `r_session` object. 63 initialize = function(options = r_session_options(), wait = TRUE, 64 wait_timeout = 3000) 65 rs_init(self, private, super, options, wait, wait_timeout), 66 67 #' @description 68 #' Similar to [r()], but runs the function in a permanent background 69 #' R session. It throws an error if the function call generated an 70 #' error in the child process. 71 #' @return The return value of the R expression. 72 run = function(func, args = list(), package = FALSE) 73 rs_run(self, private, func, args, package), 74 75 #' @description 76 #' Similar to `$run()`, but returns the standard output and error of 77 #' the child process as well. It does not throw on errors, but 78 #' returns a non-`NULL` `error` member in the result list. 79 #' 80 #' @return A list with the following entries. 81 #' * `result`: The value returned by `func`. On error this is `NULL`. 82 #' * `stdout`: The standard output of the process while evaluating 83 # the `func` call, 84 #' * `stderr`: The standard error of the process while evaluating 85 #' the `func` call. 86 #' * `error`: On error it contains an error object, that contains the 87 #' error thrown in the subprocess. Otherwise it is `NULL`. 88 #' * `code`, `message`: These fields are used by call internally and 89 #' you can ignore them. 90 run_with_output = function(func, args = list(), package = FALSE) 91 rs_run_with_output(self, private, func, args, package), 92 93 #' @description 94 #' Starts running a function in the background R session, and 95 #' returns immediately. To check if the function is done, call the 96 #' `poll_process()` method. 97 call = function(func, args = list(), package = FALSE) 98 rs_call(self, private, func, args, package), 99 100 #' @description 101 #' Poll the R session with a timeout. If the session has finished the 102 #' computation, it returns with `"ready"`. If the timeout 103 #' is reached, it returns with `"timeout"`. 104 #' 105 #' @param timeout Timeout period in milliseconds. 106 #' @return Character string `"ready"` or `"timeout"`. 107 poll_process = function(timeout) 108 rs_poll_process(self, private, timeout), 109 110 #' @description 111 #' Return the state of the R session. 112 #' 113 #' @return Possible values: 114 #' * `"starting"`: starting up, 115 #' * `"idle"`: ready to compute, 116 #' * `"busy"`: computing right now, 117 #' * `"finished"`: the R process has finished. 118 get_state = function() 119 rs_get_state(self, private), 120 121 #' @description 122 #' Returns the elapsed time since the R process has started, and the 123 #' elapsed time since the current computation has started. The latter 124 #' is `NA` if there is no active computation. 125 #' @return Named vector of `POSIXct` objects. The names are `"total"` 126 #' and `"current"`. 127 get_running_time = function() 128 rs_get_running_time(self, private), 129 130 #' @description 131 #' Reads an event from the child process, if there is one available. 132 #' Events might signal that the function call has finished, or they 133 #' can be progress report events. 134 #' 135 #' This is a low level function that you only need to use if you 136 #' want to process events (messages) from the R session manually. 137 #' 138 #' @return `NULL` if no events are available. Otherwise a named list, 139 #' which is also a `callr_session_result` object. The list always has 140 #' a `code` entry which is the type of the event. See also 141 #' `r_session$public_fields$status` for symbolic names of the 142 #' event types. 143 #' * `200`: (`DONE`) The computation is done, and the event includes 144 #' the result, in the same form as for the `run()` method. 145 #' * `201`: (`STARTED`) An R session that was in 'starting' state is 146 #' ready to go. 147 #' * `202`: (`ATTACH_DONE`) Used by the `attach()` method. 148 #' * `301`: (`MSG`) A message from the subprocess. The message is a 149 #' condition object with class `callr_message`. (It typically has 150 #' other classes, e.g. `cli_message` for output from the cli 151 #' package.) 152 #' * `500`: (`EXITED`) The R session finished cleanly. This means 153 #' that the evaluated expression quit R. 154 #' * `501`: (`CRASHED`) The R session crashed or was killed. 155 #' * `502`: (`CLOSED`) The R session closed its end of the connection 156 #' that callr uses for communication. 157 158 read = function() 159 rs_read(self, private), 160 161 #' @description 162 #' Terminate the current computation and the R process. 163 #' The session object will be in `"finished"` state after this. 164 #' @param grace Grace period in milliseconds, to wait for the 165 #' subprocess to exit cleanly, after its standard input is closed. 166 #' If the process is still running after this period, it will be 167 #' killed. 168 close = function(grace = 1000) 169 rs_close(self, private, grace), 170 171 #' @description 172 #' The `traceback()` method can be used after an error in the R 173 #' subprocess. It is equivalent to the [base::traceback()] call, in 174 #' the subprocess. 175 #' @return The same output as from [base::traceback()] 176 traceback = function() 177 rs_traceback(self, private), 178 179 #' @description 180 #' Interactive debugger to inspect the dumped frames in the subprocess, 181 #' after an error. See more at [r_session_debug]. 182 debug = function() 183 rs_debug(self, private), 184 185 #' @description Experimental function that provides a REPL 186 #' (Read-Eval-Print-Loop) to the subprocess. 187 attach = function() 188 rs_attach(self, private), 189 190 #' @description 191 #' Finalizer that is called when garbage collecting an `r_session` 192 #' object, to clean up temporary files. 193 finalize = function() { 194 unlink(private$tmp_output_file) 195 unlink(private$tmp_error_file) 196 unlink(private$options$tmp_files, recursive = TRUE) 197 if ("finalize" %in% ls(super)) super$finalize() 198 }, 199 200 #' @description 201 #' Print method for an `r_session`. 202 #' @param ... Arguments are not used currently. 203 print = function(...) { 204 cat( 205 sep = "", 206 "R SESSION, ", 207 if (self$is_alive()) { 208 paste0("alive, ", self$get_state(), ", ") 209 } else { 210 "finished, " 211 }, 212 "pid ", self$get_pid(), ".\n") 213 invisible(self) 214 } 215 ), 216 217 private = list( 218 options = NULL, 219 state = NULL, 220 started_at = NULL, 221 fun_started_at = as.POSIXct(NA), 222 pipe = NULL, 223 224 tmp_output_file = NULL, 225 tmp_error_file = NULL, 226 227 func_file = NULL, 228 res_file = NULL, 229 230 buffer = NULL, 231 read_buffer = function() 232 rs__read_buffer(self, private), 233 read_message = function() 234 rs__read_message(self, private), 235 236 get_result_and_output = function(std = FALSE) 237 rs__get_result_and_output(self, private, std), 238 report_back = function(code, text = "") 239 rs__report_back(self, private, code, text), 240 write_for_sure = function(text) 241 rs__write_for_sure(self, private, text), 242 parse_msg = function(msg) 243 rs__parse_msg(self, private, msg), 244 attach_wait = function() 245 rs__attach_wait(self, private) 246 ) 247) 248 249rs_init <- function(self, private, super, options, wait, wait_timeout) { 250 251 options$func <- options$func %||% function() { } 252 options$args <- list() 253 options$load_hook <- session_load_hook(options$load_hook) 254 255 options <- convert_and_check_my_args(options) 256 options <- setup_context(options) 257 options <- setup_r_binary_and_args(options, script_file = FALSE) 258 259 private$options <- options 260 261 prepare_client_files() 262 with_envvar( 263 options$env, 264 do.call(super$initialize, c(list(options$bin, options$real_cmdargs, 265 stdin = "|", stdout = "|", stderr = "|", poll_connection = TRUE), 266 options$extra)) 267 ) 268 269 ## Make child report back when ready 270 private$report_back(201, "ready to go") 271 272 private$pipe <- self$get_poll_connection() 273 274 private$started_at <- Sys.time() 275 private$state <- "starting" 276 277 if (wait) { 278 timeout <- wait_timeout 279 have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs") 280 pr <- self$poll_io(timeout) 281 out <- "" 282 err <- "" 283 while (any(pr == "ready")) { 284 if (pr["output"] == "ready") out <- paste0(out, self$read_output()) 285 if (pr["error"] == "ready") err <- paste0(err, self$read_error()) 286 if (pr["process"] == "ready") break 287 timeout <- as.double(have_until - Sys.time(), units = "secs") * 1000 288 pr <- self$poll_io(as.integer(timeout)) 289 } 290 291 if (pr["process"] == "ready") { 292 msg <- self$read() 293 out <- paste0(out, msg$stdout) 294 err <- paste0(err, msg$stderr) 295 if (msg$code != 201) { 296 data <- list( 297 status = self$get_exit_status(), 298 stdout = out, 299 stderr = err, 300 timeout = FALSE 301 ) 302 throw(new_callr_error(data, "Failed to start R session")) 303 } 304 } else if (pr["process"] != "ready") { 305 cat("stdout:]\n", out, "\n") 306 cat("stderr:]\n", err, "\n") 307 throw(new_error("Could not start R session, timed out")) 308 } 309 } 310 311 invisible(self) 312} 313 314rs_read <- function(self, private) { 315 if (!is.null(private$buffer)) { 316 # There is a partial message in the buffer, try to finish it. 317 out <- private$read_buffer() 318 } else { 319 # A new message. 320 out <- private$read_message() 321 } 322 if (!length(out)) { 323 if (processx::processx_conn_is_incomplete(private$pipe)) return() 324 if (self$is_alive()) { 325 # We do this in on.exit(), because parse_msg still reads the streams 326 on.exit(self$kill(), add = TRUE) 327 out <- list(header = list( 328 code = 502, length = 0, 329 rest = "R session closed the process connection, killed" 330 )) 331 } else if (identical(es <- self$get_exit_status(), 0L)) { 332 out <- list(header = list( 333 code = 500, length = 0, 334 rest = "R session finished cleanly" 335 )) 336 } else { 337 out <- list(header = list( 338 code = 501, length = 0, 339 rest = paste0("R session crashed with exit code ", es) 340 )) 341 } 342 } 343 if (length(out)) private$parse_msg(out) 344} 345 346rs__read_buffer <- function(self, private) { 347 # There is a partial message in the buffer already, we need to 348 # read some more 349 need <- private$buffer$header$length - private$buffer$got 350 chunk <- processx::processx_conn_read_chars(private$pipe, need) 351 got <- nchar(chunk) 352 if (got == 0) { 353 # make this special case fast 354 NULL 355 } else if (got == need) { 356 msg <- list( 357 header = private$buffer$header, 358 body = paste(c(private$buffer$chunks, list(chunk)), collapse = "") 359 ) 360 private$buffer <- NULL 361 msg 362 } else { 363 private$buffer$got <- private$buffer$got + got 364 private$buffer$chunks <- c(private$buffer$chunks, list(chunk)) 365 NULL 366 } 367} 368 369rs__read_message <- function(self, private) { 370 # A new message, we can surely read the first line 371 out <- processx::processx_conn_read_lines(private$pipe, 1) 372 if (length(out) == 0) return(NULL) 373 374 header <- rs__parse_header(out) 375 body <- "" 376 if (header$length > 0) { 377 body <- processx::processx_conn_read_chars( 378 private$pipe, 379 header$length 380 ) 381 } 382 got <- nchar(body) 383 if (got < header$length) { 384 # Partial message 385 private$buffer <- list( 386 header = header, 387 got = got, 388 chunks = list(body) 389 ) 390 NULL 391 } else { 392 list(header = header, body = body) 393 } 394} 395 396rs__parse_header <- function(line) { 397 parts <- strsplit(line, " ", fixed = TRUE)[[1]] 398 parts2 <- suppressWarnings(as.integer(parts[1:2])) 399 rest <- paste(parts[-(1:2)], collapse = " ") 400 header <- list(code = parts2[1], length = parts2[2], rest = rest) 401 if (is.na(header$code) || is.na(header$length)) { 402 stop("Internal callr error, invalid message header") 403 } 404 header 405} 406 407rs_close <- function(self, private, grace) { 408 processx::processx_conn_close(self$get_input_connection()) 409 self$poll_process(grace) 410 self$kill() 411 self$wait(1000) 412 if (self$is_alive()) throw(new_error("Could not kill background R session")) 413 private$state <- "finished" 414 private$fun_started_at <- as.POSIXct(NA) 415 processx::processx_conn_close(private$pipe) 416 processx::processx_conn_close(self$get_output_connection()) 417 processx::processx_conn_close(self$get_error_connection()) 418} 419 420rs_call <- function(self, private, func, args, package) { 421 422 ## We only allow a new command if the R session is idle. 423 ## This allows keeping a clean state 424 ## TODO: do we need a state at all? 425 if (private$state == "starting") throw(new_error("R session not ready yet")) 426 if (private$state == "finished") throw(new_error("R session finished")) 427 if (private$state == "busy") throw(new_error("R session busy")) 428 429 ## Save the function in a file 430 private$options$func <- func 431 private$options$args <- args 432 private$options$package <- package 433 private$options$func_file <- save_function_to_temp(private$options) 434 private$options$result_file <- tempfile("callr-rs-result-") 435 private$options$tmp_files <- 436 c(private$options$tmp_files, private$options$func_file, 437 private$options$result_file) 438 439 ## Maybe we need to redirect stdout / stderr 440 re_stdout <- if (is.null(private$options$stdout)) { 441 private$tmp_output_file <- tempfile("callr-rs-stdout-") 442 } 443 re_stderr <- if (is.null(private$options$stderr)) { 444 private$tmp_error_file <- tempfile("callr-rs-stderr-") 445 } 446 447 pre <- rs__prehook(re_stdout, re_stderr) 448 post <- rs__posthook(re_stdout, re_stderr) 449 450 ## Run an expr that loads it, in the child process, with error handlers 451 expr <- make_vanilla_script_expr(private$options$func_file, 452 private$options$result_file, 453 private$options$error, 454 pre_hook = pre, post_hook = post, 455 messages = TRUE) 456 cmd <- paste0(deparse(expr), "\n") 457 458 ## Write this to stdin 459 private$write_for_sure(cmd) 460 private$fun_started_at <- Sys.time() 461 462 ## Report back when done 463 report_str <- paste0("done ", basename(private$options$result_file)) 464 private$report_back(200, report_str) 465 466 private$state <- "busy" 467 468 469} 470 471rs_run_with_output <- function(self, private, func, args, package) { 472 self$call(func, args, package) 473 474 go <- TRUE 475 res <- NULL 476 477 while (go) { 478 ## TODO: why is this in a tryCatch? 479 res <- tryCatch( 480 { processx::poll(list(private$pipe), -1) 481 msg <- self$read() 482 if (is.null(msg)) next 483 if (msg$code == 200 || (msg$code >= 500 && msg$code < 600)) { 484 return(msg) 485 } 486 if (msg$code == 301) { 487 rs__handle_condition(msg$message) 488 } 489 }, 490 interrupt = function(e) { 491 self$interrupt() 492 ## The R process will catch the interrupt, and then save the 493 ## error object to a file, but this might still take some time, 494 ## so we need to poll here. If the bg process ignores 495 ## interrupts, then we kill it. 496 ps <- processx::poll(list(private$pipe), 1000)[[1]] 497 if (ps == "timeout") { 498 self$kill() 499 } else { 500 res <<- self$read() 501 go <<- FALSE 502 } 503 iconn <- structure( 504 list(message = "Interrupted"), 505 class = c("interrupt", "condition")) 506 signalCondition(iconn) 507 cat("\n") 508 invokeRestart("abort") 509 }) 510 } 511 res 512} 513 514rs_run <- function(self, private, func, args, package) { 515 res <- rs_run_with_output(self, private, func, args, package) 516 if (is.null(res$error)) { 517 res$result 518 } else{ 519 res$stdout <- paste0(res$stdout, self$read_output()) 520 res$stderr <- paste0(res$stderr, self$read_error()) 521 throw(res$error) 522 } 523} 524 525rs_get_state <- function(self, private) { 526 private$state 527} 528 529rs_get_running_time <- function(self, private) { 530 now <- Sys.time() 531 finished <- private$state == "finished" 532 c(total = if (finished) now - private$started_at else as.POSIXct(NA), 533 current = now - private$fun_started_at) 534} 535 536rs_poll_process <- function(self, private, timeout) { 537 processx::poll(list(self$get_poll_connection()), timeout)[[1]] 538} 539 540rs_traceback <- function(self, private) { 541 ## TODO: get rid of magic number 12 542 traceback(utils::head(self$run(function() { 543 traceback(as.environment("tools:callr")$`__callr_data__`$.Traceback, 10) 544 }), -12)) 545} 546 547rs_debug <- function(self, private) { 548 hasdump <- self$run(function() { 549 ! is.null(as.environment("tools:callr")$`__callr_data__`$.Last.dump) 550 }) 551 if (!hasdump) stop("Can't find dumped frames, nothing to debug") 552 553 help <- function() { 554 cat("Debugging in process ", self$get_pid(), 555 ", press CTRL+C (ESC) to quit. Commands:\n", sep = "") 556 cat(" .where -- print stack trace\n", 557 " .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv\n", 558 " .help -- print this message\n", 559 " <cmd> -- run <cmd> in frame or .GlobalEnv\n\n", sep = "") 560 } 561 562 translate_cmd <- function(cmd) { 563 if (cmd == ".where") { 564 traceback(tb) 565 if (frame) cat("Inspecting frame", frame, "\n") 566 NULL 567 568 } else if (cmd == ".help") { 569 help() 570 NULL 571 572 } else if (grepl("^.inspect ", cmd)) { 573 newframe <- as.integer(strsplit(cmd, " ")[[1]][[2]]) 574 if (is.na(newframe)) { 575 message("Cannot parse frame number") 576 } else { 577 frame <<- newframe 578 } 579 NULL 580 581 } else { 582 cmd 583 } 584 } 585 586 help() 587 tb <- self$traceback() 588 frame <- 0L 589 590 while (TRUE) { 591 cat("\n") 592 prompt <- paste0( 593 "RS ", self$get_pid(), 594 if (frame) paste0(" (frame ", frame, ")"), " > ") 595 cmd <- rs__attach_get_input(prompt) 596 cmd2 <- translate_cmd(cmd) 597 if (is.null(cmd2)) next 598 599 update_history(cmd) 600 601 ret <- self$run_with_output(function(cmd, frame) { 602 dump <- as.environment("tools:callr")$`__callr_data__`$.Last.dump 603 envir <- if (!frame) .GlobalEnv else dump[[frame + 12L]] 604 eval(parse(text = cmd), envir = envir) 605 }, list(cmd = cmd, frame = frame)) 606 cat(ret$stdout) 607 cat(ret$stderr) 608 if (!is.null(ret$error)) print(ret$error) 609 print(ret$result) 610 } 611 invisible() 612} 613 614rs_attach <- function(self, private) { 615 out <- self$get_output_connection() 616 err <- self$get_error_connection() 617 while (nchar(x <- processx::processx_conn_read_chars(out))) cat(x) 618 while (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x)) 619 tryCatch({ 620 while (TRUE) { 621 cmd <- rs__attach_get_input(paste0("RS ", self$get_pid(), " > ")) 622 update_history(cmd) 623 private$write_for_sure(paste0(cmd, "\n")) 624 private$report_back(202, "done") 625 private$attach_wait() 626 } }, 627 interrupt = function(e) { self$interrupt(); invisible() } 628 ) 629} 630 631## Internal functions ---------------------------------------------------- 632 633rs__attach_get_input <- function(prompt) { 634 cmd <- readline(prompt = prompt) 635 while (! is_complete_expression(cmd)) { 636 cmd <- paste0(cmd, sep = "\n", readline(prompt = "+ ")) 637 } 638 cmd 639} 640 641rs__attach_wait <- function(self, private) { 642 out <- self$get_output_connection() 643 err <- self$get_error_connection() 644 pro <- private$pipe 645 while (TRUE) { 646 pr <- processx::poll(list(out, err, pro), -1) 647 if (pr[[1]] == "ready") { 648 if (nchar(x <- processx::processx_conn_read_chars(out))) cat(x) 649 } 650 if (pr[[2]] == "ready") { 651 if (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x)) 652 } 653 if (pr[[3]] == "ready") { 654 msg <- self$read() 655 if (msg$code == 202) break; 656 } 657 } 658} 659 660rs__report_back <- function(self, private, code, text) { 661 cmd <- paste0( 662 deparse(rs__status_expr(code, text, fd = 3)), 663 "\n" 664 ) 665 private$write_for_sure(cmd) 666} 667 668rs__write_for_sure <- function(self, private, text) { 669 while (1) { 670 text <- self$write_input(text) 671 if (!length(text)) break; 672 Sys.sleep(.1) 673 } 674} 675 676rs__parse_msg <- function(self, private, msg) { 677 code <- as.character(msg$header$code) 678 message <- msg$body 679 if (length(message) && substr(message, 1, 8) == "base64::") { 680 message <- substr(message, 9, nchar(message)) 681 message <- unserialize(processx::base64_decode(message)) 682 } else { 683 message <- msg$header$rest 684 } 685 686 if (! code %in% names(rs__parse_msg_funcs)) { 687 throw(new_error("Unknown message code: `", code, "`")) 688 } 689 structure( 690 rs__parse_msg_funcs[[code]](self, private, msg$header$code, message), 691 class = "callr_session_result") 692} 693 694rs__parse_msg_funcs <- list() 695rs__parse_msg_funcs[["200"]] <- function(self, private, code, message) { 696 if (private$state != "busy") { 697 throw(new_error("Got `done` message when session is not busy")) 698 } 699 private$state <- "idle" 700 701 res <- private$get_result_and_output() 702 c(list(code = code, message = message), res) 703} 704 705rs__parse_msg_funcs[["201"]] <- function(self, private, code, message) { 706 if (private$state != "starting") { 707 throw(new_error("Session already started, invalid `starting` message")) 708 } 709 private$state <- "idle" 710 list(code = code, message = message) 711} 712 713rs__parse_msg_funcs[["202"]] <- function(self, private, code, message) { 714 private$state <- "idle" 715 list(code = code, message = message) 716} 717 718rs__parse_msg_funcs[["301"]] <- function(self, private, code, message) { 719 ## TODO: progress bar update, what to do here? 720 list(code = code, message = message) 721} 722 723rs__parse_msg_funcs[["500"]] <- function(self, private, code, message) { 724 private$state <- "finished" 725 res <- private$get_result_and_output(std = TRUE) 726 c(list(code = code, message = message), res) 727} 728 729rs__parse_msg_funcs[["501"]] <- function(self, private, code, message) { 730 private$state <- "finished" 731 err <- structure( 732 list(message = message), 733 class = c("error", "condition")) 734 res <- private$get_result_and_output(std = TRUE) 735 res$error <- err 736 c(list(code = code, message = message), res) 737} 738 739rs__parse_msg_funcs[["502"]] <- rs__parse_msg_funcs[["501"]] 740 741rs__status_expr <- function(code, text = "", fd = 3L) { 742 substitute( 743 local({ 744 pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib 745 code_ <- code; fd_ <- fd; text_ <- text 746 data <- paste0(code_, " 0 ", text_, "\n") 747 pxlib$write_fd(as.integer(fd), data) 748 }), 749 list(code = code, fd = fd, text = text) 750 ) 751} 752 753rs__prehook <- function(stdout, stderr) { 754 oexpr <- if (!is.null(stdout)) substitute({ 755 assign( 756 ".__stdout__", 757 as.environment("tools:callr")$`__callr_data__`$pxlib$ 758 set_stdout_file(`__fn__`), 759 envir = as.environment("tools:callr")$`__callr_data__`) 760 }, list(`__fn__` = stdout)) 761 eexpr <- if (!is.null(stderr)) substitute({ 762 assign( 763 ".__stderr__", 764 as.environment("tools:callr")$`__callr_data__`$pxlib$ 765 set_stderr_file(`__fn__`), 766 envir = as.environment("tools:callr")$`__callr_data__`) 767 }, list(`__fn__` = stderr)) 768 769 substitute({ o; e }, list(o = oexpr, e = eexpr)) 770} 771 772rs__posthook <- function(stdout, stderr) { 773 oexpr <- if (!is.null(stdout)) substitute({ 774 as.environment("tools:callr")$`__callr_data__`$ 775 pxlib$set_stdout(as.environment("tools:callr")$`__callr_data__`$ 776 .__stdout__) 777 }) 778 eexpr <- if (!is.null(stderr)) substitute({ 779 as.environment("tools:callr")$`__callr_data__`$ 780 pxlib$set_stderr(as.environment("tools:callr")$`__callr_data__`$ 781 .__stderr__) 782 }) 783 784 substitute({ o; e }, list(o = oexpr, e = eexpr)) 785} 786 787rs__get_result_and_output <- function(self, private, std) { 788 789 ## Get stdout and stderr 790 stdout <- if (!is.null(private$tmp_output_file) && 791 file.exists(private$tmp_output_file)) { 792 tryCatch(suppressWarnings(read_all(private$tmp_output_file)), 793 error = function(e) "") 794 } else if (std && self$has_output_connection()) { 795 tryCatch(self$read_all_output(), error = function(err) NULL) 796 } 797 stderr <- if (!is.null(private$tmp_error_file) && 798 file.exists(private$tmp_error_file)) { 799 tryCatch(suppressWarnings(read_all(private$tmp_error_file)), 800 error = function(e) "") 801 } else if (std && self$has_error_connection()) { 802 tryCatch(self$read_all_error(), error = function(err) NULL) 803 } 804 unlink(c(private$tmp_output_file, private$tmp_error_file)) 805 private$tmp_output_file <- private$tmp_error_file <- NULL 806 807 ## Get result or error from RDS 808 outp <- list( 809 status = 0, 810 stdout = stdout %||% "", 811 stderr = stderr %||% "", 812 timeout = FALSE 813 ) 814 res <- err <- NULL 815 tryCatch( 816 res <- get_result(outp, private$options), 817 error = function(e) err <<- e, 818 interrupt = function(e) err <<- e 819 ) 820 unlink(private$options$tmp_files, recursive = TRUE) 821 private$options$tmp_files <- NULL 822 823 ## Assemble result 824 list(result = res, stdout = stdout, stderr = stderr, error = err) 825} 826 827rs__handle_condition <- function(cond) { 828 829 default_handler <- function(x) { 830 classes <- class(x) 831 for (cl in classes) { 832 opt <- paste0("callr.condition_handler_", cl) 833 if (!is.null(val <- getOption(opt)) && is.function(val)) { 834 val(x) 835 break 836 } 837 } 838 } 839 840 if (is.list(cond) && is.null(cond$muffle)) { 841 cond$muffle <- "callr_r_session_muffle" 842 } 843 withRestarts({ 844 signalCondition(cond) 845 default_handler(cond) 846 }, callr_r_session_muffle = function() NULL) 847 848 invisible() 849} 850 851## Helper functions ------------------------------------------------------ 852 853#' Create options for an [r_session] object 854#' 855#' @param ... Options to override, named arguments. 856#' @return Named list of options. 857#' 858#' The current options are: 859#' * `libpath`: Library path for the subprocess. By default the same as the 860#' _current_ library path. I.e. _not_ necessarily the library path of 861#' a fresh R session.) 862#' * `repos`: `repos` option for the subprocess. By default the current 863#' value of the main process. 864#' * `stdout`: Standard output of the sub-process. This can be `NULL` or 865#' a pipe: `"|"`. If it is a pipe then the output of the subprocess is 866#' not included in the responses, but you need to poll and read it 867#' manually. This is for exports. 868#' * `stderr`: Similar to `stdout`, but for the standard error. 869#' * `error`: See 'Error handling' in [r()]. 870#' * `cmdargs`: See the same argument of [r()]. (Its default might be 871#' different, though.) 872#' * `system_profile`: See the same argument of [r()]. 873#' * `user_profile`: See the same argument of [r()]. 874#' * `env`: See the same argument of [r()]. 875#' * `load_hook`: `NULL`, or code (quoted) to run in the sub-process 876#' at start up. (I.e. not for every single `run()` call.) 877#' * `extra`: List of extra arguments to pass to [processx::process]. 878#' 879#' Call `r_session_options()` to see the default values. 880#' `r_session_options()` might contain undocumented entries, you cannot 881#' change these. 882#' 883#' @export 884#' @examples 885#' r_session_options() 886 887r_session_options <- function(...) { 888 update_options(r_session_options_default(), ...) 889} 890 891r_session_options_default <- function() { 892 list( 893 func = NULL, 894 args = NULL, 895 libpath = .libPaths(), 896 repos = default_repos(), 897 stdout = NULL, 898 stderr = NULL, 899 error = getOption("callr.error", "error"), 900 cmdargs = c( 901 if (os_platform() != "windows") "--no-readline", 902 "--slave", 903 "--no-save", 904 "--no-restore" 905 ), 906 system_profile = FALSE, 907 user_profile = "project", 908 env = c(TERM = "dumb"), 909 supervise = FALSE, 910 load_hook = NULL, 911 extra = list(), 912 arch = "same" 913 ) 914} 915 916#' Interactive debugging of persistent R sessions 917#' 918#' The `r_session$debug()` method is an interactive debugger to inspect 919#' the stack of the background process after an error. 920#' 921#' `$debug()` starts a REPL (Read-Eval-Print-Loop), that evaluates R 922#' expressions in the subprocess. It is similar to [browser()] and 923#' [debugger()] and also has some extra commands: 924#' 925#' * `.help` prints a short help message. 926#' * `.where` prints the complete stack trace of the error. (The same as 927#' the `$traceback()` method. 928#' * `.inspect <n>` switches the "focus" to frame `<n>`. Frame 0 is the 929#' global environment, so `.inspect 0` will switch back to that. 930#' 931#' To exit the debugger, press the usual interrupt key, i.e. `CTRL+c` or 932#' `ESC` in some GUIs. 933#' 934#' Here is an example session that uses `$debug()` (some output is omitted 935#' for brevity): 936#' 937#' ``` 938#' # ---------------------------------------------------------------------- 939#' > rs <- r_session$new() 940#' > rs$run(function() knitr::knit("no-such-file")) 941#' Error in rs_run(self, private, func, args) : 942#' callr subprocess failed: cannot open the connection 943#' 944#' > rs$debug() 945#' Debugging in process 87361, press CTRL+C (ESC) to quit. Commands: 946#' .where -- print stack trace 947#' .inspect <n> -- inspect a frame, 0 resets to .GlobalEnv 948#' .help -- print this message 949#' <cmd> -- run <cmd> in frame or .GlobalEnv 950#' 951#' 3: file(con, "r") 952#' 2: readLines(input2, encoding = "UTF-8", warn = FALSE) 953#' 1: knitr::knit("no-such-file") at #1 954#' 955#' RS 87361 > .inspect 1 956#' 957#' RS 87361 (frame 1) > ls() 958#' [1] "encoding" "envir" "ext" "in.file" "input" "input.dir" 959#' [7] "input2" "ocode" "oconc" "oenvir" "oopts" "optc" 960#' [13] "optk" "otangle" "out.purl" "output" "quiet" "tangle" 961#' [19] "text" 962#' 963#' RS 87361 (frame 1) > input 964#' [1] "no-such-file" 965#' 966#' RS 87361 (frame 1) > file.exists(input) 967#' [1] FALSE 968#' 969#' RS 87361 (frame 1) > # <CTRL + C> 970#' # ---------------------------------------------------------------------- 971#' ``` 972#' 973#' @name r_session_debug 974NULL 975