1#' Create a PSOCK Cluster of R Workers for Parallel Processing 2#' 3#' The `makeClusterPSOCK()` function creates a cluster of \R workers 4#' for parallel processing. These \R workers may be background \R sessions 5#' on the current machine, \R sessions on external machines (local or remote), 6#' or a mix of such. For external workers, the default is to use SSH to connect 7#' to those external machines. This function works similarly to 8#' \code{\link[parallel:makeCluster]{makePSOCKcluster}()} of the 9#' \pkg{parallel} package, but provides additional and more flexibility options 10#' for controlling the setup of the system calls that launch the background 11#' \R workers, and how to connect to external machines. 12#' 13#' @param workers The hostnames of workers (as a character vector) or the number 14#' of localhost workers (as a positive integer). 15#' 16#' @param makeNode A function that creates a `"SOCKnode"` or 17#' `"SOCK0node"` object, which represents a connection to a worker. 18#' 19#' @param port The port number of the master used for communicating with all 20#' the workers (via socket connections). If an integer vector of ports, then a 21#' random one among those is chosen. If `"random"`, then a random port in 22#' is chosen from `11000:11999`, or from the range specified by 23#' environment variable \env{R_PARALLELLY_RANDOM_PORTS}. 24#' If `"auto"` (default), then the default (single) port is taken from 25#' environment variable \env{R_PARALLEL_PORT}, otherwise `"random"` is 26#' used. 27#' _Note, do not use this argument to specify the port number used by 28#' `rshcmd`, which typically is an SSH client. Instead, if the SSH daemon 29#' runs on a different port than the default 22, specify the SSH port by 30#' appending it to the hostname, e.g. `"remote.server.org:2200"` or via 31#' SSH options `-p`, e.g. `rshopts = c("-p", "2200")`._ 32#' 33#' @param \dots Optional arguments passed to 34#' `makeNode(workers[i], ..., rank = i)` where `i = seq_along(workers)`. 35#' 36#' @param autoStop If TRUE, the cluster will be automatically stopped 37#' using \code{\link[parallel:makeCluster]{stopCluster}()} when it is 38#' garbage collected, unless already stopped. See also [autoStopCluster()]. 39#' 40#' @param tries,delay Maximum number of attempts done to launch each node 41#' with `makeNode()` and the delay (in seconds) in-between attempts. 42#' If argument `port` specifies more than one port, e.g. `port = "random"` 43#' then a random port will be drawn and validated at most `tries` times. 44#' Arguments `tries` and `delay` are used only when `setup_strategy == "sequential`. 45#' 46#' @param validate If TRUE (default), after the nodes have been created, they are all 47#' validated that they work by inquiring about their session information, 48#' which is saved in attribute `session_info` of each node. 49#' 50#' @param verbose If TRUE, informative messages are outputted. 51#' 52#' @return An object of class `c("RichSOCKcluster", "SOCKcluster", "cluster")` 53#' consisting of a list of `"SOCKnode"` or `"SOCK0node"` workers (that also 54#' inherit from `RichSOCKnode`). 55#' 56#' @example incl/makeClusterPSOCK.R 57#' 58#' @importFrom parallel stopCluster 59#' @importFrom utils packageVersion 60#' @export 61makeClusterPSOCK <- function(workers, makeNode = makeNodePSOCK, port = c("auto", "random"), ..., autoStop = FALSE, tries = getOption2("parallelly.makeNodePSOCK.tries", 3L), delay = getOption2("parallelly.makeNodePSOCK.tries.delay", 15.0), validate = getOption2("parallelly.makeNodePSOCK.validate", TRUE), verbose = getOption2("parallelly.debug", FALSE)) { 62 localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") 63 64 if (is.numeric(workers)) { 65 if (length(workers) != 1L) { 66 stopf("When numeric, argument 'workers' must be a single value: %s", length(workers)) 67 } 68 workers <- as.integer(workers) 69 if (is.na(workers) || workers < 1L) { 70 stopf("Number of 'workers' must be one or greater: %s", workers) 71 } 72 workers <- rep(localhostHostname, times = workers) 73 } 74 75 tries <- as.integer(tries) 76 stop_if_not(length(tries) == 1L, is.integer(tries), !is.na(tries), tries >= 1L) 77 78 delay <- as.numeric(delay) 79 stop_if_not(length(delay) == 1L, is.numeric(delay), !is.na(delay), delay >= 0) 80 81 validate <- as.logical(validate) 82 stop_if_not(length(validate) == 1L, is.logical(validate), !is.na(validate)) 83 84 ## If we are sure that each node requires a connection, then ... 85 if (identical(makeNode, makeNodePSOCK)) { 86 ## ... can we create that many workers? 87 free <- freeConnections() 88 if (validate) free <- free - 1L 89 if (length(workers) > free) { 90 stopf("Cannot create %d parallel PSOCK nodes. Each node needs one connection but there are only %d connections left out of the maximum %d available on this R installation", length(workers), free, availableConnections()) 91 } 92 } 93 94 verbose_prefix <- "[local output] " 95 96 if (verbose) { 97 mdebugf("%sWorkers: [n = %d] %s", verbose_prefix, 98 length(workers), hpaste(sQuote(workers))) 99 } 100 101 if (length(port) == 0L) { 102 stop("Argument 'port' must be of length one or more: 0") 103 } 104 port <- freePort(port) 105 if (verbose) mdebugf("%sBase port: %d", verbose_prefix, port) 106 107 108 n <- length(workers) 109 nodeOptions <- vector("list", length = n) 110 if (verbose) mdebugf("%sGetting setup options for %d cluster nodes ...", verbose_prefix, n) 111 for (ii in seq_len(n)) { 112 if (verbose) mdebugf("%s - Node %d of %d ...", verbose_prefix, ii, n) 113 options <- makeNode(workers[[ii]], port = port, ..., rank = ii, action = "options", verbose = verbose) 114 stop_if_not(inherits(options, "makeNodePSOCKOptions")) 115 nodeOptions[[ii]] <- options 116 } 117 if (verbose) mdebugf("%sGetting setup options for %d cluster nodes ... done", verbose_prefix, n) 118 119 ## Is a 'parallel' setup strategy requested and possible? 120 setup_strategy <- lapply(nodeOptions, FUN = function(options) { 121 value <- options$setup_strategy 122 if (is.null(value)) value <- "sequential" 123 stop_if_not(is.character(value), length(value) == 1L) 124 value 125 }) 126 setup_strategy <- unlist(setup_strategy, use.names = FALSE) 127 is_parallel <- (setup_strategy == "parallel") 128 force_sequential <- FALSE 129 if (any(is_parallel)) { 130 if (verbose) mdebugf("%s - Parallel setup requested for some PSOCK nodes", verbose_prefix) 131 132 if (!all(is_parallel)) { 133 if (verbose) mdebugf("%s - Parallel setup requested only for some PSOCK nodes; will revert to a sequential setup for all", verbose_prefix) 134 force_sequential <- TRUE 135 } else { 136 ## Force setup_strategy = "sequential"? 137 affected <- affected_by_bug18119() 138 if (!is.na(affected) && affected) { 139 if (verbose) mdebugf("%s - Parallel setup requested but not supported on this version of R: %s", verbose_prefix, getRversion()) 140 force_sequential <- TRUE 141 } 142 } 143 } 144 145 if (force_sequential) { 146 ## Force all nodes to be setup using the 'sequential' setup strategy 147 setup_strategy <- "sequential" 148 149 for (ii in which(is_parallel)) { 150 if (verbose) mdebugf("%s - Node %d of %d ...", verbose_prefix, ii, n) 151 args <- list(workers[[ii]], port = port, ..., rank = ii, action = "options", verbose = verbose) 152 args$setup_strategy <- "sequential" 153 options <- do.call(makeNode, args = args) 154 stop_if_not(inherits(options, "makeNodePSOCKOptions")) 155 nodeOptions[[ii]] <- options 156 } 157 } 158 159 ## Sanity check 160 setup_strategy <- lapply(nodeOptions, FUN = function(options) { 161 value <- options$setup_strategy 162 if (is.null(value)) value <- "sequential" 163 stop_if_not(is.character(value), length(value) == 1L) 164 value 165 }) 166 setup_strategy <- unlist(setup_strategy, use.names = FALSE) 167 setup_strategy <- unique(setup_strategy) 168 stop_if_not(length(setup_strategy) == 1L) 169 170 cl <- vector("list", length = length(nodeOptions)) 171 class(cl) <- c("RichSOCKcluster", "SOCKcluster", "cluster") 172 173 ## If an error occurred, make sure to clean up before exiting, i.e. 174 ## stop each node 175 on.exit({ 176 nodes <- vapply(cl, FUN = inherits, c("SOCKnode", "SOCK0node"), 177 FUN.VALUE = FALSE) 178 stopCluster(cl[nodes]) 179 cl <- NULL 180 }) 181 182 if (setup_strategy == "parallel") { 183 ## To please R CMD check on R (< 4.0.0) 184 if (getRversion() < "4.0.0") { 185 stopf("Parallel setup of PSOCK cluster nodes is not supported in R %s", getRversion()) 186 socketAccept <- serverSocket <- function(...) NULL 187 } 188 189 sendCall <- importParallel("sendCall") 190 recvResult <- importParallel("recvResult") 191 192 ## AD HOC: Use (port, timeout, useXDR) from the options of the first node 193 options <- nodeOptions[[1]] 194 if (verbose) { 195 mdebugf("%sSetting up PSOCK nodes in parallel", verbose_prefix) 196 mstr(options) 197 } 198 port <- options[["port"]] 199 connectTimeout <- options[["connectTimeout"]] 200 timeout <- options[["timeout"]] 201 useXDR <- options[["useXDR"]] 202 nodeClass <- c("RichSOCKnode", if(useXDR) "SOCKnode" else "SOCK0node") 203 cmd <- options[["cmd"]] 204 205 if (verbose) { 206 mdebugf("%sSystem call to launch all workers:", verbose_prefix) 207 mdebugf("%s%s", verbose_prefix, cmd) 208 } 209 210 ## FIXME: Add argument, option, environment variable for this 211 212 ## Start listening and start workers. 213 if (verbose) mdebugf("%sStarting PSOCK main server", verbose_prefix) 214 socket <- serverSocket(port = port) 215 on.exit(if (!is.null(socket)) close(socket), add = TRUE) 216 217 if (.Platform$OS.type == "windows") { 218 for (ii in seq_along(cl)) { 219 ## See parallel::newPSOCKnode() for the input = "" 220 system(cmd, wait = FALSE, input = "") 221 } 222 } else { 223 ## Asynchronous lists are defined by POSIX 224 cmd <- paste(rep(cmd, times = length(cl)), collapse = " & ") 225 system(cmd, wait = FALSE) 226 } 227 228 if (verbose) mdebugf("%sWorkers launched", verbose_prefix) 229 230 ## Accept connections and send the first command as initial 231 ## handshake. The handshake makes TCP synchronization detect and 232 ## err on half-opened connections, which arise during parallel setup 233 ## of client-server connections (due to internal timeouts, limited 234 ## length of the listen backlog queue, race in timing out on 235 ## creating a connection and probably more). 236 ## 237 ## The handshake looks like a regular server command followed by 238 ## client response, which is compatible with older versions of R. 239 ready <- 0L 240 pending <- list() 241 on.exit({ 242 lapply(pending, FUN = function(x) close(x$con)) 243 cl <- NULL 244 }, add = TRUE) 245 246 if (verbose) mdebugf("%sWaiting for workers to connect back", verbose_prefix) 247 248 t0 <- Sys.time() 249 while (ready < length(cl)) { 250 if (verbose) mdebugf("%s%d workers out of %d ready", verbose_prefix, ready, length(cl)) 251 252 cons <- lapply(pending, FUN = function(x) x$con) 253 254 if (difftime(Sys.time(), t0, units="secs") > connectTimeout + 5) { 255 ## The workers will give up after connectTimeout, so there is 256 ## no point waiting for them much longer. 257 failed <- length(cl) - ready 258 stop(ngettext(failed, 259 "Cluster setup failed. %d worker of %d failed to connect.", 260 "Cluster setup failed. %d of %d workers failed to connect."), 261 failed, length(cl)) 262 } 263 a <- socketSelect(append(list(socket), cons), write = FALSE, timeout = connectTimeout) 264 canAccept <- a[1] 265 canReceive <- seq_along(pending)[a[-1]] 266 267 if (canAccept) { 268 con <- socketAccept(socket = socket, blocking = TRUE, open = "a+b", timeout = timeout) 269 scon <- structure(list(con = con, host = localhostHostname, rank = ready), class = nodeClass) 270 res <- tryCatch({ 271 sendCall(scon, eval, list(quote(Sys.getpid()))) 272 }, error = identity) 273 pending <- append(pending, list(scon)) 274 } 275 276 for (scon in pending[canReceive]) { 277 pid <- tryCatch({ 278 recvResult(scon) 279 }, error = identity) 280 if (is.integer(pid)) { 281 ready <- ready + 1L 282 cl[[ready]] <- scon 283 } else { 284 close(scon$con) 285 } 286 } 287 if (length(canReceive) > 0L) pending <- pending[-canReceive] 288 } ## while() 289 } else if (setup_strategy == "sequential") { 290 retryPort <- getOption2("parallelly.makeNodePSOCK.tries.port", "same") 291 for (ii in seq_along(cl)) { 292 if (verbose) { 293 mdebugf("%sCreating node %d of %d ...", verbose_prefix, ii, n) 294 mdebugf("%s- setting up node", verbose_prefix) 295 } 296 297 options <- nodeOptions[[ii]] 298 299 for (kk in 1:tries) { 300 if (verbose) { 301 mdebugf("%s- attempt #%d of %d", verbose_prefix, kk, tries) 302 } 303 node <- tryCatch({ 304 makeNode(options, verbose = verbose) 305 }, error = identity) 306 ## Success or an error that is not a connection error? 307 if (!inherits(node, "PSOCKConnectionError")) break 308 309 if (kk < tries) { 310 if (verbose) { 311 message(conditionMessage(node)) 312 ## Retry with a new random port? 313 if (retryPort == "next") { 314 options$port <- max(options$port + 1L, 65535L) 315 } else if (retryPort == "available") { 316 options$port <- freePort() 317 } 318 mdebugf("%s- waiting %g seconds before trying again", 319 verbose_prefix, delay) 320 } 321 Sys.sleep(delay) 322 } 323 } 324 if (inherits(node, "error")) { 325 ex <- node 326 if (inherits(node, "PSOCKConnectionError")) { 327 if (verbose) { 328 mdebugf("%s Failed %d attempts with %g seconds delay", 329 verbose_prefix, tries, delay) 330 } 331 ex$message <- sprintf("%s\n * Number of attempts: %d (%gs delay)", 332 conditionMessage(ex), tries, delay) 333 } else { 334 ex$call <- sys.call() 335 } 336 stop(ex) 337 } 338 cl[[ii]] <- node 339 340 if (verbose) { 341 mdebugf("%sCreating node %d of %d ... done", verbose_prefix, ii, n) 342 } 343 } 344 } 345 346 ## Cleanup 347 try(close(socket), silent = TRUE) 348 socket <- NULL 349 350 if (validate) { 351 ## Attaching session information for each worker. This is done to assert 352 ## that we have a working cluster already here. It will also collect 353 ## useful information otherwise not available, e.g. the PID. 354 if (verbose) { 355 mdebugf("%s- collecting session information", verbose_prefix) 356 } 357 for (ii in seq_along(cl)) { 358 cl[ii] <- add_cluster_session_info(cl[ii]) 359 } 360 } 361 362 if (autoStop) cl <- autoStopCluster(cl) 363 364 ## Success, remove automatic cleanup of nodes 365 on.exit() 366 367 cl 368} ## makeClusterPSOCK() 369 370 371#' @param worker The hostname or IP number of the machine where the worker 372#' should run. 373#' 374#' @param master The hostname or IP number of the master / calling machine, as 375#' known to the workers. If NULL (default), then the default is 376#' `Sys.info()[["nodename"]]` unless `worker` is _localhost_ or 377#' `revtunnel = TRUE` in case it is `"localhost"`. 378#' 379#' @param connectTimeout The maximum time (in seconds) allowed for each socket 380#' connection between the master and a worker to be established (defaults to 381#' 2 minutes). _See note below on current lack of support on Linux and 382#' macOS systems._ 383#' 384#' @param timeout The maximum time (in seconds) allowed to pass without the 385#' master and a worker communicate with each other (defaults to 30 days). 386#' 387#' @param rscript,homogeneous The system command for launching \command{Rscript} 388#' on the worker and whether it is installed in the same path as the calling 389#' machine or not. For more details, see below. 390#' 391#' @param rscript_args Additional arguments to \command{Rscript} (as a character 392#' vector). This argument can be used to customize the \R environment of the 393#' workers before they launches. 394#' For instance, use `rscript_args = c("-e", shQuote('setwd("/path/to")'))` 395#' to set the working directory to \file{/path/to} on _all_ workers. 396#' 397#' @param rscript_envs A named character vector environment variables to 398#' set or unset on worker at startup, e.g. 399#' `rscript_envs = c(FOO = "3.14", "HOME", "UNKNOWN", UNSETME = NA_character_)`. 400#' If an element is not named, then the value of that variable will be used as 401#' the name and the value will be the value of `Sys.getenv()` for that 402#' variable. Non-existing environment variables will be dropped. 403#' These variables are set using `Sys.setenv()`. 404#' An named element with value `NA_character_` will cause that variable to be 405#' unset, which is done via `Sys.unsetenv()`. 406#' 407#' @param rscript_libs A character vector of \R library paths that will be 408#' used for the library search path of the \R workers. An asterisk 409#' (`"*"`) will be resolved to the default `.libPaths()` _on the 410#' worker_. That is, to `prepend` a folder, instead of replacing the 411#' existing ones, use `rscript_libs = c("new_folder", "*")`. 412#' To pass down a non-default library path currently set _on the main \R 413#' session_ to the workers, use `rscript_libs = .libPaths()`. 414#' 415#' @param rscript_startup An \R expression or a character vector of \R code, 416#' or a list with a mix of these, that will be evaluated on the \R worker 417#' prior to launching the worker's event loop. 418#' For instance, use `rscript_startup = 'setwd("/path/to")'` 419#' to set the working directory to \file{/path/to} on _all_ workers. 420#' 421#' @param rscript_sh The type of shell used where `rscript` is launched, 422#' which should be `"sh"` is launched via a POSIX shell and `"cmd"` if 423#' launched via an MS Windows shell. This controls how shell command-line 424#' options are quoted, via 425#' \code{\link[base:shQuote]{shQuote(..., type = rscript_sh)}}. 426#' If `"auto"` (default), and the cluster node is launched locally, then it 427#' is set to `"sh"` or `"cmd"` according to the current platform. If launched 428#' remotely, then it is set to `"sh"` based on the assumption remote machines 429#' typically launch commands via SSH in a POSIX shell. 430#' 431#' @param default_packages A character vector or NULL that controls which R 432#' packages are attached on each cluster node during startup. An asterisk 433#' (`"*"`) resolves to `getOption("defaultPackages")` _on the current machine_. 434#' If NULL, then the default set of packages R are attached. 435#' 436#' @param methods If TRUE (default), then the \pkg{methods} package is also 437#' loaded. This is argument exists for legacy reasons due to how 438#' \command{Rscript} worked in R (< 3.5.0). 439#' 440#' @param useXDR If FALSE (default), the communication between master and workers, which is binary, will use small-endian (faster), otherwise big-endian ("XDR"; slower). 441#' 442#' @param socketOptions A character string that sets \R option 443#' \option{socketOptions} on the worker. 444#' 445#' @param outfile Where to direct the \link[base:showConnections]{stdout} and 446#' \link[base:showConnections]{stderr} connection output from the workers. 447#' If NULL, then no redirection of output is done, which means that the 448#' output is relayed in the terminal on the local computer. On Windows, the 449#' output is only relayed when running \R from a terminal but not from a GUI. 450#' 451#' @param renice A numerical 'niceness' (priority) to set for the worker 452#' processes. 453#' 454#' @param rank A unique one-based index for each worker (automatically set). 455#' 456#' @param rshcmd,rshopts The command (character vector) to be run on the master 457#' to launch a process on another host and any additional arguments (character 458#' vector). These arguments are only applied if `machine` is not 459#' _localhost_. For more details, see below. 460#' 461#' @param rshlogfile (optional) If a filename, the output produced by the 462#' `rshcmd` call is logged to this file, of if TRUE, then it is logged 463#' to a temporary file. The log file name is available as an attribute 464#' as part of the return node object. 465#' _Warning: This only works with SSH clients that support option 466#' `-E out.log`_. For example, PuTTY's \command{plink} does _not_ support 467#' this option, and any attempts to specify `rshlogfile` will cause the SSH 468#' connection to fail. 469#' 470#' @param user (optional) The user name to be used when communicating with 471#' another host. 472#' 473#' @param revtunnel If TRUE, a reverse SSH tunnel is set up for each worker such 474#' that the worker \R process sets up a socket connection to its local port 475#' `(port - rank + 1)` which then reaches the master on port `port`. 476#' If FALSE, then the worker will try to connect directly to port `port` on 477#' `master`. For more details, see below. 478#' 479#' @param manual If TRUE the workers will need to be run manually. The command 480#' to run will be displayed. 481#' 482#' @param dryrun If TRUE, nothing is set up, but a message suggesting how to 483#' launch the worker from the terminal is outputted. This is useful for 484#' troubleshooting. 485#' 486#' @param quiet If TRUE, then no output will be produced other than that from 487#' using `verbose = TRUE`. 488#' 489#' @param setup_strategy If `"parallel"` (default), the workers are set up 490#' concurrently, one after the other. If `"sequential"`, they are set up 491#' sequentially. 492#' 493#' @param action This is an internal argument. 494#' 495#' @return `makeNodePSOCK()` returns a `"SOCKnode"` or 496#' `"SOCK0node"` object representing an established connection to a worker. 497#' 498#' @section Definition of _localhost_: 499#' A hostname is considered to be _localhost_ if it equals: 500#' \itemize{ 501#' \item `"localhost"`, 502#' \item `"127.0.0.1"`, or 503#' \item `Sys.info()[["nodename"]]`. 504#' } 505#' It is also considered _localhost_ if it appears on the same line 506#' as the value of `Sys.info()[["nodename"]]` in file \file{/etc/hosts}. 507#' 508#' @section Default SSH client and options (arguments `rshcmd` and `rshopts`): 509#' Arguments `rshcmd` and `rshopts` are only used when connecting 510#' to an external host. 511#' 512#' The default method for connecting to an external host is via SSH and the 513#' system executable for this is given by argument `rshcmd`. The default 514#' is given by option \option{parallelly.makeNodePSOCK.rshcmd}. If that is not 515#' set, then the default is to use \command{ssh} on Unix-like systems, 516#' including macOS as well as Windows 10. On older MS Windows versions, which 517#' does not have a built-in \command{ssh} client, the default is to use 518#' (i) \command{plink} from the \href{https://www.putty.org/}{\command{PuTTY}} 519#' project, and then (ii) the \command{ssh} client that is distributed with 520#' RStudio. 521#' 522#' PuTTY puts itself on Windows' system \env{PATH} when installed, meaning this 523#' function will find PuTTY automatically if installed. If not, to manually 524#' set specify PuTTY as the SSH client, specify the absolute pathname of 525#' \file{plink.exe} in the first element and option \command{-ssh} in the 526#' second as in `rshcmd = c("C:/Path/PuTTY/plink.exe", "-ssh")`. 527#' This is because all elements of `rshcmd` are individually "shell" 528#' quoted and element `rshcmd[1]` must be on the system \env{PATH}. 529#' 530#' Furthermore, when running \R from RStudio on Windows, the \command{ssh} 531#' client that is distributed with RStudio will also be considered. 532#' This client, which is from \href{https://osdn.net/projects/mingw/}{MinGW} 533#' MSYS, is searched for in the folder given by the \env{RSTUDIO_MSYS_SSH} 534#' environment variable - a variable that is (only) set when running RStudio. 535#' To use this SSH client outside of RStudio, set \env{RSTUDIO_MSYS_SSH} 536#' accordingly. 537#' 538#' You can override the default set of SSH clients that are searched for 539#' by specifying them in argument `rshcmd` or via option 540#' \option{parallelly.makeNodePSOCK.rshcmd} using the format `<...>`, e.g. 541#' `rshcmd = c("<rstudio-ssh>", "<putty-plink>", "<ssh>")`. See 542#' below for examples. 543#' 544#' If no SSH-client is found, an informative error message is produced. 545#' 546#' Additional SSH options may be specified via argument `rshopts`, which 547#' defaults to option \option{parallelly.makeNodePSOCK.rshopts}. For instance, a 548#' private SSH key can be provided as 549#' `rshopts = c("-i", "~/.ssh/my_private_key")`. PuTTY users should 550#' specify a PuTTY PPK file, e.g. 551#' `rshopts = c("-i", "C:/Users/joe/.ssh/my_keys.ppk")`. 552#' Contrary to `rshcmd`, elements of `rshopts` are not quoted. 553#' 554#' @section Accessing external machines that prompts for a password: 555#' _IMPORTANT: With one exception, it is not possible to for these 556#' functions to log in and launch \R workers on external machines that requires 557#' a password to be entered manually for authentication._ 558#' The only known exception is the PuTTY client on Windows for which one can 559#' pass the password via command-line option `-pw`, e.g. 560#' `rshopts = c("-pw", "MySecretPassword")`. 561#' 562#' Note, depending on whether you run \R in a terminal or via a GUI, you might 563#' not even see the password prompt. It is also likely that you cannot enter 564#' a password, because the connection is set up via a background system call. 565#' 566#' The poor man's workaround for setup that requires a password is to manually 567#' log into the each of the external machines and launch the \R workers by hand. 568#' For this approach, use `manual = TRUE` and follow the instructions 569#' which include cut'n'pasteable commands on how to launch the worker from the 570#' external machine. 571#' 572#' However, a much more convenient and less tedious method is to set up 573#' key-based SSH authentication between your local machine and the external 574#' machine(s), as explain below. 575#' 576#' @section Accessing external machines via key-based SSH authentication: 577#' The best approach to automatically launch \R workers on external machines 578#' over SSH is to set up key-based SSH authentication. This will allow you 579#' to log into the external machine without have to enter a password. 580#' 581#' Key-based SSH authentication is taken care of by the SSH client and not \R. 582#' To configure this, see the manuals of your SSH client or search the web 583#' for "ssh key authentication". 584#' 585#' @section Reverse SSH tunneling: 586#' The default is to use reverse SSH tunneling (`revtunnel = TRUE`) for 587#' workers running on other machines. This avoids the complication of 588#' otherwise having to configure port forwarding in firewalls, which often 589#' requires static IP address as well as privileges to edit the firewall 590#' on your outgoing router, something most users don't have. 591#' It also has the advantage of not having to know the internal and / or the 592#' public IP address / hostname of the master. 593#' Yet another advantage is that there will be no need for a DNS lookup by the 594#' worker machines to the master, which may not be configured or is disabled 595#' on some systems, e.g. compute clusters. 596#' 597#' @section Argument `rscript`: 598#' If `homogeneous` is FALSE, the `rscript` defaults to `"Rscript"`, i.e. it 599#' is assumed that the \command{Rscript} executable is available on the 600#' \env{PATH} of the worker. 601#' If `homogeneous` is TRUE, the `rscript` defaults to 602#' `file.path(R.home("bin"), "Rscript")`, i.e. it is basically assumed that 603#' the worker and the caller share the same file system and \R installation. 604#' 605#' When specified, argument `rscript` should be a character vector with one or 606#' more elements. Any asterisk (`"*"`) will be resolved to the above default 607#' `homogeneous`-dependent `Rscript` path. 608#' All elements are automatically shell quoted using [base::shQuote()], except 609#' those that are of format `<ENVVAR>=<VALUE>`, that is, the ones matching the 610#' regular expression '\samp{^[[:alpha:]_][[:alnum:]_]*=.*}'. 611#' Another exception is when `rscript` inherits from 'AsIs'. 612#' 613#' @section Default value of argument `homogeneous`: 614#' The default value of `homogeneous` is TRUE if and only if either 615#' of the following is fulfilled: 616#' \itemize{ 617#' \item `worker` is _localhost_ 618#' \item `revtunnel` is FALSE and `master` is _localhost_ 619#' \item `worker` is neither an IP number nor a fully qualified domain 620#' name (FQDN). A hostname is considered to be a FQDN if it contains 621#' one or more periods 622#' } 623#' In all other cases, `homogeneous` defaults to FALSE. 624#' 625#' @section Connection time out: 626#' Argument `connectTimeout` does _not_ work properly on Unix and 627#' macOS due to limitation in \R itself. For more details on this, please see 628#' R-devel thread 'BUG?: On Linux setTimeLimit() fails to propagate timeout 629#' error when it occurs (works on Windows)' on 2016-10-26 630#' (\url{https://stat.ethz.ch/pipermail/r-devel/2016-October/073309.html}). 631#' When used, the timeout will eventually trigger an error, but it won't happen 632#' until the socket connection timeout `timeout` itself happens. 633#' 634#' @section Communication time out: 635#' If there is no communication between the master and a worker within the 636#' `timeout` limit, then the corresponding socket connection will be 637#' closed automatically. This will eventually result in an error in code 638#' trying to access the connection. 639#' 640#' @section Failing to set up local workers: 641#' When setting up a cluster of localhost workers, that is, workers running 642#' on the same machine as the master \R process, occasionally a connection 643#' to a worker ("cluster node") may fail to be set up. 644#' When this occurs, an informative error message with troubleshooting 645#' suggestions will be produced. 646#' The most common reason for such localhost failures is due to port 647#' clashes. Retrying will often resolve the problem. 648#' 649#' @section Failing to set up remote workers: 650#' A cluster of remote workers runs \R processes on external machines. These 651#' external \R processes are launched over, typically, SSH to the remote 652#' machine. For this to work, each of the remote machines needs to have 653#' \R installed, which preferably is of the same version as what is on the 654#' main machine. For this to work, it is required that one can SSH to the 655#' remote machines. Ideally, the SSH connections use authentication based 656#' on public-private SSH keys such that the set up of the remote workers can 657#' be fully automated (see above). If `makeClusterPSOCK()` fails to set 658#' up one or more remote \R workers, then an informative error message is 659#' produced. 660#' There are a few reasons for failing to set up remote workers. If this 661#' happens, start by asserting that you can SSH to the remote machine and 662#' launch \file{Rscript} by calling something like: 663#' \preformatted{ 664#' {local}$ ssh -l alice remote.server.org 665#' {remote}$ Rscript --version 666#' R scripting front-end version 3.6.1 (2019-07-05) 667#' {remote}$ logout 668#' {local}$ 669#' } 670#' When you have confirmed the above to work, then confirm that you can achieve 671#' the same in a single command-line call; 672#' \preformatted{ 673#' {local}$ ssh -l alice remote.server.org Rscript --version 674#' R scripting front-end version 3.6.1 (2019-07-05) 675#' {local}$ 676#' } 677#' The latter will assert that you have proper startup configuration also for 678#' _non-interactive_ shell sessions on the remote machine. 679#' 680#' Another reason for failing to setup remote workers could be that they are 681#' running an \R version that is not compatible with the version that your main 682#' \R session is running. For instance, if we run R (>= 3.6.0) locally and the 683#' workers run R (< 3.5.0), we will get: 684#' `Error in unserialize(node$con) : error reading from connection`. 685#' This is because R (>= 3.6.0) uses serialization format version 3 by default 686#' whereas R (< 3.5.0) only supports version 2. We can see the version of the 687#' \R workers by adding `rscript_args = c("-e", shQuote("getRversion()"))` when 688#' calling `makeClusterPSOCK()`. 689#' 690#' @rdname makeClusterPSOCK 691#' @importFrom tools pskill 692#' @importFrom utils flush.console 693#' @export 694makeNodePSOCK <- function(worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = TRUE, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), action = c("launch", "options"), verbose = FALSE) { 695 verbose <- as.logical(verbose) 696 stop_if_not(length(verbose) == 1L, !is.na(verbose)) 697 698 if (inherits(worker, "makeNodePSOCKOptions")) { 699 return(launchNodePSOCK(options = worker, verbose = verbose)) 700 } 701 702 localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") 703 localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) 704 705 ## Could it be that the worker specifies the name of the localhost? 706 ## Note, this approach preserves worker == "127.0.0.1" if that is given. 707 if (!localMachine) { 708 localMachine <- is_localhost(worker) 709 if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") 710 } 711 attr(worker, "localhost") <- localMachine 712 713 stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, !anyNA(rscript_sh)) 714 rscript_sh <- rscript_sh[1] 715 if (rscript_sh == "auto") { 716 if (localMachine) { 717 rscript_sh <- if (.Platform$OS.type == "windows") "cmd" else "sh" 718 } else { 719 ## Assume remote machine uses as POSIX shell 720 rscript_sh <- "sh" 721 } 722 } 723 724 manual <- as.logical(manual) 725 stop_if_not(length(manual) == 1L, !is.na(manual)) 726 727 dryrun <- as.logical(dryrun) 728 stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) 729 730 setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) 731 732 quiet <- as.logical(quiet) 733 stop_if_not(length(quiet) == 1L, !is.na(quiet)) 734 735 ## Locate a default SSH client? 736 if (identical(rshcmd, "")) rshcmd <- NULL 737 if (!is.null(rshcmd)) { 738 rshcmd <- as.character(rshcmd) 739 stop_if_not(length(rshcmd) >= 1L) 740 } 741 742 if (identical(rshopts, "")) rshopts <- NULL 743 rshopts <- as.character(rshopts) 744 745 user <- as.character(user) 746 stop_if_not(length(user) <= 1L) 747 748 port <- as.integer(port) 749 assertPort(port) 750 751 revtunnel <- as.logical(revtunnel) 752 stop_if_not(length(revtunnel) == 1L, !is.na(revtunnel)) 753 754 if (!is.null(rshlogfile)) { 755 if (is.logical(rshlogfile)) { 756 stop_if_not(!is.na(rshlogfile)) 757 if (rshlogfile) { 758 rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") 759 } else { 760 rshlogfile <- NULL 761 } 762 } else { 763 rshlogfile <- as.character(rshlogfile) 764 rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) 765 } 766 } 767 768 if (is.null(master)) { 769 if (localMachine || revtunnel) { 770 master <- localhostHostname 771 } else { 772 master <- Sys.info()[["nodename"]] 773 } 774 } 775 stop_if_not(!is.null(master)) 776 777 timeout <- as.numeric(timeout) 778 stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) 779 780 ## FIXME: This is really legacy code there. It stems from R (< 3.5.0), where 781 ## 'Rscript' did *not* attach the 'methods' package by default, whereas 'R' 782 ## did. Since R 3.5.0, 'R' and 'Rscript' attach the same set of packages. 783 methods <- as.logical(methods) 784 stop_if_not(length(methods) == 1L, !is.na(methods)) 785 786 if (!is.null(default_packages)) { 787 default_packages <- as.character(default_packages) 788 stop_if_not(!anyNA(default_packages)) 789 is_asterisk <- (default_packages == "*") 790 if (any(is_asterisk)) { 791 pkgs <- getOption("defaultPackages") 792 if (length(pkgs) == 0) { 793 default_packages[!is_asterisk] 794 } else { 795 pkgs <- paste(pkgs, collapse=",") 796 default_packages[is_asterisk] <- pkgs 797 default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) 798 } 799 } 800 default_packages <- unique(default_packages) 801 pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) 802 invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) 803 if (length(invalid) > 0) { 804 stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) 805 } 806 } 807 808 if (is.null(homogeneous)) { 809 homogeneous <- { 810 localMachine || 811 (!revtunnel && is_localhost(master)) || 812 (!is_ip_number(worker) && !is_fqdn(worker)) 813 } 814 } 815 homogeneous <- as.logical(homogeneous) 816 stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) 817 818 ## Is a parallel setup strategy possible? 819 if (setup_strategy == "parallel") { 820 if (getRversion() < "4.0.0" || 821 manual || dryrun || !homogeneous || !localMachine) { 822 setup_strategy <- "sequential" 823 } 824 } 825 826 bin <- "Rscript" 827 if (homogeneous) bin <- file.path(R.home("bin"), bin) 828 if (is.null(rscript)) { 829 rscript <- bin 830 } else { 831 if (!is.character(rscript)) rscript <- as.character(rscript) 832 stop_if_not(length(rscript) >= 1L) 833 rscript[rscript == "*"] <- bin 834 bin <- rscript[1] 835 if (homogeneous && !inherits(bin, "AsIs")) { 836 bin <- Sys.which(bin) 837 if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) 838 rscript[1] <- bin 839 } 840 } 841 842 ## Is rscript[1] referring to Rscript, or R/Rterm? 843 name <- sub("[.]exe$", "", basename(bin)) 844 is_Rscript <- (tolower(name) == "rscript") 845 846 rscript_args <- as.character(rscript_args) 847 848 if (length(rscript_startup) > 0L) { 849 if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) 850 rscript_startup <- lapply(rscript_startup, FUN = function(init) { 851 if (is.language(init)) { 852 init <- deparse(init, width.cutoff = 500L) 853 ## We cannot use newline between statements because 854 ## it needs to be passed as a one line string via -e <code> 855 init <- paste(init, collapse = ";") 856 } 857 init <- as.character(init) 858 if (length(init) == 0L) return(NULL) 859 tryCatch({ 860 parse(text = init) 861 }, error = function(ex) { 862 stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) 863 }) 864 init 865 }) 866 rscript_startup <- unlist(rscript_startup, use.names = FALSE) 867 } 868 869 if (!is.null(rscript_libs)) { 870 rscript_libs <- as.character(rscript_libs) 871 stop_if_not(!anyNA(rscript_libs)) 872 } 873 874 useXDR <- as.logical(useXDR) 875 stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) 876 877 if (!is.null(socketOptions)) { 878 stop_if_not(is.character(socketOptions),length(socketOptions) == 1L, 879 !is.na(socketOptions), nzchar(socketOptions)) 880 if (socketOptions == "NULL") socketOptions <- NULL 881 } 882 883 stop_if_not(is.null(outfile) || is.character(outfile)) 884 885 renice <- as.integer(renice) 886 stop_if_not(length(renice) == 1L) 887 888 rank <- as.integer(rank) 889 stop_if_not(length(rank) == 1L, !is.na(rank)) 890 891 action <- match.arg(action, choices = c("launch", "options")) 892 893 verbose_prefix <- "[local output] " 894 895 ## Shell quote the Rscript executable? 896 if (!inherits(rscript, "AsIs")) { 897 idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) 898 rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh) 899 } 900 901 rscript_args_internal <- character(0L) 902 903 ## Can we get the worker's PID during launch? 904 if (localMachine && !dryrun) { 905 res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh, verbose = verbose) 906 pidfile <- res$pidfile 907 rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) 908 } else { 909 pidfile <- NULL 910 } 911 912 ## Add Rscript "label"? 913 rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) 914 if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { 915 if (isTRUE(as.logical(rscript_label))) { 916 script <- grep("[.]R$", commandArgs(), value = TRUE)[1] 917 if (is.na(script)) script <- "UNKNOWN" 918 rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) 919 } 920 rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh), rscript_args_internal) 921 } 922 923 ## In contrast to default_packages=character(0), default_packages = NULL 924 ## skips --default-packages/R_DEFAULT_PACKAGES completely. 925 if (!is.null(default_packages)) { 926 pkgs <- paste(unique(default_packages), collapse = ",") 927 if (is_Rscript) { 928 arg <- sprintf("--default-packages=%s", pkgs) 929 rscript_args_internal <- c(arg, rscript_args_internal) 930 } else { 931 ## FIXME: Make 'rscript_envs' work this way so they are applied sooner 932 ## in the R startup process, instead via -e 'Sys.setenv(FOO="1")'. 933 arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) 934 935 ## Is the cluster node launched in a MS Windows machine? 936 on_MSWindows <- (rscript_sh %in% c("cmd", "cmd2")) 937 if (on_MSWindows) { 938 ## On MS Windows, we have to use special '/path/to/R FOO=1 ...' 939 rscript_args <- c(arg, rscript_args) 940 } else { 941 ## Everywhere else, we can use 'FOO=1 /path/to/R ...' 942 rscript <- c(arg, rscript) 943 } 944 } 945 } 946 947 ## Port that the Rscript should use to connect back to the master 948 if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { 949 rscript_port <- assertPort(port + (rank - 1L)) 950 if (verbose) { 951 mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank-1L, rscript_port) 952 } 953 } else { 954 rscript_port <- port 955 if (verbose) { 956 mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) 957 } 958 } 959 960 if (length(socketOptions) == 1L) { 961 code <- sprintf("options(socketOptions = \"%s\")", socketOptions) 962 rscript_expr <- c("-e", shQuote(code, type = rscript_sh)) 963 rscript_args_internal <- c(rscript_args_internal, rscript_expr) 964 } 965 966 if (length(rscript_startup) > 0L) { 967 rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") 968 rscript_startup <- shQuote(rscript_startup, type = rscript_sh) 969 rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) 970 rscript_startup <- unlist(rscript_startup, use.names = FALSE) 971 rscript_args_internal <- c(rscript_args_internal, rscript_startup) 972 } 973 974 if (length(rscript_envs) > 0L) { 975 names <- names(rscript_envs) 976 if (is.null(names)) { 977 copy <- seq_along(rscript_envs) 978 } else { 979 copy <- which(nchar(names) == 0L) 980 } 981 if (length(copy) > 0L) { 982 missing <- NULL 983 for (idx in copy) { 984 name <- rscript_envs[idx] 985 if (!nzchar(name)) { 986 stop("Argument 'rscript_envs' contains an empty non-named environment variable") 987 } 988 value <- Sys.getenv(name, NA_character_) 989 if (!is.na(value)) { 990 rscript_envs[idx] <- value 991 names(rscript_envs)[idx] <- name 992 } else { 993 missing <- c(missing, name) 994 } 995 } 996 if (length(missing) > 0L) { 997 warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) 998 } 999 names <- names(rscript_envs) 1000 rscript_envs <- rscript_envs[nzchar(names)] 1001 names <- names(rscript_envs) 1002 } 1003 ## Any environment variables to unset? 1004 if (length(unset <- which(is.na(rscript_envs))) > 0L) { 1005 names <- names(rscript_envs[unset]) 1006 code <- sprintf("\"%s\"", names) 1007 code <- paste(code, collapse = ", ") 1008 code <- paste0("Sys.unsetenv(c(", code, "))") 1009 tryCatch({ 1010 parse(text = code) 1011 }, error = function(ex) { 1012 stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) 1013 }) 1014 rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh)) 1015 rscript_envs <- rscript_envs[-unset] 1016 names <- names(rscript_envs) 1017 } 1018 1019 ## Any environment variables to set? 1020 if (length(names) > 0L) { 1021 code <- sprintf('"%s"="%s"', names, rscript_envs) 1022 code <- paste(code, collapse = ", ") 1023 code <- paste0("Sys.setenv(", code, ")") 1024 tryCatch({ 1025 parse(text = code) 1026 }, error = function(ex) { 1027 stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ", ")) 1028 }) 1029 rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh)) 1030 } 1031 } 1032 1033 if (length(rscript_libs) > 0L) { 1034 ## Make sure to preserve backslashes, e.g. in Windows network drives 1035 rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) 1036 code <- paste0('"', rscript_libs, '"') 1037 code[rscript_libs == "*"] <- ".libPaths()" 1038 code <- paste(code, collapse = ",") 1039 code <- paste0('.libPaths(c(', code, '))') 1040 tryCatch({ 1041 parse(text = code) 1042 }, error = function(ex) { 1043 stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) 1044 }) 1045 rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh)) 1046 } 1047 1048 ## .{slave,work}RSOCK() command already specified? 1049 if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { 1050 ## In R (>= 4.1.0), parallel:::.slaveRSOCK() was renamed to .workRSOCK() 1051 cmd <- "workRSOCK <- tryCatch(parallel:::.workRSOCK, error=function(e) parallel:::.slaveRSOCK); workRSOCK()" 1052 rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh)) 1053 } 1054 1055 ## Append or inject rscript_args_internal? 1056 idx <- which(rscript_args == "*") 1057 if (length(idx) == 0L) { 1058 rscript_args <- c(rscript_args, rscript_args_internal) 1059 } else if (length(idx) == 1L) { 1060 n <- length(rscript_args) 1061 if (idx == 1L) { 1062 rscript_args <- c(rscript_args_internal, rscript_args[-1]) 1063 } else if (idx == n) { 1064 rscript_args <- c(rscript_args[-n], rscript_args_internal) 1065 } else { 1066 rscript_args <- c(rscript_args[1:(idx-1)], rscript_args_internal, 1067 rscript_args[(idx+1):n]) 1068 } 1069 } else { 1070 stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) 1071 } 1072 1073 rscript <- paste(rscript, collapse = " ") 1074 rscript_args <- paste(rscript_args, collapse = " ") 1075 envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, 1076 " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) 1077 1078 cmd <- paste(rscript, rscript_args, envvars) 1079 1080 ## Renice? 1081 if (!is.na(renice) && renice > 0L) { 1082 cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) 1083 } 1084 1085 if (!localMachine) { 1086 ## Find default SSH client 1087 find <- is.null(rshcmd) 1088 if (find) { 1089 which <- NULL 1090 if (verbose) { 1091 mdebugf("%sWill search for all 'rshcmd' available\n", 1092 verbose_prefix) 1093 } 1094 } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { 1095 find <- TRUE 1096 if (verbose) { 1097 mdebugf("%sWill search for specified 'rshcmd' types: %s\n", 1098 verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) 1099 } 1100 which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) 1101 } 1102 1103 if (find) { 1104 rshcmd <- find_rshcmd(which = which, 1105 must_work = !localMachine && !manual && !dryrun) 1106 if (verbose) { 1107 s <- unlist(lapply(rshcmd, FUN = function(r) { 1108 sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) 1109 })) 1110 s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") 1111 mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) 1112 } 1113 rshcmd <- rshcmd[[1]] 1114 } else { 1115 if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- "<unknown>" 1116 if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "<unknown>" 1117 } 1118 1119 ## Holds a pathname with an optional set of command-line options 1120 stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) 1121 1122 s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) 1123 rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) 1124 1125 if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) 1126 1127 ## User? 1128 if (length(user) == 1L) rshopts <- c("-l", user, rshopts) 1129 1130 ## Reverse tunneling? 1131 if (revtunnel) { 1132 ## WORKAROUND: The Windows 10 loopback resolution uses IPv6 by default 1133 ## and the server is not listening for "localhost". The solution is 1134 ## to use "127.0.0.1" instead, or force IPv4 by using ssh option '-4'. 1135 ## For more details, see 1136 ## https://github.com/PowerShell/Win32-OpenSSH/issues/1265#issuecomment-855234326 for 1137 if (is_localhost(master) && .Platform$OS.type == "windows" && ( 1138 isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || 1139 basename(rshcmd[1]) == "ssh" 1140 )) { 1141 master <- "127.0.0.1" 1142 } 1143 rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) 1144 } 1145 1146 ## SSH log file? 1147 if (is.character(rshlogfile)) { 1148 rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) 1149 } 1150 1151 rshopts <- paste(rshopts, collapse = " ") 1152 1153 ## Local commands 1154 rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) 1155 local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh)) 1156 } else { 1157 rshcmd_label <- NULL 1158 rsh_call <- NULL 1159 local_cmd <- cmd 1160 } 1161 stop_if_not(length(local_cmd) == 1L) 1162 1163 options <- structure(list( 1164 local_cmd = local_cmd, 1165 worker = worker, 1166 rank = rank, 1167 rshlogfile = rshlogfile, 1168 port = port, 1169 connectTimeout = connectTimeout, 1170 timeout = timeout, 1171 useXDR = useXDR, 1172 pidfile = pidfile, 1173 setup_strategy = setup_strategy, 1174 ## For messages, warnings, and errors: 1175 outfile = outfile, 1176 rshcmd_label = rshcmd_label, 1177 rsh_call = rsh_call, 1178 cmd = cmd, 1179 localMachine = localMachine, 1180 manual = manual, 1181 dryrun = dryrun, 1182 quiet = quiet, 1183 rshcmd = rshcmd, 1184 revtunnel = revtunnel 1185 ), class = c("makeNodePSOCKOptions", "makeNodeOptions")) 1186 1187 ## Return options? 1188 if (action == "options") return(options) 1189 1190 launchNodePSOCK(options, verbose = verbose) 1191} 1192 1193 1194launchNodePSOCK <- function(options, verbose = FALSE) { 1195 stop_if_not(inherits(options, "makeNodePSOCKOptions")) 1196 1197 local_cmd <- options[["local_cmd"]] 1198 worker <- options[["worker"]] 1199 rank <- options[["rank"]] 1200 rshlogfile <- options[["rshlogfile"]] 1201 port <- options[["port"]] 1202 connectTimeout <- options[["connectTimeout"]] 1203 timeout <- options[["timeout"]] 1204 pidfile <- options[["pidfile"]] 1205 ## For messages, warnings, and errors"]] 1206 useXDR <- options[["useXDR"]] 1207 outfile <- options[["outfile"]] 1208 rshcmd_label <- options[["rshcmd_label"]] 1209 rsh_call <- options[["rsh_call"]] 1210 cmd <- options[["cmd"]] 1211 localMachine <- options[["localMachine"]] 1212 manual <- options[["manual"]] 1213 dryrun <- options[["dryrun"]] 1214 quiet <- options[["quiet"]] 1215 rshcmd <- options[["rshcmd"]] 1216 revtunnel <- options[["revtunnel"]] 1217 setup_strategy <- options[["setup_strategy"]] 1218 1219 if (setup_strategy == "parallel") { 1220 stop("INTERNAL ERROR: launchNodePSOCK() called with setup_strategy='parallel', which should never occur") 1221 } 1222 1223 verbose <- as.logical(verbose) 1224 stop_if_not(length(verbose) == 1L, !is.na(verbose)) 1225 1226 verbose_prefix <- "[local output] " 1227 1228 is_worker_output_visible <- is.null(outfile) 1229 1230 if (manual || dryrun) { 1231 if (!quiet) { 1232 msg <- c("----------------------------------------------------------------------") 1233 if (localMachine) { 1234 msg <- c(msg, sprintf("Manually, start worker #%s on local machine %s with:", rank, sQuote(worker)), sprintf("\n %s\n", cmd)) 1235 } else { 1236 msg <- c(msg, sprintf("Manually, (i) login into external machine %s:", sQuote(worker)), 1237 sprintf("\n %s\n", rsh_call)) 1238 msg <- c(msg, sprintf("and (ii) start worker #%s from there:", rank), 1239 sprintf("\n %s\n", cmd)) 1240 msg <- c(msg, sprintf("Alternatively, start worker #%s from the local machine by combining both step in a single call:", rank), 1241 sprintf("\n %s\n", local_cmd)) 1242 } 1243 msg <- paste(c(msg, ""), collapse = "\n") 1244 cat(msg) 1245 flush.console() 1246 } 1247 if (dryrun) return(NULL) 1248 } else { 1249 if (verbose) { 1250 mdebugf("%sStarting worker #%s on %s: %s", verbose_prefix, rank, sQuote(worker), local_cmd) 1251 } 1252 input <- if (.Platform$OS.type == "windows") "" else NULL 1253 res <- system(local_cmd, wait = FALSE, input = input) 1254 if (verbose) { 1255 mdebugf("%s- Exit code of system() call: %s", verbose_prefix, res) 1256 } 1257 if (res != 0) { 1258 warnf("system(%s) had a non-zero exit code: %d", local_cmd, res) 1259 } 1260 } 1261 1262 if (verbose) { 1263 mdebugf("%sWaiting for worker #%s on %s to connect back", verbose_prefix, rank, sQuote(worker)) 1264 if (is_worker_output_visible) { 1265 if (.Platform$OS.type == "windows") { 1266 mdebugf("%s- Detected 'outfile=NULL' on Windows: this will make the output from the background worker visible when running R from a terminal, but it will most likely not be visible when using a GUI.", verbose_prefix) 1267 } else { 1268 mdebugf("%s- Detected 'outfile=NULL': this will make the output from the background worker visible", verbose_prefix) 1269 } 1270 } 1271 } 1272 1273 1274 con <- local({ 1275 ## Apply connection time limit "only to the rest of the current computation". 1276 ## NOTE: Regardless of transient = TRUE / FALSE, it still seems we need to 1277 ## undo it manually :/ /HB 2016-11-05 1278 setTimeLimit(elapsed = connectTimeout) 1279 on.exit(setTimeLimit(elapsed = Inf)) 1280 1281 localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") 1282 warnings <- list() 1283 tryCatch({ 1284 withCallingHandlers({ 1285 socketConnection(localhostHostname, port = port, server = TRUE, 1286 blocking = TRUE, open = "a+b", timeout = timeout) 1287 }, warning = function(w) { 1288 if (verbose) { 1289 mdebugf("%sDetected a warning from socketConnection(): %s", verbose_prefix, sQuote(conditionMessage(w))) 1290 } 1291 warnings <<- c(warnings, list(w)) 1292 }) 1293 }, error = function(ex) { 1294 setTimeLimit(elapsed = Inf) 1295 1296 ## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1297 ## Post-mortem analysis 1298 ## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1299 machineType <- if (localMachine) "local" else "remote" 1300 msg <- sprintf("Failed to launch and connect to R worker on %s machine %s from local machine %s.\n", machineType, sQuote(worker), sQuote(Sys.info()[["nodename"]])) 1301 1302 ## Inspect and report on the error message 1303 cmsg <- conditionMessage(ex) 1304 if (grepl(gettext("reached elapsed time limit"), cmsg)) { 1305 msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s (which suggests that the connection timeout of %.0f seconds (argument 'connectTimeout') kicked in)\n", sQuote(cmsg), connectTimeout)) 1306 } else { 1307 msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s\n", sQuote(cmsg))) 1308 } 1309 1310 ## Inspect and report on any warnings 1311 if (length(warnings) > 0) { 1312 msg <- c(msg, sprintf(" * In addition, socketConnection() produced %d warning(s):\n", length(warnings))) 1313 for (kk in seq_along(warnings)) { 1314 cmsg <- conditionMessage(warnings[[kk]]) 1315 if (grepl("port [0-9]+ cannot be opened", cmsg)) { 1316 msg <- c(msg, sprintf(" - Warning #%d: %s (which suggests that this port is either already occupied by another process or blocked by the firewall on your local machine)\n", kk, sQuote(cmsg))) 1317 } else { 1318 msg <- c(msg, sprintf(" - Warning #%d: %s\n", kk, sQuote(cmsg))) 1319 } 1320 } 1321 } 1322 1323 ## Report on how the local socket connect was setup 1324 msg <- c(msg, sprintf(" * The localhost socket connection that failed to connect to the R worker used port %d using a communication timeout of %.0f seconds and a connection timeout of %.0f seconds.\n", port, timeout, connectTimeout)) 1325 1326 ## Report on how the worker was launched 1327 msg <- c(msg, sprintf(" * Worker launch call: %s.\n", local_cmd)) 1328 1329 ## Do we know the PID of the worker? If so, try to kill it to avoid 1330 ## leaving a stray process behind 1331 ## Comment: readWorkerPID() must be done *after* socketConnection() 1332 ## on R 3.4.4, otherwise socketConnection() will fail. Not sure why. 1333 ## /HB 2019-01-24 1334 pid <- readWorkerPID(pidfile) 1335 if (!is.null(pid)) { 1336 if (verbose) mdebugf("Killing worker process (PID %d) if still alive", pid) 1337 ## WARNING: pid_kill() calls pid_exists() [twice] and on Windows 1338 ## pid_exists() uses system('tasklist') which can be very very slow 1339 ## /HB 2019-01-24 1340 success <- pid_kill(pid) 1341 if (verbose) mdebugf("Worker (PID %d) was successfully killed: %s", pid, success) 1342 msg <- c(msg, sprintf(" * Worker (PID %d) was successfully killed: %s\n", pid, success)) 1343 } else if (localMachine) { 1344 msg <- c(msg, sprintf(" * Failed to kill local worker because it's PID is could not be identified.\n")) 1345 } 1346 1347 ## Propose further troubleshooting methods 1348 suggestions <- NULL 1349 1350 ## Enable verbose=TRUE? 1351 if (!verbose) { 1352 suggestions <- c(suggestions, "Set 'verbose=TRUE' to see more details.") 1353 } 1354 1355 ## outfile=NULL? 1356 if (.Platform$OS.type == "windows") { 1357 if (is_worker_output_visible) { 1358 suggestions <- c(suggestions, "On Windows, to see output from worker, set 'outfile=NULL' and run R from a terminal (not a GUI).") 1359 } else { 1360 suggestions <- c(suggestions, "On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).") 1361 } 1362 } else { 1363 if (!is_worker_output_visible) { 1364 suggestions <- c(suggestions, "Set 'outfile=NULL' to see output from worker.") 1365 } 1366 } 1367 1368 ## Log file? 1369 if (is.character(rshlogfile)) { 1370 smsg <- sprintf("Inspect the content of log file %s for %s.", sQuote(rshlogfile), paste(sQuote(rshcmd), collapse = " ")) 1371 lmsg <- tryCatch(readLines(rshlogfile, n = 15L, warn = FALSE), error = function(ex) NULL) 1372 if (length(lmsg) > 0) { 1373 lmsg <- sprintf(" %2d: %s", seq_along(lmsg), lmsg) 1374 smsg <- sprintf("%s The first %d lines are:\n%s", smsg, length(lmsg), paste(lmsg, collapse = "\n")) 1375 } 1376 suggestions <- c(suggestions, smsg) 1377 } else { 1378 suggestions <- c(suggestions, sprintf("Set 'rshlogfile=TRUE' to enable logging for %s.", paste(sQuote(rshcmd), collapse = " "))) 1379 } 1380 1381 ## Special: Windows 10 ssh client may not support reverse tunneling. /2018-11-10 1382 ## https://github.com/PowerShell/Win32-OpenSSH/issues/1265 1383 if (!localMachine && revtunnel && isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) { 1384 suggestions <- c(suggestions, sprintf("The 'rshcmd' (%s) used may not support reverse tunneling (revtunnel = TRUE). See ?parallelly::makeClusterPSOCK for alternatives.\n", rshcmd_label)) 1385 } 1386 1387 if (length(suggestions) > 0) { 1388 suggestions <- sprintf(" - Suggestion #%d: %s\n", seq_along(suggestions), suggestions) 1389 msg <- c(msg, " * Troubleshooting suggestions:\n", suggestions) 1390 } 1391 1392 msg <- paste(msg, collapse = "") 1393 ex$message <- msg 1394 1395 ## Re-signal as an PSOCKConnectionError error 1396 class(ex) <- c("PSOCKConnectionError", class(ex)) 1397 1398 ## Relay error and temporarily avoid truncating the error message 1399 ## in case it is too long 1400 local({ 1401 oopts <- options(warning.length = 2000L) 1402 on.exit(options(oopts)) 1403 stop(ex) 1404 }) 1405 }) 1406 }) 1407 setTimeLimit(elapsed = Inf) 1408 1409 if (verbose) { 1410 mdebugf("%sConnection with worker #%s on %s established", verbose_prefix, rank, sQuote(worker)) 1411 } 1412 1413 structure(list(con = con, host = worker, rank = rank, rshlogfile = rshlogfile), 1414 class = c("RichSOCKnode", if (useXDR) "SOCKnode" else "SOCK0node")) 1415} ## makeNodePSOCK() 1416 1417 1418## Checks if a given worker is the same as the localhost. It is, iff: 1419## 1420## * worker == "localhost" 1421## * worker == "127.0.0.1" 1422## * worker == hostname 1423## * worker and hostname appears on the same line in /etc/hosts 1424## 1425## This should cover cases such as: 1426## * Calling is_localhost("n3") from machine n3 1427## * Calling is_localhost("n3.myserver.org") from machine n3[.myserver.org] 1428## 1429## References: 1430## * https://en.wikipedia.org/wiki/Hostname 1431#' @importFrom utils file_test 1432is_localhost <- local({ 1433 localhosts <- c("localhost", "127.0.0.1") 1434 non_localhosts <- character(0L) 1435 1436 function(worker, hostname = Sys.info()[["nodename"]], pathnames = "/etc/hosts") { 1437 ## INTERNAL: Clear list of known local hosts? 1438 if (is.null(worker) && is.null(hostname)) { 1439 localhosts <<- c("localhost", "127.0.0.1") 1440 non_localhosts <<- character(0L) 1441 return(NA) 1442 } 1443 1444 stop_if_not(length(worker) == 1, length(hostname) == 1) 1445 1446 ## Already known to a localhost or not to one? 1447 if (worker %in% localhosts) return(TRUE) 1448 if (worker %in% non_localhosts) return(FALSE) 1449 1450 if (worker == hostname) { 1451 ## Add worker to the list of known local hosts. 1452 localhosts <<- unique(c(localhosts, worker)) 1453 return(TRUE) 1454 } 1455 1456 alias <- getOption2("parallelly.localhost.hostname") 1457 if (is.character(alias) && worker == alias) { 1458 ## Add worker to the list of known local hosts. 1459 localhosts <<- unique(c(localhosts, worker)) 1460 return(TRUE) 1461 } 1462 1463 ## Scan known "hosts" files 1464 pathnames <- pathnames[file_test("-f", pathnames)] 1465 if (length(pathnames) == 0L) return(FALSE) 1466 1467 ## Search for (hostname, worker) and (worker, hostname) 1468 ## occuring on the same line and are separates by one or 1469 ## more whitespace symbols (but nothing else). 1470 pattern <- sprintf("^((|.*[[:space:]])%s[[:space:]]+%s([[:space:]]+|)|(|.*[[:space:]])%s[[:space:]]+%s([[:space:]]+|))$", hostname, worker, worker, hostname) 1471 1472 for (pathname in pathnames) { 1473 bfr <- readLines(pathname, warn = FALSE) 1474 if (any(grepl(pattern, bfr, ignore.case = TRUE))) { 1475 ## Add worker to the list of known local hosts. 1476 localhosts <<- unique(c(localhosts, worker)) 1477 return(TRUE) 1478 } 1479 } 1480 1481 ## Add worker to the list of known non-local hosts. 1482 non_localhosts <<- unique(c(non_localhosts, worker)) 1483 1484 FALSE 1485 } 1486}) ## is_localhost() 1487 1488 1489## Checks if a worker is specified by its IP number. 1490is_ip_number <- function(worker) { 1491 ip <- strsplit(worker, split = ".", fixed = TRUE)[[1]] 1492 if (length(ip) != 4) return(FALSE) 1493 ip <- as.integer(ip) 1494 if (anyNA(ip)) return(FALSE) 1495 all(0 <= ip & ip <= 255) 1496} 1497 1498## Checks if a worker is specified as a fully qualified domain name (FQDN) 1499is_fqdn <- function(worker) { 1500 grepl(".", worker, fixed = TRUE) 1501} 1502 1503 1504#' Search for SSH clients on the current system 1505#' 1506#' @param which A character vector specifying which types of SSH clients 1507#' to search for. If NULL, a default set of clients supported by the 1508#' current platform is searched for. 1509#' 1510#' @param first If TRUE, the first client found is returned, otherwise 1511#' all located clients are returned. 1512#' 1513#' @param must_work If TRUE and no clients was found, then an error 1514#' is produced, otherwise only a warning. 1515#' 1516#' @return A named list of pathnames to all located SSH clients. 1517#' The pathnames may be followed by zero or more command-line options, 1518#' i.e. the elements of the returned list are character vectors of length 1519#' one or more. 1520#' If `first = TRUE`, only the first one is returned. 1521#' Attribute `version` contains the output from querying the 1522#' executable for its version (via command-line option `-V`). 1523#' 1524#' @keywords internal 1525find_rshcmd <- function(which = NULL, first = FALSE, must_work = TRUE) { 1526 query_version <- function(bin, args = "-V") { 1527 v <- suppressWarnings(system2(bin, args = args, stdout = TRUE, stderr = TRUE)) 1528 v <- paste(v, collapse = "; ") 1529 stop_if_not(length(v) == 1L) 1530 v 1531 } 1532 1533 find_rstudio_ssh <- function() { 1534 path <- Sys.getenv("RSTUDIO_MSYS_SSH") 1535 if (!file_test("-d", path)) return(NULL) 1536 path <- normalizePath(path) 1537 path_org <- Sys.getenv("PATH") 1538 on.exit(Sys.setenv(PATH = path_org)) 1539 1540 ## Set PATH to only look in RSTUDIO_MSYS_SSH to avoid 1541 ## picking up other clients with the same name 1542 ## Comment: In RStudio, RSTUDIO_MSYS_SSH is appended 1543 ## to the PATH, see PATH in 'Tools -> Shell ...'. 1544 Sys.setenv(PATH = path) 1545 bin <- Sys.which("ssh") 1546 if (!nzchar(bin)) return(NULL) 1547 attr(bin, "type") <- "rstudio-ssh" 1548 attr(bin, "version") <- query_version(bin, args = "-V") 1549 bin 1550 } 1551 1552 find_putty_plink <- function() { 1553 bin <- Sys.which("plink") 1554 if (!nzchar(bin)) return(NULL) 1555 res <- c(bin, "-ssh") 1556 attr(res, "type") <- "putty-plink" 1557 attr(res, "version") <- query_version(bin, args = "-V") 1558 res 1559 } 1560 1561 find_ssh <- function() { 1562 bin <- Sys.which("ssh") 1563 if (!nzchar(bin)) return(NULL) 1564 attr(bin, "type") <- "ssh" 1565 v <- query_version(bin, args = "-V") 1566 attr(bin, "version") <- v 1567 if (any(grepl("OpenSSH_for_Windows", v))) 1568 attr(bin, "OpenSSH_for_Windows") <- TRUE 1569 bin 1570 } 1571 1572 if (!is.null(which)) stop_if_not(is.character(which), length(which) >= 1L, !anyNA(which)) 1573 stop_if_not(is.logical(first), length(first) == 1L, !is.na(first)) 1574 stop_if_not(is.logical(must_work), length(must_work) == 1L, !is.na(must_work)) 1575 1576 if (is.null(which)) { 1577 if (.Platform$OS.type == "windows") { 1578 which <- c("ssh", "putty-plink", "rstudio-ssh") 1579 } else { 1580 which <- c("ssh") 1581 } 1582 } 1583 1584 res <- list() 1585 for (name in which) { 1586 pathname <- switch(name, 1587 "ssh" = find_ssh(), 1588 "putty-plink" = find_putty_plink(), 1589 "rstudio-ssh" = find_rstudio_ssh(), 1590 stopf("Unknown 'rshcmd' type: %s", sQuote(name)) 1591 ) 1592 1593 if (!is.null(pathname)) { 1594 if (first) return(pathname) 1595 res[[name]] <- pathname 1596 } 1597 } 1598 1599 if (length(res) > 0) return(res) 1600 1601 msg <- sprintf("Failed to locate a default SSH client (checked: %s). Please specify one via argument 'rshcmd'.", paste(sQuote(which), collapse = ", ")) #nolint 1602 if (must_work) stop(msg) 1603 1604 pathname <- "ssh" 1605 msg <- sprintf("%s Will still try with %s.", msg, sQuote(paste(pathname, collapse = " "))) 1606 warning(msg) 1607 pathname 1608} 1609 1610 1611#' @importFrom utils installed.packages 1612session_info <- function(pkgs = getOption2("parallelly.makeNodePSOCK.sessionInfo.pkgs", FALSE)) { 1613 libs <- .libPaths() 1614 info <- list( 1615 r = c(R.version, os.type = .Platform$OS.type), 1616 system = as.list(Sys.info()), 1617 libs = libs, 1618 pkgs = if (isTRUE(pkgs)) { 1619 structure(lapply(libs, FUN = function(lib.loc) { 1620 pkgs <- installed.packages(lib.loc = lib.loc) 1621 if (length(pkgs) == 0) return(NULL) 1622 paste0(pkgs[, "Package"], "_", pkgs[, "Version"]) 1623 }), names = libs) 1624 }, 1625 pwd = getwd(), 1626 process = list(pid = Sys.getpid()) 1627 ) 1628 info 1629} 1630 1631 1632#' @importFrom utils capture.output 1633#' @importFrom parallel clusterCall 1634add_cluster_session_info <- local({ 1635 get_session_info <- session_info 1636 formals(get_session_info)$pkgs <- FALSE 1637 environment(get_session_info) <- getNamespace("utils") 1638 1639 function(cl) { 1640 stop_if_not(inherits(cl, "cluster")) 1641 1642 for (ii in seq_along(cl)) { 1643 node <- cl[[ii]] 1644 if (is.null(node)) next ## Happens with dryrun = TRUE 1645 1646 ## Session information already collected? 1647 if (!is.null(node$session_info)) next 1648 1649 pkgs <- getOption2("parallelly.makeNodePSOCK.sessionInfo.pkgs", FALSE) 1650 node$session_info <- clusterCall(cl[ii], fun = get_session_info, pkgs = pkgs)[[1]] 1651 1652 ## Sanity check, iff possible 1653 if (inherits(node, "SOCK0node") || inherits(node, "SOCKnode")) { 1654 pid <- capture.output(print(node)) 1655 pid <- as.integer(gsub(".* ", "", pid)) 1656 stop_if_not(node$session_info$process$pid == pid) 1657 } 1658 1659 cl[[ii]] <- node 1660 } 1661 1662 cl 1663 } 1664}) ## add_cluster_session_info() 1665 1666 1667## Gets the Windows build version, e.g. '10.0.17134.523' (Windows 10 v1803) 1668## and '10.0.17763.253' (Windows 10 v1809). 1669windows_build_version <- local({ 1670 if (.Platform$OS.type != "windows") return(function() NULL) 1671 function() { 1672 res <- shell("ver", intern = TRUE) 1673 if (length(res) == 0) return(NULL) 1674 res <- grep("Microsoft", res, value = TRUE) 1675 if (length(res) == 0) return(NULL) 1676 res <- gsub(".*Version ([0-9.]+).*", "\\1", res) 1677 tryCatch({ 1678 numeric_version(res) 1679 }, error = function(ex) NULL) 1680 } 1681}) 1682 1683 1684useWorkerPID <- local({ 1685 parent_pid <- NULL 1686 .cache <- list() 1687 1688 makeResult <- function(rank, rscript_sh) { 1689 if (is.null(parent_pid)) parent_pid <<- Sys.getpid() 1690 pidfile <- tempfile(pattern = sprintf("worker.rank=%d.parallelly.parent=%d.", 1691 rank, parent_pid), fileext = ".pid") 1692 pidfile <- normalizePath(pidfile, winslash = "/", mustWork = FALSE) 1693 pidcode <- sprintf('try(suppressWarnings(cat(Sys.getpid(),file="%s")), silent = TRUE)', pidfile) 1694 rscript_pid_args <- c("-e", shQuote(pidcode, type = rscript_sh)) 1695 list(pidfile = pidfile, rscript_pid_args = rscript_pid_args) 1696 } 1697 1698 function(rscript, rank, rscript_sh, force = FALSE, verbose = FALSE) { 1699 autoKill <- getOption2("parallelly.makeNodePSOCK.autoKill", TRUE) 1700 if (!isTRUE(as.logical(autoKill))) return(list()) 1701 1702 result <- makeResult(rank, rscript_sh = rscript_sh) 1703 1704 ## Already cached? 1705 key <- paste(rscript, collapse = "\t") 1706 if (!force && isTRUE(.cache[[key]])) return(result) 1707 1708 test_cmd <- paste(c( 1709 rscript, 1710 result$rscript_pid_args, 1711 "-e", shQuote(sprintf('file.exists("%s")', result$pidfile), type = rscript_sh) 1712 ), collapse = " ") 1713 if (verbose) { 1714 mdebugf("Testing if worker's PID can be inferred: %s", sQuote(test_cmd)) 1715 } 1716 1717 input <- NULL 1718 1719 ## AD HOC: 'singularity exec ... Rscript' requires input="". If not, 1720 ## they will be terminated because they try to read from non-existing 1721 ## standard input. /HB 2019-02-14 1722 if (any(grepl("singularity", rscript, ignore.case = TRUE))) input <- "" 1723 1724 res <- system(test_cmd, intern = TRUE, input = input) 1725 status <- attr(res, "status") 1726 suppressWarnings(file.remove(result$pidfile)) 1727 1728 .cache[[key]] <<- (is.null(status) || status == 0L) && any(grepl("TRUE", res)) 1729 if (verbose) mdebugf("- Possible to infer worker's PID: %s", .cache[[key]]) 1730 1731 result 1732 } 1733}) 1734 1735 1736readWorkerPID <- function(pidfile, wait = 0.5, maxTries = 8L, verbose = FALSE) { 1737 if (is.null(pidfile)) return(NULL) 1738 1739 if (verbose) mdebug("Attempting to infer PID for worker process ...") 1740 pid <- NULL 1741 1742 ## Wait for PID file 1743 tries <- 0L 1744 while (!file.exists(pidfile) && tries <= maxTries) { 1745 Sys.sleep(wait) 1746 tries <- tries + 1L 1747 } 1748 1749 if (file.exists(pidfile)) { 1750 pid0 <- NULL 1751 for (tries in 1:maxTries) { 1752 pid0 <- tryCatch(readLines(pidfile, warn = FALSE), error = identity) 1753 if (!inherits(pid0, "error")) break 1754 pid0 <- NULL 1755 Sys.sleep(wait) 1756 } 1757 file.remove(pidfile) 1758 1759 if (length(pid0) > 0L) { 1760 ## Use last one, if more than one ("should not happend") 1761 pid <- as.integer(pid0[length(pid0)]) 1762 if (verbose) mdebugf(" - pid: %s", pid) 1763 if (is.na(pid)) { 1764 warnf("Worker PID is a non-integer: %s", pid0) 1765 pid <- NULL 1766 } else if (pid == Sys.getpid()) { 1767 warnf("Hmm... worker PID and parent PID are the same: %s", pid) 1768 pid <- NULL 1769 } 1770 } 1771 } 1772 1773 if (verbose) mdebug("Attempting to infer PID for worker process ... done") 1774 1775 pid 1776} ## readWorkerPID() 1777 1778 1779 1780#' @export 1781summary.RichSOCKnode <- function(object, ...) { 1782 res <- list( 1783 host = NA_character_, 1784 r_version = NA_character_, 1785 platform = NA_character_, 1786 pwd = NA_character_, 1787 pid = NA_integer_ 1788 ) 1789 host <- object[["host"]] 1790 if (!is.null(host)) res$host <- host 1791 session_info <- object[["session_info"]] 1792 if (!is.null(session_info)) { 1793 res$r_version <- session_info[["r"]][["version.string"]] 1794 res$platform <- session_info[["r"]][["platform"]] 1795 res$pwd <- session_info[["pwd"]] 1796 res$pid <- session_info[["process"]][["pid"]] 1797 } 1798 as.data.frame(res, stringsAsFactors = FALSE) 1799} 1800 1801#' @export 1802summary.RichSOCKcluster <- function(object, ...) { 1803 res <- lapply(object, FUN = function(node) { 1804 if (is.null(node)) return(summary.RichSOCKnode(node)) 1805 summary(node) 1806 }) 1807 res <- do.call(rbind, res) 1808 rownames(res) <- NULL 1809 res 1810} 1811 1812#' @export 1813print.RichSOCKcluster <- function (x, ...) { 1814 info <- summary(x) 1815 txt <- sprintf("host %s", sQuote(info[["host"]])) 1816 specs <- sprintf("(%s, platform %s)", info[["r_version"]], info[["platform"]]) 1817 specs[is.na(info[["r_version"]])] <- "(R version and platform not queried)" 1818 txt <- paste(txt, specs, sep = " ") 1819 t <- table(txt) 1820 t <- t[order(t, decreasing = TRUE)] 1821 w <- ifelse(t == 1L, "node is", "nodes are") 1822 txt <- sprintf("%d %s on %s", t, w, names(t)) 1823 txt <- paste(txt, collapse = ", ") 1824 txt <- sprintf("Socket cluster with %d nodes where %s", length(x), txt) 1825 1826 ## Report on autoStop? 1827 if (!is.null(attr(x, "gcMe"))) { 1828 txt <- sprintf("%s. This cluster is registered to be automatically stopped by the garbage collector", txt) 1829 } 1830 1831 cat(txt, "\n", sep = "") 1832 invisible(x) 1833} 1834